Code
= candidates.to_pandas()
candidates_pd = mpd.DataFrame(candidates_pd)
candidates_modin # candidates_x = xpd.DataFrame(candidates_pd)
Full feature extraction
get_candidates (candidates:modin.pandas.dataframe.DataFrame, candidate_type=None, num:int=4)
get_candidate_data (candidate:dict, data:xarray.core.dataarray.DataArray, neighbor:int=0)
calc_candidate_duration (candidate, data, **kwargs)
The PDF of the field rotation angles across the solar-wind IDs is well fitted by the exponential function exp(−θ/)…
get_data_at_times (data:xarray.core.dataarray.DataArray, times)
Select data at specified times.
calc_rotation_angle (v1, v2)
*Computes the rotation angle between two vectors.
Parameters: - v1: The first vector(s). - v2: The second vector(s).*
calc_events_rotation_angle (events, data:xarray.core.dataarray.DataArray)
Computes the rotation angle(s) at two different time steps.
calc_normal_direction (v1, v2, normalize=True)
Computes the normal direction of two vectors.
Type | Default | Details | |
---|---|---|---|
v1 | array_like | The first vector(s). | |
v2 | array_like | The second vector(s). | |
normalize | bool | True | |
Returns | ndarray |
calc_events_normal_direction (events, data:xarray.core.dataarray.DataArray)
Computes the normal directions(s) at two different time steps.
calc_events_vec_change (events, data:xarray.core.dataarray.DataArray)
Utils function to calculate features related to the change of the magnetic field
patch pdp.ApplyToRows
to work with modin
and xorbits
DataFrames
Pipelines
Class for processing IDs
Notes: Using lambda
function instead of partial
because of partial
freezeing the args decreasing the performance
IDsPdPipeline ()
Initialize self. See help(type(self)) for accurate signature.
process_events (candidates_pl:polars.dataframe.frame.DataFrame, sat_fgm:xarray.core.dataarray.DataArray, data_resolution:datetime.timedelta, modin=True, method:Literal['fit','derivative']='fit', **kwargs)
Process candidates DataFrame
Type | Default | Details | |
---|---|---|---|
candidates_pl | DataFrame | potential candidates DataFrame | |
sat_fgm | DataArray | satellite FGM data | |
data_resolution | timedelta | time resolution of the data | |
modin | bool | True | |
method | Literal | fit | |
kwargs | |||
Returns | DataFrame |
Generally mapply
and modin
are the fastest. xorbits
is expected to be the fastest but it is not and it is the slowest one.
#| notest
sat = 'jno'
coord = 'se'
cols = ["BX", "BY", "BZ"]
tau = timedelta(seconds=60)
data_resolution = timedelta(seconds=1)
if True:
year = 2012
files = f'../data/{sat}_data_{year}.parquet'
output = f'../data/{sat}_candidates_{year}_tau_{tau.seconds}.parquet'
data = pl.scan_parquet(files).set_sorted('time').collect()
indices = compute_indices(data, tau)
# filter condition
sparse_num = tau / data_resolution // 3
filter_condition = filter_indices(sparse_num = sparse_num)
candidates = indices.filter(filter_condition).with_columns(pl_format_time(tau)).sort('time')
data_c = compress_data_by_events(data, candidates, tau)
sat_fgm = df2ts(data_c, cols, attrs={"units": "nT"})
if True:
pdp_test = pdp.ApplyToRows(
lambda candidate: calc_candidate_duration(candidate, sat_fgm), # fast a little bit
# lambda candidate: calc_duration(get_candidate_data_xr(candidate, sat_fgm)),
# lambda candidate: calc_duration(sat_fgm.sel(time=slice(candidate['tstart'], candidate['tstop']))),
func_desc="calculating duration parameters",
)
# process_events(candidates_modin, sat_fgm, sat_state, data_resolution)
# ---
# successful cases
# ---
# candidates_pd.mapply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works, 4.2 secs
# candidates_pd.mapply(calc_candidate_duration, axis=1, data=sat_fgm) # this works, but a little bit slower, 6.7 secs
# candidates_pd.apply(calc_candidate_duration, axis=1, data=sat_fgm) # Standard case: 24+s secs
# candidates_pd.swifter.apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 80 secs
# candidates_pd.swifter.set_dask_scheduler(scheduler="threads").apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 60 secs
# candidates_modin.apply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works with ray, 6 secs # NOTE: can not work with dask
# candidates_x.apply(calc_candidate_duration, axis=1, data=sat_fgm) # 30 seconds
# pdp_test(candidates_modin) # this works, 8 secs
# ---
# failed cases
# ---
# candidates_modin.apply(calc_candidate_duration, axis=1, data=sat_fgm) # AttributeError: 'DataFrame' object has no attribute 'sel'
def benchmark_results(results, sat_fgm):
func = partial(calc_candidate_duration, data=sat_fgm)
task_dict = {
'pandas': (candidates_pd, lambda _: _.apply(func, axis=1)),
'pandas-mapply': (candidates_pd, lambda _: _.mapply(func, axis=1)),
'modin': (candidates_modin, lambda _: _.apply(func, axis=1)),
# 'xorbits': (candidates_x, lambda _: _.apply(func, axis=1)),
}
results = benchmark(task_dict)
return results