Code
from discontinuitypy.datasets import IDsDataset
import polars as pl
from fastcore.utils import walk
from loguru import logger
from datetime import timedelta
See following notebooks for details:
::: {#cell-2 .cell 0=‘h’ 1=‘i’ 2=‘d’ 3=‘e’}
:::
mission = "JNO"
ts = timedelta(seconds=1)
tau = timedelta(seconds=60)
data_dir = '../../../data'
dir_path = f'{data_dir}/03_primary/JNO_MAG_ts_{ts.seconds}s'
juno_state_path = f'{data_dir}/03_primary/JNO_STATE_ts_3600s.parquet'
vec_cols = ['v_x', 'v_y', 'v_z']
format = 'arrow'
fname = f'events.{mission}.ts_{ts.total_seconds():.2f}s_tau_{tau.seconds}s.{format}'
output_path = f'{data_dir}/05_reporting/{fname}'
juno_events = []
for mag_path in walk(dir_path):
mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')
_juno_events = (
IDsDataset(
mag_data=mag_data,
plasma_data=plasma_data,
tau=tau,
ts=ts,
vec_cols=vec_cols,
density_col="plasma_density",
speed_col="plasma_speed",
temperature_col="plasma_temperature",
)
.find_events(return_best_fit=False)
.update_candidates_with_plasma_data()
.events
)
juno_events.append(_juno_events)
juno_ids_dataset = IDsDataset(
events=pl.concat(juno_events),
mag_data= pl.scan_parquet(list(walk(dir_path))).drop('X', 'Y', 'Z').sort('time')
)
juno_ids_dataset.export(output_path)
0.03 s - 0.125 s time resolution
def preprocess(
fp,
every = timedelta(seconds = 0.125),
dir_path = "../../../data/02_intermediate/JNO_MAG_8hz",
update = False
):
fname = fp.split('/')[-1]
output_path = f"{dir_path}/{fname}"
if not os.path.exists(output_path) or update:
mkdir(dir_path, parents=True, exist_ok = True)
df = pl.scan_ipc(fp).sort('time').pipe(resample, every = every)
df.collect().write_ipc(output_path)
return output_path
@curry
def process(fp, ids_dataset: IDsDataset, sparse_num = 10, **kwargs):
df = pl.scan_ipc(fp).sort('time').unique('time')
ids_dataset.data = df
return ids_dataset.find_events(return_best_fit=False, sparse_num = sparse_num, **kwargs).update_candidates_with_plasma_data().events
Reasonably splitting the data files may accelerate the processing.
from discontinuitypy.utils.basic import df2ts
from discontinuitypy.core.pipeline import compress_data_by_interval
from xarray_einstats import linalg
from sea_norm import sean
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from matplotlib.pyplot import Figure, Axes
from discontinuitypy.integration import J_FACTOR
from tqdm.auto import tqdm
def keep_good_fit(df: pl.DataFrame, rsquared = 0.95):
return df.filter(pl.col('fit.stat.rsquared') > rsquared)
from discontinuitypy.propeties.mva import minvar
import numpy as np
def get_dBdt_data(data: pl.LazyFrame):
"""
Calculate the time derivative of the magnetic field
"""
# TODO: compress data first by events, however, this will decrease the reliability of the derivative at the edges of the events
ts = df2ts(data)
vec_diff = ts.differentiate("time", datetime_unit="s")
vec_diff_mag: xr.DataArray = linalg.norm(vec_diff, dims="v_dim")
return vec_diff_mag.to_dataframe(name="dBdt")
def get_mva_data(
data: pl.LazyFrame,
starts: list,
ends: list,
columns=["B_l", "B_m", "B_n"],
normalize=True,
):
for start, end in tqdm(zip(starts, ends)):
event_data = data.filter(
pl.col("time") >= start, pl.col("time") <= end
).collect()
event_numpy = event_data.drop("time").to_numpy() # Assuming this is efficient for your use case
time = event_data.get_column("time").to_numpy()
vrot, _, _ = minvar(event_numpy)
if True:
vl = vrot[:, 0]
vl = vl * np.sign(vl[-1] - vl[0])
vl_ts = xr.DataArray(vl, dims="time", coords={"time": time})
dvl_dt_df = vl_ts.differentiate("time", datetime_unit="s").to_dataframe(name="dBl_dt")
if normalize:
vrot = normalize_mva_data(vrot)
vrot_df = pd.DataFrame(vrot, columns=columns, index=time)
yield pd.concat([vrot_df, dvl_dt_df], axis=1)
def normalize_mva_data(
data: np.ndarray, shift=False # shift the data in l direction to the origin
):
"""
normalize the MVA data: Bl, Bm, and Bn were respectively normalized to B0 = 0.5ΔBl, Bm, and <B>
"""
vl, vm, vn = data.T
vl_norm_q = (vl[-1] - vl[0]) / 2
vm_norm_q = (vm[0] + vm[-1]) / 2
vn_norm_q = (vn[0] + vn[-1]) / 2
return np.array([vl / vl_norm_q, vm / vm_norm_q, vn / vn_norm_q]).T
def sea_ids(
ds: IDsDataset,
event_cols=["t.d_start", "t.d_time", "t.d_end"],
sea_cols=[
"B_l",
"j_m",
"j_m_norm",
"j_k",
"j_k_norm",
"B_m",
],
bins=[10, 10],
return_data=True,
):
# converting to a list of numpy arrays
events = keep_good_fit(ds.events)
sea_events = [col.to_numpy() for col in events[event_cols]]
mag_data = ds.data
mag_data_c = compress_data_by_interval(
mag_data.collect(), sea_events[0], sea_events[2]
).lazy()
# dBdt_data = get_dBdt_data(ds.data)
dBdt_data = get_dBdt_data(mag_data_c)
mva_data = pd.concat(get_mva_data(mag_data_c, sea_events[0], sea_events[2]))
b_data = dBdt_data.join(mva_data, on="time")
p_data = (
events[["time", "v_k", "j_Alfven"]].to_pandas().set_index("time").sort_index()
)
data = pd.merge_asof(
b_data,
p_data,
left_index=True,
right_index=True,
direction="nearest",
)
data = data.assign(
j_m=lambda df: df.dBl_dt / df.v_k * J_FACTOR,
j_m_norm =lambda df: df.j_m / df.j_Alfven,
j_k=lambda df: df.dBdt / df.v_k * J_FACTOR,
j_k_norm=lambda df: df.j_k / df.j_Alfven,
)
return sean(data, sea_events, bins, return_data=return_data, cols=sea_cols)
def plot_SEA(SEAarray: pd.DataFrame, meta) -> tuple[Figure, list[Axes]]:
cols = meta["sea_cols"]
fig, axes = plt.subplots(nrows=len(cols), sharex=True, squeeze=True)
if len(cols) == 1:
axes = [axes]
# loop over columns that were analyzed
for c, ax in zip(cols, axes):
# for each column identify the column titles which
# have 'c' in the title and those that don't have
# 'cnt' in the title
# e.g. for AE columns
# AE_mean, AE_median, AE_lowq, AE_upq, AE_cnt
# fine columns AE_mean, AE_median, AE_lowq, AE_upq
# mask = SEAarray.columns.str.startswith(c) & ~SEAarray.columns.str.endswith("cnt")
mask = [c + "_mean", c + "_median", c + "_lowq", c + "_upq"]
# plot the SEA data
SEAarray.loc[:, mask].plot(
ax=ax,
style=["r-", "b-", "b--", "b--"],
xlabel="Normalized Time",
ylabel=c.replace("_", " "),
legend=False,
)
return fig, axes
def plot_ids_sea(SEAarray: pd.DataFrame, meta) -> tuple[Figure, list[Axes]]:
import scienceplots
SEAarray.index = SEAarray.index.map(lambda x: x / bin/2)
with plt.style.context(['science', 'nature', 'notebook']):
fig, axes = plot_SEA(SEAarray, meta)
axes[0].set_ylabel(r"$B_x \ / \ B_0$")
axes[1].set_ylabel(r"$J_y$")
axes[2].set_ylabel(r"$J_y \ / \ J_A$")
axes[3].set_ylabel(r"$J_k$")
axes[4].set_ylabel(r"$J_k \ / \ J_A$")
axes[5].set_ylabel(r"$B_m \ / \ B_g$")
fig.tight_layout()
fig.subplots_adjust(hspace=0)
axes[0].legend(labels=["Mean", "Median", "LowQ", "UpQ"])
return fig, axes
# fig.savefig(f"../../../figures/sea/sea_juno_first_year_{freq}.png", dpi = 300)
mag_path = sorted(list(walk(dir_path)))[0]
tau = timedelta(seconds=60)
mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')
ids_ds = (
IDsDataset(
mag_data=mag_data,
plasma_data=plasma_data,
tau=tau,
ts=ts,
vec_cols=vec_cols,
density_col="plasma_density",
speed_col="plasma_speed",
temperature_col="plasma_temperature",
)
.find_events(return_best_fit=True)
.update_candidates_with_plasma_data()
)
# mag_path = sorted(list(walk(dir_path)))[-1]
tau = timedelta(seconds=300)
tau = timedelta(seconds=60)
mag_path = sorted(list(walk(dir_path)))[-1]
mag_data = pl.scan_parquet(mag_path).drop('X', 'Y', 'Z').sort('time')
ids_ds = (
IDsDataset(
mag_data=mag_data,
plasma_data=plasma_data,
tau=tau,
ts=ts,
vec_cols=vec_cols,
density_col="plasma_density",
speed_col="plasma_speed",
temperature_col="plasma_temperature",
)
.find_events(return_best_fit=True)
.update_candidates_with_plasma_data()
)
mag_paths = list(download_data(datatype="FULL") | select(preprocess))
freq = 'high'
# freq = 'low'
if freq == 'high': # use high dimensional data
ids_ds.data = pl.scan_ipc(mag_paths[-365:]).drop('X', 'Y', 'Z').sort('time')
bin = 16
else: # use low dimensional data
bin = 8
SEAarray, meta, p1data, p2data = sea_ids(ids_ds, bins=[bin, bin])
groupby = 'index'
columns = ['dBdt', 'j_k']
# Plot each group
fig, axes = plt.subplots(nrows=len(columns))
if len(columns) == 1:
axes = [axes]
p1groups = p1data.groupby(groupby)
p2groups = p2data.groupby(groupby)
for ax, column in zip(axes, columns):
for name, group in p1groups:
ax.plot(group['t_norm'] -1 , group[column], color='grey', alpha=0.3)
for name, group in p2groups:
ax.plot(group['t_norm'], group[column], color='grey', alpha=0.3)
# plt.yscale('log')
plt.show()
1 day of data resampled by 1 sec is about 12 MB.
So 1 year of data is about 4 GB, and 6 years of JUNO Cruise data is about 24 GB.
Downloading rate is about 250 KB/s, so it will take about 3 days to download all the data.
num_of_files = 6*365
jno_file_size = 12e3
thm_file_size = 40e3
files_size = jno_file_size + thm_file_size
downloading_rate = 250
processing_rate = 1/60
time_to_download = num_of_files * files_size / downloading_rate / 60 / 60
space_required = num_of_files * files_size / 1e6
time_to_process = num_of_files / processing_rate / 60 / 60
print(f"Time to download: {time_to_download:.2f} hours")
print(f"Disk space required: {space_required:.2f} GB")
print(f"Time to process: {time_to_process:.2f} hours")