THEMIS Magnetic field data pipeline

Loading data

Code
from space_analysis.utils.speasy import Variables
from datetime import timedelta
Code
timerange = ["2011-08-25", "2016-06-30"]
# timerange = ["2011-08-25", "2011-09-01"]
mission = "THB"
ts = timedelta(seconds=1)
tau = timedelta(seconds=60)

provider = 'archive/local'
mag_dataset = "THB_L2_FGM"
mag_parameters = ["thb_fgl_gse"]

mag_vars = Variables(
    provider = provider,
    dataset=mag_dataset,
    parameters=mag_parameters,
    timerange=timerange,
).retrieve_data()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[10], line 14
      7 mag_dataset = "THB_L2_FGM"
      8 mag_parameters = ["thb_fgl_gse"]
     10 mag_vars = Variables(
     11     dataset=mag_dataset,
     12     parameters=mag_parameters,
     13     timerange=timerange,
---> 14 ).retrieve_data()
     16 plasma_dataset = 'THB_L2_MOM'
     17 plasma_parameters = ["thb_peim_densityQ", "thb_peim_velocity_gseQ"]

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/space_analysis/utils/speasy.py:84, in Variables.retrieve_data(self)
     80     self.data = spz.get_data(
     81         self.products, self.timerange
     82     )
     83 else:
---> 84     self.data = spz.get_data(
     85         self.products, self.timerange, disable_proxy=self._disable_proxy
     86     )
     87 return self

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/request_dispatch.py:316, in get_data(*args, **kwargs)
    314 product = args[0]
    315 if is_collection(product) and not isinstance(product, SpeasyIndex):
--> 316     return list(map(lambda p: get_data(p, *args[1:], **kwargs), progress_bar(leave=True, **kwargs)(product)))
    318 if len(args) == 1:
    319     return _get_catalog_or_timetable(*args, **kwargs)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/request_dispatch.py:316, in get_data.<locals>.<lambda>(p)
    314 product = args[0]
    315 if is_collection(product) and not isinstance(product, SpeasyIndex):
--> 316     return list(map(lambda p: get_data(p, *args[1:], **kwargs), progress_bar(leave=True, **kwargs)(product)))
    318 if len(args) == 1:
    319     return _get_catalog_or_timetable(*args, **kwargs)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/request_dispatch.py:323, in get_data(*args, **kwargs)
    321 t_range = args[1]
    322 if _is_dtrange(t_range):
--> 323     return _get_timeserie1(*args, **kwargs)
    324 if is_collection(t_range):
    325     return list(
    326         map(lambda r: get_data(product, r, *args[2:], **kwargs),
    327             progress_bar(leave=False, **kwargs)(t_range)))

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/request_dispatch.py:166, in _get_timeserie1(index, dtrange, **kwargs)
    165 def _get_timeserie1(index, dtrange, **kwargs):
--> 166     return _scalar_get_data(index, dtrange[0], dtrange[1], **kwargs)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/request_dispatch.py:157, in _scalar_get_data(index, *args, **kwargs)
    155 provider_uid, product_uid = provider_and_product(index)
    156 if provider_uid in PROVIDERS:
--> 157     return PROVIDERS[provider_uid].get_data(product_uid, *args, **kwargs)
    158 raise ValueError(f"Can't find a provider for {index}")

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/__init__.py:223, in AllowedKwargs.__call__.<locals>.wrapped(*args, **kwargs)
    220 unexpected_args = list(
    221     filter(lambda arg_name: arg_name not in self.allowed_list, kwargs.keys()))
    222 if not unexpected_args:
--> 223     return func(*args, **kwargs)
    224 raise TypeError(
    225     f"Unexpected keyword argument {unexpected_args}, allowed keyword arguments are {self.allowed_list}")

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/dataprovider.py:30, in ParameterRangeCheck.__call__.<locals>.wrapped(wrapped_self, product, start_time, stop_time, **kwargs)
     28     log.warning(f"You are requesting {product} outside of its definition range {p_range}")
     29     return None
---> 30 return get_data(wrapped_self, product=product, start_time=start_time, stop_time=stop_time, **kwargs)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/cache/_providers_caches.py:247, in UnversionedProviderCache.__call__.<locals>.wrapped(wrapped_self, product, start_time, stop_time, **kwargs)
    243 fragment_duration = timedelta(hours=fragment_hours)
    244 data_chunks, maybe_outdated_fragments, missing_fragments = self.split_fragments(fragments, product,
    245                                                                                 fragment_duration, **kwargs)
    246 data_chunks += \
--> 247     list(filter(lambda d: d is not None, [
    248         self._cache.add_to_cache(
    249             get_data(
    250                 wrapped_self, product=product, start_time=fragment_group[0],
    251                 stop_time=fragment_group[-1] + fragment_duration, **kwargs),
    252             fragments=fragment_group, product=product, fragment_duration_hours=fragment_hours,
    253             version=datetime.utcnow(), **kwargs)
    254         for fragment_group
    255         in progress_bar(leave=False, desc="Downloading missing fragments from cache", **kwargs)(
    256             missing_fragments)]))
    258 for group in progress_bar(leave=False, desc="Checking if cache fragments are outdated", **kwargs)(
    259     maybe_outdated_fragments):
    260     oldest = max(group, key=lambda item: item[1].version)[1].version

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/cache/_providers_caches.py:249, in <listcomp>(.0)
    243 fragment_duration = timedelta(hours=fragment_hours)
    244 data_chunks, maybe_outdated_fragments, missing_fragments = self.split_fragments(fragments, product,
    245                                                                                 fragment_duration, **kwargs)
    246 data_chunks += \
    247     list(filter(lambda d: d is not None, [
    248         self._cache.add_to_cache(
--> 249             get_data(
    250                 wrapped_self, product=product, start_time=fragment_group[0],
    251                 stop_time=fragment_group[-1] + fragment_duration, **kwargs),
    252             fragments=fragment_group, product=product, fragment_duration_hours=fragment_hours,
    253             version=datetime.utcnow(), **kwargs)
    254         for fragment_group
    255         in progress_bar(leave=False, desc="Downloading missing fragments from cache", **kwargs)(
    256             missing_fragments)]))
    258 for group in progress_bar(leave=False, desc="Checking if cache fragments are outdated", **kwargs)(
    259     maybe_outdated_fragments):
    260     oldest = max(group, key=lambda item: item[1].version)[1].version

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/split_large_requests.py:25, in SplitLargeRequests.__call__.<locals>.wrapped(wrapped_self, product, start_time, stop_time, **kwargs)
     22 else:
     23     fragments = range.split(max_range_per_request)
     24     return var_merge(
---> 25         [get_data(wrapped_self, product=product, start_time=r.start_time, stop_time=r.stop_time, **kwargs)
     26          for r in fragments])

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/requests_scheduling/split_large_requests.py:25, in <listcomp>(.0)
     22 else:
     23     fragments = range.split(max_range_per_request)
     24     return var_merge(
---> 25         [get_data(wrapped_self, product=product, start_time=r.start_time, stop_time=r.stop_time, **kwargs)
     26          for r in fragments])

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/proxy/__init__.py:129, in Proxyfiable.__call__.<locals>.wrapped(*args, **kwargs)
    127     except:  # lgtm [py/catch-base-exception]
    128         log.error(f"Can't get data from proxy server {proxy_cfg.url()}")
--> 129 return func(*args, **kwargs)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/webservices/cda/__init__.py:173, in CDA_Webservice.get_data(self, product, start_time, stop_time, if_newer_than, extra_http_headers)
    164 @AllowedKwargs(
    165     PROXY_ALLOWED_KWARGS + CACHE_ALLOWED_KWARGS + GET_DATA_ALLOWED_KWARGS + ['if_newer_than'])
    166 @ParameterRangeCheck()
   (...)
    170 def get_data(self, product, start_time: datetime, stop_time: datetime, if_newer_than: datetime or None = None,
    171              extra_http_headers: Dict or None = None):
    172     dataset, variable = self._to_dataset_and_variable(product)
--> 173     return self._dl_variable(start_time=start_time, stop_time=stop_time, dataset=dataset,
    174                              variable=variable, if_newer_than=if_newer_than, extra_http_headers=extra_http_headers)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/webservices/cda/__init__.py:155, in CDA_Webservice._dl_variable(self, dataset, variable, start_time, stop_time, if_newer_than, extra_http_headers)
    153 log.debug(resp.url)
    154 if resp.status_code == 200 and 'FileDescription' in resp.json():
--> 155     return cdf.load_variable(file=resp.json()['FileDescription'][0]['Name'], variable=variable)
    156 elif not resp.ok:
    157     if resp.status_code == 404 and "No data available" in resp.json().get('Message', [""])[0]:

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/cdf/__init__.py:77, in load_variable(variable, file, cache_remote_files)
     74     if is_local_file(file):
     75         return _load_variable(variable=variable, file=urlparse(url=file).path)
     76     return _load_variable(variable=variable,
---> 77                           buffer=any_loc_open(file, mode='rb', cache_remote_files=cache_remote_files).read())
     78 if type(file) is bytes:
     79     return _load_variable(variable=variable, buffer=bytes)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/any_files.py:102, in any_loc_open(url, timeout, headers, mode, cache_remote_files)
    100 else:
    101     if cache_remote_files:
--> 102         last_modified = http.head(url).getheader('last-modified', str(datetime.now()))
    103         cache_item: Optional[CacheItem] = get_item(url)
    104         if cache_item is None or last_modified != cache_item.version:

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/speasy/core/http.py:93, in _HttpVerb.__call__(self, url, headers, params, timeout)
     91 headers = headers or {}
     92 headers['User-Agent'] = USER_AGENT
---> 93 return Response(self._verb(url=url, headers=headers, fields=params, timeout=timeout))

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/_request_methods.py:110, in RequestMethods.request(self, method, url, body, fields, headers, json, **urlopen_kw)
    107     urlopen_kw["body"] = body
    109 if method in self._encode_url_methods:
--> 110     return self.request_encode_url(
    111         method,
    112         url,
    113         fields=fields,  # type: ignore[arg-type]
    114         headers=headers,
    115         **urlopen_kw,
    116     )
    117 else:
    118     return self.request_encode_body(
    119         method, url, fields=fields, headers=headers, **urlopen_kw
    120     )

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/_request_methods.py:143, in RequestMethods.request_encode_url(self, method, url, fields, headers, **urlopen_kw)
    140 if fields:
    141     url += "?" + urlencode(fields)
--> 143 return self.urlopen(method, url, **extra_kw)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/poolmanager.py:444, in PoolManager.urlopen(self, method, url, redirect, **kw)
    442     response = conn.urlopen(method, url, **kw)
    443 else:
--> 444     response = conn.urlopen(method, u.request_uri, **kw)
    446 redirect_location = redirect and response.get_redirect_location()
    447 if not redirect_location:

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/connectionpool.py:790, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    787 response_conn = conn if not release_conn else None
    789 # Make the request on the HTTPConnection object
--> 790 response = self._make_request(
    791     conn,
    792     method,
    793     url,
    794     timeout=timeout_obj,
    795     body=body,
    796     headers=headers,
    797     chunked=chunked,
    798     retries=retries,
    799     response_conn=response_conn,
    800     preload_content=preload_content,
    801     decode_content=decode_content,
    802     **response_kw,
    803 )
    805 # Everything went great!
    806 clean_exit = True

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/connectionpool.py:536, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
    534 # Receive the response from the server
    535 try:
--> 536     response = conn.getresponse()
    537 except (BaseSSLError, OSError) as e:
    538     self._raise_timeout(err=e, url=url, timeout_value=read_timeout)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/site-packages/urllib3/connection.py:461, in HTTPConnection.getresponse(self)
    458 from .response import HTTPResponse
    460 # Get the response from http.client.HTTPConnection
--> 461 httplib_response = super().getresponse()
    463 try:
    464     assert_header_parsing(httplib_response.msg)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/http/client.py:1386, in HTTPConnection.getresponse(self)
   1384 try:
   1385     try:
-> 1386         response.begin()
   1387     except ConnectionError:
   1388         self.close()

File ~/micromamba/envs/psp_conjunction/lib/python3.11/http/client.py:325, in HTTPResponse.begin(self)
    323 # read until we get a non-100 response
    324 while True:
--> 325     version, status, reason = self._read_status()
    326     if status != CONTINUE:
    327         break

File ~/micromamba/envs/psp_conjunction/lib/python3.11/http/client.py:286, in HTTPResponse._read_status(self)
    285 def _read_status(self):
--> 286     line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
    287     if len(line) > _MAXLINE:
    288         raise LineTooLong("status line")

File ~/micromamba/envs/psp_conjunction/lib/python3.11/socket.py:706, in SocketIO.readinto(self, b)
    704 while True:
    705     try:
--> 706         return self._sock.recv_into(b)
    707     except timeout:
    708         self._timeout_occurred = True

File ~/micromamba/envs/psp_conjunction/lib/python3.11/ssl.py:1315, in SSLSocket.recv_into(self, buffer, nbytes, flags)
   1311     if flags != 0:
   1312         raise ValueError(
   1313           "non-zero flags not allowed in calls to recv_into() on %s" %
   1314           self.__class__)
-> 1315     return self.read(nbytes, buffer)
   1316 else:
   1317     return super().recv_into(buffer, nbytes, flags)

File ~/micromamba/envs/psp_conjunction/lib/python3.11/ssl.py:1167, in SSLSocket.read(self, len, buffer)
   1165 try:
   1166     if buffer is not None:
-> 1167         return self._sslobj.read(len, buffer)
   1168     else:
   1169         return self._sslobj.read(len)

KeyboardInterrupt: 
Code
mag_vars.to_polars()
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

DF ["Bx FGL-D", "By FGL-D", "Bz FGL-D", "time"]; PROJECT */4 COLUMNS; SELECTION: "None"

::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

Code
def preprocess_data(
    raw_data: pl.LazyFrame,
    datatype: str = None,
) -> pl.LazyFrame:
    """
    Preprocess the raw dataset (only minor transformations)

    - Applying naming conventions for columns
    - Dropping duplicate time
    - Changing storing format to `parquet`

    """

    datatype = datatype.upper()
    name_mapping = {
        f"Bx {datatype}-D": "B_x",
        f"By {datatype}-D": "B_y",
        f"Bz {datatype}-D": "B_z",
    }

    return raw_data.sort("time").unique("time").rename(name_mapping)

:::