Source code for offsets_db_data.apx

import numpy as np  # noqa: F401
import pandas as pd
import pandas_flavor as pf

from offsets_db_data.common import (
    CREDIT_SCHEMA_UPATH,
    PROJECT_SCHEMA_UPATH,
    load_column_mapping,
    load_inverted_protocol_mapping,
    load_protocol_mapping,
    load_registry_project_column_mapping,
)
from offsets_db_data.credits import *  # noqa: F403
from offsets_db_data.models import credit_without_id_schema, project_schema
from offsets_db_data.projects import *  # noqa: F403


[docs] @pf.register_dataframe_method def determine_transaction_type(df: pd.DataFrame, *, download_type: str) -> pd.DataFrame: """ Assign a transaction type to each record in the DataFrame based on the download type. Parameters ---------- df : pd.DataFrame Input DataFrame containing transaction data. download_type : str Type of transaction ('issuances', 'retirements', 'cancellations') to determine the transaction type. Returns ------- pd.DataFrame DataFrame with a new 'transaction_type' column, containing assigned transaction types based on download_type. """ transaction_type_mapping = { 'issuances': 'issuance', 'retirements': 'retirement', 'cancellations': 'cancellation', } df['transaction_type'] = transaction_type_mapping[download_type] return df
[docs] @pf.register_dataframe_method def process_apx_credits( df: pd.DataFrame, *, download_type: str, registry_name: str, arb: pd.DataFrame | None = None ) -> pd.DataFrame: """ Process APX credits data by setting registry, determining transaction types, renaming columns, converting date columns, aggregating issuances (if applicable), and validating the schema. Parameters ---------- df : pd.DataFrame Input DataFrame with raw APX credits data. download_type : str Type of download ('issuances', 'retirements', etc.). registry_name : str Name of the registry for setting and mapping columns. arb : pd.DataFrame | None, optional Additional DataFrame for data merging (default is None). Returns ------- pd.DataFrame Processed DataFrame with APX credits data. """ df = df.copy() column_mapping = load_column_mapping( registry_name=registry_name, download_type=download_type, mapping_path=CREDIT_SCHEMA_UPATH ) columns = {v: k for k, v in column_mapping.items()} data = ( df.set_registry(registry_name=registry_name) .determine_transaction_type(download_type=download_type) .rename(columns=columns) ) # split the date and time and keeping only the date. this helps with the inconsistency in the date format data['transaction_date'] = data['transaction_date'].str.split().str[0] data = data.convert_to_datetime(columns=['transaction_date']) if download_type == 'issuances': data = data.aggregate_issuance_transactions() data = data.add_missing_columns(schema=credit_without_id_schema).validate( schema=credit_without_id_schema ) if arb is not None and not arb.empty: data = data.merge_with_arb(arb=arb) return data
[docs] def harmonize_acr_status(row: pd.Series) -> str: """Derive single project status for CAR and ACR projects Raw CAR and ACR data has two status columns -- one for compliance status, one for voluntary. Handle and harmonize. Parameters ---------- row : pd.Series A row from a pandas DataFrame Returns ------- value : str The status of the project """ if row['Compliance Program Status (ARB or Ecology)'] == 'Not ARB or Ecology Eligible': return row['Voluntary Status'].lower() ACR_COMPLIANCE_STATE_MAP = { 'Listed - Active ARB Project': 'active', 'ARB Completed': 'completed', 'ARB Inactive': 'completed', 'Listed - Proposed Project': 'listed', 'Listed - Active Registry Project': 'listed', 'ARB Terminated': 'completed', 'Submitted': 'listed', 'Transferred ARB or Ecology Project': 'active', 'Listed – Active ARB Project': 'active', } return ACR_COMPLIANCE_STATE_MAP.get( row['Compliance Program Status (ARB or Ecology)'], 'unknown' )
[docs] @pf.register_dataframe_method def add_project_url(df: pd.DataFrame, *, registry_name: str) -> pd.DataFrame: """ Add a project URL to each record in the DataFrame based on the registry name and project ID. Parameters ---------- df : pd.DataFrame Input DataFrame containing project data. registry_name : str Name of the registry ('american-carbon-registry', 'climate-action-reserve', 'art-trees'). Returns ------- pd.DataFrame DataFrame with a new 'project_url' column, containing URLs for each project. """ if registry_name == 'american-carbon-registry': base = 'https://acr2.apx.com/mymodule/reg/prjView.asp?id1=' elif registry_name == 'climate-action-reserve': base = 'https://thereserve2.apx.com/mymodule/reg/prjView.asp?id1=' elif registry_name == 'art-trees': base = 'https://art.apx.com/mymodule/reg/prjView.asp?id1=' else: raise ValueError(f'Unknown registry name: {registry_name}') df['project_url'] = base + df['project_id'].str[3:] return df
[docs] @pf.register_dataframe_method def process_apx_projects( df: pd.DataFrame, *, credits: pd.DataFrame, registry_name: str ) -> pd.DataFrame: """ Process APX projects data, including renaming, adding, and validating columns, harmonizing statuses, and merging with credits data. Parameters ---------- df : pd.DataFrame Input DataFrame with raw projects data. credits : pd.DataFrame DataFrame containing credits data for merging. registry_name : str Name of the registry for specific processing steps. Returns ------- pd.DataFrame Processed DataFrame with harmonized and validated APX projects data. """ df = df.copy() credits = credits.copy() registry_project_column_mapping = load_registry_project_column_mapping( registry_name=registry_name, file_path=PROJECT_SCHEMA_UPATH ) inverted_column_mapping = {value: key for key, value in registry_project_column_mapping.items()} protocol_mapping = load_protocol_mapping() inverted_protocol_mapping = load_inverted_protocol_mapping() data = df.rename(columns=inverted_column_mapping) if registry_name == 'art-trees': data['protocol'] = [['art-trees']] * len(data) data['category'] = [['forest']] * len(data) else: data = data.map_protocol(inverted_protocol_mapping=inverted_protocol_mapping).add_category( protocol_mapping=protocol_mapping ) if registry_name == 'american-carbon-registry': data['status'] = data.apply(harmonize_acr_status, axis=1) else: data = data.harmonize_status_codes() data = ( data.set_registry(registry_name=registry_name) .add_project_url(registry_name=registry_name) .harmonize_country_names() .add_is_compliance_flag() .add_retired_and_issued_totals(credits=credits) .add_first_issuance_and_retirement_dates(credits=credits) .add_missing_columns(schema=project_schema) .convert_to_datetime(columns=['listed_at']) .validate(schema=project_schema) ) return data