JUNO State data pipeline

JUNO state data is composed of two parts: the plasma data from MHD model and 1h-average magnetic field data from FGM, which providing background interplanetary magnetic field (IMF) information.

::: {#cell-1 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
from datetime import timedelta
import polars as pl
import pandas as pd

from discontinuitypy.utils.basic import resample
from discontinuitypy.pipelines.default.data import create_pipeline_template

from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline

from typing import Dict, Callable

:::

::: {#cell-2 .cell 0=‘h’ 1=‘i’ 2=‘d’ 3=‘e’ 4=’ ’ 5=‘d’ 6=‘e’ 7=‘f’ 8=‘a’ 9=‘u’ 10=‘l’ 11=‘t’ 12=’_’ 13=‘e’ 14=‘x’ 15=‘p’ 16=’ ’ 17=‘p’ 18=‘i’ 19=‘p’ 20=‘e’ 21=‘l’ 22=‘i’ 23=‘n’ 24=‘e’ 25=‘s’ 26=‘/’ 27=‘j’ 28=‘u’ 29=‘n’ 30=‘o’ 31=‘/’ 32=‘s’ 33=‘t’ 34=‘a’ 35=‘t’ 36=‘e’}

Code
%load_ext autoreload
%autoreload 2

:::

Getting background magnetic field

::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def process_IMF_data(
    raw_data: Dict[str, Callable[..., pl.LazyFrame]],
    ts: str = 3600,  # time resolution
) -> pl.DataFrame | dict[str, pl.DataFrame]:
    """
    Resampling data to provide background magnetic field
    """

    every = timedelta(seconds=ts)
    period = every
    offset = every / 2

    data = pl.concat(
        resample(func(), every=every, period=period, offset=offset)
        for func in raw_data.values()
    )

    name_mapping = {
        "BX SE": "B_background_x",
        "BY SE": "B_background_y",
        "BZ SE": "B_background_z",
    }

    return data.unique("time").sort("time").rename(name_mapping)

:::

Loading data

For interpolated solar wind at JUNO’s location, see model output file.

::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def load_data(
    raw_data: pd.DataFrame,
    start: str,
    end: str,
) -> pl.LazyFrame:
    return pl.from_pandas(raw_data).lazy()

:::

Preprocessing data

Coordinate System:  HGI
Variables:
  Date_Time: date and time in ISO format [UT]
  hour: elapsed time since trajectory start [hr]
  r: radial coordinate in HGI [AU]
  phi: longitude coordinate in HGI [deg]
  Rho: density [amu/cm^3]
  Ux, Uy, Uz: bulk velocity components in HGI [km/s]
  Bx, By, Bz: magnetic field components in HGI [nT]
  Ti: ion temperature [K]

::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def preprocess_data(
    raw_data: pl.LazyFrame,
):
    """
    Preprocess the raw dataset (only minor transformations)

    - Parsing and typing data (like from string to datetime for time columns)
    - Changing storing format (like from `csv` to `parquet`)
    """
    df = (
        raw_data
        .with_columns(
            time=pl.col("Date_Time").str.to_datetime(),
        )
        .sort("time")
        .drop(["Date_Time", "hour"])
    )
    return df

:::

Processing data

Combining plasma data and background magnetic field

::: {#cell-10 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
bcols_hgi = ["bx", "by", "bz"]
bcols_rtn = ["b_r", "b_t", "b_n"]
vcols_hgi = ["ux", "uy", "uz"]
vcols_rtn = ["v_r", "v_t", "v_n"]


def hgi2rtn(df: pl.LazyFrame | pl.DataFrame):
    """Transform coordinates from HGI to RTN"""

    phi_rad = pl.col("phi_rad")
    ux = pl.col("ux")
    uy = pl.col("uy")
    uz = pl.col("uz")
    result = (
        df.with_columns(
            phi_rad=pl.col("phi").radians(),
        )
        .with_columns(
            b_r=pl.col("bx") * phi_rad.cos() + pl.col("by") * phi_rad.sin(),
            b_t=-pl.col("bx") * phi_rad.sin() + pl.col("by") * phi_rad.cos(),
            b_n=pl.col("bz"),
            v_r=ux * phi_rad.cos() + uy * phi_rad.sin(),
            v_t=-ux * phi_rad.sin() + uy * phi_rad.cos(),
            v_n=uz,
            plasma_speed=(ux**2 + uy**2 + uz**2).sqrt(),
        )
        .drop(["phi", "phi_rad"] + bcols_hgi + vcols_hgi)
    )
    return result

:::

::: {#cell-11 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def process_data(
    model_data: pl.LazyFrame,
    imf_data: pl.LazyFrame,
) -> pl.DataFrame:
    """
    Corresponding to primary data layer, where source data models are transformed into domain data models

    - Transforming data to RTN (Radial-Tangential-Normal) coordinate system
    - Applying naming conventions for columns
    """

    columns_name_mapping = {
        "r": "radial_distance",
        "v_r": "v_x",
        "v_t": "v_y",
        "v_n": "v_z",
        "b_r": "model_b_r",
        "b_n": "model_b_n",
        "b_t": "model_b_t",
        "Ti": "plasma_temperature",
        "rho": "plasma_density",
    }

    return model_data.pipe(hgi2rtn).rename(columns_name_mapping).join(imf_data, on="time")

:::

Pipeline

::: {#cell-13 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def create_IMF_pipeline():
    node_process_IMF_data = node(
        process_IMF_data,
        inputs="JNO.MAG.inter_data_1SEC",
        outputs="JNO.STATE.IMF_data",
    )

    return pipeline([node_process_IMF_data])

:::

::: {#cell-14 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
load_inputs = dict(
    raw_data="model_data",
    start="params:start_date", # necessary for the pipeline to work
    end="params:end_date",
)

process_inputs = dict(
    model_data="inter_data_hourly",
    imf_data="IMF_data",
)

def create_pipeline(sat_id="JNO", source="STATE"):
    return create_IMF_pipeline() + create_pipeline_template(
        sat_id=sat_id,
        source=source,
        load_data_fn=load_data,
        preprocess_data_fn=preprocess_data,
        process_data_fn=process_data,
        load_inputs=load_inputs,
        process_inputs=process_inputs,
    )

:::