Source code for offsets_db_data.vcs

import numpy as np  # noqa: F401
import pandas as pd
import pandas_flavor as pf

from offsets_db_data.common import (
    BERKELEY_PROJECT_TYPE_UPATH,
    CREDIT_SCHEMA_UPATH,
    PROJECT_SCHEMA_UPATH,
    load_column_mapping,
    load_inverted_protocol_mapping,
    load_protocol_mapping,
    load_registry_project_column_mapping,
    load_type_category_mapping,
)
from offsets_db_data.credits import *  # noqa: F403
from offsets_db_data.credits import harmonize_beneficiary_data
from offsets_db_data.models import credit_without_id_schema, project_schema
from offsets_db_data.projects import *  # noqa: F403


[docs] @pf.register_dataframe_method def generate_vcs_project_ids(df: pd.DataFrame, *, prefix: str) -> pd.DataFrame: """ Generate Verra project IDs by concatenating a specified prefix with the 'ID' column of the DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame containing Verra project data. prefix : str Prefix string to prepend to each project ID. Returns ------- pd.DataFrame DataFrame with a new 'project_id' column, containing the generated project IDs. """ df['project_id'] = prefix + df['ID'].astype(str) return df
[docs] @pf.register_dataframe_method def determine_vcs_transaction_type(df: pd.DataFrame, *, date_column: str) -> pd.DataFrame: """ Determine the transaction type for Verra transactions based on a specified date column. Transactions with non-null date values are labeled as 'retirement', else as 'issuance'. Parameters ---------- df : pd.DataFrame Input DataFrame with transaction data. date_column : str Name of the column in the DataFrame used to determine the transaction type. Returns ------- pd.DataFrame DataFrame with a new 'transaction_type' column indicating the type of each transaction. """ # Verra doesn't have a transaction type column, and doesn't differentitate between retirements and cancelattions # So we'll use the date column to determine whether a transaction is a retirement or issuance and set the # transaction type accordingly df['transaction_type'] = df[date_column].apply( lambda x: 'retirement' if pd.notnull(x) else 'issuance' ) return df
[docs] @pf.register_dataframe_method def set_vcs_transaction_dates( df: pd.DataFrame, *, date_column: str, fallback_column: str ) -> pd.DataFrame: """ Set the transaction dates in a DataFrame, using a primary date column and a fallback column. Parameters ---------- df : pd.DataFrame Input DataFrame with transaction data. date_column : str Primary column to use for transaction dates. fallback_column : str Column to use as fallback for transaction dates when primary column is null. Returns ------- pd.DataFrame DataFrame with a new 'transaction_date' column, containing the determined dates. """ df['transaction_date'] = df[date_column].where(df[date_column].notnull(), df[fallback_column]) return df
[docs] @pf.register_dataframe_method def set_vcs_vintage_year(df: pd.DataFrame, *, date_column: str) -> pd.DataFrame: """ Set the vintage year for Verra transactions based on a date column formatted as '%d/%m/%Y'. Parameters ---------- df : pd.DataFrame Input DataFrame with transaction data. date_column : str Name of the column containing date information to extract the vintage year from. Returns ------- pd.DataFrame DataFrame with a new 'vintage' column, containing the vintage year of each transaction. """ try: df[date_column] = pd.to_datetime(df[date_column], format='%d/%m/%Y', utc=True) except ValueError: df[date_column] = pd.to_datetime(df[date_column], utc=True) df['vintage'] = df[date_column].dt.year return df
[docs] @pf.register_dataframe_method def calculate_vcs_issuances(df: pd.DataFrame) -> pd.DataFrame: """Logic to calculate verra transactions from prepocessed transaction data Verra allows rolling/partial issuances. This requires inferring vintage issuance from `Total Vintage Quantity` Parameters ---------- df : pd.DataFrame Input DataFrame with preprocessed transaction data. Returns ------- pd.DataFrame DataFrame containing only issuance transactions with deduplicated and renamed columns. """ df_issuance = df.sort_values('transaction_date').drop_duplicates( ['vintage', 'project_id', 'Total Vintage Quantity'], keep='first' ) df_issuance = df_issuance.rename(columns={'Total Vintage Quantity': 'quantity'}) df_issuance['transaction_type'] = 'issuance' return df_issuance
[docs] @pf.register_dataframe_method def calculate_vcs_retirements(df: pd.DataFrame) -> pd.DataFrame: """ Calculate retirements and cancellations for Verra transactions. The data does not allow distinguishing between retirements and cancellations. Parameters ---------- df : pd.DataFrame Input DataFrame with Verra transaction data. Returns ------- pd.DataFrame DataFrame containing only retirement transactions with renamed columns. """ retirements = df[df['transaction_type'] != 'issuance'] retirements = retirements.rename(columns={'Quantity Issued': 'quantity'}) return retirements
[docs] @pf.register_dataframe_method def process_vcs_credits( df: pd.DataFrame, *, download_type: str = 'transactions', registry_name: str = 'verra', prefix: str = 'VCS', arb: pd.DataFrame | None = None, harmonize_beneficiary_info: bool = False, ) -> pd.DataFrame: """ Process Verra credits data, including generation of project IDs, determination of transaction types, setting transaction dates, and various data transformations and validations. Parameters ---------- df : pd.DataFrame Input DataFrame with raw credits data. download_type : str, optional Type of download operation performed (default is 'transactions'). registry_name : str, optional Name of the registry (default is 'verra'). prefix : str, optional Prefix for generating project IDs (default is 'VCS'). arb : pd.DataFrame | None, optional DataFrame for additional data merging (default is None). Returns ------- pd.DataFrame Processed DataFrame with Verra credits data. """ df = df.copy() data = ( df.set_registry(registry_name=registry_name) .generate_vcs_project_ids(prefix=prefix) .determine_vcs_transaction_type(date_column='Retirement/Cancellation Date') .set_vcs_transaction_dates( date_column='Retirement/Cancellation Date', fallback_column='Issuance Date' ) .clean_and_convert_numeric_columns(columns=['Total Vintage Quantity', 'Quantity Issued']) .set_vcs_vintage_year(date_column='Vintage End') .convert_to_datetime(columns=['transaction_date'], dayfirst=True) ) issuances = data.calculate_vcs_issuances() retirements = data.calculate_vcs_retirements() column_mapping = load_column_mapping( registry_name=registry_name, download_type=download_type, mapping_path=CREDIT_SCHEMA_UPATH ) columns = {v: k for k, v in column_mapping.items()} merged_df = pd.concat([issuances, retirements]).reset_index(drop=True).rename(columns=columns) issuances = merged_df.aggregate_issuance_transactions() retirements = merged_df[merged_df['transaction_type'].str.contains('retirement')] data = ( pd.concat([issuances, retirements]) .reset_index(drop=True) .add_missing_columns(schema=credit_without_id_schema) .validate(schema=credit_without_id_schema) ) if arb is not None and not arb.empty: data = data.merge_with_arb(arb=arb) if harmonize_beneficiary_info: data = data.pipe( harmonize_beneficiary_data, registry_name=registry_name, download_type=download_type ) data = ( data.add_missing_columns(schema=credit_without_id_schema) .convert_to_datetime(columns=['transaction_date'], format='%Y-%m-%d') .validate(schema=credit_without_id_schema) ) return data
[docs] @pf.register_dataframe_method def add_vcs_compliance_projects(df: pd.DataFrame) -> pd.DataFrame: """ Add details about two compliance projects to projects database. Parameters ---------- df : pd.DataFrame A pandas DataFrame containing project data with a 'project_id' column. Returns -------- df: pd.DataFrame A pandas DataFrame with two additional rows, describing two projects from the mostly unused Verra compliance registry portal. """ vcs_project_dicts = [ { 'project_id': 'VCSOPR2', 'name': 'Corinth Abandoned Mine Methane Recovery Project', 'protocol': ['arb-mine-methane'], 'category': 'ghg-management', 'project_type': 'mine methane capture', 'project_type_source': 'carbonplan', 'proponent': 'Keyrock Energy LLC', 'country': 'United States', 'status': 'registered', 'is_compliance': True, 'registry': 'verra', 'project_url': 'https://registry.verra.org/app/projectDetail/VCS/2265', }, { 'project_id': 'VCSOPR10', 'name': 'Blue Source-Alford Improved Forest Management Project', 'protocol': ['arb-forest'], 'category': 'forest', 'project_type': 'improved forest management', 'project_type_source': 'carbonplan', 'proponent': 'Ozark Regional Land Trust', 'country': 'United States', 'status': 'registered', 'is_compliance': True, 'registry': 'verra', 'project_url': 'https://registry.verra.org/app/projectDetail/VCS/2271', }, ] vcs_projects = pd.DataFrame(vcs_project_dicts) return pd.concat([df, vcs_projects], ignore_index=True)
[docs] @pf.register_dataframe_method def add_vcs_project_url(df: pd.DataFrame) -> pd.DataFrame: """ Create a URL for each project based on its Verra project ID. Parameters ---------- df : pd.DataFrame Input DataFrame with Verra project data. Returns ------- pd.DataFrame DataFrame with a new 'project_url' column, containing the generated URLs for each project. """ df['project_url'] = ( 'https://registry.verra.org/app/projectDetail/VCS/' + df['project_id'].str[3:] ) return df
[docs] @pf.register_dataframe_method def add_vcs_project_id(df: pd.DataFrame) -> pd.DataFrame: """ Add a prefix 'VCS' to each project ID in the DataFrame. Parameters ---------- df : pd.DataFrame Input DataFrame with Verra project data. Returns ------- pd.DataFrame DataFrame with updated 'project_id' column, containing the prefixed project IDs. """ df['project_id'] = df['project_id'].apply(lambda x: f'VCS{str(x)}') return df
[docs] @pf.register_dataframe_method def process_vcs_projects( df: pd.DataFrame, *, credits: pd.DataFrame, registry_name: str = 'verra', ) -> pd.DataFrame: """ Process Verra projects data, including renaming, adding, and validating columns, and merging with credits data. Parameters ---------- df : pd.DataFrame Input DataFrame with raw projects data. credits : pd.DataFrame DataFrame containing credits data for merging. registry_name : str, optional Name of the registry (default is 'verra'). download_type : str, optional Type of download operation performed (default is 'projects'). Returns ------- pd.DataFrame Processed DataFrame with harmonized and validated Verra projects data. """ df = df.copy() credits = credits.copy() registry_project_column_mapping = load_registry_project_column_mapping( registry_name=registry_name, file_path=PROJECT_SCHEMA_UPATH ) inverted_column_mapping = {value: key for key, value in registry_project_column_mapping.items()} type_category_mapping = load_type_category_mapping() inverted_protocol_mapping = load_inverted_protocol_mapping() protocol_mapping = load_protocol_mapping() data = ( df.rename(columns=inverted_column_mapping) .set_registry(registry_name=registry_name) .add_vcs_project_id() .add_vcs_project_url() .harmonize_country_names() .harmonize_status_codes() .map_protocol(inverted_protocol_mapping=inverted_protocol_mapping) .infer_project_type() .override_project_types( override_data_path=BERKELEY_PROJECT_TYPE_UPATH, source_str='berkeley' ) .add_category( type_category_mapping=type_category_mapping, protocol_mapping=protocol_mapping, ) # category derived from protocol; project_type is independent .add_is_compliance_flag() .add_vcs_compliance_projects() .map_project_type_to_display_name(type_category_mapping=type_category_mapping) .add_retired_and_issued_totals(credits=credits) .add_first_issuance_and_retirement_dates(credits=credits) .add_missing_columns(schema=project_schema) .convert_to_datetime(columns=['listed_at'], dayfirst=True) .validate(schema=project_schema) ) return data