import contextlib
import json
import country_converter as coco
import janitor # noqa: F401
import numpy as np
import pandas as pd
import pandas_flavor as pf
[docs]
@pf.register_dataframe_method
def harmonize_country_names(df: pd.DataFrame, *, country_column: str = 'country') -> pd.DataFrame:
"""
Harmonize country names in the DataFrame to standardized country names.
Parameters
----------
df : pd.DataFrame
Input DataFrame with country data.
country_column : str, optional
The name of the column containing country names to be harmonized (default is 'country').
Returns
-------
pd.DataFrame
DataFrame with harmonized country names in the specified column.
"""
print('Harmonizing country names...')
cc = coco.CountryConverter()
df[country_column] = cc.pandas_convert(df[country_column], to='name')
print('Done converting country names...')
return df
[docs]
@pf.register_dataframe_method
def add_category(df: pd.DataFrame, *, type_category_mapping: dict) -> pd.DataFrame:
"""
Add a category to each record in the DataFrame based on its protocol.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
type_category_mapping : dict
Dictionary mapping types to categories.
Returns
-------
pd.DataFrame
DataFrame with a new 'category' column, derived from the protocol information.
"""
print('Adding category based on protocol...')
df['category'] = (
df['project_type']
.str.lower()
.map({key.lower(): value['category'] for key, value in type_category_mapping.items()})
.fillna('unknown')
)
return df
[docs]
@pf.register_dataframe_method
def override_project_types(df: pd.DataFrame, *, override_data_path: str, source_str: str):
"""
Override project types to the DataFrame based on project characteristics
We treat Berkeley data as source of truth for most project types
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
override_data_path: str
Path to where json of override data lives
source: str
Value to write to `type_source` when applying override values
Returns
-------
pd.DataFrame
DataFrame with a 'project_type' column overridden by all values in override_data.
"""
override_d = json.load(open(override_data_path))
df['project_type'] = df['project_id'].map(override_d).fillna(df['project_type'])
df.loc[df['project_id'].isin(list(override_d.keys())), 'project_type_source'] = source_str
return df
[docs]
@pf.register_dataframe_method
def infer_project_type(df: pd.DataFrame) -> pd.DataFrame:
"""
Add project types to the DataFrame based on project characteristics
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
Returns
-------
pd.DataFrame
DataFrame with a new 'project_type' column, indicating the project's type. Defaults to None
"""
df.loc[:, 'project_type'] = 'unknown'
df.loc[:, 'project_type_source'] = 'carbonplan'
df.loc[df.apply(lambda x: 'art-trees' in x['protocol'], axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(lambda x: 'acr-ifm-nonfed' in x['protocol'], axis=1), 'project_type'] = (
'improved forest management'
)
df.loc[df.apply(lambda x: 'acr-abandoned-wells' in x['protocol'], axis=1), 'project_type'] = (
'plugging oil & gas wells'
)
df.loc[df.apply(lambda x: 'arb-mine-methane' in x['protocol'], axis=1), 'project_type'] = (
'mine methane capture'
)
df.loc[df.apply(lambda x: 'vm0048' in x['protocol'], axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(lambda x: 'vm0047' in x['protocol'], axis=1), 'project_type'] = (
'afforestation/reforestation'
)
df.loc[df.apply(lambda x: 'vm0045' in x['protocol'], axis=1), 'project_type'] = (
'improved forest management'
)
df.loc[df.apply(lambda x: 'vm0042' in x['protocol'], axis=1), 'project_type'] = 'agriculture'
df.loc[df.apply(lambda x: 'vm0007' in x['protocol'], axis=1), 'project_type'] = 'redd+'
return df
[docs]
@pf.register_dataframe_method
def map_project_type_to_display_name(
df: pd.DataFrame, *, type_category_mapping: dict
) -> pd.DataFrame:
"""
Map project types in the DataFrame to display names based on a mapping dictionary.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
type_category_mapping : dict
Dictionary mapping project type strings to display names.
Returns
-------
pd.DataFrame
DataFrame with a new 'project_type' column, containing mapped display names.
"""
print('Mapping project types to display names...')
df['project_type'] = (
df['project_type']
.map(
{
key.lower(): value['project-type-display-name']
for key, value in type_category_mapping.items()
}
)
.fillna('Unknown')
)
return df
[docs]
@pf.register_dataframe_method
def add_is_compliance_flag(df: pd.DataFrame) -> pd.DataFrame:
"""
Add a compliance flag to the DataFrame based on the protocol.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
Returns
-------
pd.DataFrame
DataFrame with a new 'is_compliance' column, indicating if the protocol starts with 'arb-'.
"""
print('Adding is_compliance flag...')
df['is_compliance'] = df.apply(
lambda row: np.any([protocol_str.startswith('arb-') for protocol_str in row['protocol']]),
axis=1,
)
return df
[docs]
@pf.register_dataframe_method
def map_protocol(
df: pd.DataFrame,
*,
inverted_protocol_mapping: dict,
original_protocol_column: str = 'original_protocol',
) -> pd.DataFrame:
"""
Map protocols in the DataFrame to standardized names based on an inverted protocol mapping.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
inverted_protocol_mapping : dict
Dictionary mapping protocol strings to standardized protocol names.
original_protocol_column : str, optional
Name of the column containing original protocol information (default is 'original_protocol').
Returns
-------
pd.DataFrame
DataFrame with a new 'protocol' column, containing mapped protocol names.
"""
print('Mapping protocol based on known string...')
try:
df['protocol'] = df[original_protocol_column].apply(
lambda item: find_protocol(
search_string=item, inverted_protocol_mapping=inverted_protocol_mapping
)
)
except KeyError:
# art-trees doesnt have protocol column
df['protocol'] = [['unknown']] * len(df) # protocol column is nested list
return df
[docs]
@pf.register_dataframe_method
def harmonize_status_codes(df: pd.DataFrame, *, status_column: str = 'status') -> pd.DataFrame:
"""Harmonize project status codes across registries
Excludes ACR, as it requires special treatment across two columns
Parameters
----------
df : pd.DataFrame
Input DataFrame with project status data.
status_column : str, optional
Name of the column containing status codes to harmonize (default is 'status').
Returns
-------
pd.DataFrame
DataFrame with harmonized project status codes.
"""
print('Harmonizing status codes')
with contextlib.suppress(KeyError):
CAR_STATES = {
'Registered': 'registered',
'Completed': 'completed',
'Listed': 'listed',
'Transitioned': 'unknown',
}
VERRA_STATES = {
'Under validation': 'listed',
'Under development': 'listed',
'Registration requested': 'listed',
'Registration and verification approval requested': 'listed',
'Withdrawn': 'completed',
'On Hold': 'registered',
'Units Transferred from Approved GHG Program': 'unknown',
'Rejected by Administrator': 'completed',
'Crediting Period Renewal Requested': 'registered',
'Inactive': 'completed',
'Crediting Period Renewal and Verification Approval Requested': 'registered',
}
GS_STATES = {
'GOLD_STANDARD_CERTIFIED_PROJECT': 'registered',
'LISTED': 'listed',
'GOLD_STANDARD_CERTIFIED_DESIGN': 'registered',
}
state_dict = CAR_STATES | VERRA_STATES | GS_STATES
df[status_column] = df[status_column].apply(lambda x: state_dict.get(x, 'unknown'))
return df
[docs]
def find_protocol(
*, search_string: str, inverted_protocol_mapping: dict[str, list[str]]
) -> list[str]:
"""Match known strings of project methodologies to internal topology
Unmatched strings are passed through to the database, until such time that we update mapping data.
"""
if pd.isna(search_string): # handle nan case, which crops up in verra data right now
return ['unknown']
if known_match := inverted_protocol_mapping.get(search_string.strip()):
return known_match # inverted_mapping returns lst
print(f"'{search_string}' is unmapped in full protocol mapping")
return [search_string]
[docs]
def get_protocol_category(*, protocol_strs: list[str] | str, protocol_mapping: dict) -> list[str]:
"""
Get category based on protocol string
Parameters
----------
protocol_strs : str or list
single protocol string or list of protocol strings
protocol_mapping: dict
metadata about normalized protocol strings
Returns
-------
categories : list[str]
list of category strings
"""
def _get_category(protocol_str, protocol_mapping):
try:
return protocol_mapping.get(protocol_str).get('category', 'unknown')
except AttributeError:
return 'unknown'
if isinstance(protocol_strs, str):
protocol_strs = [protocol_strs]
categories = [_get_category(protocol_str, protocol_mapping) for protocol_str in protocol_strs]
return list(
set(categories)
) # if multiple protocols have same category, just return category once
[docs]
@pf.register_dataframe_method
def add_first_issuance_and_retirement_dates(
projects: pd.DataFrame, *, credits: pd.DataFrame
) -> pd.DataFrame:
"""
Add the first issuance date of carbon credits to each project in the projects DataFrame.
Parameters
----------
credits : pd.DataFrame
A pandas DataFrame containing credit issuance data with columns 'project_id', 'transaction_date', and 'transaction_type'.
projects : pd.DataFrame
A pandas DataFrame containing project data with a 'project_id' column.
Returns
-------
projects : pd.DataFrame
A pandas DataFrame which is the original projects DataFrame with two additional columns 'first_issuance_at' representing
the first issuance date of each project and 'first_retirement_at' representing the first retirement date of each project.
"""
first_issuance = (
credits[credits['transaction_type'] == 'issuance']
.groupby('project_id')['transaction_date']
.min()
.reset_index()
)
first_retirement = (
credits[credits['transaction_type'].str.contains('retirement')]
.groupby('project_id')['transaction_date']
.min()
.reset_index()
)
# Merge the projects DataFrame with the first issuance and retirement dates
projects_with_dates = pd.merge(projects, first_issuance, on='project_id', how='left')
projects_with_dates = pd.merge(
projects_with_dates, first_retirement, on='project_id', how='left'
)
# Rename the merged columns for clarity
projects_with_dates = projects_with_dates.rename(
columns={
'transaction_date_x': 'first_issuance_at',
'transaction_date_y': 'first_retirement_at',
}
)
return projects_with_dates
[docs]
@pf.register_dataframe_method
def add_retired_and_issued_totals(projects: pd.DataFrame, *, credits: pd.DataFrame) -> pd.DataFrame:
"""
Add total quantities of issued and retired credits to each project.
Parameters
----------
projects : pd.DataFrame
DataFrame containing project data.
credits : pd.DataFrame
DataFrame containing credit transaction data.
Returns
-------
pd.DataFrame
DataFrame with two new columns: 'issued' and 'retired', representing the total quantities of issued and retired credits.
"""
# Drop conflicting columns if they exist
projects = projects.drop(columns=['issued', 'retired'], errors='ignore')
# # filter out the projects that are not in the credits data
# credits = credits[credits['project_id'].isin(projects['project_id'].unique())]
# groupd and sum
credit_totals = (
credits.groupby(['project_id', 'transaction_type'])['quantity'].sum().reset_index()
)
# pivot the table
credit_totals_pivot = credit_totals.pivot(
index='project_id', columns='transaction_type', values='quantity'
).reset_index()
# merge with projects
projects_combined = pd.merge(
projects,
credit_totals_pivot[['project_id', 'issuance', 'retirement']],
left_on='project_id',
right_on='project_id',
how='left',
)
# rename columns for clarity
projects_combined = projects_combined.rename(
columns={'issuance': 'issued', 'retirement': 'retired'}
)
# replace Nans with 0 if any
projects_combined[['issued', 'retired']] = projects_combined[['issued', 'retired']].fillna(0)
return projects_combined