import contextlib
import json
from decimal import Decimal
import country_converter as coco
import janitor # noqa: F401
import numpy as np
import pandas as pd
import pandas_flavor as pf
from offsets_db_data.common import convert_to_datetime, validate
from offsets_db_data.models import project_schema
_REGISTRY_PROJECT_URLS = {
'verra': 'https://registry.verra.org/app/projectDetail/VCS/',
'gold-standard': 'https://registry.goldstandard.org/projects?q=gs',
'american-carbon-registry': 'https://acr2.apx.com/mymodule/reg/prjView.asp?id1=',
'climate-action-reserve': 'https://thereserve2.apx.com/mymodule/reg/prjView.asp?id1=',
'art-trees': 'https://art.apx.com/mymodule/reg/prjView.asp?id1=',
'cercarbono': 'https://www.ecoregistry.io/projects/CDC-',
}
[docs]
@pf.register_dataframe_method
def harmonize_country_names(df: pd.DataFrame, *, country_column: str = 'country') -> pd.DataFrame:
"""
Harmonize country names in the DataFrame to standardized country names.
Parameters
----------
df : pd.DataFrame
Input DataFrame with country data.
country_column : str, optional
The name of the column containing country names to be harmonized (default is 'country').
Returns
-------
pd.DataFrame
DataFrame with harmonized country names in the specified column.
"""
print('Harmonizing country names...')
cc = coco.CountryConverter()
df[country_column] = cc.pandas_convert(df[country_column], to='name')
print('Done converting country names...')
return df
[docs]
@pf.register_dataframe_method
def add_category(
df: pd.DataFrame, *, type_category_mapping: dict, protocol_mapping: dict | None = None
) -> pd.DataFrame:
"""
Add a category to each record in the DataFrame based on its protocol.
Category is derived directly from the protocol via protocol_mapping when available,
falling back to type_category_mapping via project_type. This keeps category and
project_type independent of each other.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
type_category_mapping : dict
Dictionary mapping project_type strings to categories (fallback).
protocol_mapping : dict, optional
The full protocol mapping (from all-protocol-mapping.json). When provided,
category is read directly from the matched protocol's 'category' field,
which decouples it from project_type.
Returns
-------
pd.DataFrame
DataFrame with a new 'category' column, derived from the protocol information.
"""
print('Adding category based on protocol...')
if protocol_mapping is not None:
def _category_from_protocol(protocol_list: list | None) -> str:
if not protocol_list:
return 'unknown'
for p in protocol_list:
cat = protocol_mapping.get(p, {}).get('category')
if cat and cat != 'unknown':
return cat
return 'unknown'
df['category'] = df['protocol'].apply(_category_from_protocol)
else:
# Legacy fallback: derive category from project_type via type_category_mapping
df['category'] = (
df['project_type']
.str.lower()
.map({key.lower(): value['category'] for key, value in type_category_mapping.items()})
.fillna('unknown')
)
return df
[docs]
@pf.register_dataframe_method
def override_project_types(df: pd.DataFrame, *, override_data_path: str, source_str: str):
"""
Override project types to the DataFrame based on project characteristics
We treat Berkeley data as source of truth for most project types
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
override_data_path: str
Path to where json of override data lives
source: str
Value to write to `type_source` when applying override values
Returns
-------
pd.DataFrame
DataFrame with a 'project_type' column overridden by all values in override_data.
"""
override_d = json.load(open(override_data_path))
df['project_type'] = df['project_id'].map(override_d).fillna(df['project_type'])
df.loc[df['project_id'].isin(list(override_d.keys())), 'project_type_source'] = source_str
return df
[docs]
@pf.register_dataframe_method
def infer_project_type(df: pd.DataFrame) -> pd.DataFrame:
"""
Add project types to the DataFrame based on project characteristics
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
Returns
-------
pd.DataFrame
DataFrame with a new 'project_type' column, indicating the project's type. Defaults to None
"""
def _has_protocol(pid: str):
return lambda x: x['protocol'] is not None and pid in x['protocol']
df.loc[:, 'project_type'] = 'unknown'
df.loc[:, 'project_type_source'] = 'carbonplan'
df.loc[df.apply(_has_protocol('art-trees'), axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(_has_protocol('acr-ifm-nonfed'), axis=1), 'project_type'] = (
'improved forest management'
)
df.loc[df.apply(_has_protocol('acr-refridge'), axis=1), 'project_type'] = (
'advanced refrigerants'
)
df.loc[df.apply(_has_protocol('acr-abandoned-wells'), axis=1), 'project_type'] = (
'plugging oil & gas wells'
)
df.loc[df.apply(_has_protocol('arb-mine-methane'), axis=1), 'project_type'] = (
'mine methane capture'
)
df.loc[df.apply(_has_protocol('vm0048'), axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(_has_protocol('vm0047'), axis=1), 'project_type'] = (
'afforestation/reforestation'
)
df.loc[df.apply(_has_protocol('vm0045'), axis=1), 'project_type'] = 'improved forest management'
df.loc[df.apply(_has_protocol('vm0042'), axis=1), 'project_type'] = 'sustainable agriculture'
df.loc[df.apply(_has_protocol('vm0007'), axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(_has_protocol('acm0001'), axis=1), 'project_type'] = 'landfill methane'
df.loc[df.apply(_has_protocol('acm0002'), axis=1), 'project_type'] = 're bundled'
df.loc[df.apply(_has_protocol('iso-refor'), axis=1), 'project_type'] = (
'afforestation/reforestation'
)
df.loc[df.apply(_has_protocol('iso-biochar'), axis=1), 'project_type'] = 'biochar'
df.loc[df.apply(_has_protocol('iso-bio-burial'), axis=1), 'project_type'] = 'biomass burial'
df.loc[df.apply(_has_protocol('iso-bio-geo'), axis=1), 'project_type'] = 'biomass injection'
df.loc[df.apply(_has_protocol('iso-bio-oil'), axis=1), 'project_type'] = 'biomass injection'
df.loc[df.apply(_has_protocol('iso-dac'), axis=1), 'project_type'] = 'direct air capture'
df.loc[df.apply(_has_protocol('iso-erw'), axis=1), 'project_type'] = 'enhanced rock weathering'
df.loc[df.apply(_has_protocol('ccb-refor'), axis=1), 'project_type'] = (
'afforestation/reforestation'
)
df.loc[df.apply(_has_protocol('ccb-redd'), axis=1), 'project_type'] = 'redd+'
df.loc[df.apply(_has_protocol('car-forest-mx'), axis=1), 'project_type'] = (
'improved forest management'
)
df.loc[df.apply(_has_protocol('gs-reforest'), axis=1), 'project_type'] = (
'afforestation/reforestation'
)
df.loc[df.apply(_has_protocol('gs-drinking-water'), axis=1), 'project_type'] = 'clean water'
return df
[docs]
@pf.register_dataframe_method
def map_project_type_to_display_name(
df: pd.DataFrame, *, type_category_mapping: dict
) -> pd.DataFrame:
"""
Map project types in the DataFrame to display names based on a mapping dictionary.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing project data.
type_category_mapping : dict
Dictionary mapping project type strings to display names.
Returns
-------
pd.DataFrame
DataFrame with a new 'project_type' column, containing mapped display names.
"""
print('Mapping project types to display names...')
df['project_type'] = (
df['project_type']
.map(
{
key.lower(): value['project-type-display-name']
for key, value in type_category_mapping.items()
}
)
.fillna('Unknown')
)
return df
[docs]
@pf.register_dataframe_method
def add_is_compliance_flag(df: pd.DataFrame) -> pd.DataFrame:
"""
Add a compliance flag to the DataFrame based on the protocol.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
Returns
-------
pd.DataFrame
DataFrame with a new 'is_compliance' column, indicating if the protocol starts with 'arb-'.
"""
print('Adding is_compliance flag...')
df['is_compliance'] = df.apply(
lambda row: (
row['protocol'] is not None
and np.any([protocol_str.startswith('arb-') for protocol_str in row['protocol']])
),
axis=1,
)
return df
[docs]
@pf.register_dataframe_method
def map_protocol(
df: pd.DataFrame,
*,
inverted_protocol_mapping: dict,
original_protocol_column: str = 'original_protocol',
) -> pd.DataFrame:
"""
Map protocols in the DataFrame to standardized names based on an inverted protocol mapping.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing protocol data.
inverted_protocol_mapping : dict
Dictionary mapping protocol strings to standardized protocol names.
original_protocol_column : str, optional
Name of the column containing original protocol information (default is 'original_protocol').
Returns
-------
pd.DataFrame
DataFrame with a new 'protocol' column, containing mapped protocol names.
"""
print('Mapping protocol based on known string...')
try:
results = df[original_protocol_column].apply(
lambda item: find_protocol(
search_string=item, inverted_protocol_mapping=inverted_protocol_mapping
)
)
df['protocol'] = [r[0] for r in results]
df['protocol_unassigned'] = [r[1] for r in results]
except KeyError:
df['protocol'] = [None] * len(df)
df['protocol_unassigned'] = [None] * len(df)
return df
[docs]
@pf.register_dataframe_method
def harmonize_status_codes(df: pd.DataFrame, *, status_column: str = 'status') -> pd.DataFrame:
"""Harmonize project status codes across registries
Excludes ACR, as it requires special treatment across two columns
Parameters
----------
df : pd.DataFrame
Input DataFrame with project status data.
status_column : str, optional
Name of the column containing status codes to harmonize (default is 'status').
Returns
-------
pd.DataFrame
DataFrame with harmonized project status codes.
"""
print('Harmonizing status codes')
with contextlib.suppress(KeyError):
CAR_STATES = {
'Registered': 'registered',
'Completed': 'completed',
'Listed': 'listed',
'Transitioned': 'unknown',
}
VERRA_STATES = {
'Under validation': 'listed',
'Under development': 'listed',
'Registration requested': 'listed',
'Registration and verification approval requested': 'listed',
'Withdrawn': 'completed',
'On Hold': 'registered',
'Units Transferred from Approved GHG Program': 'unknown',
'Rejected by Administrator': 'completed',
'Crediting Period Renewal Requested': 'registered',
'Inactive': 'completed',
'Crediting Period Renewal and Verification Approval Requested': 'registered',
}
GS_STATES = {
'GOLD_STANDARD_CERTIFIED_PROJECT': 'registered',
'LISTED': 'listed',
'GOLD_STANDARD_CERTIFIED_DESIGN': 'registered',
}
state_dict = CAR_STATES | VERRA_STATES | GS_STATES
df[status_column] = df[status_column].apply(lambda x: state_dict.get(x, 'unknown'))
return df
[docs]
def find_protocol(
*, search_string: str, inverted_protocol_mapping: dict[str, list[str]]
) -> tuple[list[str] | None, list[str] | None]:
"""Match known strings of project methodologies to internal topology.
Returns a ``(mapped, unmatched)`` tuple:
* ``mapped`` — list of normalised protocol IDs when the string is recognised, else ``None``.
* ``unmatched`` — list containing the raw string when it is present but unrecognised, else ``None``.
NaN, empty, and whitespace-only strings yield ``(None, None)``.
"""
if pd.isna(search_string) or not str(search_string).strip():
return None, None
stripped = search_string.strip()
if known_match := inverted_protocol_mapping.get(stripped):
if known_match == ['unknown']:
return None, [search_string]
return known_match, None
print(f"'{search_string}' is unmapped in full protocol mapping")
return None, [search_string]
[docs]
def get_protocol_category(*, protocol_strs: list[str] | str, protocol_mapping: dict) -> list[str]:
"""
Get category based on protocol string
Parameters
----------
protocol_strs : str or list
single protocol string or list of protocol strings
protocol_mapping: dict
metadata about normalized protocol strings
Returns
-------
categories : list[str]
list of category strings
"""
def _get_category(protocol_str, protocol_mapping):
try:
return protocol_mapping.get(protocol_str).get('category', 'unknown')
except AttributeError:
return 'unknown'
if isinstance(protocol_strs, str):
protocol_strs = [protocol_strs]
categories = [_get_category(protocol_str, protocol_mapping) for protocol_str in protocol_strs]
return list(
set(categories)
) # if multiple protocols have same category, just return category once
[docs]
@pf.register_dataframe_method
def add_first_issuance_and_retirement_dates(
projects: pd.DataFrame, *, credits: pd.DataFrame
) -> pd.DataFrame:
"""
Add the first issuance date of carbon credits to each project in the projects DataFrame.
Parameters
----------
credits : pd.DataFrame
A pandas DataFrame containing credit issuance data with columns 'project_id', 'transaction_date', and 'transaction_type'.
projects : pd.DataFrame
A pandas DataFrame containing project data with a 'project_id' column.
Returns
-------
projects : pd.DataFrame
A pandas DataFrame which is the original projects DataFrame with two additional columns 'first_issuance_at' representing
the first issuance date of each project and 'first_retirement_at' representing the first retirement date of each project.
"""
first_issuance = (
credits[credits['transaction_type'] == 'issuance']
.groupby('project_id')['transaction_date']
.min()
.reset_index()
)
first_retirement = (
credits[credits['transaction_type'].str.contains('retirement')]
.groupby('project_id')['transaction_date']
.min()
.reset_index()
)
# Merge the projects DataFrame with the first issuance and retirement dates
projects_with_dates = pd.merge(projects, first_issuance, on='project_id', how='left')
projects_with_dates = pd.merge(
projects_with_dates, first_retirement, on='project_id', how='left'
)
# Rename the merged columns for clarity
projects_with_dates = projects_with_dates.rename(
columns={
'transaction_date_x': 'first_issuance_at',
'transaction_date_y': 'first_retirement_at',
}
)
return projects_with_dates
[docs]
@pf.register_dataframe_method
def add_retired_and_issued_totals(projects: pd.DataFrame, *, credits: pd.DataFrame) -> pd.DataFrame:
"""
Add total quantities of issued and retired credits to each project.
Parameters
----------
projects : pd.DataFrame
DataFrame containing project data.
credits : pd.DataFrame
DataFrame containing credit transaction data.
Returns
-------
pd.DataFrame
DataFrame with two new columns: 'issued' and 'retired', representing the total quantities of issued and retired credits.
"""
# Drop conflicting columns if they exist
projects = projects.drop(columns=['issued', 'retired'], errors='ignore')
# # filter out the projects that are not in the credits data
# credits = credits[credits['project_id'].isin(projects['project_id'].unique())]
# infer source precision and round after sum to avoid float64 representation errors
# (e.g. 153.89 + 235.53 = 389.41999... in binary float)
source_precision = int(
credits['quantity']
.dropna()
.apply(lambda x: max(0, -Decimal(repr(x)).as_tuple().exponent))
.max()
)
credit_totals = (
credits.groupby(['project_id', 'transaction_type'])['quantity']
.sum()
.round(source_precision)
.reset_index()
)
# pivot the table
credit_totals_pivot = credit_totals.pivot(
index='project_id', columns='transaction_type', values='quantity'
).reset_index()
# merge with projects
projects_combined = pd.merge(
projects,
credit_totals_pivot[['project_id', 'issuance', 'retirement']],
left_on='project_id',
right_on='project_id',
how='left',
)
# rename columns for clarity
projects_combined = projects_combined.rename(
columns={'issuance': 'issued', 'retirement': 'retired'}
)
# replace Nans with 0 if any
projects_combined[['issued', 'retired']] = projects_combined[['issued', 'retired']].fillna(0)
return projects_combined
[docs]
def add_placeholder_projects(
*,
credits: pd.DataFrame,
projects: pd.DataFrame,
) -> pd.DataFrame:
"""
Append placeholder project rows for any project IDs found in credits but absent from projects.
Computes issued/retired totals and first issuance/retirement dates from credit data so that
placeholder rows arrive in the database with correct summary stats rather than zeroed-out
defaults. Call this on the combined (all-registry) DataFrames before writing to parquet.
Parameters
----------
credits : pd.DataFrame
Combined credits DataFrame (all registries).
projects : pd.DataFrame
Combined projects DataFrame (all registries).
Returns
-------
pd.DataFrame
Projects DataFrame with placeholder rows appended for orphan project IDs.
"""
from offsets_db_data.registry import get_registry_from_project_id
orphan_ids = set(credits['project_id'].unique()) - set(projects['project_id'].unique())
if not orphan_ids:
return projects
print(f'Found {len(orphan_ids)} project IDs in credits with no project record: {orphan_ids}')
orphan_credits = credits[credits['project_id'].isin(orphan_ids)]
issued = (
orphan_credits[orphan_credits['transaction_type'] == 'issuance']
.groupby('project_id')['quantity']
.sum()
.rename('issued')
)
retired = (
orphan_credits[orphan_credits['transaction_type'].str.contains('retirement', na=False)]
.groupby('project_id')['quantity']
.sum()
.rename('retired')
)
first_issuance_at = (
orphan_credits[orphan_credits['transaction_type'] == 'issuance']
.groupby('project_id')['transaction_date']
.min()
.rename('first_issuance_at')
.dt.as_unit('ns')
)
first_retirement_at = (
orphan_credits[orphan_credits['transaction_type'].str.contains('retirement', na=False)]
.groupby('project_id')['transaction_date']
.min()
.rename('first_retirement_at')
.dt.as_unit('ns')
)
stats = pd.concat(
[issued, retired, first_issuance_at, first_retirement_at], axis=1
).reset_index()
stats[['issued', 'retired']] = stats[['issued', 'retired']].fillna(0)
rows = []
for _, row in stats.iterrows():
project_id = row['project_id']
try:
registry = get_registry_from_project_id(project_id)
except KeyError:
registry = 'unknown'
base_url = _REGISTRY_PROJECT_URLS.get(registry)
project_url = f'{base_url}{project_id[3:]}' if base_url else None
rows.append(
{
'project_id': project_id,
'registry': registry,
'project_type': 'Unknown',
'project_type_source': 'carbonplan',
'category': 'unknown',
'protocol': None,
'protocol_unassigned': None,
'issued': row.get('issued', 0.0),
'retired': row.get('retired', 0.0),
'first_issuance_at': row.get('first_issuance_at'),
'first_retirement_at': row.get('first_retirement_at'),
'is_compliance': False,
'project_url': project_url,
'name': None,
'proponent': None,
'status': None,
'country': None,
'listed_at': None,
}
)
data = pd.concat([projects, pd.DataFrame(rows)], ignore_index=True)
data = convert_to_datetime(
data, columns=['first_issuance_at', 'first_retirement_at', 'listed_at']
)
return validate(data, schema=project_schema)