Source code for utils.importers

"""Module containing functions for importing existing data to database."""

import re
from datetime import datetime, timedelta
from pathlib import Path

import pandas as pd
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.db.utils import IntegrityError
from django.http import Http404
from django.shortcuts import get_object_or_404

from core.models import (
    Contribution,
    Contributor,
    Cycle,
    Handle,
    Reward,
    RewardType,
    SocialPlatform,
)
from utils.constants.core import REWARDS_COLLECTION
from utils.helpers import get_env_variable, social_platform_prefixes

ADDRESSES_CSV_COLUMNS = ["handle", "address"]
CONTRIBUTION_CSV_COLUMNS = [
    "contributor",
    "cycle_start",
    "cycle_end",
    "platform",
    "url",
    "type",
    "level",
    "percentage",
    "reward",
    "comment",
]


def _append_gaps_to_cycles_dataframe(df):
    """Append gap periods to cycles dataframe and return sorted by start date.

    Identifies periods between existing cycles that are not covered and adds them
    as additional rows to the dataframe. The resulting dataframe contains both
    original cycles and the identified gaps, sorted by cycle_start date.

    :param df: DataFrame with cycle_start and cycle_end columns in 'yyyy-mm-dd' format
    :type df: pandas.DataFrame
    :return: DataFrame with original cycles and gap periods appended
    :rtype: pandas.DataFrame
    """
    # Create a working copy with datetime columns
    df_working = df.copy()
    df_working["cycle_start_dt"] = pd.to_datetime(df_working["cycle_start"])
    df_working["cycle_end_dt"] = pd.to_datetime(df_working["cycle_end"])

    # Sort by start date
    df_working.sort_values(by="cycle_start_dt", inplace=True)
    df_working.reset_index(drop=True, inplace=True)

    # Find the gaps
    gaps = []

    # Find gaps between periods
    for i in range(1, len(df_working)):
        prev_end = df_working.loc[i - 1, "cycle_end_dt"]
        curr_start = df_working.loc[i, "cycle_start_dt"]

        # If there's a gap between the end of previous and start of current
        if prev_end < curr_start - pd.Timedelta(days=1):
            gap_start = prev_end + pd.Timedelta(days=1)
            gap_end = curr_start - pd.Timedelta(days=1)
            gaps.append(
                {
                    "cycle_start": gap_start.strftime("%Y-%m-%d"),
                    "cycle_end": gap_end.strftime("%Y-%m-%d"),
                }
            )

    # Append gaps to original dataframe
    if gaps:
        gaps_df = pd.DataFrame(gaps)
        result_df = pd.concat([df, gaps_df], ignore_index=True)
    else:
        result_df = df.copy()

    # Convert back to datetime for sorting, then back to string
    result_df["cycle_start_dt"] = pd.to_datetime(result_df["cycle_start"])
    result_df.sort_values(by="cycle_start_dt", inplace=True)
    result_df.reset_index(drop=True, inplace=True)
    result_df = result_df[["cycle_start", "cycle_end"]]  # Keep only original columns

    return result_df


def _check_current_cycle(cycle_instance):
    """Check if current cycle has ended and create new cycle if needed.

    :param cycle_instance: The latest cycle instance to check
    :type cycle_instance: :class:`core.models.Cycle`
    """
    if datetime.now().date() > cycle_instance.end:
        start = cycle_instance.end + timedelta(days=1)
        end = start + timedelta(days=92)
        # Adjust to end of month
        end = datetime(end.year, end.month, 1) + timedelta(days=-1)
        Cycle.objects.create(start=start, end=end)


def _create_active_rewards():
    """Create or activate reward objects based on REWARDS_COLLECTION."""
    for index in range(len(REWARDS_COLLECTION)):
        reward = REWARDS_COLLECTION[index]
        reward_name = reward[0]
        for level, amount in enumerate(reward):
            if level == 0:
                continue

            label, name = (
                reward_name.split(" ", 1)[0].strip("[]"),
                reward_name.split(" ", 1)[1].strip(),
            )
            reward_type = get_object_or_404(RewardType, label=label, name=name)
            try:
                reward = Reward.objects.get(
                    type=reward_type, level=level, amount=amount
                )
                reward.active = True
                reward.save()

            except ObjectDoesNotExist:
                Reward.objects.create(
                    type=reward_type, level=level, amount=amount, active=True
                )


def _create_superusers():
    """Create initial superusers from environment variables."""
    superusers_str = get_env_variable("INITIAL_SUPERUSERS", "")
    passwords_str = get_env_variable("INITIAL_SUPERUSER_PASSWORDS", "")
    addresses_str = get_env_variable("INITIAL_SUPERUSER_ADDRESSES", "")

    superusers = [user for user in superusers_str.split(",") if user.strip()]
    passwords = [pwd for pwd in passwords_str.split(",") if pwd.strip()]
    addresses = [adr for adr in addresses_str.split(",") if len(addresses_str) > 50]

    assert len(superusers) == len(passwords)
    assert len(addresses) == 0 or len(addresses) == len(superusers)

    for index, superuser in enumerate(superusers):
        user = User.objects.create_superuser(superuser, password=passwords[index])
        if addresses and addresses[index]:
            address = addresses[index]
            contributor = Contributor.objects.filter(address=address).first()
            if not contributor:
                contributor = Contributor.objects.from_full_handle(
                    user.username, address=address
                )

            user.profile.contributor = contributor
            user.profile.save()


def _dataframe_from_csv(filename, columns=CONTRIBUTION_CSV_COLUMNS):
    """Create pandas DataFrame from CSV file.

    :param filename: Path to the CSV file
    :type filename: str
    :param columns: List of column names for the DataFrame
    :type columns: list
    :return: DataFrame with specified columns or None if file not found/empty
    :rtype: :class:`pandas.DataFrame` or None
    """
    try:
        data = pd.read_csv(filename, header=None, sep=",")
    except (pd.errors.EmptyDataError, FileNotFoundError):
        return None
    data.columns = columns
    return data


def _import_contributions(data, parse_callback, amount_callback):
    """Import contributions from DataFrame to database.

    :param data: DataFrame containing contribution data
    :type data: :class:`pandas.DataFrame`
    :param parse_callback: Function to parse reward type from string
    :type parse_callback: callable
    :param amount_callback: Function to calculate reward amount
    :type amount_callback: callable
    """
    for _, row in data.iterrows():
        contributor = Contributor.objects.from_full_handle(row["contributor"])
        cycle = Cycle.objects.get(start=row["cycle_start"])
        platform = SocialPlatform.objects.get(name__iexact=row["platform"])
        label, name = parse_callback(row["type"])
        reward_type = get_object_or_404(RewardType, label=label, name=name)
        reward = Reward.objects.get(
            type=reward_type,
            level=row["level"] if not pd.isna(row["level"]) else 1,
            amount=amount_callback(row["reward"]),
        )
        percentage = row["percentage"] if not pd.isna(row["percentage"]) else 1
        url = row["url"] if not pd.isna(row["url"]) else None
        comment = row["comment"] if not pd.isna(row["comment"]) else None
        Contribution.objects.create(
            contributor=contributor,
            cycle=cycle,
            platform=platform,
            reward=reward,
            percentage=percentage,
            url=url,
            comment=comment,
            confirmed=True,
        )


def _import_rewards(data, parse_callback, amount_callback):
    """Import rewards from DataFrame to database.

    :param data: DataFrame containing reward data
    :type data: :class:`pandas.DataFrame`
    :param parse_callback: Function to parse reward type from string
    :type parse_callback: callable
    :param amount_callback: Function to calculate reward amount
    :type amount_callback: callable
    """
    for typ, level, reward in data.values.tolist():
        label, name = parse_callback(typ)
        try:
            reward_type = get_object_or_404(RewardType, label=label, name=name)

        except Http404:
            reward_type = RewardType.objects.create(label=label, name=name)

        try:
            Reward.objects.create(
                type=reward_type,
                level=level if not pd.isna(level) else 1,
                amount=amount_callback(reward),
                active=False,
            )
        except IntegrityError:
            pass


def _parse_addresses():
    """Parse addresses from CSV file and group by address.

    :return: List of addresses with associated handles
    :rtype: list
    """
    addresses_filename = (
        Path(__file__).resolve().parent.parent / "fixtures" / "addresses.csv"
    )
    addresses = _dataframe_from_csv(addresses_filename, columns=ADDRESSES_CSV_COLUMNS)

    users_filename = (
        Path(__file__).resolve().parent.parent
        / "fixtures"
        / "users_without_addresses.csv"
    )
    users = _dataframe_from_csv(users_filename, columns=ADDRESSES_CSV_COLUMNS)

    # Handle cases where one or both files are missing/empty
    if addresses is None and users is None:
        return []
    elif addresses is None:
        data = users
    elif users is None:
        data = addresses
    else:
        data = pd.concat([addresses, users])

    data = data[["handle", "address"]].drop_duplicates()
    grouped = (
        data.groupby("address")["handle"]
        .apply(lambda x: x.tolist()[::-1])
        .reset_index()
    )
    return grouped.values.tolist()


def _parse_label_and_name_from_reward_type_legacy(typ):
    """Parse reward type label and name from legacy format.

    :param typ: Reward type string in legacy format
    :type typ: str
    :return: Tuple of (label, name)
    :rtype: tuple
    """
    label, name = _parse_label_and_name_from_reward_type(typ)
    if name == "Custom":
        # Handle None or empty strings
        if not typ:
            return "S", "Suggestion"

        typ_lower = typ.lower()
        if "feature request" in typ_lower:
            return "F", "Feature Request"

        if "bug report" in typ_lower:
            return "B", "Bug Report"

        if "ecosystem research" in typ_lower:
            return "ER", "Ecosystem Research"

        return "S", "Suggestion"

    return label, name


def _parse_label_and_name_from_reward_type(typ):
    """Parse reward type label and name from standard format.

    :param typ: Reward type string in format "[LABEL] Name"
    :type typ: str
    :return: Tuple of (label, name)
    :rtype: tuple
    """
    if not pd.isna(typ):
        pattern = r"\[([^\]]+)\]\s*(.+)"
        match = re.match(pattern, typ)
        if match:
            return match.group(1), match.group(2)

    return "CST", "Custom"


def _reward_amount(reward):
    """Calculate reward amount in base units.

    :param reward: Reward amount
    :type reward: float
    :return: Reward amount in base units
    :rtype: int
    """
    return round(reward * 1_000_000) if not pd.isna(reward) else 0


def _reward_amount_legacy(reward):
    """Calculate legacy reward amount in base units.

    :param reward: Reward amount
    :type reward: float
    :return: Reward amount in base units
    :rtype: int
    """
    return round(round(reward, 2) * 1_000_000) if not pd.isna(reward) else 0


[docs] def import_from_csv(contributions_path, legacy_contributions_path): """Import contributions from CSV files to database. :param contributions_path: Path to current contributions CSV file :type contributions_path: str :param legacy_contributions_path: Path to legacy contributions CSV file :type legacy_contributions_path: str :return: Error message string or False if successful :rtype: str or bool """ # # CHECK if len(SocialPlatform.objects.all()): return "ERROR: Database is not empty!" # # PLATFORMS SocialPlatform.objects.bulk_create( SocialPlatform(name=name, prefix=prefix) for name, prefix in social_platform_prefixes() ) print("Social platforms created: ", len(SocialPlatform.objects.all())) # # ADDRESSES addresses = _parse_addresses() Contributor.objects.bulk_create( Contributor(name=handles[0], address=address) for address, handles in addresses ) print("Contributors imported: ", len(Contributor.objects.all())) for address, handles in addresses: for full_handle in handles: handle = Handle.objects.from_address_and_full_handle(address, full_handle) handle.save() print("Handles imported: ", len(Handle.objects.all())) # # CONTRIBUTIONS data = _dataframe_from_csv(contributions_path) legacy_data = _dataframe_from_csv(legacy_contributions_path) # Handle case where CSV files are missing or empty if data is None: data = pd.DataFrame(columns=CONTRIBUTION_CSV_COLUMNS) if legacy_data is None: legacy_data = pd.DataFrame(columns=CONTRIBUTION_CSV_COLUMNS) cycles_data = data[["cycle_start", "cycle_end"]].drop_duplicates() legacy_cycles_data = legacy_data[["cycle_start", "cycle_end"]].drop_duplicates() all_cycles_data = _append_gaps_to_cycles_dataframe( pd.concat([cycles_data, legacy_cycles_data]).sort_values(by=["cycle_start"]) ) Cycle.objects.bulk_create( Cycle(start=start, end=end) for start, end in all_cycles_data.values.tolist() ) _check_current_cycle(Cycle.objects.latest("end")) print("Cycles imported: ", len(Cycle.objects.all())) _import_rewards( data[["type", "level", "reward"]], _parse_label_and_name_from_reward_type, _reward_amount, ) _import_rewards( legacy_data[["type", "level", "reward"]], _parse_label_and_name_from_reward_type_legacy, _reward_amount_legacy, ) _create_active_rewards() print("Rewards imported: ", len(Reward.objects.all())) _import_contributions( legacy_data, _parse_label_and_name_from_reward_type_legacy, _reward_amount_legacy, ) _import_contributions( data, _parse_label_and_name_from_reward_type, _reward_amount, ) print("Contributions imported: ", len(Contribution.objects.all())) _create_superusers() return False