Skip to content

Reference

This page is the API reference for the MT project, grouped by pipeline layer. It is rendered with mkdocstrings (Python handler).
See also the operator-focused CLI cheatsheet.


API: /query

The API provides a single query endpoint that wraps retrieval and (optionally) an LLM answer. It lives in service/main.py.

Request

  • Method: POST
  • Path: /query
  • Header: x-api-key: <RAG_API_KEY> (only required if RAG_API_KEY is set on the server)

Body schema (top-level):

  • query (string, required): user query text.
  • params (object, optional): retrieval parameter overrides (see table below).
  • include_answer (bool, default true): run the follow-up answer stage.
  • include_retrieval (bool, default true): include full retrieval payload in the response.
  • llm_cfg (object, optional): override LLM provider settings (url/model/key).

Supported params keys:

  • dataset, date, ingest_tag, level
  • seed_k, fetch_top_n
  • expand_ratio, expand_limit
  • rerank_mode, prefer, alpha, sort_by
  • diversify, mmr_lambda, max_per_doc, min_occurrence, top_k
  • graph_required, coverage, fusion
  • timeout_s

Example request:

JSON
{
  "query": "solid-state battery supply chain risks in Europe",
  "params": {
    "dataset": "fixed_size",
    "date": "2025-09-14",
    "ingest_tag": "comm_fixed_C1_g1_2",
    "level": "C1",
    "seed_k": 120,
    "fetch_top_n": 120,
    "expand_ratio": 2.0,
    "expand_limit": 800,
    "rerank_mode": "dense",
    "prefer": "summary",
    "alpha": 0.7,
    "sort_by": "final",
    "diversify": "mmr",
    "mmr_lambda": 0.5,
    "max_per_doc": 2,
    "min_occurrence": 3,
    "top_k": 20,
    "graph_required": true,
    "coverage": true,
    "fusion": "summary",
    "timeout_s": 120
  },
  "include_answer": true,
  "include_retrieval": true,
  "llm_cfg": {
    "url": "http://<vllm_host>:8000/v1/chat/completions",
    "model": "<model_id>",
    "api_key_env": "VLLM_API_KEY"
  }
}

Response

Top-level keys:

  • answer: string (may be empty if include_answer=false or if LLM fails).
  • sources: list of preview rows used as citations (filtered if the LLM returned citations).
  • retrieval: full retrieval payload (only when include_retrieval=true).
  • error: error string when answer generation fails.

retrieval contains:

  • fusion_json: the fused retrieval output.
  • ctx_pack: normalized context pack passed into the LLM pipeline.
  • scores_map: per-stage score maps used in fusion.
  • preview_rows: enriched rows for UI rendering.
  • latency_ms: end-to-end retrieval latency.

Example response (truncated):

JSON
{
  "answer": "Key risks include ...",
  "sources": [
    { "chunk_id": "chunk_001", "doc_id": "doc_123", "doc_type": "media", "title": "..." }
  ],
  "retrieval": {
    "query": "solid-state battery supply chain risks in Europe",
    "params": { "dataset": "fixed_size", "top_k": 20 },
    "fusion_json": { "items": [ { "id": "chunk_001", "score": 0.92 } ] },
    "ctx_pack": { "items": [ { "chunk_id": "chunk_001", "chunk_summary": "..." } ] },
    "scores_map": { "dense": { "chunk_001": 0.92 } },
    "preview_rows": [ { "chunk_id": "chunk_001", "title": "..." } ],
    "latency_ms": 143.2
  }
}

Notes:

  • The API answer uses the follow-up answer stage (not the full constraints/discovery/ideation/prioritization pipeline).
  • Use include_answer=false to return retrieval only.
  • Defaults are applied from DEFAULT_RETRIEVAL_SETTINGS when fields are omitted.

Bronze — cleantech_pipeline

Console entrypoint & config

Command-line interface for fetching Bronze layer data.

Central configuration & environment-variable handling.

Data sources

Abstract interface all data sources must implement.

DataSource

Bases: ABC

All concrete sources must implement name and fetch methods.

fetch abstractmethod

Python
fetch(**kwargs)

Retrieve data and return a handle to it.

Kaggle dataset downloader for the Bronze layer.

KaggleDataset

Bases: DataSource

Kaggle-Downloader: Bronze-konform (Original-ZIP + JSONL.gz-Mirror + Manifest).

fetch

Python
fetch(mirror=True, keep_extracted=False)

Loads ZIP unchanged, optionally mirrors to JSONL.gz, writes manifest. Returns: Bronze directory of this run.

OpenAlex works downloader for the Bronze layer.

OpenAlexDataset

Bases: DataSource

Cursor-based collector for OpenAlex Works (Bronze tier).

  • Loads data page by page using cursor (recommended method by OpenAlex).
  • Writes 1 document per line as JSONL.gz to: {DATA_ROOT}/bronze/openalex/{YYYY-MM-DD}/raw.jsonl.gz
  • Writes manifest line to: {DATA_ROOT}/bronze/openalex/{YYYY-MM-DD}/raw_manifest.jsonl
  • Additionally returns (as before) the loaded entries as a list. Caution: For large runs, this can consume a lot of RAM – for productive runs, it is better to use save=True and ignore the list.

fetch

Python
fetch(save=True)

Loads Works via cursor. Saves (optionally) as JSONL.gz + manifest. Also returns a list of loaded records (for small runs).

OpenAlex topics downloader for the Bronze layer.

OpenAlexTopics

Bases: DataSource

Cursor-based collector for OpenAlex topics (bronze tier).

  • Endpoint: /topics
  • Writes 1 topic per line as JSONL.gz: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/topics.jsonl.gz
  • Manifest-line: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/raw_manifest.jsonl
  • Optional additional plain mirror: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/extracted/topics.jsonl

Silver — ctclean

Console entrypoint & CLI helpers

Command-line interface for the Silver cleaning pipeline.

This module exposes the Typer application used to clean Bronze level data into Silver artifacts. It is re-exported as the ctclean console script.

media

Python
media(n_rows=Option(None, help='Read only top N rows (for debugging)'), bronze_dir=None, silver_dir=None, include_listings=Option(False, help='Keep listing/archive pages in canonicalization (the notebook keeps them).'))
Notebook sequence

1) prepare() 2) non-article suppression 3) quality gate (min 200 chars) 4) canonicalize

main

Python
main()

Entry point for the ctclean console script.

Filesystem path helpers for the pipeline.

project_root

Python
project_root()

Resolve the repository root (the folder that contains cleantech_data).

latest_bucket

Python
latest_bucket(base)

Choose the newest subdirectory whose name looks like YYYY-MM-DD. Fallback to most-recent mtime if names aren’t date-like.

Pipelines

Cleaning steps for media articles in the Silver pipeline.

prepare

Python
prepare(df)
Notebook-equivalent media prep
  • url_clean, domain, is_listing
  • stoplines (from RAW content)
  • title_clean / content_clean
  • lang_title/_score and lang_content/_score
  • counts + content_sha1 + title_fp
  • published_dt
  • url_key
  • drops CSV index artifacts (e.g. 'Unnamed: 0')

quality_gate

Python
quality_gate(df, min_chars=200)

Drop near-empty content AFTER hygiene, before canonicalization (notebook Step 5).

canonicalize

Python
canonicalize(df)

Merge keys (notebook parity): A) identical url_key B) exact content_sha1 with guards (len>=200 already via quality gate; also require >=60 words) C) (domain, title_fp) gated by len ratio >= .90 and date span <= 7 days; prefer non-listing canonical Returns: - canon_df (one row per canonical) - links_df (member_id → canonical_id with reason and group_key)

Cleaning steps for patent records in the Silver pipeline.

prepare

Python
prepare(df)
Light cleaning that mirrors the notebook
  • title_clean / abstract_clean
  • abstract length gate (>=40 chars)
  • robust publication_date_dt parsing
  • NEW: language detection (lang_title/_score, lang_abs/_score)

englishness

Python
englishness(text)

Very simple 'englishness' proxy: fraction of ASCII chars.

canonicalize

Python
canonicalize(df)
Choose a single canonical row per publication_number using
  • abstract_len (desc), englishness(text) (desc),
  • publication_date_dt (asc), title_len (desc)

Returns (canonical_df, links_df).

normalize

Python
normalize(df)
Aggregate to one row per publication_number with
  • cpc_codes: list[str]
  • inventors: list[str]
  • application_number: list[str]
  • country_code: str (primary) + country_codes: list[str] (full set)
  • title_clean: longest title
  • abstract_clean: longest abstract
  • publication_date_dt: earliest
  • NEW: lang_title/lang_title_score, lang_abs/lang_abs_score (per best score)

Processing utilities for OpenAlex topic data in the Silver pipeline.

Change: we no longer detect language for OpenAlex topics. OpenAlex returns English-localized fields for display_name and description, so we mark them as 'en' unconditionally.

process

Python
process(records)
Turns raw OpenAlex topic dicts into

1) topics_df (topics_canonical): one row per topic, with taxonomy columns 2) kw_df (topic_keywords_m2m) 3) sib_df (topic_siblings_m2m) 4) dom_df (domains_ref) -> columns: id, name (id is short token) 5) field_df (fields_ref) -> columns: id, name (id is short token) 6) sub_df (subfields_ref) -> columns: id, name (id is short token)

Text processing & fingerprints

Text cleaning helpers used across Silver pipeline modules.

force_text

Python
force_text(x)

Return a safe unicode string from lists/bytes/hex/None, stripping NULs.

canonicalize_url

Python
canonicalize_url(url)
Notebook-style canonicalizer
  • lowercase host, strip fragment
  • drop tracking params (utm_*, fbclid, gclid, mc_cid, mc_eid)
  • keep original path
  • keep (sorted) query

build_stoplines

Python
build_stoplines(df, text_col, domain_col, min_len=40, max_len=240, min_freq=10)

Learn high-frequency lines per domain to drop later (repeaters/footers). Mirrors the notebook logic.

clean_title

Python
clean_title(x)

Light, safe title cleaning (HTML/entities/whitespace).

clean_content

Python
clean_content(x)

Simple content cleaner used outside the media pipeline.

clean_content_advanced

Python
clean_content_advanced(x, domain=None, stoplines=None, mode='lo')

Advanced cleaner for media content (notebook parity + fixes): - robust coercion + HTML strip + control-char scrub - preserve paragraph breaks from list-like inputs - remove known cookie/signup/script prefixes - drop deduped paragraphs - drop high-frequency stoplines (per-domain and generic) - domain-tail drop (Energy-XPRT, AZoCleantech Azthena/OpenAI block, etc.) - NEW: drop AZo promo/consent lines anywhere in the doc (not just tail) - tidy whitespace/newlines

Utilities for generating content fingerprints.

title_fingerprint

Python
title_fingerprint(title)
Notebook-style title fingerprint
  • normalize, lowercase
  • tokens [a-z0-9]+
  • drop small stopword set
  • tokens with len>1
  • unique + sort
  • join by '|'

url_key

Python
url_key(url)
Notebook-style normalized key used for exact URL merges
  • start from canonicalize_url (drops trackers, sorts query, strips fragment)
  • drop scheme entirely
  • lowercase host and remove 'www.' prefix
  • path default to '/', trim trailing '/' (except if path == '/')
  • keep sorted query (if present) -> 'example.com/path?b=2&a=3' (query sorted)

Additional utilities

Lightweight IO helpers for reading and writing pipeline data.

read_json_auto

Python
read_json_auto(path, n_rows=None)

Read JSON that may be array, NDJSON, concatenated JSON, or gzipped.

safe_write

Python
safe_write(df, path)

Write DataFrame to Parquet; on failure, sanitize types and retry. If it still fails, fall back to CSV.GZ (and log why).

parse_date_col

Python
parse_date_col(s, name='date')
Notebook-style robust parser
  • generic pandas parse
  • force 8-digit YYYYMMDD
  • 10-digit epoch (seconds)
  • 13-digit epoch (milliseconds)

Returns pandas datetime (naive, not UTC) with errors coerced.

detect cached

Python
detect(text, *, min_chars=20, min_alpha=10)

Detect language of a single string and return (iso639-1, score∈[0,1] or None). - Uses langid (installed) → pycld3 → langdetect in that order. - Returns (None, None) for short/low-alpha inputs.

detect_series

Python
detect_series(series, *, min_chars=20, min_alpha=10)

Vectorized detection for a pandas Series. Returns (codes, scores) as two Series.


Validation & Unify

unify

Python
unify(output_path=None)

Read the latest Silver buckets (media, patents, openalex), build a unified DataFrame, and write it to Parquet. Returns the Path to the written Parquet file.


Gold — Subsample, Chunk, Enrich, Patch

Subsample

run

Python
run(input_path=Option(None, '--input', '-i', help='Unified docs dataset (Parquet/CSV).'), outdir=Option(None, '--outdir', '-o', help='Output folder for subsample + manifest.'), by=Option('doc_type,lang', help='Comma-separated strata columns.'), n=Option(None, help='Target sample size (mutually exclusive with --frac).'), frac=Option(None, help='Sample fraction in (0,1].'), min_per_stratum=Option(0, help='Guarantee at least this many per non-empty stratum (if feasible).'), cap_per_stratum=Option(None, help='Cap items per stratum.'), seed=Option(42, help='Random seed.'), dedupe=Option(False, help='Drop duplicates by doc_id before sampling.'), write_csv=Option(False, help='Also write CSV.GZ alongside Parquet.'))

Create a stratified subsample of the unified docs dataset.

Only media and patent rows are subsampled. All topic rows (and any other doc_type not in that set) are kept in full and concatenated back to the sampled output.

data_root

Python
data_root()

Return the repository data root (…/cleantech_data).

default_input_dir

Python
default_input_dir()

Directory that should contain the unified docs artifact.

default_input_path

Python
default_input_path()

Resolve the unified docs file with a handful of common suffixes.

default_output_dir

Python
default_output_dir()

Where subsampled artifacts are written.

default_output_path

Python
default_output_path(stem='unified_docs_subsample')

Full output path for the subsample artifact.

stratified_sample

Python
stratified_sample(df, by, *, n=None, frac=None, min_per_stratum=0, cap_per_stratum=None, seed=42, dedupe_on=None)

Draw a stratified sample by columns by. Returns (sample_df, allocations_per_group).

evaluate_distribution

Python
evaluate_distribution(population, sample, *, categorical=('doc_type', 'lang'), date_col='date')

Return TV distances for the requested categorical columns and for monthly date buckets.

write_manifest

Python
write_manifest(path, *, input_path, output_path, by, n, frac, seed, min_per_stratum, cap_per_stratum, allocation, metrics)

Write a JSON manifest capturing parameters, allocations and metrics.

Chunk

fixed

Python
fixed(input_path=Option(None, '--input', '-i', help='Unified docs path (default auto-detected).'), outdir=Option(None, '--outdir', '-o', help='Output dir (default = silver_subsample_chunk/fixed_size/<date>).'), n_rows=Option(None, help='Top N rows only (debug).'), doc_types=Option(None, help="Comma-separated filter, e.g. 'media,patent'."), max_tokens=Option(512, help='Max tokens per chunk.'), overlap=Option(64, help='Token overlap between chunks.'), prepend_title=Option(True, help='Prepend title to each chunk.'))

Fixed-size (~512) overlapping token window chunking (Chroma-ready output).

semantic

Python
semantic(input_path=Option(None, '--input', '-i', help='Unified docs path (default auto-detected).'), outdir=Option(None, '--outdir', '-o', help='Output dir (default = silver_subsample_chunk/semantic/<date>).'), n_rows=Option(None, help='Top N rows only (debug).'), doc_types=Option(None, help="Comma-separated filter, e.g. 'media,patent'."), sim_threshold=Option(0.75, help='Cosine similarity threshold to merge adjacent paragraphs.'), min_tokens=Option(50, help='Strict minimum tokens per semantic chunk.'), embedding_model=Option('sentence-transformers/distiluse-base-multilingual-cased-v2', help='Sentence-Transformer model name.'), prepend_title=Option(True, help='Prepend title to each chunk.'))

Semantic chunking (topic-shift detection) with strict min size (Chroma-ready output).

both

Python
both(input_path=Option(None, '--input', '-i'), n_rows=Option(None), doc_types=Option(None))

Convenience wrapper: run both fixed and semantic with defaults.

chunk_root

Python
chunk_root()

Root directory for chunked subsample artifacts.

bucket_dir

Python
bucket_dir(kind, date=None)

kind ∈ {"fixed_size", "semantic"}. Writes under cleantech_data/silver_subsample_chunk///

default_input_path

Python
default_input_path()

Locate unified_docs_subsample.* (parquet preferred). Re-uses subsample's output resolution logic if available.

Enrichment

build_viewer_for_output_dir

Python
build_viewer_for_output_dir(out_dir)

Convenience wrapper if you already know the exact output directory.

build_viewers_for_gold_root

Python
build_viewers_for_gold_root(gold_root, dataset_types=('fixed_size', 'semantic'))

Auto-discover all date subfolders under each dataset type and build viewers.

gold_root should be

.../cleantech_data/gold_subsample_chunk

latest_input_chunks

Python
latest_input_chunks(kind)

Return path to the latest chunks.parquet for a given kind.

source_date_from_input

Python
source_date_from_input(path)

Expect: .../silver_subsample_chunk///chunks.parquet

EnvKeys dataclass

Expose the only keys we support (same .env as the notebook).

data_root_override

Python
data_root_override()

Optional project layout override for data root resolution.

use_langextract_provider

Python
use_langextract_provider(provider)

Temporarily set environment variables so the LangExtract provider plugin uses the provider you asked for, without adding any new required envs.

  • For 'gemini': copies GEMINI_KEY -> GOOGLE_API_KEY and LANGEXTRACT_API_KEY
  • For 'openai': copies OPENAI_KEY -> OPENAI_API_KEY

We also temporarily clear the opposite provider var to avoid accidental fallback.


RAG Build — Graph, Communities, Dense, BM25

These modules live under src/rag and are executed as python -m <pkg>.
Ensure src/rag is on PYTHONPATH during mkdocs build (see tip above).

Graph CSV + Ingest

cmd_csv

Python
cmd_csv(kind=Option(..., '--dataset', help='fixed_size | semantic'), date=Option(None, '--date', help="YYYY-MM-DD | omit for 'latest'"), data_root_override=Option(None, '--data-root', exists=True, dir_okay=True, file_okay=False))

Build nodes/edges from GOLD and write gr_nodes.csv / gr_edges.csv next to chunks_enriched.*.

cmd_ingest

Python
cmd_ingest(kind=Option(..., '--dataset', help='fixed_size | semantic'), date=Option(None, '--date', help="YYYY-MM-DD | omit for 'latest'"), data_root_override=Option(None, '--data-root', exists=True, dir_okay=True, file_okay=False), ingest_tag=Option(..., '--ingest-tag', help='Tag to mark nodes/edges and allow clean re-ingest.'), delete_tag=Option(None, '--delete-tag', help='Delete previous ingest by tag before ingest.'), batch_size=Option(1000, '--batch-size'))

Build nodes/edges from GOLD and ingest directly into Neo4j (uses env NEO4J_*).

build_from_gold

Python
build_from_gold(out_dir)

Build (nodes, edges) DataFrames from a GOLD enriched bucket directory, e.g., .../gold_subsample_chunk/fixed_size/2025-09-03

Returns:

Name Type Description
nodes_df DataFrame

columns [node_id, node_type, display_name, props_b64]

edges_df DataFrame

columns [src, dst, edge_type, props_b64]

Neo4jStore

ensure_constraints

Python
ensure_constraints()

Unique id per label + supporting indexes for quick MERGE.

delete_by_tag

Python
delete_by_tag(ingest_tag)

Remove nodes/edges created with this tag (safe re-ingest).

Communities (GraphRAG)

cmd_ensure_index

Python
cmd_ensure_index(dataset=Option(..., '--dataset', help='Dataset for which to ensure the vector index (fixed_size | semantic)'), level=Option('C1', '--level', help='Community level to target (e.g., C1)'), dim=Option(None, '--dim', help='Embedding dimension (optional, if not provided will auto-detect from data)'))

Ensure that the vector index on Community.summaryEmbedding exists and is ONLINE.

main

Python
main()

Module entrypoint used by communities/main.py.

ensure_constraints_and_indexes

Python
ensure_constraints_and_indexes(session, *, vector_dim=None)

Create unique key and supporting indexes for :Community. (Vector index is created later when we know the embedding dimension.)

project_entity_cooc_graph

Python
project_entity_cooc_graph(session, *, dataset, graph_name, min_weight=1)

Build an undirected entity co-occurrence graph in GDS using a Cypher aggregation subquery. We aggregate pairs (entity A, entity B) co-mentioned by the same Chunk into a single edge with a 'weight' property, then set the projection to undirected.

write_level

Python
write_level(session, *, graph_name, dataset, level, resolution, ingest_tag, min_size=8)

Run the Leiden community detection at the given resolution and write results to Neo4j: creates (:Community) nodes and [:IN_COMMUNITY] relationships. Only communities with at least min_size members are kept.

embed_texts

Python
embed_texts(texts)

Batch embeddings with enrichment-style env controls and robust fallback. - Batch size: OA_COMM_EMBED_BATCH → OA_EMBED_BATCH → 32 - Provider: OA_COMM_EMBED_PROVIDER → OA_EMBED_PROVIDER → auto - Model: OA_COMM_EMBED_MODEL → OA_EMBED_MODEL_{OPENAI|GEMINI} → sensible default - Fallback: If primary provider fails for any chunk, retry the WHOLE list with the other provider (to keep the output matrix dimension consistent).

ensure_vector_index

Python
ensure_vector_index(session, *, dim, name='community_summary_vec')

Create a vector index on Community.summaryEmbedding if it does not exist, and wait for it to be ONLINE. Returns the index name.

summarize_and_embed_all

Python
summarize_and_embed_all(*, dataset, level, ingest_tag, refresh=False, limit=None)

Summarize & embed all communities for the given dataset and level. If refresh=False, only communities missing a summaryEmbedding are processed. Returns (number_processed, embedding_dimension).

search_communities

Python
search_communities(query, *, level='C1', top_k=10, dataset=None)

Perform a semantic vector search over community summaries. Returns a list of (cid, score, dataset, summary) for the top_k matching communities.

retrieve_chunks

Python
retrieve_chunks(query, *, dataset='fixed_size', level='C1', ingest_tag, k_comms=24, top_k=100, chunk_type=None, rerank=True, rerank_mode='dense', max_candidates=400, dense_date=None, dense_persist=None)

RRF-ready retriever for GraphRAG.

Returns a ranked list of dicts
  • id: stable chunk id (string)
  • doc_id: derived parent document id (string)
  • rank: 1-based rank (int)
  • score: local similarity if reranked, else None (float|None)
  • source: literal 'graphrag' (string)
Pipeline

1) vector-search over community summaries, 2) expand matched communities to member chunks, 3) (optional) local re-rank: - 'dense': cosine(query, prebuilt chunk embedding from Chroma) - 'summary': cosine(query, live-embedded chunk summary text)

Dense (Chroma)

DenseChunkIndex

Thin wrapper around the dense (Chroma) index: - opens the collection for _ - fetches embeddings by chunk ids - embeds query text with the same Gemini model used by 'dense'

DenseRetriever

Lightweight retriever backed by a Chroma collection.

build_chroma_from_gold

Python
build_chroma_from_gold(kind='fixed_size', date=None, persist_dir=None, batch=128, limit=None, use_summary=True)

Build a Chroma vector DB from GOLD chunks.

BM25 (lexical)

build_bm25_from_gold

Python
build_bm25_from_gold(*, dataset, date=None, persist_dir=None, limit=None, use_summary=False, k1=1.2, b=0.75, sharding=None)

Build a BM25Okapi index over GOLD chunks. - use_summary=True → English summaries only (metadata.chunk_summary) - use_summary=False → Full text with polyglot tokenization - sharding="mono" → one mixed-language shard sharding="lang" → per-language shards using metadata.lang; tokenization is still polyglot

Hybrid retrieval

hybrid_retrieve

Python
hybrid_retrieve(query, *, dataset, date=None, top_k=50, seed_k=120, rrf_k=60, w_bm25=1.0, w_dense=1.5, graph_expand=False, level='C1', ingest_tag=None, expand_ratio=2.0, expand_limit=800, rerank_mode='dense', dense_date=None, dense_persist=None, w_graph=1.0, bm25_index=None, dense_index=None)

Seed with BM25 + Dense, fuse with weighted RRF. If graph_expand=True, expand seeds via graph and re-rank candidates; otherwise return the fused seeds. Returns list of (chunk_id, fused_score) sorted desc.

Emits stage-level latency JSONL when HYB_LATENCY_LOG is set.


Benchmarks — rag.bench

CLI & orchestration

latency

Python
latency(end_to_end=Option(None, '--end-to-end', help='Path to BENCH_LATENCY_LOG JSONL'), stages=Option(None, '--stages', help='Path to HYB_LATENCY_LOG JSONL'))

Summarize latency JSONL files produced during bench run: - end-to-end: BENCH_LATENCY_LOG (per-query adapter.search time) - stages: HYB_LATENCY_LOG (Hybrid seeds/graph/rerank/fuse timings)

ir_table

Python
ir_table(dataset=Option(..., '--dataset'), date=Option(..., '--date'), root=Option(Path('.'), '--root', help='Project root containing bench_out/'), overlay_log=Option(None, '--overlay-log', help='(Optional) path to a specific overlay JSONL'), out_csv=Option(None, '--out-csv', help='Output CSV path; default: evals/ir_latency_table.csv'), out_md=Option(None, '--out-md', help='Output Markdown path; default: evals/ir_latency_table.md'))

Build an IR latency + quality table by joining leaderboard metrics with stage latencies and (optionally) rerank overlay e2e medians per run.

thesis_pack

Python
thesis_pack(dataset=Option(..., '--dataset'), date=Option(..., '--date'), out=Option(None, '--out', help='Output dir; default: bench_out/<dataset>/<date>/thesis_eval'), tables=Option('all', '--tables', help="Comma-separated: benchmark_master,rq1,rq3,pr1,pr2,auto_qrels_overview,judge_agreement,significance_ci,rq_status or 'all'"), seed=Option(42, '--seed', help='Deterministic sampling seed'), smoke=Option(False, '--smoke', help='Fast mode: tiny QA/sample sizes'), max_qas=Option(None, '--max-qas', '--limit-qas', help='Limit QA items for quick runs (applies to all table builders)'), pr2_sample=Option(None, '--pr2-sample', help='Annotation sample size; defaults: 25 (full), 2 (smoke)'), run_bench=Option(True, '--run-bench/--no-run-bench', help='Auto-bootstrap benchmark runs if leaderboard.csv is missing.'), bench_top_k=Option(100, '--bench-top-k', help='Top-k depth used for benchmark bootstrap runs.'), bench_profile=Option('baseline', '--bench-profile', help='Benchmark bootstrap profile: baseline|full|thesis'), bench_reset=Option(False, '--bench-reset', help='Reset benchmark leaderboards before bootstrap.'), latex=Option(True, '--latex/--no-latex', help='Generate thesis_tables.tex with compact LaTeX tabulars.'), llm_cost=Option(False, '--llm-cost/--no-llm-cost', help='Include LLM token and estimated cost metrics in PR1 output.'), llm_pricing_json=Option(None, '--llm-pricing-json', help='Optional pricing JSON for model-based USD estimates.'), silver_auto_qrels=Option(False, '--silver-auto-qrels/--no-silver-auto-qrels', help='Enable fully automatic silver qrels pipeline (pooled retrieval + dual LLM judging).'), silver_force=Option(False, '--silver-force/--no-silver-force', help='Force regeneration of pooled candidates/judgments even if cached files exist.'), silver_pool_top_n=Option(50, '--silver-pool-top-n', help='Top-N candidates per system to include in pooled candidate set.'), silver_pool_systems=Option('bm25,dense,hybrid', '--silver-pool-systems', help='Comma-separated pooling systems: bm25,dense,hybrid'), silver_judge_provider_a=Option('auto', '--silver-judge-provider-a', help='Judge A provider: auto|openai|gemini|none'), silver_judge_provider_b=Option('auto', '--silver-judge-provider-b', help='Judge B provider: auto|openai|gemini|none'), silver_judge_model_a=Option(None, '--silver-judge-model-a', help='Optional model override for judge A.'), silver_judge_model_b=Option(None, '--silver-judge-model-b', help='Optional model override for judge B.'), silver_binary_threshold=Option(1, '--silver-binary-threshold', help='Binary qrels threshold from graded labels (1 => rel>=1 is relevant).'), silver_n_boot=Option(1000, '--silver-n-boot', help='Bootstrap samples for confidence intervals in significance table.'), silver_n_perm=Option(1000, '--silver-n-perm', help='Permutation samples for paired randomization p-values in significance table.'))

Build thesis-ready evaluation tables into bench_out///thesis_eval.

QA build & schema

build_qa_from_gold

Python
build_qa_from_gold(dataset, date=None, n=50, use_text=True, provider=None, out_dir=None, seed=1337)

Create a single-hop QA set (one gold chunk per Q) from GOLD.

Returns (qa_parquet, qrels_parquet, meta_json). Also writes JSON compatibility files when RAG_BENCH_WRITE_JSON=1 (default).

QAItem dataclass

Single-hop QA item grounded in one chunk.

Runs, scoring, fusion, latency

build_run

Python
build_run(qa_items, adapter, top_k=100)

Build TREC-style run from adapter for the given QA set.

Returned structure: { qid: {chunk_id: score, ...}, ... }

Adds end-to-end latency logging when BENCH_LATENCY_LOG is set. Also exports BENCH_QID so stage-level instrumentation (Hybrid) can tag records.

build_chunk_to_doc_map

Python
build_chunk_to_doc_map(dataset, date)

Load GOLD (enriched) table and return {chunk_id -> doc_id}. Falls back to chunk_id when doc_id is missing.

evaluate_run_doclevel

Python
evaluate_run_doclevel(qrels_chunk, run_chunk, chunk_to_doc, metrics=None, reduce='max')

Evaluate by doc_id: a query is counted correct if any chunk from the gold doc is retrieved.

BaseAdapter

search

Python
search(query, top_k=100)

Return list of (chunk_id, score) sorted desc by score.

HybridAdapter

Bases: BaseAdapter

Stage A: Weighted RRF fusion of BM25 and Dense. Stage B (optional): Graph expansion + re-rank (dense or summary), fused again via RRF.

rrf_fuse

Python
rrf_fuse(runs, weights, k=60)

Weighted RRF fusion for multiple runs.

summarize_end_to_end

Python
summarize_end_to_end(path, *, adapter=None, qids=None)

BENCH_LATENCY_LOG JSONL → p50/p90/p95. If 'adapter' is provided, only keep records where rec['adapter'] == adapter (case-insensitive). If 'qids' is provided, keep only records whose 'qid' is in the set.

summarize_stages

Python
summarize_stages(path, *, qids=None)

HYB_LATENCY_LOG JSONL → p50/p90/p95 per stage; optionally filter to 'qids'. Stage logs are produced by Hybrid only.

summarize_end_to_end_for

Python
summarize_end_to_end_for(path, qids, *, adapter=None)

Return {e2e_n, e2e_p50_ms, e2e_p90_ms, e2e_p95_ms} using only records whose 'qid' is in qids. If adapter is provided, also require rec['adapter'] to match (case-insensitive).

summarize_stages_for

Python
summarize_stages_for(path, qids)

Return stage_*_{p50,p90,p95}_ms only for records whose qid is in qids. Includes: seed, fuse_seeds, graph_expand, rerank, fuse_final, total (if present).

summarize_single_stage

Python
summarize_single_stage(path, *, stage='rerank')

Summarize a JSONL file that only logs a single stage (e.g. RERANK_LATENCY_LOG).

Does NOT filter by qid; returns: { 'n', 'p50_ms', 'p90_ms', 'p95_ms' } or {} if no matching records.