import datetime
import pathlib
import subprocess
import tempfile
import uuid
import janitor # noqa: F401
import numpy as np
import pandas as pd
import pandas_flavor as pf
import upath
BENEFICIARY_MAPPING_UPATH = (
upath.UPath(__file__).parents[0] / 'configs' / 'beneficiary-mappings.json'
)
[docs]
@pf.register_dataframe_method
def aggregate_issuance_transactions(df: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate issuance transactions by summing the quantity for each combination of project ID, transaction date, and vintage.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing issuance transaction data.
Returns
-------
pd.DataFrame
DataFrame with aggregated issuance transactions, filtered to include only those with a positive quantity.
"""
# Check if 'transaction_type' exists in DataFrame columns
if 'transaction_type' not in df.columns:
raise KeyError("The column 'transaction_type' is missing.")
# Initialize df_issuance_agg to an empty DataFrame
df_issuance_agg = pd.DataFrame()
df_issuance = df[df['transaction_type'] == 'issuance']
if not df_issuance.empty:
df_issuance_agg = (
df_issuance.groupby(['project_id', 'transaction_date', 'vintage'])
.agg(
{
'quantity': 'sum',
'registry': 'first',
'transaction_type': 'first',
}
)
.reset_index()
)
df_issuance_agg = df_issuance_agg[df_issuance_agg['quantity'] > 0]
return df_issuance_agg
[docs]
@pf.register_dataframe_method
def filter_and_merge_transactions(
df: pd.DataFrame, arb_data: pd.DataFrame, project_id_column: str = 'project_id'
) -> pd.DataFrame:
"""
Filter transactions based on project ID intersection with ARB data and merge the filtered transactions.
Parameters
----------
df : pd.DataFrame
Input DataFrame with transaction data.
arb_data : pd.DataFrame
DataFrame containing ARB issuance data.
project_id_column : str, optional
The name of the column containing project IDs (default is 'project_id').
Returns
-------
pd.DataFrame
DataFrame with transactions from the input DataFrame, excluding those present in ARB data, merged with relevant ARB transactions.
"""
if intersection_values := list(
set(df[project_id_column]).intersection(set(arb_data[project_id_column]))
):
df = df[~df[project_id_column].isin(intersection_values)]
df = pd.concat(
[df, arb_data[arb_data[project_id_column].isin(intersection_values)]], ignore_index=True
)
return df
[docs]
@pf.register_dataframe_method
def handle_non_issuance_transactions(df: pd.DataFrame) -> pd.DataFrame:
"""
Filter the DataFrame to include only non-issuance transactions.
Parameters
----------
df : pd.DataFrame
Input DataFrame containing transaction data.
Returns
-------
pd.DataFrame
DataFrame containing only transactions where 'transaction_type' is not 'issuance'.
"""
df_non_issuance = df[df['transaction_type'] != 'issuance']
return df_non_issuance
[docs]
@pf.register_dataframe_method
def merge_with_arb(credits: pd.DataFrame, *, arb: pd.DataFrame) -> pd.DataFrame:
"""
ARB issuance table contains the authorative version of all credit transactions for ARB projects.
This function drops all registry crediting data and, isntead, patches in data from the ARB issuance table.
Parameters
----------
credits: pd.DataFrame
Pandas dataframe containing registry credit data
arb: pd.DataFrame
Pandas dataframe containing ARB issuance data
Returns
-------
pd.DataFrame
Pandas dataframe containing merged credit and ARB data
"""
df = credits
project_id_column = 'project_id'
if intersection_values := list(
set(df[project_id_column]).intersection(set(arb[project_id_column]))
):
df = df[~df[project_id_column].isin(intersection_values)]
df = pd.concat([df, arb], ignore_index=True)
return df
[docs]
def harmonize_beneficiary_data(
credits: pd.DataFrame, registry_name: str, download_type: str
) -> pd.DataFrame:
"""
Harmonize the beneficiary information via OpenRefine.
Parameters
----------
credits : pd.DataFrame
Input DataFrame containing credit data.
"""
tempdir = tempfile.gettempdir()
temp_path = pathlib.Path(tempdir) / f'{registry_name}-{download_type}-credits.csv'
if len(credits) == 0:
print(
f'Empty dataframe with shape={credits.shape} - columns:{credits.columns.tolist()}. No credits to harmonize'
)
data = credits.copy()
data['retirement_beneficiary_harmonized'] = pd.Series(dtype='str')
return data
credits.to_csv(temp_path, index=False)
project_name = f'{registry_name}-{download_type}-beneficiary-harmonization-{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}-{uuid.uuid4()}'
output_path = pathlib.Path(tempdir) / f'{project_name}.csv'
try:
return _extract_harmonized_beneficiary_data_via_openrefine(
temp_path, project_name, str(BENEFICIARY_MAPPING_UPATH), str(output_path)
)
except subprocess.CalledProcessError as e:
raise ValueError(
f'Commad failed with return code: {e.returncode}\nOutput: {e.output}\nError output: {e.stderr}'
) from e
def _extract_harmonized_beneficiary_data_via_openrefine(
temp_path, project_name, beneficiary_mapping_path, output_path
):
result = subprocess.run(
[
'offsets-db-data-orcli',
'run',
'--',
'import',
'csv',
str(temp_path),
'--projectName',
f'{project_name}',
],
capture_output=True,
text=True,
check=True,
)
result = subprocess.run(
['offsets-db-data-orcli', 'run', '--', 'info', project_name],
capture_output=True,
text=True,
check=True,
)
result = subprocess.run(
[
'offsets-db-data-orcli',
'run',
'--',
'transform',
project_name,
beneficiary_mapping_path,
],
capture_output=True,
text=True,
check=True,
)
result = subprocess.run(
[
'offsets-db-data-orcli',
'run',
'--',
'export',
'csv',
project_name,
'--output',
output_path,
],
capture_output=True,
text=True,
check=True,
)
result = subprocess.run(
['offsets-db-data-orcli', 'run', '--', 'delete', project_name],
capture_output=True,
text=True,
check=True,
)
print(result.stdout)
data = pd.read_csv(output_path)
data['merged_beneficiary'] = data['merged_beneficiary'].fillna('').astype(str)
data['retirement_beneficiary_harmonized'] = np.where(
data['merged_beneficiary'].notnull() & (~data['merged_beneficiary'].str.contains(';%')),
data['merged_beneficiary'],
np.nan,
)
return data