#!/usr/bin/env python3
"""
Unified Amateur Radio License Loader  —  uls_cur
=================================================
Downloads the FCC ULS and Canadian ISED amateur license ZIP files and builds
(or refreshes) a single unified table  uls_cur  directly from the ZIP
contents.  No intermediate staging tables are created or required.

Sources:
  FCC ULS (USA)    — https://data.fcc.gov/download/pub/uls/complete/l_amat.zip
  ISED/IC (Canada) — http://apc-cap.ic.gc.ca/datafiles/amateur_delim.zip

uls_cur schema:
  country          CHAR(2)   'US' or 'CA'
  call_sign        TEXT
  operator_class   TEXT      FCC single-letter code (E/A/G/T/N/P) or
                             Canadian comma-separated qualification string
                             e.g. "Basic,Advanced,Morse-5wpm"
  first_name       TEXT
  last_name        TEXT
  po_box           TEXT
  street_address   TEXT
  city             TEXT
  state            TEXT      US state abbrev or Canadian province code
  zip_code         TEXT      US ZIP or Canadian postal code
  expired_date     DATE      US only; NULL for Canadian rows (ISED omits it)
  PRIMARY KEY (country, call_sign)

FCC operator_class (expanded name, highest class held):
  EXTRA            (E)
  ADVANCED         (A)
  GENERAL          (G)
  TECHNICIAN       (T)
  TECHNICIAN-PLUS  (P)
  NOVICE           (N)

Canadian operator_class (comma-separated list of all held qualifications):
  "Basic"               qual_a = 1
  "Basic-Honours"       qual_b = 1  (scored >= 80% on Basic exam)
  "Advanced"            qual_c = 1
  "Morse-5wpm"          qual_d = 1
  "Morse-12wpm"         qual_e = 1
  Example: "Basic,Advanced,Morse-5wpm"

Usage:
    python3 uls_loader.py [options]

Options:
    --source        fcc | canada | both  (default: both)
    --host          PostgreSQL host      (default: localhost)
    --port          PostgreSQL port      (default: 5432)
    --dbname        Database name        (default: ham_radio)
    --user          Database user        (default: postgres)
    --password      Database password    (default: env HAM_DB_PASSWORD)
    --schema        Target schema        (default: public)
    --skip-download Use existing ZIP files instead of downloading
    --fcc-zip       Local path for FCC ZIP    (default: ./l_amat.zip)
    --canada-zip    Local path for Canada ZIP (default: ./amateur_delim.zip)
    --batch-size    Rows per upsert batch     (default: 2000)

Requirements:
    pip install psycopg2-binary requests

2021-03-21 KB1B nedecn@kb1b.org
"""

import argparse
import io
import logging
import os
import sys
import time
import zipfile
from contextlib import contextmanager
from pathlib import Path

import psycopg2
import psycopg2.extras

try:
    import requests
    HAS_REQUESTS = True
except ImportError:
    HAS_REQUESTS = False

# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# URLs / defaults
# ---------------------------------------------------------------------------
FCC_URL         = "https://data.fcc.gov/download/pub/uls/complete/l_amat.zip"
CANADA_URL      = "http://apc-cap.ic.gc.ca/datafiles/amateur_delim.zip"
DEFAULT_FCC_ZIP = "./l_amat.zip"
DEFAULT_CA_ZIP  = "./amateur_delim.zip"

# Delimiter auto-detection candidates
_DELIMITERS = [";", "|", ",", "\t"]

# ---------------------------------------------------------------------------
# FCC column positions (0-based) within each pipe-delimited .dat file
# Only the fields we actually need are extracted; others are skipped.
# Source: FCC Public Access Database Definitions (May 2018 rev.)
# ---------------------------------------------------------------------------

# HD.dat  — License Header
# 0=record_type, 1=unique_system_identifier, 2=uls_file_number,
# 3=ebf_number,  4=call_sign,  5=license_status,  6=radio_service_code,
# 7=grant_date,  8=expired_date,  9=cancellation_date
HD_COL = {
    "unique_system_identifier": 1,
    "call_sign":                4,
    "license_status":           5,
    "radio_service_code":       6,
    "expired_date":             8,   # was wrongly 9
}

# EN.dat  — Entity
# 0=record_type, 1=unique_system_identifier, 2=uls_file_number,
# 3=ebf_number,  4=call_sign,  5=entity_type,  6=licensee_id,
# 7=entity_name, 8=first_name, 9=mi,           10=last_name,
# 11=suffix,     12=phone,     13=fax,          14=email,
# 15=street_address, 16=city,  17=state,        18=zip_code, 19=po_box
EN_COL = {
    "unique_system_identifier": 1,
    "first_name":               8,
    "last_name":               10,
    "street_address":          15,
    "city":                    16,
    "state":                   17,
    "zip_code":                18,
    "po_box":                  19,
}

# AM.dat  — Amateur
# 0=record_type, 1=unique_system_identifier, 2=uls_file_number,
# 3=ebf_number,  4=call_sign,  5=operator_class
AM_COL = {
    "unique_system_identifier": 1,
    "operator_class":           5,
}

# FCC operator class code → expanded name
FCC_CLASS = {
    "E": "EXTRA",
    "A": "ADVANCED",
    "G": "GENERAL",
    "T": "TECHNICIAN",
    "P": "TECHNICIAN-PLUS",
    "N": "NOVICE",
}

# ---------------------------------------------------------------------------
# DDL
# ---------------------------------------------------------------------------

ULS_CUR_DDL = """
CREATE TABLE IF NOT EXISTS {schema}.uls_cur (
    country        CHAR(2)  NOT NULL,
    call_sign      TEXT     NOT NULL,
    operator_class TEXT,
    first_name     TEXT,
    last_name      TEXT,
    po_box         TEXT,
    street_address TEXT,
    city           TEXT,
    state          TEXT,
    zip_code       TEXT,
    expired_date   DATE,
    PRIMARY KEY (country, call_sign)
);
"""

ULS_CUR_INDEXES = [
    "CREATE INDEX IF NOT EXISTS idx_uls_cur_operator_class"
    "  ON {schema}.uls_cur (operator_class);",
    "CREATE INDEX IF NOT EXISTS idx_uls_cur_last_name"
    "  ON {schema}.uls_cur (last_name);",
    "CREATE INDEX IF NOT EXISTS idx_uls_cur_state"
    "  ON {schema}.uls_cur (state);",
    "CREATE INDEX IF NOT EXISTS idx_uls_cur_country"
    "  ON {schema}.uls_cur (country);",
]

# Migration: handle tables created by older versions of this script
ULS_CUR_MIGRATE = [
    "ALTER TABLE {schema}.uls_cur ADD COLUMN IF NOT EXISTS country      CHAR(2);",
    "ALTER TABLE {schema}.uls_cur ADD COLUMN IF NOT EXISTS expired_date DATE;",
    "ALTER TABLE {schema}.uls_cur DROP COLUMN IF EXISTS unique_system_identifier;",
    "ALTER TABLE {schema}.uls_cur DROP COLUMN IF EXISTS last_updated;",
]

UPSERT_SQL = """
INSERT INTO {schema}.uls_cur
    (country, call_sign, operator_class, first_name, last_name,
     po_box, street_address, city, state, zip_code, expired_date)
VALUES
    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (country, call_sign) DO UPDATE SET
    operator_class = EXCLUDED.operator_class,
    first_name     = EXCLUDED.first_name,
    last_name      = EXCLUDED.last_name,
    po_box         = EXCLUDED.po_box,
    street_address = EXCLUDED.street_address,
    city           = EXCLUDED.city,
    state          = EXCLUDED.state,
    zip_code       = EXCLUDED.zip_code,
    expired_date   = EXCLUDED.expired_date;
"""

DELETE_STALE_SQL = """
DELETE FROM {schema}.uls_cur
WHERE country = %s AND call_sign NOT IN %s;
"""

# ---------------------------------------------------------------------------
# Schema setup
# ---------------------------------------------------------------------------

def ensure_table(conn, schema: str) -> None:
    """
    Create uls_cur and its indexes; migrate schema from older versions.

    Migration path covers two legacy layouts:
      v1: PRIMARY KEY (unique_system_identifier)  — single-source FCC table
      v2: no PK at all, or wrong PK               — partial migration state
      v3: PRIMARY KEY (country, call_sign)         — current unified layout
    """
    with conn.cursor() as cur:
        cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")
        cur.execute(ULS_CUR_DDL.format(schema=schema))

        # Column-level migrations (safe to run on any version)
        for stmt in ULS_CUR_MIGRATE:
            cur.execute(stmt.format(schema=schema))

        # Ensure country is NOT NULL and defaulted to 'US' for any legacy rows
        cur.execute("""
            UPDATE {schema}.uls_cur
            SET    country = 'US'
            WHERE  country IS NULL;
        """.format(schema=schema))
        cur.execute(
            "ALTER TABLE {schema}.uls_cur "
            "ALTER COLUMN country SET NOT NULL;".format(schema=schema)
        )

        # Fix the primary key if it doesn't match (country, call_sign)
        # Step 1: find the current PK constraint name (if any)
        cur.execute("""
            SELECT kcu.constraint_name,
                   array_agg(kcu.column_name ORDER BY kcu.ordinal_position)
                       AS pk_cols
            FROM   information_schema.table_constraints        tc
            JOIN   information_schema.key_column_usage         kcu
                   ON  kcu.constraint_name = tc.constraint_name
                   AND kcu.table_schema    = tc.table_schema
                   AND kcu.table_name      = tc.table_name
            WHERE  tc.constraint_type = 'PRIMARY KEY'
              AND  tc.table_schema    = %s
              AND  tc.table_name      = 'uls_cur'
            GROUP  BY kcu.constraint_name;
        """, (schema,))
        pk_row = cur.fetchone()

        if pk_row:
            pk_name, pk_cols = pk_row
            if pk_cols != ['country', 'call_sign']:
                log.info("  Replacing old PK %s %s → (country, call_sign)",
                         pk_name, pk_cols)
                cur.execute(
                    f"ALTER TABLE {schema}.uls_cur "
                    f"DROP CONSTRAINT {pk_name};"
                )
                cur.execute(
                    f"ALTER TABLE {schema}.uls_cur "
                    f"ADD PRIMARY KEY (country, call_sign);"
                )
        else:
            # No PK at all — add it
            log.info("  No primary key found; adding PRIMARY KEY (country, call_sign).")
            cur.execute(
                f"ALTER TABLE {schema}.uls_cur "
                f"ADD PRIMARY KEY (country, call_sign);"
            )

        for idx in ULS_CUR_INDEXES:
            cur.execute(idx.format(schema=schema))

    conn.commit()
    log.info("uls_cur table and indexes verified/created.")


# ---------------------------------------------------------------------------
# HTTP download (shared by FCC and Canadian loaders)
# ---------------------------------------------------------------------------

def download_file(url: str, local_path: str, label: str) -> None:
    log.info("Downloading %s from %s", label, url)
    if HAS_REQUESTS:
        _dl_requests(url, local_path, label)
    else:
        _dl_urllib(url, local_path, label)


def _dl_requests(url: str, local_path: str, label: str) -> None:
    import requests as req
    start = time.time()
    with req.get(url, stream=True, timeout=120) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length", 0)) or None
        done  = 0
        with open(local_path, "wb") as fh:
            for chunk in r.iter_content(chunk_size=65536):
                if chunk:
                    fh.write(chunk)
                    done += len(chunk)
                    _progress(label, done, total, start)
    print()
    log.info("%s: %.1f MB in %.1f s → %s",
             label, done / 1_048_576, time.time() - start, local_path)


def _dl_urllib(url: str, local_path: str, label: str) -> None:
    import urllib.request
    log.info("(requests not installed; using urllib)")
    start = time.time()
    done  = 0
    with urllib.request.urlopen(url, timeout=120) as resp:
        total_s = resp.getheader("Content-Length")
        total   = int(total_s) if total_s else None
        with open(local_path, "wb") as fh:
            while True:
                chunk = resp.read(65536)
                if not chunk:
                    break
                fh.write(chunk)
                done += len(chunk)
                _progress(label, done, total, start)
    print()
    log.info("%s: %.1f MB in %.1f s → %s",
             label, done / 1_048_576, time.time() - start, local_path)


def _progress(label: str, done: int, total: int | None, start: float) -> None:
    mb    = done / 1_048_576
    speed = done / max(time.time() - start, 0.001) / 1_048_576
    if total:
        pct = done / total * 100
        tmb = total / 1_048_576
        print(f"\r  {label:<4}  {mb:7.1f} / {tmb:.1f} MB"
              f"  ({pct:5.1f}%)  {speed:.2f} MB/s   ",
              end="", flush=True)
    else:
        print(f"\r  {label:<4}  {mb:7.1f} MB  {speed:.2f} MB/s   ",
              end="", flush=True)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _v(fields: list, idx: int) -> str | None:
    """Return stripped field value or None if empty / out of range."""
    try:
        v = fields[idx].strip()
        return v if v else None
    except IndexError:
        return None


def _up(value: str | None) -> str | None:
    """Return value converted to upper-case, or None."""
    return value.upper() if value else None


def _fv(fields: list, idx: int | None) -> str | None:
    """Return stripped field value by index, or None if missing/empty.
    Safe version of _v() used in the Canadian loader where idx may be None
    (column not present in this version of the file).
    """
    if idx is None:
        return None
    try:
        v = fields[idx].strip()
        return v if v else None
    except IndexError:
        return None


def _parse_fcc_date(raw: str | None):
    """Convert FCC MM/DD/YYYY text to a Python date, or return None."""
    if not raw:
        return None
    try:
        from datetime import date
        m, d, y = raw.strip().split("/")
        return date(int(y), int(m), int(d))
    except Exception:
        return None


def _detect_delimiter(line: str) -> str:
    """Pick the delimiter that produces the most fields from a header line."""
    best, best_n = ";", 1
    for d in _DELIMITERS:
        n = len(line.split(d))
        if n > best_n:
            best, best_n = d, n
    log.info("  Auto-detected delimiter: %r  (%d fields)", best, best_n)
    return best


def _open_zip_entry(zf: zipfile.ZipFile, name: str, encoding: str = "latin-1"):
    """Return a decoded text stream for a ZIP entry."""
    return io.TextIOWrapper(zf.open(name), encoding=encoding, errors="replace")


def _find_entry(zf: zipfile.ZipFile, filename: str) -> str | None:
    """Case-insensitive search for a filename inside a ZIP."""
    target = filename.upper()
    for name in zf.namelist():
        if Path(name).name.upper() == target:
            return name
    return None


# ---------------------------------------------------------------------------
# FCC loader  —  reads HD / EN / AM in one pass, joins in Python
# ---------------------------------------------------------------------------

def load_fcc(conn, schema: str, zip_path: str, batch_size: int) -> int:
    """
    Build uls_cur rows from the FCC ZIP without writing any staging tables.

    Strategy:
      Pass 1 — scan HD.dat: collect {usid: (call_sign, expired_date)} for
                active HA/HV licenses only.  This is the filter gate.
      Pass 2 — scan EN.dat: for each usid in the active set, store address
                fields.
      Pass 3 — scan AM.dat: for each usid in the active set, store
                operator_class.
      Upsert  — join the three dicts in Python, batch-upsert into uls_cur.
      Purge   — delete US rows whose call_sign is no longer in the active set.

    Memory: ~1.7 M active licenses × ~200 bytes each ≈ 340 MB peak.
    That is well within normal server RAM.  If memory is tight, reduce
    batch_size or use a staging table approach instead.
    """
    log.info("=== Loading FCC ULS → uls_cur (direct, no staging tables) ===")
    t0 = time.time()
    from datetime import date as _date_cls
    _today = _date_cls.today()
    log.info("  Filtering to licenses with expired_date >= %s", _today)

    with zipfile.ZipFile(zip_path, "r") as zf:

        # ── Pass 1: HD – active licenses ────────────────────────────────
        log.info("  Pass 1/3: scanning HD.dat for active licenses ...")
        hd_entry = _find_entry(zf, "HD.dat")
        if not hd_entry:
            log.error("HD.dat not found in %s", zip_path)
            return 0

        # active_hd: usid → (call_sign, expired_date_str)
        active_hd: dict[int, tuple] = {}
        with _open_zip_entry(zf, hd_entry) as fh:
            for line in fh:
                f = line.rstrip("\r\n").split("|")
                status  = _v(f, HD_COL["license_status"])
                service = _v(f, HD_COL["radio_service_code"])
                if status != "A" or service not in ("HA", "HV"):
                    continue
                # Filter out licenses whose expiry date is in the past.
                # The FCC 2-year grace period keeps status='A' even after
                # expiry, but those licensees may not legally transmit.
                exp_raw = _v(f, HD_COL["expired_date"])
                exp_date = _parse_fcc_date(exp_raw)
                if exp_date is not None and exp_date < _today:
                    continue
                usid_s = _v(f, HD_COL["unique_system_identifier"])
                if not usid_s:
                    continue
                try:
                    usid = int(usid_s)
                except ValueError:
                    continue
                active_hd[usid] = (
                    _v(f, HD_COL["call_sign"]),
                    exp_date,           # already parsed above
                )
        log.info("    %d active licenses found.", len(active_hd))

        # ── Pass 2: EN – address / name fields ──────────────────────────
        log.info("  Pass 2/3: scanning EN.dat for name and address ...")
        en_entry = _find_entry(zf, "EN.dat")
        if not en_entry:
            log.error("EN.dat not found in %s", zip_path)
            return 0

        # en_data: usid → (first_name, last_name, po_box,
        #                   street_address, city, state, zip_code)
        en_data: dict[int, tuple] = {}
        with _open_zip_entry(zf, en_entry) as fh:
            for line in fh:
                f = line.rstrip("\r\n").split("|")
                usid_s = _v(f, EN_COL["unique_system_identifier"])
                if not usid_s:
                    continue
                try:
                    usid = int(usid_s)
                except ValueError:
                    continue
                if usid not in active_hd:
                    continue
                en_data[usid] = (
                    _v(f, EN_COL["first_name"]),
                    _v(f, EN_COL["last_name"]),
                    _v(f, EN_COL["po_box"]),
                    _v(f, EN_COL["street_address"]),
                    _v(f, EN_COL["city"]),
                    _v(f, EN_COL["state"]),
                    _v(f, EN_COL["zip_code"]),
                )
        log.info("    %d entity records matched.", len(en_data))

        # ── Pass 3: AM – operator class ──────────────────────────────────
        log.info("  Pass 3/3: scanning AM.dat for operator class ...")
        am_entry = _find_entry(zf, "AM.dat")
        if not am_entry:
            log.error("AM.dat not found in %s", zip_path)
            return 0

        # am_data: usid → operator_class
        am_data: dict[int, str | None] = {}
        with _open_zip_entry(zf, am_entry) as fh:
            for line in fh:
                f = line.rstrip("\r\n").split("|")
                usid_s = _v(f, AM_COL["unique_system_identifier"])
                if not usid_s:
                    continue
                try:
                    usid = int(usid_s)
                except ValueError:
                    continue
                if usid not in active_hd:
                    continue
                raw_class = _v(f, AM_COL["operator_class"])
                am_data[usid] = FCC_CLASS.get(raw_class, raw_class)
        log.info("    %d amateur records matched.", len(am_data))

    # ── Upsert ──────────────────────────────────────────────────────────
    log.info("  Upserting into uls_cur ...")
    sql        = UPSERT_SQL.format(schema=schema)
    batch      = []
    total_rows = 0
    active_callsigns: list[str] = []

    with conn.cursor() as cur:
        for usid, (call_sign, expired_raw) in active_hd.items():
            if not call_sign:
                continue
            en  = en_data.get(usid, (None,) * 7)
            opc = am_data.get(usid)
            row = (
                "US",
                _up(call_sign),
                _up(opc),
                _up(en[0]),          # first_name
                _up(en[1]),          # last_name
                _up(en[2]),          # po_box
                _up(en[3]),          # street_address
                _up(en[4]),          # city
                _up(en[5]),          # state
                _up(en[6]),          # zip_code
                expired_raw,         # already a date object from Pass 1
            )
            batch.append(row)
            active_callsigns.append(call_sign)

            if len(batch) >= batch_size:
                psycopg2.extras.execute_batch(cur, sql, batch,
                                              page_size=batch_size)
                total_rows += len(batch)
                batch = []

        if batch:
            psycopg2.extras.execute_batch(cur, sql, batch,
                                          page_size=batch_size)
            total_rows += len(batch)

    conn.commit()
    log.info("    %d US rows upserted.", total_rows)

    # ── Purge stale US rows ──────────────────────────────────────────────
    log.info("  Purging stale US rows ...")
    deleted = _purge_stale(conn, schema, "US", active_callsigns)
    log.info("    %d stale US rows removed.", deleted)

    elapsed = time.time() - t0
    log.info("FCC load complete in %.1f s.", elapsed)
    return total_rows


# ---------------------------------------------------------------------------
# Canadian loader  —  reads single flat file directly into uls_cur
# ---------------------------------------------------------------------------

def _ca_qualified(v: str | None) -> bool:
    """Return True if an ISED qualification field is populated.
    ISED encodes a held qualification as the letter of the qual itself
    (qual_a="A", qual_b="B", qual_c="C", qual_d="D", qual_e="E").
    An absent qualification is an empty field.
    Older exports used "1" or "Y" — all non-empty values are accepted.
    """
    return bool(v and v.strip())


def load_canada(conn, schema: str, zip_path: str, batch_size: int) -> int:
    """
    Build uls_cur rows from the Canadian ISED ZIP without staging tables.
    The ZIP contains a single delimited file with a header row.
    """
    log.info("=== Loading Canadian ISED → uls_cur (direct, no staging tables) ===")
    t0 = time.time()

    with zipfile.ZipFile(zip_path, "r") as zf:
        entry = _find_entry(zf, "amateur_delim.txt")
        if not entry:
            log.error("amateur_delim.txt not found in %s. Contents: %s",
                      zip_path, zf.namelist())
            return 0

        log.info("  Found: %s", entry)

        # Read header to detect delimiter and column positions
        with _open_zip_entry(zf, entry) as fh:
            header_line = fh.readline().rstrip("\r\n")

        delim    = _detect_delimiter(header_line)
        # Strip UTF-8 BOM (\ufeff) that ISED sometimes prepends, plus
        # any other leading/trailing whitespace or non-printable chars.
        raw_cols = [
            c.strip().lower().lstrip("\ufeff").strip()
            for c in header_line.split(delim)
        ]
        log.info("  Columns (%d): %s", len(raw_cols), ", ".join(raw_cols))
        # Log with repr() so any hidden BOM/whitespace chars are visible
        log.info("  First 3 col names (repr): %s",
                 [repr(c) for c in raw_cols[:3]])

        # Log the first data row so qualifier format is visible in output
        with _open_zip_entry(zf, entry) as fh:
            next(fh)  # skip header
            sample = fh.readline().rstrip("\r\n")
        sample_fields = sample.split(delim)
        log.info("  Sample row (%d fields): %s",
                 len(sample_fields),
                 " | ".join(f"[{i}]{v}" for i, v in enumerate(sample_fields[:12])))
        # Log raw qual values so encoding ("1" vs "Y" vs other) is visible
        qual_indices = {
            "qual_a": raw_cols.index("qual_a") if "qual_a" in raw_cols else None,
            "qual_b": raw_cols.index("qual_b") if "qual_b" in raw_cols else None,
            "qual_c": raw_cols.index("qual_c") if "qual_c" in raw_cols else None,
            "qual_d": raw_cols.index("qual_d") if "qual_d" in raw_cols else None,
            "qual_e": raw_cols.index("qual_e") if "qual_e" in raw_cols else None,
        }
        log.info("  Qual column indices: %s", qual_indices)
        log.info("  Qual raw values in sample row: %s",
                 {k: (sample_fields[v].strip() if v is not None and v < len(sample_fields) else "N/A")
                  for k, v in qual_indices.items()})

        def col(name: str) -> int | None:
            try:
                return raw_cols.index(name)
            except ValueError:
                return None

        # Locate columns by name (tolerant of future ISED renames)
        i_call    = col("callsign")
        i_fname   = col("first_name")
        i_surname = col("surname")
        i_addr    = col("address_line")
        i_city    = col("city")
        i_prov    = col("prov_cd")
        i_postal  = col("postal_code")
        i_qual_a    = col("qual_a")
        i_qual_b    = col("qual_b")
        i_qual_c    = col("qual_c")
        i_qual_d    = col("qual_d")   # Morse 5 wpm
        i_qual_e    = col("qual_e")   # Morse 12 wpm
        # club_address used as po_box for club entries
        i_club_addr = col("club_address")

        if i_call is None:
            log.error("Cannot find 'callsign' column in header: %s",
                      header_line)
            return 0

        sql        = UPSERT_SQL.format(schema=schema)
        batch      = []
        total_rows = 0
        active_callsigns: list[str] = []

        with _open_zip_entry(zf, entry) as fh, conn.cursor() as cur:
            next(fh)   # skip header
            for raw_line in fh:
                line = raw_line.rstrip("\r\n")
                if not line:
                    continue
                f = line.split(delim)
                call_sign = _fv(f, i_call)
                if not call_sign:
                    continue

                # Build operator_class as a comma-separated string of all
                # held qualifications (Option A — preserves full CA detail).
                parts = []
                if _ca_qualified(_fv(f, i_qual_a)):
                    parts.append("BASIC")
                if _ca_qualified(_fv(f, i_qual_b)):
                    parts.append("BASIC-HONOURS")
                if _ca_qualified(_fv(f, i_qual_c)):
                    parts.append("ADVANCED")
                if _ca_qualified(_fv(f, i_qual_d)):
                    parts.append("MORSE-5WPM")
                if _ca_qualified(_fv(f, i_qual_e)):
                    parts.append("MORSE-12WPM")
                op_class = ",".join(parts) if parts else None

                row = (
                    "CA",
                    _up(call_sign),
                    _up(op_class),
                    _up(_fv(f, i_fname)),
                    _up(_fv(f, i_surname)),
                    _up(_fv(f, i_club_addr)),   # po_box
                    _up(_fv(f, i_addr)),        # street_address
                    _up(_fv(f, i_city)),
                    _up(_fv(f, i_prov)),
                    _up(_fv(f, i_postal)),
                    None,                   # expired_date — not provided by ISED
                )
                batch.append(row)
                active_callsigns.append(call_sign)

                if len(batch) >= batch_size:
                    psycopg2.extras.execute_batch(cur, sql, batch,
                                                  page_size=batch_size)
                    total_rows += len(batch)
                    batch = []

            if batch:
                psycopg2.extras.execute_batch(cur, sql, batch,
                                              page_size=batch_size)
                total_rows += len(batch)

    conn.commit()
    log.info("  %d CA rows upserted.", total_rows)

    log.info("  Purging stale CA rows ...")
    deleted = _purge_stale(conn, schema, "CA", active_callsigns)
    log.info("  %d stale CA rows removed.", deleted)

    elapsed = time.time() - t0
    log.info("Canadian load complete in %.1f s.", elapsed)
    return total_rows


# ---------------------------------------------------------------------------
# Purge helper
# ---------------------------------------------------------------------------

def _purge_stale(conn, schema: str, country: str,
                 active_callsigns: list[str]) -> int:
    """
    Delete rows for `country` whose call_sign is not in active_callsigns.
    Works in chunks to avoid enormous IN-list parameters.
    """
    if not active_callsigns:
        return 0

    active_set = set(active_callsigns)
    deleted    = 0

    with conn.cursor() as cur:
        # Fetch all call_signs currently in the table for this country
        cur.execute(
            f"SELECT call_sign FROM {schema}.uls_cur WHERE country = %s;",
            (country,),
        )
        existing = [row[0] for row in cur.fetchall()]
        stale    = [cs for cs in existing if cs not in active_set]

        # Delete in chunks of 1000 to avoid parameter limits
        chunk_size = 1000
        for i in range(0, len(stale), chunk_size):
            chunk = stale[i : i + chunk_size]
            cur.execute(
                f"DELETE FROM {schema}.uls_cur "
                f"WHERE country = %s AND call_sign = ANY(%s);",
                (country, chunk),
            )
            deleted += cur.rowcount

    conn.commit()
    return deleted


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Build unified uls_cur table from FCC and/or Canadian amateur license data.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    p.add_argument("--source", default="both",
                   choices=["fcc", "canada", "both"],
                   help="Which source(s) to load")
    p.add_argument("--host",       default="localhost",   help="PostgreSQL host")
    p.add_argument("--port",       default=5432, type=int, help="PostgreSQL port")
    p.add_argument("--dbname",     default="ham_radio",   help="Database name")
    p.add_argument("--user",       default="postgres",    help="Database user")
    p.add_argument("--password",   default=None,
                   help="Database password (or set HAM_DB_PASSWORD env var)")
    p.add_argument("--schema",     default="public",      help="Target schema")
    p.add_argument("--skip-download", action="store_true",
                   help="Skip downloads; use existing ZIP files")
    p.add_argument("--fcc-zip",    default=DEFAULT_FCC_ZIP,
                   help="Local path for FCC ZIP")
    p.add_argument("--canada-zip", default=DEFAULT_CA_ZIP,
                   help="Local path for Canadian ZIP")
    p.add_argument("--batch-size", default=2000, type=int,
                   help="Rows per upsert batch")
    return p.parse_args()


@contextmanager
def get_connection(args: argparse.Namespace):
    password = args.password or os.environ.get("HAM_DB_PASSWORD", "")
    conn = psycopg2.connect(
        host=args.host, port=args.port,
        dbname=args.dbname, user=args.user, password=password,
    )
    try:
        yield conn
    finally:
        conn.close()


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main() -> None:
    args      = parse_args()
    do_fcc    = args.source in ("fcc",    "both")
    do_canada = args.source in ("canada", "both")

    # ── Downloads ──────────────────────────────────────────────────────────
    if not args.skip_download:
        if do_fcc:
            download_file(FCC_URL, args.fcc_zip, "FCC")
        if do_canada:
            download_file(CANADA_URL, args.canada_zip, "CAN")
    else:
        for label, path, needed in [
            ("FCC",    args.fcc_zip,    do_fcc),
            ("Canada", args.canada_zip, do_canada),
        ]:
            if needed and not Path(path).exists():
                log.error("%s ZIP not found at %s", label, path)
                sys.exit(1)
            if needed:
                log.info("Skipping %s download; using: %s", label, path)

    # ── Database ───────────────────────────────────────────────────────────
    log.info(
        "Connecting to PostgreSQL: host=%s port=%s dbname=%s user=%s schema=%s",
        args.host, args.port, args.dbname, args.user, args.schema,
    )

    with get_connection(args) as conn:
        ensure_table(conn, args.schema)
        t_start = time.time()

        if do_fcc:
            load_fcc(conn, args.schema, args.fcc_zip, args.batch_size)

        if do_canada:
            load_canada(conn, args.schema, args.canada_zip, args.batch_size)

        log.info("Total elapsed: %.1f s.", time.time() - t_start)

    log.info("Done.")


if __name__ == "__main__":
    main()

