Duration

They might be multiple ways to define the duration of a discontinuity. Here are some possibilities:

Notes:

Caveats:

Maxium distance method


source

ts_max_distance

 ts_max_distance (ts:xarray.core.dataarray.DataArray, coord:str='time')

Compute the time interval when the timeseries has maxium cumulative variation

test for ts_max_distance function
time = pd.date_range('2000-01-01', periods=10)
x = np.linspace(0, np.pi, 10)
# generate data circular in three dimensions, so the biggest distance is between the first and the last point    
data = np.zeros((10, 3))
data[:, 0] = np.sin(x)
data[:, 1] = np.cos(x)
ts = xr.DataArray(data, coords={'time': time}, dims=['time', 'space'])
start, end = ts_max_distance(ts)
assert start == time[0]
assert end == time[-1]

Maxium derivative method


source

get_time_from_condition

 get_time_from_condition (vec:xarray.core.dataarray.DataArray, threshold,
                          condition_type)

source

find_start_end_times

 find_start_end_times (vec_diff_mag:xarray.core.dataarray.DataArray,
                       d_time, threshold)

source

ts_max_derivative

 ts_max_derivative (vec:xarray.core.dataarray.DataArray,
                    threshold_ratio=0.25)

source

calc_duration

 calc_duration (ts:xarray.core.dataarray.DataArray,
                method:Literal['distance','derivative']='distance',
                **kwargs)

Obsolete codes

This is obsolete codes because the timewindow now is overlapping. No need to consider where magnetic discontinuities happens in the boundary of one timewindow.

Code
def calc_candidate_d_duration(candidate, data) -> pd.Series:
    try:
        if pd.isnull(candidate['t.d_start']) or pd.isnull(candidate['t.d_end']):
            candidate_data = get_candidate_data(candidate, data, neighbor=1)
            d_time = candidate['d_time']
            threshold = candidate['threshold']
            return calc_d_duration(candidate_data, d_time, threshold)
        else:
            return pd.Series({
                't.d_start': candidate['t.d_start'],
                't.d_end': candidate['t.d_end'],
            })
    except Exception as e:
        # logger.debug(f"Error for candidate {candidate} at {candidate['time']}: {str(e)}")
        print(f"Error for candidate {candidate} at {candidate['time']}: {str(e)}")
        raise e
Code
def calc_d_duration(vec: xr.DataArray, d_time, threshold) -> pd.Series:
    vec_diff = vec.differentiate("time", datetime_unit="s")
    vec_diff_mag = linalg.norm(vec_diff, dims='v_dim')

    start_time, end_time = find_start_end_times(vec_diff_mag, d_time, threshold)

    return pd.Series({
        't.d_start': start_time,
        't.d_end': end_time,
    })

Calibrates candidate duration

This calibration is based on the assumption that the magnetic discontinuity is symmetric around the center of time, which is not always true.

So instead of calibrating the duration, we drop the events. - Cons: Might influence the statistics of occurrence rate, but - Pros: More robust results about the properties of the magnetic discontinuity.

Code
def calibrate_candidate_duration(
    candidate: pd.Series, data:xr.DataArray, data_resolution, ratio = 3/4
):
    """
    Calibrates the candidate duration. 
    - If only one of 't.d_start' or 't.d_end' is provided, calculates the missing one based on the provided one and 'd_time'.
    - Then if this is not enough points between 't.d_start' and 't.d_end', returns None for both.
    
    
    Parameters
    ----------
    - candidate (pd.Series): The input candidate with potential missing 't.d_start' or 't.d_end'.
    
    Returns
    -------
    - pd.Series: The calibrated candidate.
    """
    
    start_notnull = pd.notnull(candidate['t.d_start'])
    stop_notnull = pd.notnull(candidate['t.d_end']) 
    
    match start_notnull, stop_notnull:
        case (True, True):
            t.d_start = candidate['t.d_start']
            t.d_end = candidate['t.d_end']
        case (True, False):
            t.d_start = candidate['t.d_start']
            t.d_end = candidate['d_time'] -  candidate['t.d_start'] + candidate['d_time']
        case (False, True):
            t.d_start = candidate['d_time'] -  candidate['t.d_end'] + candidate['d_time']
            t.d_end = candidate['t.d_end']
        case (False, False):
            return pandas.Series({
                't.d_start': None,
                't.d_end': None,
            })
    
    duration = t.d_end - t.d_start
    num_of_points_between = data.time.sel(time=slice(t.d_start, t.d_end)).count().item()
    
    if num_of_points_between <= (duration/data_resolution) * ratio:
        t.d_start = None
        t.d_end = None
    
    return pandas.Series({
        't.d_start': t.d_start,
        't.d_end': t.d_end,
    })
Code
def calibrate_candidates_duration(candidates, sat_fgm, data_resolution):
    # calibrate duration

    calibrate_duration = pdp.ApplyToRows(
        lambda candidate: calibrate_candidate_duration(
            candidate, sat_fgm, data_resolution
        ),
        func_desc="calibrating duration parameters if needed",
    )

    temp_candidates = candidates.loc[
        lambda df: df["t.d_start"].isnull() | df["t.d_end"].isnull()
    ]  # temp_candidates = candidates.query('t.d_start.isnull() | t.d_end.isnull()') # not implemented in `modin`

    if not temp_candidates.empty:
        temp_candidates_updated = calibrate_duration(sat_fgm, data_resolution).apply(
            temp_candidates
        )
        candidates.update(temp_candidates_updated)
    return candidates