Source code for offsets_db_data.common

import json
import typing
from collections import defaultdict

import numpy as np
import pandas as pd
import pandas_flavor as pf
import pandera as pa
import upath

CREDIT_SCHEMA_UPATH = (
    upath.UPath(__file__).parents[0] / 'configs' / 'credits-raw-columns-mapping.json'
)

PROTOCOL_MAPPING_UPATH = upath.UPath(__file__).parents[0] / 'configs' / 'all-protocol-mapping.json'
PROJECT_SCHEMA_UPATH = (
    upath.UPath(__file__).parents[0] / 'configs' / 'projects-raw-columns-mapping.json'
)


[docs] def load_registry_project_column_mapping( *, registry_name: str, file_path: upath.UPath = PROJECT_SCHEMA_UPATH ) -> dict: with open(file_path) as file: data = json.load(file) mapping: dict = {} for key1, value_dict in data.items(): for key2, value in value_dict.items(): if key2 not in mapping: mapping[key2] = {} if value: mapping[key2][key1] = value return mapping[registry_name]
[docs] def load_protocol_mapping(path: upath.UPath = PROTOCOL_MAPPING_UPATH) -> dict: return json.loads(path.read_text())
[docs] def load_inverted_protocol_mapping() -> dict: protocol_mapping = load_protocol_mapping() store = defaultdict(list) for protocol_str, metadata in protocol_mapping.items(): for known_string in metadata.get('known-strings', []): store[known_string].append(protocol_str) return store
[docs] def load_column_mapping(*, registry_name: str, download_type: str, mapping_path: str) -> dict: with open(mapping_path) as f: registry_credit_column_mapping = json.load(f) return registry_credit_column_mapping[registry_name][download_type]
[docs] @pf.register_dataframe_method def set_registry(df: pd.DataFrame, registry_name: str) -> pd.DataFrame: """ Set the registry name for each record in the DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame. registry_name : str Name of the registry to set. Returns ------- pd.DataFrame DataFrame with a new 'registry' column set to the specified registry name.""" df['registry'] = registry_name return df
[docs] @pf.register_dataframe_method def convert_to_datetime( df: pd.DataFrame, *, columns: list, utc: bool = True, **kwargs: typing.Any ) -> pd.DataFrame: """ Convert specified columns in the DataFrame to datetime format. Parameters ---------- df : pd.DataFrame Input DataFrame. columns : list List of column names to convert to datetime. utc : bool, optional Whether to convert to UTC (default is True). **kwargs : typing.Any Additional keyword arguments passed to pd.to_datetime. Returns ------- pd.DataFrame DataFrame with specified columns converted to datetime format. """ for column in columns: if column in df.columns: df[column] = pd.to_datetime(df[column], utc=utc, **kwargs).dt.normalize() else: raise KeyError(f"The column '{column}' is missing.") return df
[docs] @pf.register_dataframe_method def add_missing_columns(df: pd.DataFrame, *, schema: pa.DataFrameSchema) -> pd.DataFrame: """ Add any missing columns to the DataFrame and initialize them with None. Parameters ---------- df : pd.DataFrame Input DataFrame. schema : pa.DataFrameSchema Pandera schema to validate against. Returns ------- pd.DataFrame DataFrame with all specified columns, adding missing ones initialized to None. """ default_values = { np.dtype('int64'): 0, np.dtype('int32'): 0, np.dtype('float64'): 0.0, np.dtype('float32'): 0.0, np.dtype('O'): None, np.dtype('<U'): None, np.dtype('U'): None, np.dtype('bool'): False, np.dtype('<M8[ns]'): pd.NaT, # datetime64[ns] } for column, value in schema.columns.items(): dtype = value.dtype.type if column not in df.columns: default_value = default_values.get(dtype) df[column] = pd.Series([default_value] * len(df), index=df.index, dtype=dtype) return df
[docs] @pf.register_dataframe_method def validate(df: pd.DataFrame, schema: pa.DataFrameSchema) -> pd.DataFrame: """ Validate the DataFrame against a given Pandera schema. Parameters ---------- df : pd.DataFrame Input DataFrame. schema : pa.DataFrameSchema Pandera schema to validate against. Returns ------- pd.DataFrame DataFrame with columns sorted according to the schema and validated against it. """ results = schema.validate(df) keys = sorted(list(schema.columns.keys())) results = results[keys] return results
[docs] @pf.register_dataframe_method def clean_and_convert_numeric_columns(df: pd.DataFrame, *, columns: list[str]) -> pd.DataFrame: """ Clean and convert specified columns to numeric format in the DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame. columns : list[str] List of column names to clean and convert to numeric format. Returns ------- pd.DataFrame DataFrame with specified columns cleaned (removing commas) and converted to numeric format. """ for column in columns: df[column] = df[column].str.replace(',', '', regex=True) df[column] = pd.to_numeric(df[column], errors='coerce') return df