Code
import polars as pl
import pandas
from discontinuitypy.pipelines.default.data import create_pipeline_template
magplasma Data
For STEREO’s mission, We use 1-hour averaged merged data from COHOWeb.
See STEREO ASCII merged data and one sample file [here](https://spdf.gsfc.nasa.gov/pub/data/stereo/ahead/l2/merged/stereoa2011.asc
Plasma in RTN (Radial-Tangential-Normal) coordinate system - Proton Flow Speed, km/sec - Proton Flow Elevation Angle/Latitude, deg. - Proton Flow Azimuth Angle/Longitude, deg. - Proton Density, n/cc - Proton Temperature, K)
Notes - Note1: There is a big gap 2014/12/16 - 2015/07/20 in plasma data - Note2: There is a big gap 2015/03/21 - 2015/07/09 and 2015/10/27 - 2015/11/15 in mag data - Note that for missing data, fill values consisting of a blank followed by 9’s which together constitute the format are used
def download_data(
start: str,
end: str,
datatype,
) -> list[str]:
start_time = pandas.Timestamp(start)
end_time = pandas.Timestamp(end)
url = "https://spdf.gsfc.nasa.gov/pub/data/stereo/ahead/l2/merged/stereoa{year}.asc"
files = list(
range(start_time.year, end_time.year + 1)
| select(lambda x: url.format(year=x))
| pmap(pooch.retrieve, known_hash=None)
)
return files
headers = """Year
DOY
Hour
Radial Distance, AU
HGI Lat. of the S/C
HGI Long. of the S/C
IMF BR, nT (RTN)
IMF BT, nT (RTN)
IMF BN, nT (RTN)
IMF B Scalar, nT
SW Plasma Speed, km/s
SW Lat. Angle RTN, deg.
SW Long. Angle RTN, deg.
SW Plasma Density, N/cm^3
SW Plasma Temperature, K
1.8-3.6 MeV H flux,LET
4.0-6.0 MeV H flux,LET
6.0-10.0 MeV H flux, LET
10.0-12.0 MeV H flux,LET
13.6-15.1 MeV H flux, HET
14.9-17.1 MeV H flux, HET
17.0-19.3 MeV H flux, HET
20.8-23.8 MeV H flux, HET
23.8-26.4 MeV H flux, HET
26.3-29.7 MeV H flux, HET
29.5-33.4 MeV H flux, HET
33.4-35.8 MeV H flux, HET
35.5-40.5 MeV H flux, HET
40.0-60.0 MeV H flux, HET
60.0-100.0 MeV H flux, HET
0.320-0.452 MeV H flux, SIT
0.452-0.64 MeV H flux, SIT
0.640-0.905 MeV H flux, SIT
0.905-1.28 MeV H flux, SIT
1.280-1.81 MeV H flux, SIT
1.810-2.56 MeV H flux, SIT
2.560-3.62 MeV H flux, SIT"""
def load_data(
start: str,
end: str,
datatype = 'hourly',
) -> pl.DataFrame:
"""
- Downloading data
- Reading data into a proper data structure, like dataframe.
- Parsing original data (dealing with delimiters, missing values, etc.)
"""
files = download_data(start, end, datatype=datatype)
labels = headers.split("\n")
missing_values = ["999.99", "9999.9", "9999999."]
df = pl.concat(
files
| pmap(
pandas.read_csv,
delim_whitespace=True,
names=labels,
na_values=missing_values,
)
| select(pl.from_pandas)
)
return df
def preprocess_data(
raw_data: pl.DataFrame,
) -> pl.DataFrame:
"""
Preprocess the raw dataset (only minor transformations)
- Parsing and typing data (like from string to datetime for time columns)
- Changing storing format (like from `csv` to `parquet`)
"""
return raw_data.with_columns(
time=pl.datetime(pl.col("Year"), month=1, day=1)
+ pl.duration(days=pl.col("DOY") - 1, hours=pl.col("Hour"))
)
STATE_POSITION_COLS = [
"Radial Distance, AU",
"HGI Lat. of the S/C",
"HGI Long. of the S/C",
]
STATE_IMF_COLS = [
"IMF BR, nT (RTN)",
"IMF BT, nT (RTN)",
"IMF BN, nT (RTN)",
"IMF B Scalar, nT",
]
STATE_PLASMA_COLS = [
"SW Plasma Speed, km/s",
"SW Lat. Angle RTN, deg.",
"SW Long. Angle RTN, deg.",
"SW Plasma Density, N/cm^3",
"SW Plasma Temperature, K",
]
DEFAULT_columns_name_mapping = {
"SW Plasma Speed, km/s": "plasma_speed",
"SW Lat. Angle RTN, deg.": "sw_elevation",
"SW Long. Angle RTN, deg.": "sw_azimuth",
"SW Plasma Density, N/cm^3": "plasma_density",
"SW Plasma Temperature, K": "plasma_temperature",
"Radial Distance, AU": "radial_distance",
}
def convert_state_to_rtn(df: pl.DataFrame) -> pl.DataFrame:
"""Convert state data to RTN coordinates"""
plasma_speed = pl.col("plasma_speed")
sw_elevation = pl.col("sw_elevation").radians()
sw_azimuth = pl.col("sw_azimuth").radians()
return df.with_columns(
sw_vel_r=plasma_speed * sw_elevation.cos() * sw_azimuth.cos(),
sw_vel_t=plasma_speed * sw_elevation.cos() * sw_azimuth.sin(),
sw_vel_n=plasma_speed * sw_elevation.sin(),
).drop(["sw_elevation", "sw_azimuth"])
def process_data(
raw_data: pl.DataFrame,
ts=None, # time resolution
columns: list[str] = STATE_POSITION_COLS + STATE_PLASMA_COLS + STATE_IMF_COLS,
) -> pl.DataFrame:
"""
Corresponding to primary data layer, where source data models are transformed into domain data models
- Applying naming conventions for columns
- Transforming data to RTN (Radial-Tangential-Normal) coordinate system
- Discarding unnecessary columns
"""
name_mapping = {
"sw_vel_r": "v_x",
"sw_vel_t": "v_y",
"sw_vel_n": "v_z",
"IMF BR, nT (RTN)": "B_background_x",
"IMF BT, nT (RTN)": "B_background_y",
"IMF BN, nT (RTN)": "B_background_z",
}
return (
raw_data.select("time", *columns)
.rename(DEFAULT_columns_name_mapping)
.pipe(convert_state_to_rtn)
.rename(name_mapping)
)