JUNO State data pipeline
JUNO state data is composed of two parts: the plasma data from MHD model and 1h-average magnetic field data from FGM, which providing background interplanetary magnetic field (IMF) information.
::: {#cell-1 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
from datetime import timedelta
import polars as pl
import pandas as pd
from discontinuitypy.utils.basic import resample
from discontinuitypy.pipelines.default.data import create_pipeline_template
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from typing import Dict, Callable:::
::: {#cell-2 .cell 0=‘h’ 1=‘i’ 2=‘d’ 3=‘e’ 4=’ ’ 5=‘d’ 6=‘e’ 7=‘f’ 8=‘a’ 9=‘u’ 10=‘l’ 11=‘t’ 12=’_’ 13=‘e’ 14=‘x’ 15=‘p’ 16=’ ’ 17=‘p’ 18=‘i’ 19=‘p’ 20=‘e’ 21=‘l’ 22=‘i’ 23=‘n’ 24=‘e’ 25=‘s’ 26=‘/’ 27=‘j’ 28=‘u’ 29=‘n’ 30=‘o’ 31=‘/’ 32=‘s’ 33=‘t’ 34=‘a’ 35=‘t’ 36=‘e’}
Code
%load_ext autoreload
%autoreload 2:::
Getting background magnetic field
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def process_IMF_data(
raw_data: Dict[str, Callable[..., pl.LazyFrame]],
ts: str = 3600, # time resolution
) -> pl.DataFrame | dict[str, pl.DataFrame]:
"""
Resampling data to provide background magnetic field
"""
every = timedelta(seconds=ts)
period = every
offset = every / 2
data = pl.concat(
resample(func(), every=every, period=period, offset=offset)
for func in raw_data.values()
)
name_mapping = {
"BX SE": "B_background_x",
"BY SE": "B_background_y",
"BZ SE": "B_background_z",
}
return data.unique("time").sort("time").rename(name_mapping):::
Loading data
For interpolated solar wind at JUNO’s location, see model output file.
::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def load_data(
raw_data: pd.DataFrame,
start: str,
end: str,
) -> pl.LazyFrame:
return pl.from_pandas(raw_data).lazy():::
Preprocessing data
Coordinate System: HGI
Variables:
Date_Time: date and time in ISO format [UT]
hour: elapsed time since trajectory start [hr]
r: radial coordinate in HGI [AU]
phi: longitude coordinate in HGI [deg]
Rho: density [amu/cm^3]
Ux, Uy, Uz: bulk velocity components in HGI [km/s]
Bx, By, Bz: magnetic field components in HGI [nT]
Ti: ion temperature [K]
::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def preprocess_data(
raw_data: pl.LazyFrame,
):
"""
Preprocess the raw dataset (only minor transformations)
- Parsing and typing data (like from string to datetime for time columns)
- Changing storing format (like from `csv` to `parquet`)
"""
df = (
raw_data
.with_columns(
time=pl.col("Date_Time").str.to_datetime(),
)
.sort("time")
.drop(["Date_Time", "hour"])
)
return df:::
Processing data
Combining plasma data and background magnetic field
::: {#cell-10 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
bcols_hgi = ["bx", "by", "bz"]
bcols_rtn = ["b_r", "b_t", "b_n"]
vcols_hgi = ["ux", "uy", "uz"]
vcols_rtn = ["v_r", "v_t", "v_n"]
def hgi2rtn(df: pl.LazyFrame | pl.DataFrame):
"""Transform coordinates from HGI to RTN"""
phi_rad = pl.col("phi_rad")
ux = pl.col("ux")
uy = pl.col("uy")
uz = pl.col("uz")
result = (
df.with_columns(
phi_rad=pl.col("phi").radians(),
)
.with_columns(
b_r=pl.col("bx") * phi_rad.cos() + pl.col("by") * phi_rad.sin(),
b_t=-pl.col("bx") * phi_rad.sin() + pl.col("by") * phi_rad.cos(),
b_n=pl.col("bz"),
v_r=ux * phi_rad.cos() + uy * phi_rad.sin(),
v_t=-ux * phi_rad.sin() + uy * phi_rad.cos(),
v_n=uz,
plasma_speed=(ux**2 + uy**2 + uz**2).sqrt(),
)
.drop(["phi", "phi_rad"] + bcols_hgi + vcols_hgi)
)
return result:::
::: {#cell-11 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def process_data(
model_data: pl.LazyFrame,
imf_data: pl.LazyFrame,
) -> pl.DataFrame:
"""
Corresponding to primary data layer, where source data models are transformed into domain data models
- Transforming data to RTN (Radial-Tangential-Normal) coordinate system
- Applying naming conventions for columns
"""
columns_name_mapping = {
"r": "radial_distance",
"v_r": "v_x",
"v_t": "v_y",
"v_n": "v_z",
"b_r": "model_b_r",
"b_n": "model_b_n",
"b_t": "model_b_t",
"Ti": "plasma_temperature",
"rho": "plasma_density",
}
return model_data.pipe(hgi2rtn).rename(columns_name_mapping).join(imf_data, on="time"):::
Pipeline
::: {#cell-13 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def create_IMF_pipeline():
node_process_IMF_data = node(
process_IMF_data,
inputs="JNO.MAG.inter_data_1SEC",
outputs="JNO.STATE.IMF_data",
)
return pipeline([node_process_IMF_data]):::
::: {#cell-14 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
load_inputs = dict(
raw_data="model_data",
start="params:start_date", # necessary for the pipeline to work
end="params:end_date",
)
process_inputs = dict(
model_data="inter_data_hourly",
imf_data="IMF_data",
)
def create_pipeline(sat_id="JNO", source="STATE"):
return create_IMF_pipeline() + create_pipeline_template(
sat_id=sat_id,
source=source,
load_data_fn=load_data,
preprocess_data_fn=preprocess_data,
process_data_fn=process_data,
load_inputs=load_inputs,
process_inputs=process_inputs,
):::