#!/usr/bin/env python3
import os
import argparse
import json
import re
from itertools import combinations
from datetime import datetime
import pandas as pd

# timezone support
try:
    from zoneinfo import ZoneInfo
except ImportError:
    from backports.zoneinfo import ZoneInfo  # pip install backports.zoneinfo

import psycopg2
from dotenv import load_dotenv

# === CONFIG ===
DEFAULT_CATEGORY_FILE = "categories.json"
DEFAULT_CATEGORY_KEYWORDS = {
    "RollaStep": ["rollastep"],
    "G4 Offset Safety Cage": ["g4 offset safety cage", "g4", "offset safety cage"],
    "YellowGate": ["yellowgate"],
    "AeroStep": ["aerostep"],
    "UltraTech": ["ultratech"],
    "ErectaStep": ["erectastep"],
    "Gangway": ["gangway"],
    "SafeRack/Other": ["safelok", "saferack"],
}


# === Category file management ===
def load_category_keywords(filepath: str) -> dict:
    if os.path.exists(filepath):
        try:
            with open(filepath, "r", encoding="utf-8") as f:
                data = json.load(f)
            if not isinstance(data, dict):
                raise ValueError("Category file malformed, resetting to defaults.")
            return data
        except Exception:
            # fallback to default if parse error
            print(f"Warning: failed to parse {filepath}, reinitializing with defaults.")
    # initialize with default
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(DEFAULT_CATEGORY_KEYWORDS, f, indent=2, sort_keys=True)
    return dict(DEFAULT_CATEGORY_KEYWORDS)


def save_category_keywords(keywords: dict, filepath: str):
    try:
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(keywords, f, indent=2, sort_keys=True)
    except Exception as e:
        print(f"Warning: failed to save category keywords to {filepath}: {e}")


def derive_family_and_keywords(asset_name: str, existing: dict):
    """
    Infer a new family name and initial keyword(s) from an unmatched asset name.
    """
    # Normalize and strip generic suffixes/terms
    stop_terms = {
        "sheet",
        "manual",
        "specification",
        "brochure",
        "series",
        "product",
        "installation",
        "data",
        "reference",
        "dealer",
        "access",
        "profiles",
        "booklet",
        "mp",
        "all",
        "offset",
        "safety",
        "cage",
        "ultratech",
        "safeRack".lower(),
        "yellowgate",
    }
    tokens = [t for t in re.split(r"\W+", asset_name) if t]
    cleaned = [t for t in tokens if t.lower() not in stop_terms]
    if not cleaned:
        cleaned = tokens  # fallback to raw tokens
    # Family candidate is first two cleaned words, title-cased
    family_candidate = " ".join(cleaned[:2]).title()
    # If that matches existing family (case-insensitive), use the real key
    for fam in existing:
        if fam.lower() == family_candidate.lower():
            # return existing family key and keyword
            return fam, [cleaned[0].lower()]
    # Else create new family; keyword is the first cleaned token lowercased
    return family_candidate, [cleaned[0].lower()]


def categorize_family(name: str, category_keywords: dict, category_filepath: str):
    """
    Returns family. If no existing keyword matches, infers a new family/keyword and updates config.
    """
    if not isinstance(name, str):
        return "Unknown"
    n = name.lower()
    # Exact/substring matching
    for family, keywords in category_keywords.items():
        for kw in keywords:
            if kw.lower() in n:
                return family
    # No match -> infer new pattern
    new_family, new_keywords = derive_family_and_keywords(name, category_keywords)
    added = False
    # If family exists (case-insensitive), unify key
    existing_key = None
    for fam in category_keywords:
        if fam.lower() == new_family.lower():
            existing_key = fam
            break
    if existing_key:
        # extend its keyword list
        for kw in new_keywords:
            if kw not in category_keywords[existing_key]:
                category_keywords[existing_key].append(kw)
                added = True
        family_used = existing_key
    else:
        # add new family
        category_keywords[new_family] = new_keywords
        added = True
        family_used = new_family
    if added:
        print(f"[category update] Discovered new pattern: asset_name='{name}' mapped to family='{family_used}' with keyword(s)={new_keywords}. Writing to {category_filepath}")
        save_category_keywords(category_keywords, category_filepath)
    return family_used


# === DB loader ===
def load_env(env_path: str = ".env") -> dict:
    if os.path.exists(env_path):
        load_dotenv(env_path)
    else:
        print(f"Warning: .env file not found at {env_path}; attempting to read from environment.")
    conn_info = {
        "DB_HOST": os.getenv("DB_HOST", ""),
        "DB_PORT": os.getenv("DB_PORT", "5432"),
        "DB_NAME": os.getenv("DB_NAME", ""),
        "DB_USER": os.getenv("DB_USER", ""),
        "DB_PASSWORD": os.getenv("DB_PASSWORD", ""),
        "DB_SSLMODE": os.getenv("DB_SSLMODE", "prefer"),
    }
    required = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD"]
    missing = [k for k in required if not conn_info[k]]
    if missing:
        raise RuntimeError(f"Missing required DB connection variables in environment: {missing}")
    return conn_info


def load_from_postgres(table_name: str, conn_info: dict) -> pd.DataFrame:
    conn = None
    try:
        conn = psycopg2.connect(
            host=conn_info["DB_HOST"],
            port=conn_info.get("DB_PORT", 5432),
            dbname=conn_info["DB_NAME"],
            user=conn_info["DB_USER"],
            password=conn_info["DB_PASSWORD"],
            sslmode=conn_info.get("DB_SSLMODE", "prefer"),
        )
        query = f'SELECT * FROM "{table_name}"'
        df = pd.read_sql(query, conn)
        return df
    finally:
        if conn:
            conn.close()


# === Data normalization & enrichment ===
def normalize_timestamp(df: pd.DataFrame) -> pd.DataFrame:
    if "downloaded_at" not in df.columns:
        raise ValueError("Column 'downloaded_at' not found.")
    series = df["downloaded_at"]
    dt_utc = None
    try:
        if pd.api.types.is_integer_dtype(series) or pd.api.types.is_float_dtype(series) or series.astype(str).str.isnumeric().all():
            df["downloaded_at"] = series.astype(int)
            dt_utc = pd.to_datetime(df["downloaded_at"], unit="s", utc=True)
        else:
            dt_utc = pd.to_datetime(series, utc=True, errors="coerce")
    except Exception:
        dt_utc = pd.to_datetime(series, utc=True, errors="coerce")
    if dt_utc.isna().any():
        alternative = pd.to_datetime(series, errors="coerce")
        alternative = alternative.dt.tz_localize("UTC", ambiguous="infer", nonexistent="shift_forward")
        dt_utc = dt_utc.fillna(alternative)
    df["downloaded_dt_utc"] = dt_utc
    df["downloaded_dt_est"] = df["downloaded_dt_utc"].dt.tz_convert("America/New_York")
    return df


def compute_metrics(df: pd.DataFrame):
    # Sessionization
    session = df.groupby("asset_id").agg(
        downloads=("asset_name", "count"),
        unique_assets=("asset_name", "nunique"),
        unique_families=("product_family", "nunique"),
        start_time=("downloaded_dt_est", "min"),
        end_time=("downloaded_dt_est", "max"),
    )
    session["duration_minutes"] = (session["end_time"] - session["start_time"]).dt.total_seconds() / 60
    session = session.reset_index()

    # IP-level aggregation
    ip_group = df.groupby("ip_address").agg(
        total_downloads=("asset_name", "count"),
        unique_assets=("asset_name", "nunique"),
        unique_sessions=("asset_id", "nunique"),
        unique_families=("product_family", "nunique"),
    )

    # Popularity
    asset_pop = df["asset_name"].value_counts().rename_axis("asset_name").reset_index(name="download_count")
    asset_pop["pct_of_total"] = (asset_pop["download_count"] / len(df) * 100).round(2)
    family_pop = df["product_family"].value_counts().rename_axis("product_family").reset_index(name="download_count")
    family_pop["pct_of_total"] = (family_pop["download_count"] / len(df) * 100).round(2)

    # High-intent sessions
    high_intent_sessions = session[
        (session["unique_families"] >= 2) | (session["downloads"] >= 3)
    ].sort_values(["unique_families", "downloads"], ascending=False)

    # Co-download family-level
    co_download = {}
    for _, grp in df.groupby("asset_id"):
        families = sorted(set(grp["product_family"]))
        for a, b in combinations(families, 2):
            co_download.setdefault((a, b), 0)
            co_download[(a, b)] += 1
    co_df = (
        pd.DataFrame([{"pair": f"{a} & {b}", "count": c} for (a, b), c in co_download.items()])
        .sort_values("count", ascending=False)
        .reset_index(drop=True)
    )

    # Temporal
    df["hour_est"] = df["downloaded_dt_est"].dt.hour
    hourly = df["hour_est"].value_counts().sort_index().reset_index()
    hourly.columns = ["hour_est", "download_count"]

    # Depth
    session_depth = session["downloads"].value_counts().sort_index().reset_index()
    session_depth.columns = ["downloads_in_session", "num_sessions"]


    summary = {
        "total_raw_downloads": len(df),
        "unique_rows": len(df.drop_duplicates()),
        "unique_asset_ids": df["asset_id"].nunique(),
        "unique_ips": df["ip_address"].nunique(),
        "unique_assets": df["asset_name"].nunique(),
        "top_assets": asset_pop.head(5).to_dict(orient="records"),
        "top_families": family_pop.head(5).to_dict(orient="records"),
        "high_intent_session_count": len(high_intent_sessions),
        "peak_hours_est": hourly.sort_values("download_count", ascending=False).head(3).to_dict(orient="records"),
    }

    return {
        "session": session,
        "ip_group": ip_group,
        "asset_pop": asset_pop,
        "family_pop": family_pop,
        "high_intent_sessions": high_intent_sessions,
        "co_df": co_df,
        "hourly": hourly,
        "session_depth": session_depth,
        "summary": summary,
    }


def persist_outputs(results: dict, outdir: str, df: pd.DataFrame):
    os.makedirs(outdir, exist_ok=True)

    def df_to_serializable(dframe):
        tmp = dframe.copy()
        for col in tmp.select_dtypes(include=["datetimetz", "datetime"]):
            tmp[col] = tmp[col].astype(str)
        return tmp.to_dict(orient="records")

    out = {
        "summary": results["summary"],
        "session": df_to_serializable(results["session"]),
        "ip_group": df_to_serializable(results["ip_group"].reset_index()),
        "asset_pop": df_to_serializable(results["asset_pop"]),
        "family_pop": df_to_serializable(results["family_pop"]),
        "high_intent_sessions": df_to_serializable(results["high_intent_sessions"]),
        "co_df": df_to_serializable(results["co_df"]),
        "hourly": df_to_serializable(results["hourly"]),
        "session_depth": df_to_serializable(results["session_depth"]),
        "raw_downloads": df.assign(downloaded_dt_est=df["downloaded_dt_est"].astype(str))[
            ["asset_id", "ip_address", "user_agent", "source", "asset_name", "product_family", "downloaded_dt_est"]
        ].to_dict(orient="records"),
    }
    json_path = os.path.join(outdir, "processed_data.json")
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(out, f, indent=2)
    print(f"\nProcessed data written to: {os.path.abspath(json_path)}")


def print_summary(results: dict):
    s = results["summary"]
    print("\n=== EXECUTIVE SUMMARY ===")
    print(f"Total raw download events: {s['total_raw_downloads']}")
    print(f"Deduped unique records: {s['unique_rows']}")
    print(f"Distinct sessions (asset_id): {s['unique_asset_ids']}")
    print(f"Distinct IPs: {s['unique_ips']}")
    print(f"Distinct assets: {s['unique_assets']}")
    print(f"High-intent sessions: {s['high_intent_session_count']}")
    print("\nTop assets:")
    print(results["asset_pop"].head(5).to_string(index=False))
    print("\nTop product families:")
    print(results["family_pop"].head(5).to_string(index=False))
    print("\nPeak download hours (EST):")
    print(pd.DataFrame(s["peak_hours_est"]).to_string(index=False))
    print("\nTop IPs by activity:")
    print(results["ip_group"].sort_values("total_downloads", ascending=False).head(5).to_string())
    print("\nSample high-intent sessions:")
    if not results["high_intent_sessions"].empty:
        print(
            results["high_intent_sessions"]
            .head(5)[["asset_id", "downloads", "unique_families", "duration_minutes"]]
            .to_string(index=False)
        )
    else:
        print("None detected.")


def main():
    parser = argparse.ArgumentParser(description="Sales engagement analysis pulling from Postgres with dynamic family categorization. Outputs a consolidated JSON for frontend consumption.")
    parser.add_argument("--env", help="Path to .env file", default=".env")
    parser.add_argument("--table", help="Postgres table name", default="brandfolder_asset_downloads")
    parser.add_argument("--output-dir", "-o", help="Directory to dump CSVs", default="output")
    parser.add_argument("--categories", help="Path to categories JSON file", default=DEFAULT_CATEGORY_FILE)
    args = parser.parse_args()

    # Load DB env
    conn_info = load_env(args.env)

    # Load or initialize category keywords
    category_keywords = load_category_keywords(args.categories)

    # Load data
    try:
        df = load_from_postgres(args.table, conn_info)
    except Exception as e:
        print(f"ERROR: failed to load data from Postgres: {e}")
        return

    # Normalize time and basic enrichment
    df = normalize_timestamp(df)

    # (Browser/platform user-agent parsing removed)

    # Apply dynamic family categorization (updates categories.json if new patterns appear)
    df["product_family"] = df["asset_name"].apply(lambda n: categorize_family(n, category_keywords, args.categories))

    # Compute metrics
    results = compute_metrics(df)

    # Show summary
    print_summary(results)

    # Persist detailed outputs
    persist_outputs(results, args.output_dir, df)


if __name__ == "__main__":
    main()