Skip to content

03 — LangExtract (fixed_size + semantic) with OpenAlex TopicsFAST + LLM Summaries

This notebook adds true LLM chunk summaries (multi-sentence, same language as text) to the existing LangExtract pipeline. It preserves your speed features: parallelism, text dedup across buckets, and rate limiting — now for both extraction and summaries.

Inputs (fixed to your latest buckets): - cleantech_data/silver_subsample_chunk/fixed_size/2025-09-03/chunks.parquet - cleantech_data/silver_subsample_chunk/semantic/2025-09-03/chunks.parquet - cleantech_data/silver/openalex/2025-08-09/ (topics, keywords, siblings, taxonomy refs)

Outputs - grounded_extractions.jsonl (per‑chunk LLM extractions with char spans) - visualization_interactive.html (hover to inspect highlights) - chunks_enriched.parquet (text + metadata JSON with entities, numeric facts, LLM chunk_summary + provenance) - Topics: topics_enriched.parquet, graph nodes/edges, topic↔chunk joins, and injected topic metadata parquet

Environment variables (examples)

Bash
# Keys
GEMINI_KEY=...           # preferred if present
OPENAI_KEY=...

# Processing
LEX_ONLY_TYPES=patent,media  # default
LEX_LIMIT_TOPIC=0            # 0 → skip topics in LLM step
LEX_LIMIT_PATENT=0           # 0 → process all allowed types
LEX_LIMIT_MEDIA=0
LEX_MAX_WORKERS=12
LEX_RPS=8
LEX_SKIP_STATIC_VIZ=1        # keep interactive viewer; skip static viz
LEX_WRITE_PYD_FILES=1        # 0 to skip many small per‑chunk JSONs (faster)
LEX_LANG_ORDER=meta,langdetect,llm

# NEW: Summaries
LEX_SUMMARY=1
LEX_SUMMARY_SENTENCES=3
LEX_SUMMARY_MAX_CHARS=8000
LEX_SUMMARY_RPS=8
# Optional override (default = same as extraction provider):
# LEX_SUMMARY_PROVIDER=gemini  # or: openai
# LANGEXTRACT_SUMMARY_MODEL_GEMINI=gemini-1.5-flash
# LANGEXTRACT_SUMMARY_MODEL_OPENAI=gpt-4o-mini

Install (one-time): pip install -U langextract google-genai openai langdetect python-dotenv pyarrow fastparquet

Python
# Install (one-time) if needed:
# !pip install -U langextract google-genai openai langdetect python-dotenv pyarrow fastparquet

from __future__ import annotations

import os, sys, json, pathlib, datetime as dt, re, glob, threading, time, hashlib, html
from typing import Optional, List, Dict, Any, Tuple
from collections import deque
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import langextract as lx
from IPython.display import IFrame, display

# --- Repo/Data roots -------------------------------------------------------
def _find_repo_root(start: pathlib.Path) -> pathlib.Path:
    for p in [start] + list(start.parents):
        if (p / "src" / "pipeline").exists():
            return p
    return start

ROOT = pathlib.Path.cwd()
REPO = _find_repo_root(ROOT)
if (REPO / "src" / "pipeline").exists() and str(REPO / "src" / "pipeline") not in sys.path:
    sys.path.append(str(REPO / "src" / "pipeline"))

try:
    from ctclean.paths import data_root
    DATA_ROOT = data_root()
except Exception:
    DATA_ROOT = REPO / "cleantech_data"

print("Repo root:", REPO)
print("Data root:", DATA_ROOT)

load_dotenv()

# Keys & model defaults
GEMINI_KEY = os.getenv("GEMINI_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("LANGEXTRACT_API_KEY")
OPENAI_KEY = os.getenv("OPENAI_KEY") or os.getenv("OPENAI_API_KEY")
MODEL_ID_GEMINI = os.getenv("LANGEXTRACT_MODEL_GEMINI", "gemini-1.5-flash")
MODEL_ID_OPENAI = os.getenv("LANGEXTRACT_MODEL_OPENAI", "gpt-4o-mini")

print("Keys present → GEMINI:", bool(GEMINI_KEY), "| OPENAI:", bool(OPENAI_KEY))
print("Defaults     → GEMINI model:", MODEL_ID_GEMINI, "| OPENAI model:", MODEL_ID_OPENAI)

# ---- Inputs fixed to requested dates --------------------------------------
FIXED_PARQUET = DATA_ROOT / "silver_subsample_chunk" / "fixed_size" / "2025-09-03" / "chunks.parquet"
SEM_PARQUET   = DATA_ROOT / "silver_subsample_chunk" / "semantic"   / "2025-09-03" / "chunks.parquet"
OPENALEX_DIR  = DATA_ROOT / "silver" / "openalex" / "2025-08-09"

for p in [FIXED_PARQUET, SEM_PARQUET, OPENALEX_DIR]:
    print("Check:", p, "→", ("OK" if p.exists() else "MISSING"))

# Quick validation (optional; comment out for max speed)
try:
    df_fixed_head = pd.read_parquet(FIXED_PARQUET, columns=["id","text","metadata"]).head(2)
    df_sem_head   = pd.read_parquet(SEM_PARQUET,   columns=["id","text","metadata"]).head(2)
    print("fixed_size sample:", df_fixed_head.shape)
    print("semantic   sample:", df_sem_head.shape)
except Exception as e:
    print("Parquet read check failed (continuing):", e)
Text Only
1
2
3
4
5
6
7
8
9
Repo root: C:\Users\gerbe\PycharmProjects\MT
Data root: C:\Users\gerbe\PycharmProjects\MT\cleantech_data
Keys present → GEMINI: True | OPENAI: True
Defaults     → GEMINI model: gemini-1.5-flash | OPENAI model: gpt-4o-mini
Check: C:\Users\gerbe\PycharmProjects\MT\cleantech_data\silver_subsample_chunk\fixed_size\2025-09-03\chunks.parquet → OK
Check: C:\Users\gerbe\PycharmProjects\MT\cleantech_data\silver_subsample_chunk\semantic\2025-09-03\chunks.parquet → OK
Check: C:\Users\gerbe\PycharmProjects\MT\cleantech_data\silver\openalex\2025-08-09 → OK
fixed_size sample: (2, 3)
semantic   sample: (2, 3)
Python
## Models & helpers (schema + language + span integrity)
# --- Pydantic models for structured output ---------------------------------
class Role(BaseModel):
    person: str
    role: str
    from_: Optional[int] = Field(None, alias="from")
    to:    Optional[int] = None

class Event(BaseModel):
    label: str
    year: int
    month: Optional[int] = None
    day:   Optional[int] = None

class NumericFact(BaseModel):
    name: str
    value: float
    unit: str
    year: Optional[int] = None

class Entities(BaseModel):
    person:   List[str] = []
    org:      List[str] = []
    location: List[str] = []
    technology: List[str] = []

class ChunkMetadata(BaseModel):
    id: str
    doc_id: Optional[str] = None
    doc_type: Optional[str] = None
    chunk_type: Optional[str] = None
    chunk_index: Optional[int] = None
    token_count: Optional[int] = None
    title: Optional[str] = None
    date: Optional[str] = None
    source: Optional[str] = None
    url: Optional[str] = None
    lang_source: Optional[str] = None
    lang_llm: Optional[str] = None
    lang: Optional[str] = None
    chunk_summary: str
    entities: Entities
    topic_tags:       List[str] = []
    event_dates:      List[Event] = []
    role_annotations: List[Role]  = []
    numeric_facts:    List[NumericFact] = []
    cpc_codes:        List[str] = []
    country_code:     Optional[str] = None
    content_year:     Optional[int] = None
    content_month:    Optional[int] = None
    extraction_count: int = 0
    span_integrity_pct: float = 0.0
    initiative:       List[str] = []
    grant_type:       List[str] = []

def _first_n_words(s: str, n: int = 30) -> str:
    return " ".join((s or "").split()[:n])

def _fallback_summary(text: str, words: int = 80) -> str:
    return _first_n_words(text, words)

def _parse_iso_date(iso: Optional[str]) -> Tuple[Optional[int], Optional[int]]:
    if not iso or str(iso) in {"<NA>", "NaT", "None", "nan"}: return (None, None)
    try:
        y = int(str(iso)[0:4])
        m = int(str(iso)[5:7]) if len(str(iso)) >= 7 and str(iso)[5:7].isdigit() else None
        return y, m
    except Exception:
        return None, None

def _safe_meta(meta_val: Any) -> Dict[str, Any]:
    if isinstance(meta_val, dict): return meta_val
    if isinstance(meta_val, str):
        try: return json.loads(meta_val)
        except Exception: return {}
    return {}

def _guess_chunk_index_from_id(cid: str) -> Optional[int]:
    m = re.search(r"(?:_|-)chunk[_-](\d+)$", str(cid))
    if m:
        try: return int(m.group(1))
        except Exception: return None
    return None

def _derive_doc_type(cid: str, meta: Dict[str, Any]) -> Optional[str]:
    dt = (meta.get("doc_type") or meta.get("document_type") or "").strip().lower()
    if dt in {"topic", "patent", "media"}: return dt
    did = (meta.get("doc_id") or "").lower()
    s = str(cid).lower()
    if did.startswith("topic_") or did.startswith("topic:") or s.startswith("topic_"): return "topic"
    if did.startswith("patent_") or did.startswith("patent:") or s.startswith("patent_"): return "patent"
    if did.startswith("media_")  or did.startswith("media:")  or s.startswith("media_"):  return "media"
    return None

def _coerce_list_str(val: Any) -> List[str]:
    if val is None: return []
    if isinstance(val, list): return [str(x) for x in val if str(x).strip()]
    if isinstance(val, str):
        s = val.strip()
        if not s: return []
        if s.startswith("[") and s.endswith("]"):
            try:
                arr = json.loads(s)
                if isinstance(arr, list): return [str(x) for x in arr if str(x).strip()]
            except Exception:
                return [s]
        return [s]
    return [str(val)] if str(val).strip() else []

def _normalize_topic_id(val: Any) -> Optional[str]:
    s = str(val or "").strip()
    if not s: return None
    m = re.search(r"(T\d+)", s, re.IGNORECASE)
    return m.group(1).upper() if m else None

def _clean_iso639_1(code_raw: str) -> Optional[str]:
    if not code_raw: return None
    s = str(code_raw).strip().lower()
    name2code = {"english":"en","german":"de","deutsch":"de","french":"fr","français":"fr","spanish":"es","español":"es",
                 "italian":"it","italiano":"it","dutch":"nl","portuguese":"pt","polish":"pl","swedish":"sv",
                 "norwegian":"no","danish":"da","finnish":"fi","chinese":"zh","japanese":"ja","korean":"ko"}
    if s in name2code: s = name2code[s]
    if len(s) == 2 and s.isalpha(): return s
    m = re.search(r"([a-zA-Z]{2})", s)
    return m.group(1).lower() if m else None
Python
# --- Provider context helper -----------------------------------------------
class use_langextract_provider:
    def __init__(self, provider: str, gemini_key: Optional[str]=None, openai_key: Optional[str]=None):
        self.provider = provider
        self.gemini_key = gemini_key
        self.openai_key = openai_key
        self.keep = ["LANGEXTRACT_API_KEY", "GOOGLE_API_KEY", "OPENAI_API_KEY"]
        self.old = {k: os.environ.get(k) for k in self.keep}
    def __enter__(self):
        p = (self.provider or "").lower().strip()
        if p == "gemini":
            key = self.gemini_key or os.getenv("GEMINI_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("LANGEXTRACT_API_KEY")
            if not key: raise RuntimeError("No Gemini key found")
            os.environ["LANGEXTRACT_API_KEY"] = key
            os.environ["GOOGLE_API_KEY"] = key
            os.environ.pop("OPENAI_API_KEY", None)
        elif p == "openai":
            key = self.openai_key or os.getenv("OPENAI_KEY") or os.getenv("OPENAI_API_KEY")
            if not key: raise RuntimeError("No OpenAI key found")
            os.environ["OPENAI_API_KEY"] = key
            os.environ.pop("LANGEXTRACT_API_KEY", None)
            os.environ.pop("GOOGLE_API_KEY", None)
        else:
            raise ValueError("provider must be 'gemini' or 'openai'")
        return self
    def __exit__(self, exc_type, exc, tb):
        for k, v in self.old.items():
            if v is None: os.environ.pop(k, None)
            else: os.environ[k] = v
Python
# --- Language detection (LLM fallback) --------------------------------------
def _detect_language_llm(text: str, provider_choice: str) -> Optional[str]:
    snippet = (text or "")[:4000]
    if not snippet.strip(): return None
    prompt = (
        "Return ONLY the dominant language code of the text as a two-letter ISO 639-1 code "
        "(e.g., en, de, fr). Do not add explanations.\n\nTEXT:\n" + snippet
    ).strip()
    if provider_choice == "gemini" and (os.getenv("GEMINI_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("LANGEXTRACT_API_KEY")):
        try:
            from google import genai
            key = os.getenv("GEMINI_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("LANGEXTRACT_API_KEY")
            client = genai.Client(api_key=key)
            resp = client.models.generate_content(model=MODEL_ID_GEMINI, contents=[prompt])
            ans = getattr(resp, "text", None) or ""
            return _clean_iso639_1(ans)
        except Exception:
            return None
    if provider_choice == "openai" and (os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_KEY")):
        try:
            try:
                from openai import OpenAI
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_KEY"))
                resp = client.chat.completions.create(
                    model=MODEL_ID_OPENAI,
                    messages=[
                        {"role":"system","content":"You are a language detector. Output only two letters."},
                        {"role":"user","content": prompt},
                    ],
                    temperature=0.0,
                )
                ans = resp.choices[0].message.content.strip()
            except Exception:
                import openai
                openai.api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_KEY")
                resp = openai.ChatCompletion.create(
                    model=MODEL_ID_OPENAI,
                    messages=[
                        {"role":"system","content":"You are a language detector. Output only two letters."},
                        {"role":"user","content": prompt},
                    ],
                    temperature=0.0,
                )
                ans = resp["choices"][0]["message"]["content"].strip()
            return _clean_iso639_1(ans)
        except Exception:
            return None
    return None
Python
# --- Extraction prompt & examples ------------------------------------------
PROMPT = """Task: Extract grounded, schema-like metadata from cleantech text (EN/DE/FR).
Use EXACT substrings from the text for `extraction_text` (no paraphrase).
Return diverse, non-overlapping entities/values with accurate char spans.

Extraction classes (use when present):
- PERSON(person names)
- ORG(organizations, agencies, companies)
- LOCATION(cities, regions, countries, sites)
- TECHNOLOGY(tech terms: solar PV, battery, hydrogen, EGS, wind, EV charging, electrolyzer, etc.)
- DATE(year, month?, day?)  # use numeric substrings (e.g., “2023”, “Jan 2024” → “2024”, “01”)
- NUMERIC(quantity/value with unit). Attributes:
    - quantity_type in {capacity_MW, power_MW, power_kW, energy_MWh, energy_kWh,
                       efficiency_pct, investment_USD, co2_tonnes, voltage_V, current_A, area_m2}
    - value (numeric), unit (exact unit substring)
    - qualifier? (e.g., “peak”, “nameplate”, “behind-the-meter”)
- ROLE(links person-role-org if present). Attributes: person, role, org.

Rules:
- Do NOT invent facts; only extract what appears in the text.
- Keep extraction_text as an exact substring; provide correct char spans.
- Keep numbers & units as they appear; also normalize into attributes where relevant.
- Multilingual (EN/DE/FR) allowed; do not translate terms.
""".strip()

EXAMPLES = [
    lx.data.ExampleData(
        text="English: The microgrid provides 6.5 MW and 17 MWh of storage in 2023 (Bordeaux).",
        extractions=[
            lx.data.Extraction("TECHNOLOGY", "microgrid", attributes={"subdomain":"grid"}),
            lx.data.Extraction("NUMERIC", "6.5 MW", attributes={"quantity_type":"power_MW","value":"6.5","unit":"MW"}),
            lx.data.Extraction("NUMERIC", "17 MWh", attributes={"quantity_type":"energy_MWh","value":"17","unit":"MWh"}),
            lx.data.Extraction("DATE", "2023", attributes={"year":"2023"}),
            lx.data.Extraction("LOCATION", "Bordeaux", attributes={"type":"city"})
        ]
    ),
    lx.data.ExampleData(
        text="Deutsch: Ein Solarprojekt mit 140MW wurde 2023 in Betrieb genommen.",
        extractions=[
            lx.data.Extraction("TECHNOLOGY", "Solarprojekt", attributes={"subdomain":"solar"}),
            lx.data.Extraction("NUMERIC","140MW", attributes={"quantity_type":"capacity_MW","value":"140","unit":"MW"}),
            lx.data.Extraction("DATE","2023", attributes={"year":"2023"})
        ]
    ),
    lx.data.ExampleData(
        text="Français: Un électrolyseur alimente une borne de recharge de 50 kW à Paris.",
        extractions=[
            lx.data.Extraction("TECHNOLOGY","électrolyseur", attributes={"subdomain":"hydrogen"}),
            lx.data.Extraction("NUMERIC","50 kW", attributes={"quantity_type":"power_KW","value":"50","unit":"kW"}),
            lx.data.Extraction("LOCATION","Paris", attributes={"type":"city"})
        ]
    ),
]
Python
# --- Span integrity utilities ----------------------------------------------
def _norm_interval_from_obj_or_dict(span_obj) -> Tuple[Optional[int], Optional[int]]:
    if span_obj is None: return (None, None)
    if isinstance(span_obj, dict):
        s = span_obj.get("start") or span_obj.get("start_pos")
        e = span_obj.get("end") or span_obj.get("end_pos")
        return (s, e)
    s = getattr(span_obj, "start", None)
    e = getattr(span_obj, "end", None)
    if s is not None and e is not None: return (s, e)
    s = getattr(span_obj, "start_pos", None)
    e = getattr(span_obj, "end_pos", None)
    return (s, e)

def span_integrity_pct(text: str, ex_list: List[Dict[str, Any]]) -> float:
    if not ex_list: return 0.0
    ok = total = 0
    for ex in ex_list:
        etxt = ex.get("extraction_text") or ""
        if not etxt: continue
        s, e = _norm_interval_from_obj_or_dict(ex.get("char_span"))
        if not isinstance(s, int) or not isinstance(e, int):
            s, e = _norm_interval_from_obj_or_dict(ex.get("char_interval"))
        if isinstance(s, int) and isinstance(e, int) and 0 <= s < e <= len(text):
            total += 1
            if text[s:e] == etxt: ok += 1
    return (ok / total * 100.0) if total else 0.0
Python
# --- Schema mapping (supports chunk_summary_override) ----------------------
def map_extractions_to_schema(
    chunk_id: str,
    text: str,
    doc_meta: Dict[str, Any],
    extractions: List[lx.data.Extraction],
    *,
    chunk_type_label: Optional[str] = None,
    lang_llm: Optional[str] = None,
    chunk_summary_override: Optional[str] = None,
) -> ChunkMetadata:
    persons, orgs, locs, techs = [], [], [], []
    events: List[Event] = []
    roles:  List[Role]  = []
    facts:  List[NumericFact] = []

    ex_dicts: List[Dict[str, Any]] = []
    for ex in extractions:
        d = {
            "extraction_class": getattr(ex, "extraction_class", None),
            "extraction_text": getattr(ex, "extraction_text", None),
            "attributes": getattr(ex, "attributes", None),
            "char_span": None,
            "char_interval": None,
        }
        cs = getattr(ex, "char_span", None)
        if cs is not None:
            s,e = _norm_interval_from_obj_or_dict(cs)
            d["char_span"] = {"start": s, "end": e}
        ci = getattr(ex, "char_interval", None)
        if ci is not None:
            s,e = _norm_interval_from_obj_or_dict(ci)
            d["char_interval"] = {"start_pos": s, "end_pos": e}
        ex_dicts.append(d)

        cls = (getattr(ex, "extraction_class", "") or "").lower().strip()
        etxt = (getattr(ex, "extraction_text", "") or "").strip()
        if not etxt: continue

        if cls == "person":   persons.append(etxt)
        elif cls == "org":    orgs.append(etxt)
        elif cls == "location": locs.append(etxt)
        elif cls == "technology": techs.append(etxt)
        elif cls == "date":
            year = month = day = None
            if getattr(ex, "attributes", None):
                y = ex.attributes.get("year"); m = ex.attributes.get("month"); d = ex.attributes.get("day")
                try: year = int(y) if y is not None else None
                except: pass
                try: month = int(m) if m is not None else None
                except: pass
                try: day = int(d) if d is not None else None
                except: pass
            if year:
                events.append(Event(label=etxt, year=year, month=month, day=day))
        elif cls == "numeric":
            if getattr(ex, "attributes", None):
                qtype = ex.attributes.get("quantity_type") or "value"
                unit  = ex.attributes.get("unit") or ""
                val   = ex.attributes.get("value")
                try:
                    v = float(str(val).replace(",", ".")) if val is not None else None
                except:
                    v = None
                if v is not None:
                    facts.append(NumericFact(name=str(qtype), value=v, unit=str(unit)))
        elif cls == "role":
            if getattr(ex, "attributes", None):
                person = (ex.attributes.get("person") or "").strip()
                role   = (ex.attributes.get("role") or "").strip()
                org    = (ex.attributes.get("org") or "").strip()
                if person and role:
                    roles.append(Role(person=person, role=role))
                    if org and org not in orgs: orgs.append(org)

    meta = _safe_meta(doc_meta)
    doc_id = meta.get("doc_id")
    doc_type = _derive_doc_type(chunk_id, meta)
    token_count = meta.get("token_count") or meta.get("tokens") or meta.get("n_tokens")
    try: token_count = int(token_count) if token_count is not None else None
    except Exception: token_count = None

    chunk_index = meta.get("chunk_index")
    if chunk_index is None: chunk_index = _guess_chunk_index_from_id(chunk_id)

    title = meta.get("title") or meta.get("title_clean") or None
    date_str = meta.get("date") or meta.get("publication_date") or None
    source = meta.get("source") or None
    url = meta.get("url") or None

    lang_source = meta.get("lang") or meta.get("language") or None
    lang_final = (lang_llm or lang_source)

    cpc_codes = _coerce_list_str(meta.get("cpc_codes"))
    country_code = meta.get("country_code") or meta.get("country") or None
    y, m = _parse_iso_date(date_str)

    sip = span_integrity_pct(text, ex_dicts)
    extraction_count = len(extractions)

    return ChunkMetadata(
        id=str(chunk_id),
        doc_id=doc_id,
        doc_type=doc_type,
        chunk_type=chunk_type_label,
        chunk_index=chunk_index,
        token_count=token_count,
        title=title,
        date=date_str,
        source=source,
        url=url,
        lang_source=(str(lang_source).lower() if lang_source else None),
        lang_llm=(str(lang_llm).lower() if lang_llm else None),
        lang=(str(lang_final).lower() if lang_final else None),
        chunk_summary=(chunk_summary_override or _first_n_words(text, 30)),
        entities=Entities(
            person=sorted(set(persons)),
            org=sorted(set(orgs)),
            location=sorted(set(locs)),
            technology=sorted(set(techs)),
        ),
        topic_tags=[],
        event_dates=events,
        role_annotations=roles,
        numeric_facts=facts,
        cpc_codes=cpc_codes,
        country_code=country_code,
        content_year=y,
        content_month=m,
        extraction_count=extraction_count,
        span_integrity_pct=sip,
        initiative=[],
        grant_type=[],
    )
Python
## Interactive viewer helpers (JSONL → HTML)
PALETTE = {
    "TECHNOLOGY": "#FFF59D",
    "NUMERIC":    "#B2EBF2",
    "DATE":       "#BBDEFB",
    "PERSON":     "#FFCC80",
    "ORG":        "#C5E1A5",
    "LOCATION":   "#E1BEE7",
    "ROLE":       "#F8BBD0",
}
ALL_CLASSES = ["DATE", "LOCATION", "NUMERIC", "ORG", "PERSON", "ROLE", "TECHNOLOGY"]

def _load_jsonl_records(jsonl_path: pathlib.Path) -> List[Dict[str, Any]]:
    text = pathlib.Path(jsonl_path).read_text(encoding="utf-8").strip()
    if not text: return []
    if text[0] == "[": return json.loads(text)
    recs = []
    for line in text.splitlines():
        line = line.strip()
        if line: recs.append(json.loads(line))
    return recs

def _norm_interval_view(ex: Dict[str, Any]) -> tuple[int, int] | None:
    span = ex.get("char_span") or {}
    s, e = span.get("start"), span.get("end")
    if isinstance(s, int) and isinstance(e, int): return (s, e)
    ci = ex.get("char_interval") or {}
    s, e = ci.get("start_pos"), ci.get("end_pos")
    if isinstance(s, int) and isinstance(e, int): return (s, e)
    return None

def _tip_html(ex: Dict[str, Any]) -> str:
    cls = (ex.get("extraction_class") or "").upper()
    etxt = ex.get("extraction_text") or ""
    attrs = ex.get("attributes") or {}
    attrs_kv = " · ".join(f"<b>{html.escape(str(k))}</b>=<i>{html.escape(str(v))}</i>" for k,v in attrs.items())
    body = f"<div><b>{html.escape(cls)}</b>: {html.escape(etxt)}</div>"
    if attrs_kv: body += f"<div style='margin-top:4px'>{attrs_kv}</div>"
    return body

def _render_one(text: str, extractions: List[Dict[str, Any]], allowed: List[str]) -> str:
    spans = []
    for ex in extractions:
        cls = (ex.get("extraction_class") or "").upper()
        if cls not in allowed: continue
        iv = _norm_interval_view(ex)
        if not iv: continue
        s, e = iv
        if not (isinstance(s, int) and isinstance(e, int) and 0 <= s < e <= len(text)): continue
        etxt = ex.get("extraction_text") or ""
        if text[s:e] != etxt:
            pos = text.find(etxt, max(0, s-20), min(len(text), e+20)) if etxt else -1
            if pos != -1: s, e = pos, pos + len(etxt)
        spans.append((s, e, cls, ex))
    spans.sort(key=lambda t: (t[0], t[1]))
    cleaned = []
    last_end = -1
    for s, e, cls, ex in spans:
        if s >= last_end:
            cleaned.append((s, e, cls, ex))
            last_end = e

    out = []
    cur = 0
    for s, e, cls, ex in cleaned:
        if cur < s: out.append(html.escape(text[cur:s]))
        seg = html.escape(text[s:e])
        tip = _tip_html(ex)
        out.append(
            f"<mark class='lx-mark cls-{cls}' data-cls='{cls}'>"             f"{seg}"             f"<span class='lx-tip'>{tip}</span>"             f"</mark>"
        )
        cur = e
    if cur < len(text): out.append(html.escape(text[cur:]))
    return "".join(out)

def make_interactive_html(jsonl_path: pathlib.Path, out_html: pathlib.Path,
                          allowed_classes: List[str] = None) -> pathlib.Path:
    allowed = allowed_classes or ALL_CLASSES
    recs = _load_jsonl_records(jsonl_path)
    used_counts: Dict[str, int] = {c: 0 for c in allowed}
    for r in recs:
        for ex in r.get("extractions", []):
            cls = (ex.get("extraction_class") or "").upper()
            if cls in used_counts: used_counts[cls] += 1
    if not recs:
        out_html.write_text("""<!DOCTYPE html>\n<html><head><meta charset='utf-8'><title>LangExtract Viewer</title></head>\n<body><p>No records found.</p></body></html>""", encoding="utf-8")
        return out_html
    css_colors = "\n".join([f"mark.lx-mark.cls-{k}{{background:{v};}}" for k,v in PALETTE.items() if k in allowed])
    html_parts = [f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>LangExtract Viewer</title>
<style>
  body {{ font-family: system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, sans-serif; margin: 16px; }}
  .controls {{ background: #fafafa; border: 1px solid #90caf9; border-radius: 8px; padding: 10px 12px; margin-bottom: 14px; }}
  .controls h4 {{ margin: 0 0 8px 0; font-weight: 600; color: #1565c0; font-size: 14px; }}
  .ctrl-row {{ display:flex; flex-wrap:wrap; gap: 10px; align-items:center; }}
  .ctrl-row label {{ display:inline-flex; gap:6px; align-items:center; padding:4px 6px; border-radius: 4px; border:1px solid #e0e0e0; background:#fff; font-size: 12px; cursor: pointer; }}
  .ctrl-row .count {{ opacity: .7; font-variant-numeric: tabular-nums; }}
  button.ctrl {{ padding:4px 8px; border:1px solid #90caf9; background:#e3f2fd; border-radius:6px; cursor:pointer; }}
  button.ctrl:hover {{ background:#d6ecff; }}
  .doc-block {{ border: 1px solid #cfe2ff; background: #f8fbff; border-radius: 8px; padding: 12px 14px; margin-bottom: 12px; }}
  .doc-header {{ font-size: 12px; color: #1f6feb; margin-bottom: 6px; }}
  mark.lx-mark {{ position: relative; padding: 0 2px; border-radius: 2px; }}
  {css_colors}
  mark.lx-off {{ background: transparent !important; outline: none !important; box-shadow: none !important; }}
  mark.lx-off .lx-tip {{ display:none !important; }}
  .lx-tip {{ position: absolute; display: none; z-index: 1000; bottom: 125%; left: 50%; transform: translateX(-50%); background: #333; color:#fff; border-radius: 4px; padding: 6px 8px; font-size: 12px; max-width: 260px; white-space: normal; box-shadow: 0 2px 6px rgba(0,0,0,0.3); }}
  mark.lx-mark:hover .lx-tip {{ display: block; }}
</style>
</head>
<body>
  <div class='controls'>
    <h4>Highlight classes</h4>
    <div class='ctrl-row' id='class-controls'>
      <button class='ctrl' id='all-on'>All on</button>
      <button class='ctrl' id='all-off'>All off</button>
    """]
    for cls in allowed:
        count = used_counts.get(cls, 0)
        swatch = PALETTE.get(cls, "#fff59d")
        html_parts.append(
            f"""<label style='border-color:#ddd;'>
                  <input type='checkbox' class='cls-toggle' value='{cls}' checked />
                  <span style='display:inline-block;width:10px;height:10px;background:{swatch};border:1px solid #bbb;border-radius:2px;'></span>
                  <span>{cls}</span>
                  <span class='count'>({count})</span>
                </label>"""
        )
    html_parts.append("""
    </div>
  </div>
  <div class='legend'>Tip: hover on a highlight to see details. Use the toggles to de‑emphasize a class without hiding text.</div>
""")
    for r in recs:
        doc_id = r.get("document_id") or r.get("id") or "unknown"
        text = r.get("text") or ""
        exs = r.get("extractions") or []
        doc_html = _render_one(text, exs, allowed)
        html_parts.append(f"""
  <div class='doc-block'>
    <div class='doc-header'>{html.escape(str(doc_id))}</div>
    <div class='doc-text'>{doc_html}</div>
  </div>
""")
    html_parts.append("""
<script>
(function() {
  function setClassEnabled(cls, enabled) {
    document.querySelectorAll('mark.lx-mark[data-cls="'+cls+'"]').forEach(function(el){
      if(enabled) el.classList.remove('lx-off');
      else el.classList.add('lx-off');
    });
  }
  document.querySelectorAll('.cls-toggle').forEach(function(cb){
    cb.addEventListener('change', function(){ setClassEnabled(cb.value, cb.checked); });
  });
  document.getElementById('all-on').addEventListener('click', function(){
    document.querySelectorAll('.cls-toggle').forEach(function(cb){ cb.checked = true; setClassEnabled(cb.value, true); });
  });
  document.getElementById('all-off').addEventListener('click', function(){
    document.querySelectorAll('.cls-toggle').forEach(function(cb){ cb.checked = false; setClassEnabled(cb.value, false); });
  });
})();
</script>
</body>
</html>
""")
    out_html.write_text("".join(html_parts), encoding="utf-8")
    return out_html

def recount_extractions_from_file(jsonl_path: pathlib.Path) -> int:
    recs = _load_jsonl_records(jsonl_path)
    return sum(len(r.get("extractions", []) or []) for r in recs)

def rebuild_and_preview(res_dict: Dict[str, Any], title: str = ""):
    jsonl = pathlib.Path(res_dict["jsonl_path"])
    vis   = pathlib.Path(res_dict["vis_path"]).with_name("visualization_interactive.html")
    make_interactive_html(jsonl, vis, allowed_classes=ALL_CLASSES)
    print(f"{title} interactive viewer → {vis}")
    try:
        display(IFrame(src=str(vis), width="100%", height=420))
    except Exception:
        pass
Python
## Fast runner (parallel + rate‑limited + dedup + Parquet‑only)
# ---- FAST CONFIG -----------------------------------------------------------
LEX_MAX_WORKERS      = int(os.getenv("LEX_MAX_WORKERS", "12"))
LEX_RPS              = float(os.getenv("LEX_RPS", "8"))
LEX_SKIP_STATIC_VIZ  = int(os.getenv("LEX_SKIP_STATIC_VIZ", "1"))
LEX_WRITE_PYD_FILES  = int(os.getenv("LEX_WRITE_PYD_FILES", "1"))
LEX_LANG_ORDER       = os.getenv("LEX_LANG_ORDER", "meta,langdetect,llm").split(",")
LEX_HASH_ALGO        = os.getenv("LEX_HASH_ALGO", "sha1")

_EXTRACT_CACHE_LOCK = threading.Lock()
_EXTRACT_CACHE: Dict[str, Any] = {}

def _hash_text(s: str) -> str:
    data = s.encode("utf-8", errors="ignore")
    return hashlib.sha256(data).hexdigest() if LEX_HASH_ALGO == "sha256" else hashlib.sha1(data).hexdigest()

class _RateLimiter:
    def __init__(self, rps: float):
        self.rps = max(0.1, float(rps))
        self.win = deque()
        self.lock = threading.Lock()
    def wait(self):
        with self.lock:
            now = time.monotonic()
            while self.win and now - self.win[0] > 1.0:
                self.win.popleft()
            if len(self.win) >= self.rps:
                to_sleep = 1.0 - (now - self.win[0])
                if to_sleep > 0: time.sleep(to_sleep)
            self.win.append(time.monotonic())
Python
# --- Language selection pipeline -------------------------------------------
def _get_lang_fast(text: str, meta: dict, provider_choice: str) -> tuple[str|None, str|None, str|None]:
    ms = (meta.get("lang") or meta.get("language") or meta.get("lang_source"))
    ms = (str(ms).lower() if ms else None)
    md = None
    try:
        from langdetect import detect
        md = detect((text or "")[:4000]) if text and not ms else None
        if md: md = md.lower()[:2]
    except Exception:
        md = None
    ml = None
    need_llm = False
    order = [x.strip().lower() for x in LEX_LANG_ORDER]
    if "llm" in order:
        idx_llm = order.index("llm")
        if idx_llm == 0: need_llm = True
        elif idx_llm == 1 and not ms: need_llm = True
        elif idx_llm == 2 and not ms and not md: need_llm = True
    if need_llm:
        code = _detect_language_llm(text, provider_choice)
        ml = (code.lower() if code else None)
    candid = {"meta": ms, "langdetect": md, "llm": ml}
    final = None
    for key in order:
        val = candid.get(key)
        if val: final = val; break
    def _clean(code):
        if not code: return None
        c = code.strip().lower()
        return c[:2] if len(c) >= 2 else c
    return _clean(ms), _clean(ml), _clean(final)
Python
# === NEW: LLM Summaries (config + cache + helpers) =========================

# Config
LEX_SUMMARY_ENABLED     = int(os.getenv("LEX_SUMMARY", "1"))          # 1=on, 0=off
LEX_SUMMARY_SENTENCES   = int(os.getenv("LEX_SUMMARY_SENTENCES", "3"))
LEX_SUMMARY_MAX_CHARS   = int(os.getenv("LEX_SUMMARY_MAX_CHARS", "8000"))
LEX_SUMMARY_RPS         = float(os.getenv("LEX_SUMMARY_RPS", str(LEX_RPS)))
LEX_SUMMARY_PROVIDER    = os.getenv("LEX_SUMMARY_PROVIDER", "").strip().lower()  # "", "gemini", or "openai"
MODEL_ID_SUM_GEMINI     = os.getenv("LANGEXTRACT_SUMMARY_MODEL_GEMINI", MODEL_ID_GEMINI)
MODEL_ID_SUM_OPENAI     = os.getenv("LANGEXTRACT_SUMMARY_MODEL_OPENAI", MODEL_ID_OPENAI)

_SUMMARY_CACHE_LOCK = threading.Lock()
_SUMMARY_CACHE: Dict[str, str] = {}
_SUMMARY_LIMITER = _RateLimiter(LEX_SUMMARY_RPS)

def _summarize_text_llm(
    text: str,
    *,
    lang_hint: Optional[str],
    provider_choice: str,
    model_id_gemini: str = MODEL_ID_SUM_GEMINI,
    model_id_openai: str = MODEL_ID_SUM_OPENAI,
    max_chars: int = LEX_SUMMARY_MAX_CHARS,
    n_sentences: int = LEX_SUMMARY_SENTENCES,
) -> Optional[str]:
    """
    Return a concise multi-sentence summary in the same language as the text.
    """
    t = (text or "").strip()
    if not t:
        return None
    if max_chars and len(t) > max_chars:
        t = t[:max_chars]

    lang_line = f"Write in language: {lang_hint}." if lang_hint else "Write in the same language as the text."
    prompt = (
        f"{lang_line}\n"
        f"Summarize the text in {max(2, min(4, n_sentences))} sentences (max ~90 words). "
        f"Preserve key figures (MW, kWh/MWh, $, tCO2), names, and locations. "
        f"Do not invent facts.\n\n"
        f"TEXT:\n{t}"
    )

    p = (provider_choice or "").strip().lower()
    try:
        _SUMMARY_LIMITER.wait()
        if p == "gemini":
            from google import genai
            key = os.getenv("GEMINI_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("LANGEXTRACT_API_KEY")
            if not key:
                return None
            client = genai.Client(api_key=key)
            resp = client.models.generate_content(model=model_id_gemini, contents=[prompt])
            return (getattr(resp, "text", None) or "").strip() or None

        elif p == "openai":
            try:
                from openai import OpenAI
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_KEY"))
                r = client.chat.completions.create(
                    model=model_id_openai,
                    messages=[
                        {"role":"system","content":"You are a precise summarizer. Output only the summary."},
                        {"role":"user","content": prompt},
                    ],
                    temperature=0.2,
                )
                return (r.choices[0].message.content or "").strip() or None
            except Exception:
                import openai
                openai.api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_KEY")
                r = openai.ChatCompletion.create(
                    model=model_id_openai,
                    messages=[
                        {"role":"system","content":"You are a precise summarizer. Output only the summary."},
                        {"role":"user","content": prompt},
                    ],
                    temperature=0.2,
                )
                return (r["choices"][0]["message"]["content"] or "").strip() or None
        else:
            return None
    except Exception:
        return None

def _get_summary_for_text(text: str, lang_final: Optional[str], provider: str) -> tuple[str, str]:
    """
    Returns (summary, source). source in {"llm","first_n_words"}.
    """
    if not LEX_SUMMARY_ENABLED:
        return _fallback_summary(text), "first_n_words"

    th = _hash_text(text)
    with _SUMMARY_CACHE_LOCK:
        cached = _SUMMARY_CACHE.get(th)
    if cached is not None:
        return cached, "llm"

    # Choose summary provider (default to extraction provider)
    p = (LEX_SUMMARY_PROVIDER or provider or ("gemini" if GEMINI_KEY else "openai")).lower().strip()
    s = _summarize_text_llm(text, lang_hint=lang_final, provider_choice=p)
    if s and s.strip():
        with _SUMMARY_CACHE_LOCK:
            _SUMMARY_CACHE[th] = s
        return s, "llm"
    return _fallback_summary(text), "first_n_words"

def _precompute_summaries_for_uniques(unique_by_hash: Dict[str, tuple[str, dict]], provider: str):
    """Optional speed-up: summarize unique texts in parallel before streaming."""
    if not LEX_SUMMARY_ENABLED:
        return

    to_summarize = []
    with _SUMMARY_CACHE_LOCK:
        for th, (text, meta) in unique_by_hash.items():
            if th not in _SUMMARY_CACHE:
                to_summarize.append((th, text, meta))

    if not to_summarize:
        return

    def _work(th: str, text: str, meta: dict):
        lang_source, lang_llm, lang_final = _get_lang_fast(text, meta, provider)
        s = _summarize_text_llm(text, lang_hint=lang_final, provider_choice=(LEX_SUMMARY_PROVIDER or provider))
        if s and s.strip():
            with _SUMMARY_CACHE_LOCK:
                _SUMMARY_CACHE[th] = s

    with use_langextract_provider((LEX_SUMMARY_PROVIDER or provider), gemini_key=GEMINI_KEY, openai_key=OPENAI_KEY):
        with ThreadPoolExecutor(max_workers=LEX_MAX_WORKERS) as pool:
            futs = [pool.submit(_work, th, text, meta) for th, text, meta in to_summarize]
            for _ in as_completed(futs):
                pass
Python
# --- JSON-safe extraction flattener ----------------------------------------
def _jsonable_attrs(attrs: Any) -> Any:
    if attrs is None:
        return None
    if isinstance(attrs, (str, int, float, bool)):
        return attrs
    if isinstance(attrs, dict):
        return {k: _jsonable_attrs(v) for k, v in attrs.items()}
    if isinstance(attrs, (list, tuple, set)):
        return [_jsonable_attrs(x) for x in attrs]
    if hasattr(attrs, "model_dump"):
        try: return attrs.model_dump()
        except Exception: pass
    try: return dict(attrs)
    except Exception: pass
    try: return [x for x in attrs]
    except Exception: pass
    return str(attrs)

def _span_to_dict(obj: Any) -> Optional[Dict[str, int]]:
    if obj is None: return None
    if isinstance(obj, dict):
        s = obj.get("start") or obj.get("start_pos")
        e = obj.get("end") or obj.get("end_pos")
    else:
        s = getattr(obj, "start", None) or getattr(obj, "start_pos", None)
        e = getattr(obj, "end", None)   or getattr(obj, "end_pos", None)
    if isinstance(s, int) and isinstance(e, int):
        return {"start": s, "end": e}
    return None

def _interval_to_dict(obj: Any) -> Optional[Dict[str, int]]:
    if obj is None: return None
    if isinstance(obj, dict):
        s = obj.get("start_pos") or obj.get("start")
        e = obj.get("end_pos")   or obj.get("end")
    else:
        s = getattr(obj, "start_pos", None) or getattr(obj, "start", None)
        e = getattr(obj, "end_pos", None)   or getattr(obj, "end", None)
    if isinstance(s, int) and isinstance(e, int):
        return {"start_pos": s, "end_pos": e}
    return None

def _jsonable_extraction(ex: Any) -> Dict[str, Any]:
    cs = _span_to_dict(getattr(ex, "char_span", None))
    if cs is None:
        cs = _span_to_dict(getattr(ex, "char_interval", None))
    ci = _interval_to_dict(getattr(ex, "char_interval", None))
    if ci is None:
        ci = _interval_to_dict(getattr(ex, "char_span", None))

    return {
        "extraction_class": getattr(ex, "extraction_class", None),
        "extraction_text": getattr(ex, "extraction_text", None),
        "attributes": _jsonable_attrs(getattr(ex, "attributes", None)),
        "char_span": cs,
        "char_interval": ci,
    }
Python
# --- Main runner (now computes LLM summaries) -------------------------------
def run_extraction_on_parquet(
    parquet_path: pathlib.Path,
    outdir: pathlib.Path,
    *,
    only_types: List[str] | None = None,
    per_type_limits: Dict[str, int] | None = None,
    chunk_type_label: Optional[str] = None,
    model_id: Optional[str] = None,
    temperature: float = 0.0,
    provider_choice: Optional[str] = None,
    override_lang_always: bool = True,
) -> Dict[str, Any]:
    outdir.mkdir(parents=True, exist_ok=True)
    pyd_dir = outdir / "pydantic_json"
    if int(os.getenv("LEX_WRITE_PYD_FILES", str(LEX_WRITE_PYD_FILES))):
        pyd_dir.mkdir(parents=True, exist_ok=True)

    df = pd.read_parquet(parquet_path, columns=["id","text","metadata"])

    # Filter by doc types if requested
    def _row_doc_type(row: pd.Series) -> str:
        meta = _safe_meta(row.get("metadata"))
        cid = str(row.get("id"))
        dt = _derive_doc_type(cid, meta) or "unknown"
        return dt
    def filter_df_by_doc_types(df: pd.DataFrame, allowed_types: List[str]) -> pd.DataFrame:
        keep = []
        for i, row in df.iterrows():
            dt = _row_doc_type(row)
            if dt in allowed_types: keep.append(i)
        return df.loc[keep].copy()

    if only_types:
        df = filter_df_by_doc_types(df, [t.lower() for t in only_types])

    # Per‑type limits (optional)
    counts = {}
    def select_rows_per_type(df: pd.DataFrame, per_type_limits: Dict[str, int]) -> pd.DataFrame:
        counters = {k: 0 for k in per_type_limits.keys()}
        keep_idx = []
        for i, row in df.iterrows():
            dt = _row_doc_type(row)
            if dt in per_type_limits and counters[dt] < per_type_limits[dt]:
                keep_idx.append(i)
                counters[dt] += 1
            if all(counters[k] >= per_type_limits[k] for k in per_type_limits):
                break
        out = df.loc[keep_idx].copy()
        out.attrs["counts_by_type"] = counters
        return out

    if per_type_limits:
        df = select_rows_per_type(df, per_type_limits)
        counts = df.attrs.get("counts_by_type", {})

    # Drop blanks
    df["text"] = df["text"].astype(str)
    df = df[df["text"].str.strip().ne("")]
    if df.empty:
        enriched_df = pd.DataFrame(columns=["id","text","metadata"])
        out_parquet = outdir / "chunks_enriched.parquet"
        try: enriched_df.to_parquet(out_parquet, index=False)
        except Exception as e: print("Parquet save failed:", e)
        return {
            "rows_processed": 0,
            "total_extractions": 0,
            "jsonl_path": str(outdir / "grounded_extractions.jsonl"),
            "vis_path": str(outdir / "visualization.html"),
            "pyd_dir": str(pyd_dir),
            "provider_used": provider_choice,
            "model_used": (model_id or (MODEL_ID_GEMINI if provider_choice=='gemini' else MODEL_ID_OPENAI)),
            "counts_by_type": counts,
            "enriched_parquet": str(out_parquet),
        }

    provider = provider_choice or ("gemini" if (GEMINI_KEY) else "openai")

    # Records & unique texts (for dedup)
    records = []
    for row in df.itertuples(index=False):
        cid = str(row.id)
        text = str(row.text or "")
        meta = row.metadata if isinstance(row.metadata, dict) else (json.loads(row.metadata) if isinstance(row.metadata, str) else {})
        thash = _hash_text(text)
        records.append((cid, text, meta, thash))
    unique_by_hash: Dict[str, tuple[str, dict]] = {}
    for cid, text, meta, thash in records:
        if thash not in unique_by_hash:
            unique_by_hash[thash] = (text, meta)

    # Precompute summaries (parallel & cached) before extraction stream
    _precompute_summaries_for_uniques(unique_by_hash, provider)

    limiter = _RateLimiter(LEX_RPS)

    def _extract_one(text: str, meta: dict) -> Any:
        th = _hash_text(text)
        with _EXTRACT_CACHE_LOCK:
            cached = _EXTRACT_CACHE.get(th)
        if cached is not None:
            return cached
        if override_lang_always:
            _ = _get_lang_fast(text, meta, provider)  # compute once; field merge later
        limiter.wait()
        res = lx.extract(
            text_or_documents=text,
            prompt_description=PROMPT,
            examples=EXAMPLES,
            model_id=(model_id or (MODEL_ID_GEMINI if provider=='gemini' else MODEL_ID_OPENAI)),
            language_model_params={"temperature": float(temperature)},
        )
        with _EXTRACT_CACHE_LOCK:
            _EXTRACT_CACHE[th] = res
        return res

    # Parallel extract uniques
    to_run = []
    with _EXTRACT_CACHE_LOCK:
        for thash, (text, meta) in unique_by_hash.items():
            if thash not in _EXTRACT_CACHE:
                to_run.append((thash, text, meta))

    if to_run:
        with use_langextract_provider(provider, gemini_key=GEMINI_KEY, openai_key=OPENAI_KEY):
            with ThreadPoolExecutor(max_workers=LEX_MAX_WORKERS) as ex:
                futs = {ex.submit(_extract_one, text, meta): (thash, text) for thash, text, meta in to_run}
                for fut in as_completed(futs):
                    _ = fut.result()

    # Stream JSONL and build enriched rows in original order
    jsonl_path = outdir / "grounded_extractions.jsonl"
    enriched_rows: List[Dict[str, Any]] = []

    with use_langextract_provider(provider, gemini_key=GEMINI_KEY, openai_key=OPENAI_KEY):
        with open(jsonl_path, "w", encoding="utf-8") as jf:
            for cid, text, meta, thash in records:
                with _EXTRACT_CACHE_LOCK:
                    result = _EXTRACT_CACHE.get(thash)
                if result is None:
                    result = _extract_one(text, meta)

                lang_source = lang_llm = lang_final = None
                if override_lang_always:
                    lang_source, lang_llm, lang_final = _get_lang_fast(text, meta, provider)

                # --- NEW: LLM summary (dedup + fallback)
                summary_text, summary_src = _get_summary_for_text(text, lang_final, provider)
                summary_provider_eff = (LEX_SUMMARY_PROVIDER or provider)
                summary_model_eff = (MODEL_ID_SUM_GEMINI if summary_provider_eff == "gemini" else MODEL_ID_SUM_OPENAI)

                payload = map_extractions_to_schema(
                    chunk_id=cid,
                    text=text,
                    doc_meta=meta,
                    extractions=result.extractions,
                    chunk_type_label=chunk_type_label,
                    lang_llm=lang_llm,
                    chunk_summary_override=summary_text,  # inject LLM summary
                )

                m = dict(meta)
                m["doc_id"] = m.get("doc_id") or payload.doc_id
                if payload.doc_type:   m["doc_type"]   = payload.doc_type
                if payload.chunk_type: m["chunk_type"] = payload.chunk_type
                if payload.chunk_index is not None: m["chunk_index"] = payload.chunk_index
                if payload.token_count is not None: m["token_count"] = payload.token_count
                if payload.title:      m["title"]  = payload.title
                if payload.date:       m["date"]   = payload.date
                if payload.source:     m["source"] = payload.source
                if payload.url is not None: m["url"] = payload.url

                if lang_source is not None: m["lang_source"] = lang_source
                if payload.lang_llm is not None: m["lang_llm"] = payload.lang_llm
                if lang_final is not None: m["lang"] = lang_final

                if payload.cpc_codes:          m["cpc_codes"] = payload.cpc_codes
                if payload.country_code:       m["country_code"] = payload.country_code
                if payload.content_year is not None:  m["content_year"] = payload.content_year
                if payload.content_month is not None: m["content_month"] = payload.content_month
                m["extraction_count"]   = payload.extraction_count
                m["span_integrity_pct"] = payload.span_integrity_pct

                # --- NEW: write summary + provenance
                m["chunk_summary"] = payload.chunk_summary
                m["chunk_summary_source"] = summary_src                # "llm" or "first_n_words"
                m["chunk_summary_provider"] = summary_provider_eff      # "gemini" | "openai"
                m["chunk_summary_model"] = summary_model_eff

                # Entities & others
                m["entities"]         = payload.entities.model_dump()
                m["topic_tags"]       = payload.topic_tags
                m["event_dates"]      = [e.model_dump() for e in payload.event_dates]
                m["role_annotations"] = [r.model_dump() for r in payload.role_annotations]
                m["numeric_facts"]    = [nf.model_dump() for nf in payload.numeric_facts]

                enriched_rows.append({"id": cid, "text": text, "metadata": json.dumps(m, ensure_ascii=False)})

                # JSON-safe extractions
                ex_json = [_jsonable_extraction(ex) for ex in result.extractions]
                jf.write(json.dumps({"document_id": cid, "text": text, "extractions": ex_json}, ensure_ascii=False) + "\n")

    vis_path = outdir / "visualization.html"
    if not LEX_SKIP_STATIC_VIZ:
        try:
            html_lib = lx.visualize(str(jsonl_path))
            with open(vis_path, "w", encoding="utf-8") as f:
                f.write(html_lib.data if hasattr(html_lib, "data") else html_lib)
        except Exception as e:
            print("Static visualization failed (skipped):", e)

    # Our interactive viewer
    make_interactive_html(jsonl_path, outdir / "visualization_interactive.html")

    enriched_df = pd.DataFrame(enriched_rows, columns=["id","text","metadata"])
    out_parquet = outdir / "chunks_enriched.parquet"
    try:
        enriched_df.to_parquet(out_parquet, index=False)
    except Exception as e:
        print("Parquet save failed:", e)

    total_extractions = recount_extractions_from_file(jsonl_path)
    return {
        "rows_processed": int(len(enriched_rows)),
        "total_extractions": int(total_extractions),
        "jsonl_path": str(jsonl_path),
        "vis_path": str(vis_path),
        "pyd_dir": str(pyd_dir),
        "provider_used": provider,
        "model_used": (model_id or (MODEL_ID_GEMINI if provider=='gemini' else MODEL_ID_OPENAI)),
        "counts_by_type": counts,
        "enriched_parquet": str(out_parquet),
    }
Python
# --- Utility: span integrity check -----------------------------------------
def check_span_integrity(jsonl_path: pathlib.Path) -> float:
    ok = total = 0
    raw = pathlib.Path(jsonl_path).read_text(encoding="utf-8").strip()
    if not raw:
        print("Span integrity: 0/0 (0.0%)")
        return 0.0
    def check_record(rec: Dict[str, Any]):
        nonlocal ok, total
        text = rec.get("text") or ""
        for ex in rec.get("extractions", []):
            start = end = None
            span = ex.get("char_span")
            if isinstance(span, dict): start, end = span.get("start"), span.get("end")
            if start is None or end is None:
                ci = ex.get("char_interval") or {}
                start, end = ci.get("start_pos"), ci.get("end_pos")
            etxt = ex.get("extraction_text") or ""
            if isinstance(start, int) and isinstance(end, int):
                total += 1
                if text[start:end] == etxt: ok += 1
    if raw[0] == "[":  # rare
        try:
            arr = json.loads(raw)
            for r in arr: check_record(r)
        except Exception: pass
    else:
        for line in raw.splitlines():
            line = line.strip()
            if not line: continue
            try: rec = json.loads(line); check_record(rec)
            except Exception: continue
    pct = (ok / total * 100.0) if total else 0.0
    print(f"Span integrity: {ok}/{total} ({pct:.1f}%)")
    return pct
Python
## Run LLM extraction (patent + media only) with **LLM summaries** enabled
today = dt.date.today().isoformat()
OUT_BASE = REPO / "notebooks" / "reports" / "metadata_langextract"

ONLY_TYPES = [s.strip().lower() for s in os.getenv("LEX_ONLY_TYPES", "patent,media").split(",") if s.strip()]
LIMIT_TOPIC  = int(os.getenv("LEX_LIMIT_TOPIC",  "0"))
LIMIT_PATENT = int(os.getenv("LEX_LIMIT_PATENT", "0"))
LIMIT_MEDIA  = int(os.getenv("LEX_LIMIT_MEDIA",  "0"))

PER_TYPE_LIMITS = None
if any([LIMIT_TOPIC, LIMIT_PATENT, LIMIT_MEDIA]):
    PER_TYPE_LIMITS = {}
    if "topic"  in ONLY_TYPES:  PER_TYPE_LIMITS["topic"]  = LIMIT_TOPIC
    if "patent" in ONLY_TYPES:  PER_TYPE_LIMITS["patent"] = LIMIT_PATENT if LIMIT_PATENT else 10**9
    if "media"  in ONLY_TYPES:  PER_TYPE_LIMITS["media"]  = LIMIT_MEDIA  if LIMIT_MEDIA  else 10**9

provider_fixed = "gemini" if GEMINI_KEY else "openai"
provider_sem   = ("openai" if (GEMINI_KEY and OPENAI_KEY) else ("gemini" if GEMINI_KEY else "openai"))

# --- Fixed-size ---
out_fixed = OUT_BASE / "fixed_size" / "2025-09-03"
res_fixed = run_extraction_on_parquet(
    FIXED_PARQUET,
    out_fixed,
    only_types=ONLY_TYPES,
    per_type_limits=PER_TYPE_LIMITS,
    chunk_type_label="fixed",
    temperature=0.0,
    provider_choice=provider_fixed,
    override_lang_always=True,
)
print("fixed_size:", json.dumps(res_fixed, indent=2))
print("Fixed_size spans →")
check_span_integrity(pathlib.Path(res_fixed["jsonl_path"]))
rebuild_and_preview(res_fixed, title="fixed_size")

# --- Semantic ---
out_sem = OUT_BASE / "semantic" / "2025-09-03"
res_sem = run_extraction_on_parquet(
    SEM_PARQUET,
    out_sem,
    only_types=ONLY_TYPES,
    per_type_limits=PER_TYPE_LIMITS,
    chunk_type_label="semantic",
    temperature=0.0,
    provider_choice=provider_sem,
    override_lang_always=True,
)
print("\nsemantic:", json.dumps(res_sem, indent=2))
print("Semantic spans →")
check_span_integrity(pathlib.Path(res_sem["jsonl_path"]))
rebuild_and_preview(res_sem, title="semantic")

print("\nEnriched INPUT-SCHEMA outputs (id, text, metadata):")
print("  Fixed-size  →", res_fixed["enriched_parquet"]) 
print("  Semantic    →", res_sem["enriched_parquet"]) 
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
fixed_size: {
  "rows_processed": 2,
  "total_extractions": 65,
  "jsonl_path": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\fixed_size\\2025-09-03\\grounded_extractions.jsonl",
  "vis_path": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\fixed_size\\2025-09-03\\visualization.html",
  "pyd_dir": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\fixed_size\\2025-09-03\\pydantic_json",
  "provider_used": "gemini",
  "model_used": "gemini-1.5-flash",
  "counts_by_type": {
    "patent": 1,
    "media": 1
  },
  "enriched_parquet": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\fixed_size\\2025-09-03\\chunks_enriched.parquet"
}
Fixed_size spans →
Span integrity: 60/65 (92.3%)
fixed_size interactive viewer → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\fixed_size\2025-09-03\visualization_interactive.html

Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
semantic: {
  "rows_processed": 2,
  "total_extractions": 10,
  "jsonl_path": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\semantic\\2025-09-03\\grounded_extractions.jsonl",
  "vis_path": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\semantic\\2025-09-03\\visualization.html",
  "pyd_dir": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\semantic\\2025-09-03\\pydantic_json",
  "provider_used": "openai",
  "model_used": "gpt-4o-mini",
  "counts_by_type": {
    "patent": 1,
    "media": 1
  },
  "enriched_parquet": "C:\\Users\\gerbe\\PycharmProjects\\MT\\notebooks\\reports\\metadata_langextract\\semantic\\2025-09-03\\chunks_enriched.parquet"
}
Semantic spans →
Span integrity: 8/10 (80.0%)
semantic interactive viewer → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\semantic\2025-09-03\visualization_interactive.html

Text Only
1
2
3
Enriched INPUT-SCHEMA outputs (id, text, metadata):
  Fixed-size  → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\fixed_size\2025-09-03\chunks_enriched.parquet
  Semantic    → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\semantic\2025-09-03\chunks_enriched.parquet
Python
## Topics from Silver/OpenAlex (2025‑08‑09) — no LLM
def _find_one(base_dir: pathlib.Path, patterns: List[str]) -> Optional[pathlib.Path]:
    for pat in patterns:
        for ext in ["parquet", "csv", "jsonl", "json"]:
            for p in base_dir.glob(f"{pat}.{ext}"):
                return p
        for p in base_dir.glob(f"**/{pat}.*"):
            return p
    return None

def _read_any_table(path: pathlib.Path) -> pd.DataFrame:
    if path.suffix.lower() == ".parquet":
        return pd.read_parquet(path)
    if path.suffix.lower() == ".csv":
        return pd.read_csv(path)
    if path.suffix.lower() in {".jsonl", ".json"}:
        if path.suffix.lower() == ".jsonl":
            rows = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
            return pd.DataFrame(rows)
        else:
            data = json.loads(path.read_text(encoding="utf-8"))
            if isinstance(data, list): return pd.DataFrame(data)
            elif isinstance(data, dict): return pd.DataFrame([data])
    raise ValueError(f"Unsupported file type: {path}")

def load_openalex_topics(openalex_dir: pathlib.Path) -> Dict[str, pd.DataFrame]:
    cand_topics   = _find_one(openalex_dir, ["topics_canonical", "topics", "topics_clean"])
    cand_keywords = _find_one(openalex_dir, ["topic_keywords_m2m", "topic_keywords", "topics_keywords"])
    cand_siblings = _find_one(openalex_dir, ["topic_siblings_m2m", "topic_siblings", "topics_siblings"])
    cand_domains  = _find_one(openalex_dir, ["domains_ref", "domains"])
    cand_fields   = _find_one(openalex_dir, ["fields_ref", "fields"])
    cand_subf     = _find_one(openalex_dir, ["subfields_ref", "subfields"])
    tables = {}
    if cand_topics:   tables["topics"]    = _read_any_table(cand_topics)
    if cand_keywords: tables["keywords"]  = _read_any_table(cand_keywords)
    if cand_siblings: tables["siblings"]  = _read_any_table(cand_siblings)
    if cand_domains:  tables["domains"]   = _read_any_table(cand_domains)
    if cand_fields:   tables["fields"]    = _read_any_table(cand_fields)
    if cand_subf:     tables["subfields"] = _read_any_table(cand_subf)
    print("Loaded OpenAlex tables:")
    for k, v in tables.items():
        print("  ", k, v.shape, "cols:", list(v.columns)[:8], "...")
    return tables

tables = load_openalex_topics(OPENALEX_DIR)
for k in list(tables.keys()):
    df = tables[k]
    df.columns = [c.lower() for c in df.columns]
    tables[k] = df

topics    = tables.get("topics",    pd.DataFrame()).copy()
keywords  = tables.get("keywords",  pd.DataFrame()).copy()
siblings  = tables.get("siblings",  pd.DataFrame()).copy()
domains   = tables.get("domains",   pd.DataFrame()).copy()
fields    = tables.get("fields",    pd.DataFrame()).copy()
subfields = tables.get("subfields", pd.DataFrame()).copy()

if "topic_id" not in topics.columns and "id" in topics.columns:
    topics = topics.rename(columns={"id":"topic_id"})
if "display_name" not in topics.columns and "name" in topics.columns:
    topics = topics.rename(columns={"name":"display_name"})
topics["topic_id_norm"] = topics["topic_id"].map(_normalize_topic_id)

# --- keywords aggregation
if not keywords.empty:
    if "topic_id" not in keywords.columns:
        for alt in ["topic", "topicid", "topic_id_norm"]:
            if alt in keywords.columns:
                keywords = keywords.rename(columns={alt:"topic_id"})
                break
    kw_col = None
    for cand in ["keyword_norm","keyword","keyword_orig","kw","term"]:
        if cand in keywords.columns: kw_col = cand; break
    if kw_col is None and len(keywords.columns) >= 2:
        kw_col = keywords.columns[1]
    keywords["topic_id_norm"] = keywords["topic_id"].map(_normalize_topic_id)
    kw = keywords.groupby("topic_id_norm")[kw_col].apply(lambda s: sorted({str(x) for x in s if str(x).strip()})).reset_index(name="keywords")
else:
    kw = pd.DataFrame(columns=["topic_id_norm","keywords"])    

# --- siblings aggregation
if not siblings.empty:
    sib_src = next((c for c in ["topic_id","topic"] if c in siblings.columns), None)
    sib_dst = next((c for c in ["sibling_topic_id","sibling","topic_id_2","topic2","topic_b"] if c in siblings.columns and c != sib_src), None)
    if sib_src is None or sib_dst is None:
        cols = list(siblings.columns)
        if len(cols) >= 2: sib_src, sib_dst = cols[0], cols[1]
    siblings["src_norm"] = siblings[sib_src].map(_normalize_topic_id)
    siblings["dst_norm"] = siblings[sib_dst].map(_normalize_topic_id)
    sib_aggr = siblings.groupby("src_norm")["dst_norm"].apply(lambda s: sorted({x for x in s if x})).reset_index(name="sibling_ids")
    sib_aggr["sibling_degree"] = sib_aggr["sibling_ids"].map(lambda x: len(x))
else:
    sib_aggr = pd.DataFrame(columns=["src_norm","sibling_ids","sibling_degree"]) 

# --- taxonomy refs normalization
def _ref_pick_name_col(df: pd.DataFrame) -> str:
    for c in ["display_name", "name", "label", "title"]:
        if c in df.columns: return c
    return df.columns[1] if len(df.columns) > 1 else df.columns[0]
def _ref_pick_id_col(df: pd.DataFrame) -> str:
    for c in ["id", "domain_id", "field_id", "subfield_id", "code"]:
        if c in df.columns: return c
    return df.columns[0]

if not domains.empty:
    did_col = _ref_pick_id_col(domains); dname_col = _ref_pick_name_col(domains)
    domains = domains.rename(columns={did_col:"domain_id", dname_col:"domain_name"})
if not fields.empty:
    fid_col = _ref_pick_id_col(fields); fname_col = _ref_pick_name_col(fields)
    fields = fields.rename(columns={fid_col:"field_id", fname_col:"field_name"})
if not subfields.empty:
    sid_col = _ref_pick_id_col(subfields); sname_col = _ref_pick_name_col(subfields)
    subfields = subfields.rename(columns={sid_col:"subfield_id", sname_col:"subfield_name"})

for col in ["domain_id","field_id","subfield_id"]:
    if col not in topics.columns: topics[col] = None

if not subfields.empty: topics = topics.merge(subfields[["subfield_id","subfield_name"]], how="left", on="subfield_id")
if not fields.empty:    topics = topics.merge(fields[["field_id","field_name"]],       how="left", on="field_id")
if not domains.empty:   topics = topics.merge(domains[["domain_id","domain_name"]],    how="left", on="domain_id")

topics = topics.merge(kw, on="topic_id_norm", how="left")
topics = topics.merge(sib_aggr.rename(columns={"src_norm":"topic_id_norm"}), on="topic_id_norm", how="left")

topics["topic_url"] = topics["topic_id_norm"].map(lambda t: f"https://openalex.org/{t}" if t else None)
topics["topic_slug"] = topics["display_name"].map(lambda s: re.sub(r"[^a-z0-9]+","-", str(s).lower()).strip("-"))

topics["keywords"] = topics["keywords"].apply(lambda x: x if isinstance(x, list) else [])
topics["sibling_ids"] = topics["sibling_ids"].apply(lambda x: x if isinstance(x, list) else [])
topics["sibling_degree"] = topics["sibling_degree"].fillna(0).astype(int)

TOPIC_OUT_BASE = REPO / "notebooks" / "reports" / "metadata_langextract" / "topics"
TOPIC_OUT_BASE.mkdir(parents=True, exist_ok=True)
enriched_topics_path_parquet = TOPIC_OUT_BASE / "topics_enriched.parquet"
try:
    topics.to_parquet(enriched_topics_path_parquet, index=False)
except Exception as e:
    print("Parquet save failed:", e)
print("Topics enriched →", enriched_topics_path_parquet)
Text Only
1
2
3
4
5
6
7
8
Loaded OpenAlex tables:
   topics (4516, 15) cols: ['topic_id', 'display_name', 'description', 'works_count', 'cited_by_count', 'created_date', 'updated_date', 'lang_display_name'] ...
   keywords (45154, 3) cols: ['topic_id', 'keyword_norm', 'keyword_orig'] ...
   siblings (243006, 2) cols: ['topic_id', 'sibling_topic_id'] ...
   domains (4, 2) cols: ['id', 'name'] ...
   fields (26, 2) cols: ['id', 'name'] ...
   subfields (252, 2) cols: ['id', 'name'] ...
Topics enriched → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topics_enriched.parquet
Python
## Topic graph (nodes & edges) — Parquet‑only
def _node(df: pd.DataFrame, id_col: str, name_col: str, ntype: str, extra_cols: List[str] = None) -> pd.DataFrame:
    extra_cols = extra_cols or []
    if df is None or df.empty:
        return pd.DataFrame(columns=["node_id","display_name","node_type"])
    cols = [id_col, name_col] + [c for c in extra_cols if c in df.columns]
    x = df[cols].copy()
    x = x.rename(columns={id_col:"node_id", name_col:"display_name"})
    x["node_type"] = ntype
    return x

topic_nodes = _node(
    topics, "topic_id_norm", "display_name", "topic",
    extra_cols=["keywords","sibling_degree","domain_id","field_id","subfield_id","domain_name","field_name","subfield_name","topic_url","topic_slug"]
)
domain_nodes   = _node(domains,   "domain_id",   "domain_name",   "domain")
field_nodes    = _node(fields,    "field_id",    "field_name",    "field")
subfield_nodes = _node(subfields, "subfield_id", "subfield_name", "subfield")

# Ensure clean string ids and a unique node key
frames = [topic_nodes, domain_nodes, field_nodes, subfield_nodes]
frames = [(df.assign(node_id=df["node_id"].astype(str).str.strip()) if not df.empty else df) for df in frames]
nodes = pd.concat(frames, ignore_index=True, sort=False)
nodes["node_key"] = nodes["node_type"].astype(str) + ":" + nodes["node_id"].astype(str)
nodes = nodes.drop_duplicates(subset=["node_key"]).reset_index(drop=True)
nodes["display_name"] = nodes["display_name"].astype(str)

edges = []
if not siblings.empty:
    sib_src = next((c for c in ["topic_id","topic"] if c in siblings.columns), None)
    sib_dst = next((c for c in ["sibling_topic_id","sibling","topic_id_2","topic2","topic_b"] if c in siblings.columns and c != sib_src), None)
    if sib_src is None or sib_dst is None:
        cols = list(siblings.columns)
        if len(cols) >= 2: sib_src, sib_dst = cols[0], cols[1]
    tmp = siblings.copy()
    tmp["src_norm"] = tmp[sib_src].map(_normalize_topic_id)
    tmp["dst_norm"] = tmp[sib_dst].map(_normalize_topic_id)
    tmp = tmp[["src_norm","dst_norm"]].dropna()
    tmp["edge_type"] = "sibling"
    tmp["src"] = "topic:" + tmp["src_norm"]
    tmp["dst"] = "topic:" + tmp["dst_norm"]
    edges.append(tmp[["src","dst","edge_type"]])

if "subfield_id" in topics.columns and not topics.empty:
    tmp = topics[["topic_id_norm","subfield_id"]].dropna().copy()
    tmp["src"] = "topic:" + tmp["topic_id_norm"].astype(str)
    tmp["dst"] = "subfield:" + tmp["subfield_id"].astype(str)
    tmp["edge_type"] = "belongs_to"
    edges.append(tmp[["src","dst","edge_type"]])

if not subfields.empty and "field_id" in subfields.columns:
    tmp = subfields[["subfield_id","field_id"]].dropna().copy()
    tmp["src"] = "subfield:" + tmp["subfield_id"].astype(str)
    tmp["dst"] = "field:" + tmp["field_id"].astype(str)
    tmp["edge_type"] = "belongs_to"
    edges.append(tmp[["src","dst","edge_type"]])

if not fields.empty and "domain_id" in fields.columns:
    tmp = fields[["field_id","domain_id"]].dropna().copy()
    tmp["src"] = "field:" + tmp["field_id"].astype(str)
    tmp["dst"] = "domain:" + tmp["domain_id"].astype(str)
    tmp["edge_type"] = "belongs_to"
    edges.append(tmp[["src","dst","edge_type"]])

edges_df = pd.concat(edges, ignore_index=True) if edges else pd.DataFrame(columns=["src","dst","edge_type"])
edges_df = edges_df.drop_duplicates().reset_index(drop=True)

node_keys = set(nodes["node_key"]) if not nodes.empty else set()
missing = sorted((set(edges_df["src"]) | set(edges_df["dst"])) - node_keys)
if missing:
    print(f"[warn] {len(missing)} edge endpoints are missing in nodes. Example:", missing[:5])

GRAPH_OUT = REPO / "notebooks" / "reports" / "metadata_langextract" / "topics"
GRAPH_OUT.mkdir(parents=True, exist_ok=True)
nodes_path = GRAPH_OUT / "topics_graph_nodes.parquet"
edges_path = GRAPH_OUT / "topics_graph_edges.parquet"
try:
    nodes.to_parquet(nodes_path, index=False)
    edges_df.to_parquet(edges_path, index=False)
except Exception as e:
    print("Parquet save failed:", e)
print("Graph nodes →", nodes_path)
print("Graph edges →", edges_path)
Text Only
1
2
Graph nodes → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topics_graph_nodes.parquet
Graph edges → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topics_graph_edges.parquet
Python
## Topic↔Chunk join and injection of topic fields into chunk metadata (Parquet only)
JOIN_OUT = REPO / "notebooks" / "reports" / "metadata_langextract" / "topics" / "topic_chunk_join"
JOIN_OUT.mkdir(parents=True, exist_ok=True)

def _topic_chunk_join(chunks_df: pd.DataFrame, name: str):
    rows = []
    for row in chunks_df.itertuples(index=False):
        cid = str(row.id)
        meta = _safe_meta(getattr(row, "metadata", {}))
        dt = _derive_doc_type(cid, meta) or "unknown"
        if dt != "topic":
            continue
        topic_id = _normalize_topic_id(meta.get("doc_id") or cid)
        if topic_id:
            rows.append({"topic_id": topic_id, "chunk_id": cid})
    out = pd.DataFrame(rows, columns=["topic_id","chunk_id"])
    outp = JOIN_OUT / f"topic_chunk_join__{name}.parquet"
    try:
        out.to_parquet(outp, index=False)
    except Exception as e:
        print("Parquet save failed:", e)
    print("Topic↔chunk join →", outp, "rows:", len(out))

def _inject_topic_meta_into_chunks(chunks_df: pd.DataFrame, topics_df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    join_cols = [
        "topic_id_norm","display_name","keywords","domain_name","field_name","subfield_name",
        "domain_id","field_id","subfield_id","sibling_degree","sibling_ids","topic_url","topic_slug"
    ]
    join_cols = [c for c in join_cols if c in topics_df.columns]
    j = topics_df[join_cols].copy()

    for row in chunks_df.itertuples(index=False):
        cid = str(row.id)
        text = getattr(row, "text", None)
        meta = _safe_meta(getattr(row, "metadata", {}))
        dt   = _derive_doc_type(cid, meta) or "unknown"
        if dt != "topic":
            rows.append({"id": cid, "text": text, "metadata": json.dumps(meta, ensure_ascii=False)})
            continue
        topic_id = None
        for cand in [meta.get("doc_id"), cid]:
            tid = _normalize_topic_id(cand)
            if tid: topic_id = tid; break
        if topic_id and len(j):
            m = j[j["topic_id_norm"] == topic_id]
            if len(m):
                rec = m.iloc[0].to_dict()
                meta["topic_id"] = topic_id
                meta["topic_display_name"] = rec.get("display_name")
                if "keywords" in rec:        meta["topic_keywords"] = rec["keywords"]
                if "domain_name" in rec:     meta["topic_domain_name"] = rec["domain_name"]
                if "field_name" in rec:      meta["topic_field_name"] = rec["field_name"]
                if "subfield_name" in rec:   meta["topic_subfield_name"] = rec["subfield_name"]
                if "domain_id" in rec:       meta["topic_domain_id"] = rec["domain_id"]
                if "field_id" in rec:        meta["topic_field_id"] = rec["field_id"]
                if "subfield_id" in rec:     meta["topic_subfield_id"] = rec["subfield_id"]
                if "sibling_degree" in rec:  meta["topic_sibling_degree"] = int(rec["sibling_degree"])
                if "sibling_ids" in rec:     meta["topic_sibling_ids"] = rec["sibling_ids"]
                if "topic_url" in rec:       meta["topic_url"] = rec["topic_url"]
                if "topic_slug" in rec:      meta["topic_slug"] = rec["topic_slug"]
        rows.append({"id": cid, "text": text, "metadata": json.dumps(meta, ensure_ascii=False)})
    return pd.DataFrame(rows, columns=["id","text","metadata"])

chunks_fixed = pd.read_parquet(FIXED_PARQUET, columns=["id","text","metadata"]) 
chunks_sem   = pd.read_parquet(SEM_PARQUET,   columns=["id","text","metadata"]) 

_topic_chunk_join(chunks_fixed, "fixed_size_2025-09-03")
_topic_chunk_join(chunks_sem,   "semantic_2025-09-03")

fixed_with_topicmeta = _inject_topic_meta_into_chunks(chunks_fixed, topics)
sem_with_topicmeta   = _inject_topic_meta_into_chunks(chunks_sem, topics)

out_fixed_injected = JOIN_OUT / "fixed_size_2025-09-03__chunks_with_topicmeta.parquet"
out_sem_injected   = JOIN_OUT / "semantic_2025-09-03__chunks_with_topicmeta.parquet"
try:
    fixed_with_topicmeta.to_parquet(out_fixed_injected, index=False)
    sem_with_topicmeta.to_parquet(out_sem_injected, index=False)
except Exception as e:
    print("Parquet save failed:", e)
print("Injected chunk outputs:")
print("  Fixed-size →", out_fixed_injected)
print("  Semantic   →", out_sem_injected)
Text Only
1
2
3
4
5
Topic↔chunk join → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topic_chunk_join\topic_chunk_join__fixed_size_2025-09-03.parquet rows: 4516
Topic↔chunk join → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topic_chunk_join\topic_chunk_join__semantic_2025-09-03.parquet rows: 5835
Injected chunk outputs:
  Fixed-size → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topic_chunk_join\fixed_size_2025-09-03__chunks_with_topicmeta.parquet
  Semantic   → C:\Users\gerbe\PycharmProjects\MT\notebooks\reports\metadata_langextract\topics\topic_chunk_join\semantic_2025-09-03__chunks_with_topicmeta.parquet
Python
## Summary
print("\n=== Summary ===")
try:
    print("LLM Extraction + Summaries:")
    print("  Fixed-size rows processed:", res_fixed["rows_processed"],
          "| Provider:", res_fixed["provider_used"], "| Model:", res_fixed["model_used"])
    print("  Semantic   rows processed:", res_sem["rows_processed"],
          "| Provider:", res_sem["provider_used"],   "| Model:", res_sem["model_used"])
    print("  Summaries  → enabled:", bool(LEX_SUMMARY_ENABLED),
          "| provider override:", (LEX_SUMMARY_PROVIDER or "(same as extraction)"),
          "| sentences:", LEX_SUMMARY_SENTENCES)
except Exception as e:
    print("  (LLM section not run yet)")
try:
    print("Topic Enrichment:")
    print("  topics_enriched rows:", len(topics))
    print("  nodes:", len(nodes), "| edges:", len(edges_df))
except Exception:
    print("  (topics/graph not built yet)")
print("\nOutputs under notebooks/reports/metadata_langextract/:")
print("  - fixed_size/2025-09-03/ (JSONL, interactive HTML, chunks_enriched.parquet)")
print("  - semantic/2025-09-03/   (JSONL, interactive HTML, chunks_enriched.parquet)")
print("  - topics/ (topics_enriched.parquet, topics_graph_nodes.parquet, topics_graph_edges.parquet, topic_chunk_join/*.parquet)")
Text Only
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
=== Summary ===
LLM Extraction + Summaries:
  Fixed-size rows processed: 2 | Provider: gemini | Model: gemini-1.5-flash
  Semantic   rows processed: 2 | Provider: openai | Model: gpt-4o-mini
  Summaries  → enabled: True | provider override: openai | sentences: 3
Topic Enrichment:
  topics_enriched rows: 4516
  nodes: 4798 | edges: 247522

Outputs under notebooks/reports/metadata_langextract/:
  - fixed_size/2025-09-03/ (JSONL, interactive HTML, chunks_enriched.parquet)
  - semantic/2025-09-03/   (JSONL, interactive HTML, chunks_enriched.parquet)
  - topics/ (topics_enriched.parquet, topics_graph_nodes.parquet, topics_graph_edges.parquet, topic_chunk_join/*.parquet)