Source code for offsets_db_data.gld

import numpy as np  # noqa: F401, I001
import pandas as pd
import pandas_flavor as pf

from offsets_db_data.common import (
    BERKELEY_PROJECT_TYPE_UPATH,
    CREDIT_SCHEMA_UPATH,
    PROJECT_SCHEMA_UPATH,
    load_column_mapping,
    load_inverted_protocol_mapping,
    load_protocol_mapping,
    load_registry_project_column_mapping,
    load_type_category_mapping,
)
from offsets_db_data.credits import aggregate_issuance_transactions  # noqa: F401
from offsets_db_data.credits import filter_and_merge_transactions  # noqa: F401
from offsets_db_data.credits import merge_with_arb  # noqa: F401
from offsets_db_data.credits import harmonize_beneficiary_data
from offsets_db_data.models import credit_without_id_schema, project_schema
from offsets_db_data.projects import add_category  # noqa: F401
from offsets_db_data.projects import add_first_issuance_and_retirement_dates  # noqa: F401
from offsets_db_data.projects import add_is_compliance_flag  # noqa: F401
from offsets_db_data.projects import add_retired_and_issued_totals  # noqa: F401
from offsets_db_data.projects import harmonize_country_names  # noqa: F401
from offsets_db_data.projects import harmonize_status_codes  # noqa: F401
from offsets_db_data.projects import map_protocol  # noqa: F401


[docs] @pf.register_dataframe_method def determine_gld_transaction_type(df: pd.DataFrame, *, download_type: str) -> pd.DataFrame: """ Assign a transaction type to each record in the DataFrame based on the download type for Gold Standard transactions. Parameters ---------- df : pd.DataFrame Input DataFrame containing transaction data. download_type : str Type of transaction ('issuances', 'retirements') to determine the transaction type. Returns ------- pd.DataFrame DataFrame with a new 'transaction_type' column, containing assigned transaction types based on download_type. """ transaction_type_mapping = {'issuances': 'issuance', 'retirements': 'retirement'} df['transaction_type'] = transaction_type_mapping[download_type] return df
[docs] @pf.register_dataframe_method def add_gld_project_id(df: pd.DataFrame, *, prefix: str) -> pd.DataFrame: """ Add Gold Standard project IDs to the DataFrame Parameters ---------- df : pd.DataFrame Input DataFrame containing credits data. prefix : str Prefix string to prepend to each project ID. Returns ------- pd.DataFrame DataFrame with a new 'project_id' column, containing the generated project IDs. """ df['project_id'] = prefix + df['project_id'].astype(str) return df
[docs] @pf.register_dataframe_method def process_gld_credits( df: pd.DataFrame, *, download_type: str, registry_name: str = 'gold-standard', prefix: str = 'GLD', arb: pd.DataFrame | None = None, harmonize_beneficiary_info: bool = False, ) -> pd.DataFrame: """ Process Gold Standard credits data by renaming columns, setting registry, determining transaction types, adding project IDs, converting date columns, aggregating issuances (if applicable), and validating the schema. Parameters ---------- df : pd.DataFrame Input DataFrame with raw Gold Standard credits data. download_type : str Type of download ('issuances' or 'retirements'). registry_name : str, optional Name of the registry for setting and mapping columns (default is 'gold-standard'). prefix : str, optional Prefix for generating project IDs (default is 'GLD'). arb : pd.DataFrame | None, optional Additional DataFrame for data merging (default is None). Returns ------- pd.DataFrame Processed DataFrame with Gold Standard credits data. """ column_mapping = load_column_mapping( registry_name=registry_name, download_type=download_type, mapping_path=CREDIT_SCHEMA_UPATH ) columns = {v: k for k, v in column_mapping.items()} df = df.copy() if not df.empty: data = ( df.rename(columns=columns) .set_registry(registry_name=registry_name) .determine_gld_transaction_type(download_type=download_type) .add_gld_project_id(prefix=prefix) ) # split on T and discard the microseconds for consistency data['transaction_date'] = data['transaction_date'].str.split('T').str[0] data = data.convert_to_datetime(columns=['transaction_date'], format='%Y-%m-%d') if download_type == 'issuances': data = data.aggregate_issuance_transactions() data = data.add_missing_columns(schema=credit_without_id_schema).validate( schema=credit_without_id_schema ) if arb is not None and not arb.empty: data = data.merge_with_arb(arb=arb) else: data = ( pd.DataFrame(columns=credit_without_id_schema.columns.keys()) .add_missing_columns(schema=credit_without_id_schema) .convert_to_datetime(columns=['transaction_date'], format='%Y-%m-%d') .add_missing_columns(schema=credit_without_id_schema) .validate(schema=credit_without_id_schema) ) if harmonize_beneficiary_info: data = data.pipe( harmonize_beneficiary_data, registry_name=registry_name, download_type=download_type ) data = ( data.add_missing_columns(schema=credit_without_id_schema) .convert_to_datetime(columns=['transaction_date'], format='%Y-%m-%d') .validate(schema=credit_without_id_schema) ) return data
[docs] @pf.register_dataframe_method def add_gld_project_url(df: pd.DataFrame) -> pd.DataFrame: """Add url for gold standard projects gs project ids are different from the id used in gold standard urls. Parameters ---------- df : pd.DataFrame Input DataFrame containing Gold Standard project data. Returns ------- pd.DataFrame DataFrame with a new 'project_url' column, containing URLs for each project. """ df['project_url'] = 'https://registry.goldstandard.org/projects?q=gs' + df['project_id'].apply( str ) return df
[docs] @pf.register_dataframe_method def process_gld_projects( df: pd.DataFrame, *, credits: pd.DataFrame, registry_name: str = 'gold-standard', prefix: str = 'GLD', ) -> pd.DataFrame: """ Process Gold Standard projects data, including renaming, adding, and validating columns, harmonizing statuses, and merging with credits data. Parameters ---------- df : pd.DataFrame Input DataFrame with raw Gold Standard projects data. credits : pd.DataFrame DataFrame containing credits data for merging. registry_name : str, optional Name of the registry for specific processing steps (default is 'gold-standard'). prefix : str, optional Prefix for generating project IDs (default is 'GLD'). Returns ------- pd.DataFrame Processed DataFrame with harmonized and validated Gold Standard projects data. """ registry_project_column_mapping = load_registry_project_column_mapping( registry_name=registry_name, file_path=PROJECT_SCHEMA_UPATH ) inverted_column_mapping = {value: key for key, value in registry_project_column_mapping.items()} type_category_mapping = load_type_category_mapping() inverted_protocol_mapping = load_inverted_protocol_mapping() protocol_mapping = load_protocol_mapping() df = df.copy() credits = credits.copy() if not df.empty and not credits.empty: data = ( df.rename(columns=inverted_column_mapping) .set_registry(registry_name=registry_name) .add_gld_project_url() .add_gld_project_id(prefix=prefix) .harmonize_country_names() .harmonize_status_codes() .map_protocol(inverted_protocol_mapping=inverted_protocol_mapping) .infer_project_type() .override_project_types( override_data_path=BERKELEY_PROJECT_TYPE_UPATH, source_str='berkeley' ) .add_category( type_category_mapping=type_category_mapping, protocol_mapping=protocol_mapping, ) # category derived from protocol; project_type is independent .map_project_type_to_display_name(type_category_mapping=type_category_mapping) .add_is_compliance_flag() .add_retired_and_issued_totals(credits=credits) .add_first_issuance_and_retirement_dates(credits=credits) .add_missing_columns(schema=project_schema) .convert_to_datetime(columns=['listed_at', 'first_issuance_at', 'first_retirement_at']) .validate(schema=project_schema) ) return data elif not df.empty and credits.empty: data = ( df.rename(columns=inverted_column_mapping) .set_registry(registry_name=registry_name) .add_gld_project_url() .add_gld_project_id(prefix=prefix) .harmonize_country_names() .harmonize_status_codes() .map_protocol(inverted_protocol_mapping=inverted_protocol_mapping) .infer_project_type() .override_project_types( override_data_path=BERKELEY_PROJECT_TYPE_UPATH, source_str='berkeley' ) .add_category( type_category_mapping=type_category_mapping, protocol_mapping=protocol_mapping, ) # category derived from protocol; project_type is independent .map_project_type_to_display_name(type_category_mapping=type_category_mapping) .add_is_compliance_flag() .add_missing_columns(schema=project_schema) .convert_to_datetime(columns=['listed_at', 'first_issuance_at', 'first_retirement_at']) .validate(schema=project_schema) ) return data elif df.empty: data = ( pd.DataFrame(columns=project_schema.columns.keys()) .add_missing_columns(schema=project_schema) .convert_to_datetime(columns=['listed_at', 'first_issuance_at', 'first_retirement_at']) ) data['is_compliance'] = data['is_compliance'].astype(bool) data = data.validate(schema=project_schema) return data