IDs from Juno

See following notebooks for details:

Setup

::: {#cell-2 .cell 0=‘h’ 1=‘i’ 2=‘d’ 3=‘e’}

Code
%load_ext autoreload
%autoreload 2

:::

Code
from discontinuitypy.datasets import IDsDataset
import polars as pl
from fastcore.utils import walk

from loguru import logger

from datetime import timedelta
Code
mission = "JNO"
ts = timedelta(seconds=1)
tau = timedelta(seconds=60)


data_dir = '../../../data'
dir_path = f'{data_dir}/03_primary/JNO_MAG_ts_{ts.seconds}s'
juno_state_path = f'{data_dir}/03_primary/JNO_STATE_ts_3600s.parquet'
vec_cols = ['v_x', 'v_y', 'v_z']

format = 'arrow'
fname = f'events.{mission}.ts_{ts.total_seconds():.2f}s_tau_{tau.seconds}s.{format}'
output_path = f'{data_dir}/05_reporting/{fname}'
Code
plasma_data = pl.scan_parquet(juno_state_path).sort('time')
logger.info(plasma_data.columns)

Standard Process

Code
juno_events = []
for mag_path in walk(dir_path):
    mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')

    _juno_events = (
        IDsDataset(
            mag_data=mag_data,
            plasma_data=plasma_data,
            tau=tau,
            ts=ts,
            vec_cols=vec_cols,
            density_col="plasma_density",
            speed_col="plasma_speed",
            temperature_col="plasma_temperature",
        )
        .find_events(return_best_fit=False)
        .update_candidates_with_plasma_data()
        .events
    )
    
    juno_events.append(_juno_events)
    
juno_ids_dataset = IDsDataset(
    events=pl.concat(juno_events),
    mag_data= pl.scan_parquet(list(walk(dir_path))).drop('X', 'Y', 'Z').sort('time')
)

juno_ids_dataset.export(output_path)

Check the discontinuity in Juno cruise phase

Full time resolution data

0.03 s - 0.125 s time resolution

Code
from space_analysis.missions.juno.fgm import download_data
from discontinuitypy.utils.basic import resample
from toolz import curry
from pipe import select
from fastcore.utils import mkdir
import os
Code
def preprocess(
    fp,
    every = timedelta(seconds = 0.125),
    dir_path = "../../../data/02_intermediate/JNO_MAG_8hz",
    update = False
):
    fname = fp.split('/')[-1]
    
    output_path = f"{dir_path}/{fname}"
    
    if not os.path.exists(output_path) or update:
        mkdir(dir_path, parents=True, exist_ok = True)
        df = pl.scan_ipc(fp).sort('time').pipe(resample, every = every)
        df.collect().write_ipc(output_path)
    return output_path

@curry
def process(fp, ids_dataset: IDsDataset, sparse_num = 10, **kwargs):
    df = pl.scan_ipc(fp).sort('time').unique('time')

    ids_dataset.data = df
    
    return ids_dataset.find_events(return_best_fit=False, sparse_num = sparse_num, **kwargs).update_candidates_with_plasma_data().events
Code
mag_paths = list(download_data(datatype="FULL") | select(preprocess))
Code
mag_paths
Code
ts = timedelta(seconds=0.125)
tau = timedelta(seconds=20)
method = "derivative"
# method = "fit"

fname = f'events.{mission}.{method}.ts_{ts.total_seconds():.2f}s_tau_{tau.seconds}s.{format}'
output_path = f'{data_dir}/05_reporting/{fname}'
logger.info(output_path)
Code
ids_ds = IDsDataset(
    plasma_data=plasma_data,
    tau=tau,
    ts=ts,
    vec_cols=vec_cols,
    density_col="plasma_density",
    speed_col="plasma_speed",
    temperature_col="plasma_temperature",
)

Reasonably splitting the data files may accelerate the processing.

Code
fps = split_list(mag_paths, n=100)

func = process(ids_dataset = ids_ds, sparse_num = 10, method = method)

ids_ds.data = pl.scan_ipc(mag_paths)
ids_ds.events = pl.concat(fps | select(func)) 
ids_ds.export(output_path)

Superposed epoch analysis

Code
from discontinuitypy.utils.basic import df2ts
from discontinuitypy.core.pipeline import compress_data_by_interval
from xarray_einstats import linalg

from sea_norm import sean
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from matplotlib.pyplot import Figure, Axes

from discontinuitypy.integration import J_FACTOR

from tqdm.auto import tqdm

def keep_good_fit(df: pl.DataFrame, rsquared = 0.95):
    return df.filter(pl.col('fit.stat.rsquared') > rsquared)
Code
from discontinuitypy.propeties.mva import minvar
import numpy as np


def get_dBdt_data(data: pl.LazyFrame):
    """
    Calculate the time derivative of the magnetic field
    """
    # TODO: compress data first by events, however, this will decrease the reliability of the derivative at the edges of the events
    ts = df2ts(data)

    vec_diff = ts.differentiate("time", datetime_unit="s")
    vec_diff_mag: xr.DataArray = linalg.norm(vec_diff, dims="v_dim")
    return vec_diff_mag.to_dataframe(name="dBdt")


def get_mva_data(
    data: pl.LazyFrame,
    starts: list,
    ends: list,
    columns=["B_l", "B_m", "B_n"],
    normalize=True,
):
    for start, end in tqdm(zip(starts, ends)):
        event_data = data.filter(
            pl.col("time") >= start, pl.col("time") <= end
        ).collect()

        event_numpy = event_data.drop("time").to_numpy()  # Assuming this is efficient for your use case
        time = event_data.get_column("time").to_numpy()
        
        vrot, _, _ = minvar(event_numpy)
        
        if True:
            vl = vrot[:, 0]
            vl = vl * np.sign(vl[-1] - vl[0])
            vl_ts = xr.DataArray(vl, dims="time", coords={"time": time})
            dvl_dt_df = vl_ts.differentiate("time", datetime_unit="s").to_dataframe(name="dBl_dt")

        if normalize:
            vrot = normalize_mva_data(vrot)
            vrot_df = pd.DataFrame(vrot, columns=columns, index=time)
            
        yield pd.concat([vrot_df, dvl_dt_df], axis=1)


def normalize_mva_data(
    data: np.ndarray, shift=False  # shift the data in l direction to the origin
):
    """
    normalize the MVA data: Bl, Bm, and Bn were respectively normalized to B0 = 0.5ΔBl, Bm, and <B>
    """
    vl, vm, vn = data.T

    vl_norm_q = (vl[-1] - vl[0]) / 2
    vm_norm_q = (vm[0] + vm[-1]) / 2
    vn_norm_q = (vn[0] + vn[-1]) / 2

    return np.array([vl / vl_norm_q, vm / vm_norm_q, vn / vn_norm_q]).T
Code
def sea_ids(
    ds: IDsDataset,
    event_cols=["t.d_start", "t.d_time", "t.d_end"],
    sea_cols=[
        "B_l",
        "j_m",
        "j_m_norm",
        "j_k",
        "j_k_norm",
        "B_m",
    ],
    bins=[10, 10],
    return_data=True,
):
    # converting to a list of numpy arrays

    events = keep_good_fit(ds.events)

    sea_events = [col.to_numpy() for col in events[event_cols]]

    mag_data = ds.data
    mag_data_c = compress_data_by_interval(
        mag_data.collect(), sea_events[0], sea_events[2]
    ).lazy()

    # dBdt_data = get_dBdt_data(ds.data)
    dBdt_data = get_dBdt_data(mag_data_c)
    mva_data = pd.concat(get_mva_data(mag_data_c, sea_events[0], sea_events[2]))

    b_data = dBdt_data.join(mva_data, on="time")

    p_data = (
        events[["time", "v_k", "j_Alfven"]].to_pandas().set_index("time").sort_index()
    )

    data = pd.merge_asof(
        b_data,
        p_data,
        left_index=True,
        right_index=True,
        direction="nearest",
    )

    data = data.assign(
        j_m=lambda df: df.dBl_dt / df.v_k * J_FACTOR,
        j_m_norm =lambda df: df.j_m / df.j_Alfven,
        j_k=lambda df: df.dBdt / df.v_k * J_FACTOR,
        j_k_norm=lambda df: df.j_k / df.j_Alfven,
    )

    return sean(data, sea_events, bins, return_data=return_data, cols=sea_cols)


def plot_SEA(SEAarray: pd.DataFrame, meta) -> tuple[Figure, list[Axes]]:
    
    cols = meta["sea_cols"]
    fig, axes = plt.subplots(nrows=len(cols), sharex=True, squeeze=True)

    if len(cols) == 1:
        axes = [axes]

    # loop over columns that were analyzed
    for c, ax in zip(cols, axes):
        # for each column identify the column titles which
        # have 'c' in the title and those that don't have
        # 'cnt' in the title
        # e.g. for AE columns
        # AE_mean, AE_median, AE_lowq, AE_upq, AE_cnt
        # fine columns AE_mean, AE_median, AE_lowq, AE_upq
        # mask = SEAarray.columns.str.startswith(c) & ~SEAarray.columns.str.endswith("cnt")
        mask = [c + "_mean", c + "_median", c + "_lowq", c + "_upq"]

        # plot the SEA data
        SEAarray.loc[:, mask].plot(
            ax=ax,
            style=["r-", "b-", "b--", "b--"],
            xlabel="Normalized Time",
            ylabel=c.replace("_", " "),
            legend=False,
        )

    return fig, axes

def plot_ids_sea(SEAarray: pd.DataFrame, meta) -> tuple[Figure, list[Axes]]:
    import scienceplots
    
    SEAarray.index = SEAarray.index.map(lambda x: x / bin/2)

    with plt.style.context(['science', 'nature', 'notebook']):

        fig, axes = plot_SEA(SEAarray, meta)

        axes[0].set_ylabel(r"$B_x \ / \ B_0$")
        axes[1].set_ylabel(r"$J_y$")
        axes[2].set_ylabel(r"$J_y \ / \ J_A$")
        axes[3].set_ylabel(r"$J_k$")
        axes[4].set_ylabel(r"$J_k \ / \ J_A$")
        axes[5].set_ylabel(r"$B_m \ / \ B_g$")

        fig.tight_layout()
        fig.subplots_adjust(hspace=0)
        
        axes[0].legend(labels=["Mean", "Median", "LowQ", "UpQ"])
    return fig, axes
    # fig.savefig(f"../../../figures/sea/sea_juno_first_year_{freq}.png", dpi = 300)

First year

Code
mag_path = sorted(list(walk(dir_path)))[0]
tau = timedelta(seconds=60)
mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')

ids_ds = (
    IDsDataset(
        mag_data=mag_data,
        plasma_data=plasma_data,
        tau=tau,
        ts=ts,
        vec_cols=vec_cols,
        density_col="plasma_density",
        speed_col="plasma_speed",
        temperature_col="plasma_temperature",
    )
    .find_events(return_best_fit=True)
    .update_candidates_with_plasma_data()
)
Code
mag_paths = list(download_data(datatype="FULL") | select(preprocess))
Code
freq = 'high'
# freq = 'low'
if freq == 'high': # use high dimensional data
    ids_ds.data = pl.scan_ipc(mag_paths[0:365]).drop('X', 'Y', 'Z').sort('time')
    bin = 16
else: # use low dimensional data
    bin = 8

SEAarray, meta, p1data, p2data = sea_ids(ids_ds, bins=[bin, bin])
Figure 1
Code
ids_ds.plot_candidates(num=20, plot_fit_data=True, predicates=(pl.col('fit.stat.rsquared') > 0.95))

Last year

Code
# mag_path = sorted(list(walk(dir_path)))[-1]
tau = timedelta(seconds=300)
tau = timedelta(seconds=60)
mag_path = sorted(list(walk(dir_path)))[-1]
mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')

ids_ds = (
    IDsDataset(
        mag_data=mag_data,
        plasma_data=plasma_data,
        tau=tau,
        ts=ts,
        vec_cols=vec_cols,
        density_col="plasma_density",
        speed_col="plasma_speed",
        temperature_col="plasma_temperature",
    )
    .find_events(return_best_fit=True)
    .update_candidates_with_plasma_data()
)
Code
mag_paths = list(download_data(datatype="FULL") | select(preprocess))

freq = 'high'
# freq = 'low'
if freq == 'high': # use high dimensional data
    ids_ds.data = pl.scan_ipc(mag_paths[-365:]).drop('X', 'Y', 'Z').sort('time')
    bin = 16
else: # use low dimensional data
    bin = 8

SEAarray, meta, p1data, p2data = sea_ids(ids_ds, bins=[bin, bin])
Code
fig, axes = plot_ids_sea(SEAarray, meta)
fig.savefig(f"../../../figures/sea/sea_juno_last_year_{freq}.png", dpi = 300)

Superposed epoch analysis

Code
groupby = 'index'
columns = ['dBdt', 'j_k']

# Plot each group
fig, axes = plt.subplots(nrows=len(columns))

if len(columns) == 1:
    axes = [axes]

p1groups = p1data.groupby(groupby)
p2groups = p2data.groupby(groupby)

for ax, column in zip(axes, columns):
    for name, group in p1groups:
        ax.plot(group['t_norm'] -1 , group[column], color='grey', alpha=0.3)
        
    for name, group in p2groups:
        ax.plot(group['t_norm'], group[column], color='grey', alpha=0.3)

# plt.yscale('log')
plt.show()
Code
ids_ds.plot_candidates(num=20, plot_fit_data=True, predicates=(pl.col('fit.stat.rsquared') > 0.95))

Processing the whole data

Obsolete

Estimate

1 day of data resampled by 1 sec is about 12 MB.

So 1 year of data is about 4 GB, and 6 years of JUNO Cruise data is about 24 GB.

Downloading rate is about 250 KB/s, so it will take about 3 days to download all the data.

Code
num_of_files = 6*365
jno_file_size = 12e3
thm_file_size = 40e3
files_size = jno_file_size + thm_file_size
downloading_rate = 250
processing_rate = 1/60

time_to_download = num_of_files * files_size / downloading_rate / 60 / 60
space_required = num_of_files * files_size / 1e6
time_to_process = num_of_files / processing_rate / 60 / 60

print(f"Time to download: {time_to_download:.2f} hours")
print(f"Disk space required: {space_required:.2f} GB")
print(f"Time to process: {time_to_process:.2f} hours")