diff --git a/src/hapiutils.py b/src/hapiutils.py index 67880aa..0214e42 100644 --- a/src/hapiutils.py +++ b/src/hapiutils.py @@ -1,19 +1,22 @@ -from hapiclient.hapitime import hapitime2datetime, datetime2hapitime -import numpy.lib.recfunctions as nrecfun +import copy +from typing import Literal + import numpy as np +import numpy.lib.recfunctions as nrecfun import pandas as pd -import copy -from datetime import datetime, timedelta +from hapiclient.hapi import compute_dt +from hapiclient.hapitime import hapitime2datetime, datetime2hapitime def nparray_unpack_to_list(arr) -> list: + """Converts a np.ndarray to a list.""" if type(arr) == np.ndarray: return arr.tolist() else: return arr -def merge_dtypes(dataA, dataB, trim="Time"): +def merge_dtypes(dataA, dataB, trim: str = "Time"): """could not use stackoverflow comprehensives of forms a[0],str(a[1]) because it fails on 2D items like ('Field_Vector', ' pd.DataFrame: + """Convert hapi data array to a pandas DataFrame, preserving data types. - # if round_to_sec: - # dataA['Time'] = round_hapitime(dataA['Time']) + Args: + data (_type_): Hapi data array. + round_to_sec (bool, optional): Round time to nearest second. Defaults to False. - has_multiD = False - multiD = {} - namelist = list(dataA.dtype.names) - for name in dataA.dtype.names: - try: - if dataA[name].shape[1]: - has_multiD = True - multiD[name] = True - except: - multiD[name] = False - - if has_multiD: - dfA = pd.DataFrame({"Time": dataA["Time"]}) # ,dtype='string') - namelist.remove("Time") - for name in namelist: - if multiD[name]: - # dfA[name] = pd.Series(dtype='object') - dfA[name] = list(dataA[name]) # list or tuple work - # dfA[name] = dataA[name].astype(object) - # ",".join([str(val) for val in dataA[name]]) - else: - dfA[name] = dataA[name] - else: - # easy case, all 1-D data so no fussing needed - dfA = pd.DataFrame(dataA) + Returns: + pd.DataFrame: Hapi data in a pandas DataFrame. + """ + data_dict = {} + for name in data.dtype.names: + column_data = data[name] - # clean times - dfA["Time"] = pd.to_datetime( - dfA["Time"].str.decode("utf-8") - ) # hapitime2datetime(np.array(dfA['Time']),**ops) - if round_to_sec: - dfA["Time"] = dfA["Time"].dt.round("S") - dfA["Time"] = dfA["Time"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + # Check if the field's dtype includes a subarray shape (multi-dimensional) + if len(column_data.shape) > 1: + # Convert subarray fields into lists or tuples + data_dict[name] = [element for element in column_data] + else: + # Directly assign the data for 1d dtypes + data_dict[name] = column_data - return dfA + df = pd.DataFrame(data_dict) + df = clean_time(df, round_to_sec=round_to_sec) + return df def merge_hapi( - dataA, metaA, dataC, metaC, round_to_sec=False, fill_nan=False, join_all=True + dataA, + metaA, + dataB, + metaB, + how: Literal["left", "right", "outer", "inner"] = "outer", + round_to_sec: bool = False, + fill_nan: bool = False, ): - metaAC = copy.deepcopy(metaA) - for ele in metaC["parameters"]: - if ele["name"] != "Time" and (join_all or (ele not in metaA["parameters"])): - # adjust both dataframe name and hapi metadata - if ele in metaA["parameters"]: - name = ele["name"] - new_name = ( - f"{name}_{metaC['x_dataset']}" # does x_dataset always exist? - ) - dataC = nrecfun.rename_fields(dataC, {name: new_name}) - ele["name"] = new_name - metaAC["parameters"].append(ele) - + """Merge two hapi data arrays to single array via specified merge type. Returns + merged hapi data and meta objects. + + Args: + dataA: Left hapi array to merge + metaA: Meta corresponding with dataA + dataB: Right hapi array to merge + metaB: Meta corresponding with dataB + how (str, optional): Type of merge: 'left', 'right', 'outer', or 'inner'. See + documentation for pandas.merge_ordered for descriptions. Defaults to 'outer'. + round_to_sec (bool, optional): Rounds time to nearest second. Defaults to False. + fill_nan (bool, optional): Fill NaNs according to fill_value from meta. Defaults + to False. + """ + metaAB = copy.deepcopy(metaA) + new_names = {} + for param in metaB["parameters"]: + if param["name"] != "Time": + # If the field is already in the left array, change the field name + if param in metaA["parameters"]: + new_name = f"{param['name']}_{metaB['x_dataset']}" # Does x_dataset always exist? + new_names[param["name"]] = new_name + param["name"] = new_name + # Update meta + metaAB["parameters"].append(param) + + dataB = nrecfun.rename_fields(dataB, new_names) + + # Convert structured arrays to DataFrames and merge on "Time" fields dfA = hapi_to_df(dataA, round_to_sec=round_to_sec) - dfC = hapi_to_df(dataC, round_to_sec=round_to_sec) - dt = merge_dtypes(dataA, dataC, trim="Time") - dfAC = pd.merge_ordered(dfA, dfC) # Works! + dfB = hapi_to_df(dataB, round_to_sec=round_to_sec) + dfAB = pd.merge_ordered(dfA, dfB, on="Time", how=how) - # walk through dfAC and fill all numeric 'NaN' with 'fill' from meta if fill_nan: - for ele in metaAC["parameters"]: - name = ele["name"] + dfAB = df_fill_nans(dfAB, metaAB) + + dataAB = df_to_hapi(dfAB, metaAB) + + return dataAB, metaAB + + +def clean_time(df: pd.DataFrame, round_to_sec: bool = False) -> pd.DataFrame: + """Converts time to hapi specfied format and optionally rounds time to nearest second. + + Args: + df (pd.DataFrame): DataFrame containing "Time" column to clean. + round_to_sec (bool, optional): Rounds time to nearest second. Defaults to False. + + Returns: + pd.DataFrame: DataFrame with "Time" column cleaned. + """ + df["Time"] = pd.to_datetime(df["Time"].str.decode("utf-8")) + if round_to_sec: + df["Time"] = df["Time"].dt.round("s") + df["Time"] = df["Time"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + return df + + +def df_fill_nans(df, meta) -> pd.DataFrame: + """Returns new hapi DataFrame with all NaN values filled according to fill_value in meta.""" + for param in meta["parameters"]: + name = param["name"] + if param["fill"] is None: + print(f"No fill: {param['name']} --> {param['fill']}") + else: try: - if ele["fill"] != None: - fill = float(ele["fill"]) - print("Filling with: ", fill, ele) - dfAC[name] = dfAC[name].fillna(fill) + fill = float(param["fill"]) + df[name] = df[name].fillna(fill) + print(f"Fill successful: {param['name']} --> {param['fill']}") except: - print("NO fill for ", ele["fill"], ele) + print(f"Fill failed: {param['name']} -> {param['fill']}") pass + return df + + +def dtypes_from_data(data) -> list: + """Returns parameter data types from hapi data array.""" + dt = [(param, data.dtype.fields[param][0]) for param in data.dtype.names] + return dt - newAC = dfAC.to_records(index=False, column_dtypes={"Time": "S30"}) - newAC = np.array( - [tuple([nparray_unpack_to_list(e) for e in elm]) for elm in newAC], dtype=dt + +def dtypes_from_meta(meta) -> list: + """Returns parameter data types from meta.""" + dt, _, _, _, _, _ = compute_dt(meta, {"format": ""}) + return dt + + +def df_to_hapi(df, meta): + """Converts a hapi DataFrame to a hapi array.""" + dt = dtypes_from_meta(meta) + data = df.to_records(index=False, column_dtypes={"Time": "S30"}) + data = np.array( + [tuple([nparray_unpack_to_list(e) for e in elm]) for elm in data], dtype=dt ) - newAC = np.array([tuple(i) for i in newAC], dtype=dt) + data = np.array([tuple(i) for i in data], dtype=dt) + return data + + +def resample_hapi( + data, + meta, + interval: str, + round_to_sec: bool = False, + tolerance: float | None = None, + start_time: str | None = None, + end_time: str | None = None, + limit: int | None = None, +): + """ + Resample hapi data at specified intervals. If data for a time does not exist, + uses data value from the nearest time. + """ + df = hapi_to_df(data, round_to_sec=round_to_sec) + + # Format dataframe for pandas resampling + # df["Time"] = pd.to_datetime(df["Time"]) + df["Time"] = hapitime2datetime(df["Time"].values) + df = df.set_index("Time") + + if start_time is not None: + start_time = pd.to_datetime(start_time) + df = df[df.index >= start_time] + + if end_time is not None: + end_time = pd.to_datetime(end_time) + df = df[df.index <= end_time] - return newAC, metaAC + if tolerance: + target_times = pd.date_range( + start=df.index.min(), end=df.index.max(), freq=interval + ) + tolerance_timedelta = pd.to_timedelta(tolerance) + + # Iterate over each target time and find the closest time within the tolerance + sampled_rows = [] + for target_time in target_times: + time_diffs = abs((df.index - target_time).total_seconds()) + closest_idx = time_diffs.idxmin() + if time_diffs[closest_idx] <= tolerance_timedelta: + sampled_rows.append(df.loc[closest_idx]) + + resampled_df = pd.DataFrame(sampled_rows).reset_index() + + else: + # Resample the DataFrame using the nearest value + resampled_df = df.resample(interval, origin="start").nearest(limit=limit) + resampled_df = resampled_df.reset_index() + + resampled_meta = copy.deepcopy(meta) # Does meta need to be updated? + resampled_df["Time"] = resampled_df["Time"].dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + # resampled_df["Time"] = datetime2hapitime(resampled_df["Time"].values) + resampled_data = df_to_hapi(resampled_df, resampled_meta) + return resampled_data, resampled_meta # , resampled_df diff --git a/tests/test2.py b/tests/test2.py new file mode 100644 index 0000000..4b6ca5b --- /dev/null +++ b/tests/test2.py @@ -0,0 +1 @@ +# stub to test git