THEMIS State data pipeline

We use low resolution OMNI data for plasma state data, as we did in the OMNI notebook

::: {#cell-1 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
import polars as pl
import pandas

from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline

:::

Solar wind state

Also we have additional data file that indicate if THEMIS is in solar wind or not.

::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def load_sw_data(raw_data: pandas.DataFrame):
    return pl.from_dataframe(raw_data)

:::

::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def preprocess_sw_data(
    raw_data: pl.LazyFrame,
) -> pl.LazyFrame:
    """
    - Applying naming conventions for columns
    - Parsing and typing data (like from string to datetime for time columns)
    """

    return raw_data.with_columns(
        # Note: For `polars`, please either specify both hour and minute, or neither.
        pl.concat_str(pl.col("start"), pl.lit(" 00")).str.to_datetime(
            format="%Y %j %H %M"
        ),
        pl.concat_str(pl.col("end"), pl.lit(" 00")).str.to_datetime(
            format="%Y %j %H %M"
        ),
    )

:::

Pipelines

::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def create_sw_pipeline(sat_id="THB", source="STATE"):
    namespace = f"{sat_id}.{source}"
    node_load_sw_data = node(
        load_sw_data,
        inputs="original_sw_data",
        outputs="raw_data_sw",
        name="load_solar_wind_data",
    )
    node_preprocess_sw_state = node(
        preprocess_sw_data,
        inputs="raw_data_sw",
        outputs="inter_data_sw",
        name="preprocess_solar_wind_data",
    )
    return pipeline(
        [
            node_load_sw_data,
            node_preprocess_sw_state,
        ],
        namespace=namespace,
    )

:::