THEMIS State data pipeline
We use low resolution OMNI data for plasma state data, as we did in the OMNI notebook
::: {#cell-1 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
import polars as pl
import pandas
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline:::
Solar wind state
Also we have additional data file that indicate if THEMIS is in solar wind or not.
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def load_sw_data(raw_data: pandas.DataFrame):
return pl.from_dataframe(raw_data):::
::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def preprocess_sw_data(
raw_data: pl.LazyFrame,
) -> pl.LazyFrame:
"""
- Applying naming conventions for columns
- Parsing and typing data (like from string to datetime for time columns)
"""
return raw_data.with_columns(
# Note: For `polars`, please either specify both hour and minute, or neither.
pl.concat_str(pl.col("start"), pl.lit(" 00")).str.to_datetime(
format="%Y %j %H %M"
),
pl.concat_str(pl.col("end"), pl.lit(" 00")).str.to_datetime(
format="%Y %j %H %M"
),
):::
Pipelines
::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
Code
def create_sw_pipeline(sat_id="THB", source="STATE"):
namespace = f"{sat_id}.{source}"
node_load_sw_data = node(
load_sw_data,
inputs="original_sw_data",
outputs="raw_data_sw",
name="load_solar_wind_data",
)
node_preprocess_sw_state = node(
preprocess_sw_data,
inputs="raw_data_sw",
outputs="inter_data_sw",
name="preprocess_solar_wind_data",
)
return pipeline(
[
node_load_sw_data,
node_preprocess_sw_state,
],
namespace=namespace,
):::