JUNO State data pipeline
JUNO state data is composed of two parts: the plasma data from MHD model and 1h-average magnetic field data from FGM, which providing background interplanetary magnetic field (IMF) information.
::: {#cell-1 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
from datetime import timedelta
import polars as pl
import pandas as pd
from discontinuitypy.utils.basic import resample
from discontinuitypy.pipelines.default.data import create_pipeline_template
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from typing import Dict, Callable
:::
::: {#cell-2 .cell 0=‘h’ 1=‘i’ 2=‘d’ 3=‘e’ 4=’ ’ 5=‘d’ 6=‘e’ 7=‘f’ 8=‘a’ 9=‘u’ 10=‘l’ 11=‘t’ 12=’_’ 13=‘e’ 14=‘x’ 15=‘p’ 16=’ ’ 17=‘p’ 18=‘i’ 19=‘p’ 20=‘e’ 21=‘l’ 22=‘i’ 23=‘n’ 24=‘e’ 25=‘s’ 26=‘/’ 27=‘j’ 28=‘u’ 29=‘n’ 30=‘o’ 31=‘/’ 32=‘s’ 33=‘t’ 34=‘a’ 35=‘t’ 36=‘e’}
Code
%load_ext autoreload
%autoreload 2
:::
Getting background magnetic field
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def process_IMF_data(
str, Callable[..., pl.LazyFrame]],
raw_data: Dict[str = 3600, # time resolution
ts: -> pl.DataFrame | dict[str, pl.DataFrame]:
) """
Resampling data to provide background magnetic field
"""
= timedelta(seconds=ts)
every = every
period = every / 2
offset
= pl.concat(
data =every, period=period, offset=offset)
resample(func(), everyfor func in raw_data.values()
)
= {
name_mapping "BX SE": "B_background_x",
"BY SE": "B_background_y",
"BZ SE": "B_background_z",
}
return data.unique("time").sort("time").rename(name_mapping)
:::
Loading data
For interpolated solar wind at JUNO’s location, see model output file.
::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def load_data(
raw_data: pd.DataFrame,str,
start: str,
end: -> pl.LazyFrame:
) return pl.from_pandas(raw_data).lazy()
:::
Preprocessing data
Coordinate System: HGI
Variables:
Date_Time: date and time in ISO format [UT]
hour: elapsed time since trajectory start [hr]
r: radial coordinate in HGI [AU]
phi: longitude coordinate in HGI [deg]
Rho: density [amu/cm^3]
Ux, Uy, Uz: bulk velocity components in HGI [km/s]
Bx, By, Bz: magnetic field components in HGI [nT]
Ti: ion temperature [K]
::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def preprocess_data(
raw_data: pl.LazyFrame,
):"""
Preprocess the raw dataset (only minor transformations)
- Parsing and typing data (like from string to datetime for time columns)
- Changing storing format (like from `csv` to `parquet`)
"""
= (
df
raw_data
.with_columns(=pl.col("Date_Time").str.to_datetime(),
time
)"time")
.sort("Date_Time", "hour"])
.drop([
)return df
:::
Processing data
Combining plasma data and background magnetic field
::: {#cell-10 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
= ["bx", "by", "bz"]
bcols_hgi = ["b_r", "b_t", "b_n"]
bcols_rtn = ["ux", "uy", "uz"]
vcols_hgi = ["v_r", "v_t", "v_n"]
vcols_rtn
def hgi2rtn(df: pl.LazyFrame | pl.DataFrame):
"""Transform coordinates from HGI to RTN"""
= pl.col("phi_rad")
phi_rad = pl.col("ux")
ux = pl.col("uy")
uy = pl.col("uz")
uz = (
result
df.with_columns(=pl.col("phi").radians(),
phi_rad
)
.with_columns(=pl.col("bx") * phi_rad.cos() + pl.col("by") * phi_rad.sin(),
b_r=-pl.col("bx") * phi_rad.sin() + pl.col("by") * phi_rad.cos(),
b_t=pl.col("bz"),
b_n=ux * phi_rad.cos() + uy * phi_rad.sin(),
v_r=-ux * phi_rad.sin() + uy * phi_rad.cos(),
v_t=uz,
v_n=(ux**2 + uy**2 + uz**2).sqrt(),
plasma_speed
)"phi", "phi_rad"] + bcols_hgi + vcols_hgi)
.drop([
)return result
:::
::: {#cell-11 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def process_data(
model_data: pl.LazyFrame,
imf_data: pl.LazyFrame,-> pl.DataFrame:
) """
Corresponding to primary data layer, where source data models are transformed into domain data models
- Transforming data to RTN (Radial-Tangential-Normal) coordinate system
- Applying naming conventions for columns
"""
= {
columns_name_mapping "r": "radial_distance",
"v_r": "v_x",
"v_t": "v_y",
"v_n": "v_z",
"b_r": "model_b_r",
"b_n": "model_b_n",
"b_t": "model_b_t",
"Ti": "plasma_temperature",
"rho": "plasma_density",
}
return model_data.pipe(hgi2rtn).rename(columns_name_mapping).join(imf_data, on="time")
:::
Pipeline
::: {#cell-13 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def create_IMF_pipeline():
= node(
node_process_IMF_data
process_IMF_data,="JNO.MAG.inter_data_1SEC",
inputs="JNO.STATE.IMF_data",
outputs
)
return pipeline([node_process_IMF_data])
:::
::: {#cell-14 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
= dict(
load_inputs ="model_data",
raw_data="params:start_date", # necessary for the pipeline to work
start="params:end_date",
end
)
= dict(
process_inputs ="inter_data_hourly",
model_data="IMF_data",
imf_data
)
def create_pipeline(sat_id="JNO", source="STATE"):
return create_IMF_pipeline() + create_pipeline_template(
=sat_id,
sat_id=source,
source=load_data,
load_data_fn=preprocess_data,
preprocess_data_fn=process_data,
process_data_fn=load_inputs,
load_inputs=process_inputs,
process_inputs )
:::