ID properties

Full feature extraction


source

get_candidates

 get_candidates (candidates:modin.pandas.dataframe.DataFrame,
                 candidate_type=None, num:int=4)

source

get_candidate_data

 get_candidate_data (candidate:dict, data:xarray.core.dataarray.DataArray,
                     neighbor:int=0)

Duration


source

calc_candidate_duration

 calc_candidate_duration (candidate, data, **kwargs)

Minimum variance analysis (MVA) features

Field rotation angles

The PDF of the field rotation angles across the solar-wind IDs is well fitted by the exponential function exp(−θ/)…


source

get_data_at_times

 get_data_at_times (data:xarray.core.dataarray.DataArray, times)

Select data at specified times.


source

calc_rotation_angle

 calc_rotation_angle (v1, v2)

*Computes the rotation angle between two vectors.

Parameters: - v1: The first vector(s). - v2: The second vector(s).*


source

calc_events_rotation_angle

 calc_events_rotation_angle (events, data:xarray.core.dataarray.DataArray)

Computes the rotation angle(s) at two different time steps.

Normal direction


source

calc_normal_direction

 calc_normal_direction (v1, v2, normalize=True)

Computes the normal direction of two vectors.

Type Default Details
v1 array_like The first vector(s).
v2 array_like The second vector(s).
normalize bool True
Returns ndarray

source

calc_events_normal_direction

 calc_events_normal_direction (events,
                               data:xarray.core.dataarray.DataArray)

Computes the normal directions(s) at two different time steps.


source

calc_events_vec_change

 calc_events_vec_change (events, data:xarray.core.dataarray.DataArray)

Utils function to calculate features related to the change of the magnetic field

Pipelines

patch pdp.ApplyToRows to work with modin and xorbits DataFrames

Pipelines Class for processing IDs

Notes: Using lambda function instead of partial because of partial freezeing the args decreasing the performance


source

IDsPdPipeline

 IDsPdPipeline ()

Initialize self. See help(type(self)) for accurate signature.


source

process_events

 process_events (candidates_pl:polars.dataframe.frame.DataFrame,
                 sat_fgm:xarray.core.dataarray.DataArray,
                 data_resolution:datetime.timedelta, modin=True,
                 method:Literal['fit','derivative']='fit', **kwargs)

Process candidates DataFrame

Type Default Details
candidates_pl DataFrame potential candidates DataFrame
sat_fgm DataArray satellite FGM data
data_resolution timedelta time resolution of the data
modin bool True
method Literal fit
kwargs
Returns DataFrame

Test

Test parallelization

Generally mapply and modin are the fastest. xorbits is expected to be the fastest but it is not and it is the slowest one.

#| notest
sat = 'jno'
coord = 'se'
cols = ["BX", "BY", "BZ"]
tau = timedelta(seconds=60)
data_resolution = timedelta(seconds=1)

if True:
    year = 2012
    files = f'../data/{sat}_data_{year}.parquet'
    output = f'../data/{sat}_candidates_{year}_tau_{tau.seconds}.parquet'

    data = pl.scan_parquet(files).set_sorted('time').collect()

    indices = compute_indices(data, tau)
    # filter condition
    sparse_num = tau / data_resolution // 3
    filter_condition = filter_indices(sparse_num = sparse_num)

    candidates = indices.filter(filter_condition).with_columns(pl_format_time(tau)).sort('time')
    
    data_c = compress_data_by_events(data, candidates, tau)
    sat_fgm = df2ts(data_c, cols, attrs={"units": "nT"})
Code
candidates_pd = candidates.to_pandas()
candidates_modin = mpd.DataFrame(candidates_pd)
# candidates_x = xpd.DataFrame(candidates_pd)
Test different libraries to parallelize the computation
if True:
    pdp_test = pdp.ApplyToRows(
        lambda candidate: calc_candidate_duration(candidate, sat_fgm),  # fast a little bit
        # lambda candidate: calc_duration(get_candidate_data_xr(candidate, sat_fgm)),
        # lambda candidate: calc_duration(sat_fgm.sel(time=slice(candidate['tstart'], candidate['tstop']))),
        func_desc="calculating duration parameters",
    )
    
    # process_events(candidates_modin, sat_fgm, sat_state, data_resolution)
    
    # ---
    # successful cases
    # ---
    # candidates_pd.mapply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works, 4.2 secs
    # candidates_pd.mapply(calc_candidate_duration, axis=1, data=sat_fgm) # this works, but a little bit slower, 6.7 secs
    
    # candidates_pd.apply(calc_candidate_duration, axis=1, data=sat_fgm) # Standard case: 24+s secs
    # candidates_pd.swifter.apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 80 secs
    # candidates_pd.swifter.set_dask_scheduler(scheduler="threads").apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 60 secs
    # candidates_modin.apply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works with ray, 6 secs # NOTE: can not work with dask
    # candidates_x.apply(calc_candidate_duration, axis=1, data=sat_fgm) # 30 seconds
    # pdp_test(candidates_modin) # this works, 8 secs
    
    # ---
    # failed cases
    # ---
    # candidates_modin.apply(calc_candidate_duration, axis=1, data=sat_fgm) # AttributeError: 'DataFrame' object has no attribute 'sel'
Code
import timeit
from functools import partial
Code
def benchmark(task_dict, number=1):
    results = {}
    for name, (data, task) in task_dict.items():
        try:
            time_taken = timeit.timeit(
                lambda: task(data),
                number=number
            )
            results[name] = time_taken / number
        except Exception as e:
            results[name] = str(e)
    return results
Code
def benchmark_results(results, sat_fgm):
    func = partial(calc_candidate_duration, data=sat_fgm)
    task_dict = {
        'pandas': (candidates_pd, lambda _: _.apply(func, axis=1)),
        'pandas-mapply': (candidates_pd, lambda _: _.mapply(func, axis=1)),
        'modin': (candidates_modin, lambda _: _.apply(func, axis=1)),
        # 'xorbits': (candidates_x, lambda _: _.apply(func, axis=1)),
    }

    results = benchmark(task_dict)
    return results