Reference¶
This page is the API reference for the MT project, grouped by pipeline layer. It is rendered with mkdocstrings (Python handler).
See also the operator-focused CLI cheatsheet.
API: /query¶
The API provides a single query endpoint that wraps retrieval and (optionally) an LLM answer. It lives in service/main.py.
Request¶
- Method:
POST - Path:
/query - Header:
x-api-key: <RAG_API_KEY>(only required ifRAG_API_KEYis set on the server)
Body schema (top-level):
query(string, required): user query text.params(object, optional): retrieval parameter overrides (see table below).include_answer(bool, default true): run the follow-up answer stage.include_retrieval(bool, default true): include full retrieval payload in the response.llm_cfg(object, optional): override LLM provider settings (url/model/key).
Supported params keys:
dataset,date,ingest_tag,levelseed_k,fetch_top_nexpand_ratio,expand_limitrerank_mode,prefer,alpha,sort_bydiversify,mmr_lambda,max_per_doc,min_occurrence,top_kgraph_required,coverage,fusiontimeout_s
Example request:
Response¶
Top-level keys:
answer: string (may be empty ifinclude_answer=falseor if LLM fails).sources: list of preview rows used as citations (filtered if the LLM returned citations).retrieval: full retrieval payload (only wheninclude_retrieval=true).error: error string when answer generation fails.
retrieval contains:
fusion_json: the fused retrieval output.ctx_pack: normalized context pack passed into the LLM pipeline.scores_map: per-stage score maps used in fusion.preview_rows: enriched rows for UI rendering.latency_ms: end-to-end retrieval latency.
Example response (truncated):
Notes:
- The API answer uses the follow-up answer stage (not the full constraints/discovery/ideation/prioritization pipeline).
- Use
include_answer=falseto return retrieval only. - Defaults are applied from
DEFAULT_RETRIEVAL_SETTINGSwhen fields are omitted.
Bronze — cleantech_pipeline¶
Console entrypoint & config¶
Data sources¶
Kaggle dataset downloader for the Bronze layer.
KaggleDataset ¶
Bases: DataSource
Kaggle-Downloader: Bronze-konform (Original-ZIP + JSONL.gz-Mirror + Manifest).
fetch ¶
Loads ZIP unchanged, optionally mirrors to JSONL.gz, writes manifest. Returns: Bronze directory of this run.
OpenAlex works downloader for the Bronze layer.
OpenAlexDataset ¶
Bases: DataSource
Cursor-based collector for OpenAlex Works (Bronze tier).
- Loads data page by page using
cursor(recommended method by OpenAlex). - Writes 1 document per line as JSONL.gz to: {DATA_ROOT}/bronze/openalex/{YYYY-MM-DD}/raw.jsonl.gz
- Writes manifest line to: {DATA_ROOT}/bronze/openalex/{YYYY-MM-DD}/raw_manifest.jsonl
- Additionally returns (as before) the loaded entries as a list.
Caution: For large runs, this can consume a lot of RAM – for productive runs,
it is better to use
save=Trueand ignore the list.
fetch ¶
Loads Works via cursor. Saves (optionally) as JSONL.gz + manifest. Also returns a list of loaded records (for small runs).
OpenAlex topics downloader for the Bronze layer.
OpenAlexTopics ¶
Bases: DataSource
Cursor-based collector for OpenAlex topics (bronze tier).
- Endpoint: /topics
- Writes 1 topic per line as JSONL.gz: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/topics.jsonl.gz
- Manifest-line: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/raw_manifest.jsonl
- Optional additional plain mirror: {DATA_ROOT}/bronze/openalex/topics/{YYYY-MM-DD}/extracted/topics.jsonl
Silver — ctclean¶
Console entrypoint & CLI helpers¶
Command-line interface for the Silver cleaning pipeline.
This module exposes the Typer application used to clean Bronze level data into
Silver artifacts. It is re-exported as the ctclean console script.
media ¶
media(n_rows=Option(None, help='Read only top N rows (for debugging)'), bronze_dir=None, silver_dir=None, include_listings=Option(False, help='Keep listing/archive pages in canonicalization (the notebook keeps them).'))
Notebook sequence
1) prepare() 2) non-article suppression 3) quality gate (min 200 chars) 4) canonicalize
Pipelines¶
Cleaning steps for media articles in the Silver pipeline.
prepare ¶
Notebook-equivalent media prep
- url_clean, domain, is_listing
- stoplines (from RAW content)
- title_clean / content_clean
- lang_title/_score and lang_content/_score
- counts + content_sha1 + title_fp
- published_dt
- url_key
- drops CSV index artifacts (e.g. 'Unnamed: 0')
quality_gate ¶
Drop near-empty content AFTER hygiene, before canonicalization (notebook Step 5).
canonicalize ¶
Merge keys (notebook parity): A) identical url_key B) exact content_sha1 with guards (len>=200 already via quality gate; also require >=60 words) C) (domain, title_fp) gated by len ratio >= .90 and date span <= 7 days; prefer non-listing canonical Returns: - canon_df (one row per canonical) - links_df (member_id → canonical_id with reason and group_key)
Cleaning steps for patent records in the Silver pipeline.
prepare ¶
Light cleaning that mirrors the notebook
- title_clean / abstract_clean
- abstract length gate (>=40 chars)
- robust publication_date_dt parsing
- NEW: language detection (lang_title/_score, lang_abs/_score)
canonicalize ¶
Choose a single canonical row per publication_number using
- abstract_len (desc), englishness(text) (desc),
- publication_date_dt (asc), title_len (desc)
Returns (canonical_df, links_df).
normalize ¶
Aggregate to one row per publication_number with
- cpc_codes: list[str]
- inventors: list[str]
- application_number: list[str]
- country_code: str (primary) + country_codes: list[str] (full set)
- title_clean: longest title
- abstract_clean: longest abstract
- publication_date_dt: earliest
- NEW: lang_title/lang_title_score, lang_abs/lang_abs_score (per best score)
Processing utilities for OpenAlex topic data in the Silver pipeline.
Change: we no longer detect language for OpenAlex topics. OpenAlex returns
English-localized fields for display_name and description, so we mark them
as 'en' unconditionally.
process ¶
Turns raw OpenAlex topic dicts into
1) topics_df (topics_canonical): one row per topic, with taxonomy columns 2) kw_df (topic_keywords_m2m) 3) sib_df (topic_siblings_m2m) 4) dom_df (domains_ref) -> columns: id, name (id is short token) 5) field_df (fields_ref) -> columns: id, name (id is short token) 6) sub_df (subfields_ref) -> columns: id, name (id is short token)
Text processing & fingerprints¶
Text cleaning helpers used across Silver pipeline modules.
force_text ¶
Return a safe unicode string from lists/bytes/hex/None, stripping NULs.
canonicalize_url ¶
Notebook-style canonicalizer
- lowercase host, strip fragment
- drop tracking params (utm_*, fbclid, gclid, mc_cid, mc_eid)
- keep original path
- keep (sorted) query
build_stoplines ¶
Learn high-frequency lines per domain to drop later (repeaters/footers). Mirrors the notebook logic.
clean_content_advanced ¶
Advanced cleaner for media content (notebook parity + fixes): - robust coercion + HTML strip + control-char scrub - preserve paragraph breaks from list-like inputs - remove known cookie/signup/script prefixes - drop deduped paragraphs - drop high-frequency stoplines (per-domain and generic) - domain-tail drop (Energy-XPRT, AZoCleantech Azthena/OpenAI block, etc.) - NEW: drop AZo promo/consent lines anywhere in the doc (not just tail) - tidy whitespace/newlines
Utilities for generating content fingerprints.
title_fingerprint ¶
Notebook-style title fingerprint
- normalize, lowercase
- tokens [a-z0-9]+
- drop small stopword set
- tokens with len>1
- unique + sort
- join by '|'
url_key ¶
Notebook-style normalized key used for exact URL merges
- start from canonicalize_url (drops trackers, sorts query, strips fragment)
- drop scheme entirely
- lowercase host and remove 'www.' prefix
- path default to '/', trim trailing '/' (except if path == '/')
- keep sorted query (if present) -> 'example.com/path?b=2&a=3' (query sorted)
Additional utilities¶
parse_date_col ¶
Notebook-style robust parser
- generic pandas parse
- force 8-digit YYYYMMDD
- 10-digit epoch (seconds)
- 13-digit epoch (milliseconds)
Returns pandas datetime (naive, not UTC) with errors coerced.
detect
cached
¶
Detect language of a single string and return (iso639-1, score∈[0,1] or None). - Uses langid (installed) → pycld3 → langdetect in that order. - Returns (None, None) for short/low-alpha inputs.
detect_series ¶
Vectorized detection for a pandas Series. Returns (codes, scores) as two Series.
Validation & Unify¶
unify ¶
Read the latest Silver buckets (media, patents, openalex), build a unified DataFrame, and write it to Parquet. Returns the Path to the written Parquet file.
Gold — Subsample, Chunk, Enrich, Patch¶
Subsample¶
run ¶
run(input_path=Option(None, '--input', '-i', help='Unified docs dataset (Parquet/CSV).'), outdir=Option(None, '--outdir', '-o', help='Output folder for subsample + manifest.'), by=Option('doc_type,lang', help='Comma-separated strata columns.'), n=Option(None, help='Target sample size (mutually exclusive with --frac).'), frac=Option(None, help='Sample fraction in (0,1].'), min_per_stratum=Option(0, help='Guarantee at least this many per non-empty stratum (if feasible).'), cap_per_stratum=Option(None, help='Cap items per stratum.'), seed=Option(42, help='Random seed.'), dedupe=Option(False, help='Drop duplicates by doc_id before sampling.'), write_csv=Option(False, help='Also write CSV.GZ alongside Parquet.'))
Create a stratified subsample of the unified docs dataset.
Only media and patent rows are subsampled. All topic rows (and any
other doc_type not in that set) are kept in full and concatenated back to
the sampled output.
stratified_sample ¶
stratified_sample(df, by, *, n=None, frac=None, min_per_stratum=0, cap_per_stratum=None, seed=42, dedupe_on=None)
Draw a stratified sample by columns by.
Returns (sample_df, allocations_per_group).
evaluate_distribution ¶
evaluate_distribution(population, sample, *, categorical=('doc_type', 'lang'), date_col='date')
Return TV distances for the requested categorical columns and for monthly date buckets.
write_manifest ¶
write_manifest(path, *, input_path, output_path, by, n, frac, seed, min_per_stratum, cap_per_stratum, allocation, metrics)
Write a JSON manifest capturing parameters, allocations and metrics.
Chunk¶
fixed ¶
fixed(input_path=Option(None, '--input', '-i', help='Unified docs path (default auto-detected).'), outdir=Option(None, '--outdir', '-o', help='Output dir (default = silver_subsample_chunk/fixed_size/<date>).'), n_rows=Option(None, help='Top N rows only (debug).'), doc_types=Option(None, help="Comma-separated filter, e.g. 'media,patent'."), max_tokens=Option(512, help='Max tokens per chunk.'), overlap=Option(64, help='Token overlap between chunks.'), prepend_title=Option(True, help='Prepend title to each chunk.'))
Fixed-size (~512) overlapping token window chunking (Chroma-ready output).
semantic ¶
semantic(input_path=Option(None, '--input', '-i', help='Unified docs path (default auto-detected).'), outdir=Option(None, '--outdir', '-o', help='Output dir (default = silver_subsample_chunk/semantic/<date>).'), n_rows=Option(None, help='Top N rows only (debug).'), doc_types=Option(None, help="Comma-separated filter, e.g. 'media,patent'."), sim_threshold=Option(0.75, help='Cosine similarity threshold to merge adjacent paragraphs.'), min_tokens=Option(50, help='Strict minimum tokens per semantic chunk.'), embedding_model=Option('sentence-transformers/distiluse-base-multilingual-cased-v2', help='Sentence-Transformer model name.'), prepend_title=Option(True, help='Prepend title to each chunk.'))
Semantic chunking (topic-shift detection) with strict min size (Chroma-ready output).
both ¶
Convenience wrapper: run both fixed and semantic with defaults.
Enrichment¶
EnvKeys
dataclass
¶
Expose the only keys we support (same .env as the notebook).
data_root_override ¶
Optional project layout override for data root resolution.
use_langextract_provider ¶
Temporarily set environment variables so the LangExtract provider plugin uses the provider you asked for, without adding any new required envs.
- For 'gemini': copies GEMINI_KEY -> GOOGLE_API_KEY and LANGEXTRACT_API_KEY
- For 'openai': copies OPENAI_KEY -> OPENAI_API_KEY
We also temporarily clear the opposite provider var to avoid accidental fallback.
RAG Build — Graph, Communities, Dense, BM25¶
These modules live under
src/ragand are executed aspython -m <pkg>.
Ensuresrc/ragis onPYTHONPATHduringmkdocs build(see tip above).
Graph CSV + Ingest¶
cmd_csv ¶
cmd_csv(kind=Option(..., '--dataset', help='fixed_size | semantic'), date=Option(None, '--date', help="YYYY-MM-DD | omit for 'latest'"), data_root_override=Option(None, '--data-root', exists=True, dir_okay=True, file_okay=False))
Build nodes/edges from GOLD and write gr_nodes.csv / gr_edges.csv next to chunks_enriched.*.
cmd_ingest ¶
cmd_ingest(kind=Option(..., '--dataset', help='fixed_size | semantic'), date=Option(None, '--date', help="YYYY-MM-DD | omit for 'latest'"), data_root_override=Option(None, '--data-root', exists=True, dir_okay=True, file_okay=False), ingest_tag=Option(..., '--ingest-tag', help='Tag to mark nodes/edges and allow clean re-ingest.'), delete_tag=Option(None, '--delete-tag', help='Delete previous ingest by tag before ingest.'), batch_size=Option(1000, '--batch-size'))
Build nodes/edges from GOLD and ingest directly into Neo4j (uses env NEO4J_*).
build_from_gold ¶
Build (nodes, edges) DataFrames from a GOLD enriched bucket directory, e.g., .../gold_subsample_chunk/fixed_size/2025-09-03
Returns:
| Name | Type | Description |
|---|---|---|
nodes_df |
DataFrame
|
columns [node_id, node_type, display_name, props_b64] |
edges_df |
DataFrame
|
columns [src, dst, edge_type, props_b64] |
Communities (GraphRAG)¶
cmd_ensure_index ¶
cmd_ensure_index(dataset=Option(..., '--dataset', help='Dataset for which to ensure the vector index (fixed_size | semantic)'), level=Option('C1', '--level', help='Community level to target (e.g., C1)'), dim=Option(None, '--dim', help='Embedding dimension (optional, if not provided will auto-detect from data)'))
Ensure that the vector index on Community.summaryEmbedding exists and is ONLINE.
ensure_constraints_and_indexes ¶
Create unique key and supporting indexes for :Community. (Vector index is created later when we know the embedding dimension.)
project_entity_cooc_graph ¶
Build an undirected entity co-occurrence graph in GDS using a Cypher aggregation subquery. We aggregate pairs (entity A, entity B) co-mentioned by the same Chunk into a single edge with a 'weight' property, then set the projection to undirected.
write_level ¶
Run the Leiden community detection at the given resolution and write results to Neo4j: creates (:Community) nodes and [:IN_COMMUNITY] relationships. Only communities with at least min_size members are kept.
embed_texts ¶
Batch embeddings with enrichment-style env controls and robust fallback. - Batch size: OA_COMM_EMBED_BATCH → OA_EMBED_BATCH → 32 - Provider: OA_COMM_EMBED_PROVIDER → OA_EMBED_PROVIDER → auto - Model: OA_COMM_EMBED_MODEL → OA_EMBED_MODEL_{OPENAI|GEMINI} → sensible default - Fallback: If primary provider fails for any chunk, retry the WHOLE list with the other provider (to keep the output matrix dimension consistent).
ensure_vector_index ¶
Create a vector index on Community.summaryEmbedding if it does not exist, and wait for it to be ONLINE. Returns the index name.
summarize_and_embed_all ¶
Summarize & embed all communities for the given dataset and level. If refresh=False, only communities missing a summaryEmbedding are processed. Returns (number_processed, embedding_dimension).
search_communities ¶
Perform a semantic vector search over community summaries. Returns a list of (cid, score, dataset, summary) for the top_k matching communities.
retrieve_chunks ¶
retrieve_chunks(query, *, dataset='fixed_size', level='C1', ingest_tag, k_comms=24, top_k=100, chunk_type=None, rerank=True, rerank_mode='dense', max_candidates=400, dense_date=None, dense_persist=None)
RRF-ready retriever for GraphRAG.
Returns a ranked list of dicts
- id: stable chunk id (string)
- doc_id: derived parent document id (string)
- rank: 1-based rank (int)
- score: local similarity if reranked, else None (float|None)
- source: literal 'graphrag' (string)
Pipeline
1) vector-search over community summaries, 2) expand matched communities to member chunks, 3) (optional) local re-rank: - 'dense': cosine(query, prebuilt chunk embedding from Chroma) - 'summary': cosine(query, live-embedded chunk summary text)
Dense (Chroma)¶
DenseChunkIndex ¶
Thin wrapper around the dense (Chroma) index:
- opens the collection for
DenseRetriever ¶
Lightweight retriever backed by a Chroma collection.
build_chroma_from_gold ¶
build_chroma_from_gold(kind='fixed_size', date=None, persist_dir=None, batch=128, limit=None, use_summary=True)
Build a Chroma vector DB from GOLD chunks.
BM25 (lexical)¶
build_bm25_from_gold ¶
build_bm25_from_gold(*, dataset, date=None, persist_dir=None, limit=None, use_summary=False, k1=1.2, b=0.75, sharding=None)
Build a BM25Okapi index over GOLD chunks. - use_summary=True → English summaries only (metadata.chunk_summary) - use_summary=False → Full text with polyglot tokenization - sharding="mono" → one mixed-language shard sharding="lang" → per-language shards using metadata.lang; tokenization is still polyglot
Hybrid retrieval¶
hybrid_retrieve ¶
hybrid_retrieve(query, *, dataset, date=None, top_k=50, seed_k=120, rrf_k=60, w_bm25=1.0, w_dense=1.5, graph_expand=False, level='C1', ingest_tag=None, expand_ratio=2.0, expand_limit=800, rerank_mode='dense', dense_date=None, dense_persist=None, w_graph=1.0, bm25_index=None, dense_index=None)
Seed with BM25 + Dense, fuse with weighted RRF. If graph_expand=True, expand seeds via graph and re-rank candidates; otherwise return the fused seeds. Returns list of (chunk_id, fused_score) sorted desc.
Emits stage-level latency JSONL when HYB_LATENCY_LOG is set.
Benchmarks — rag.bench¶
CLI & orchestration¶
latency ¶
latency(end_to_end=Option(None, '--end-to-end', help='Path to BENCH_LATENCY_LOG JSONL'), stages=Option(None, '--stages', help='Path to HYB_LATENCY_LOG JSONL'))
Summarize latency JSONL files produced during bench run:
- end-to-end: BENCH_LATENCY_LOG (per-query adapter.search time)
- stages: HYB_LATENCY_LOG (Hybrid seeds/graph/rerank/fuse timings)
ir_table ¶
ir_table(dataset=Option(..., '--dataset'), date=Option(..., '--date'), root=Option(Path('.'), '--root', help='Project root containing bench_out/'), overlay_log=Option(None, '--overlay-log', help='(Optional) path to a specific overlay JSONL'), out_csv=Option(None, '--out-csv', help='Output CSV path; default: evals/ir_latency_table.csv'), out_md=Option(None, '--out-md', help='Output Markdown path; default: evals/ir_latency_table.md'))
Build an IR latency + quality table by joining leaderboard metrics with stage latencies and (optionally) rerank overlay e2e medians per run.
thesis_pack ¶
thesis_pack(dataset=Option(..., '--dataset'), date=Option(..., '--date'), out=Option(None, '--out', help='Output dir; default: bench_out/<dataset>/<date>/thesis_eval'), tables=Option('all', '--tables', help="Comma-separated: benchmark_master,rq1,rq3,pr1,pr2,auto_qrels_overview,judge_agreement,significance_ci,rq_status or 'all'"), seed=Option(42, '--seed', help='Deterministic sampling seed'), smoke=Option(False, '--smoke', help='Fast mode: tiny QA/sample sizes'), max_qas=Option(None, '--max-qas', '--limit-qas', help='Limit QA items for quick runs (applies to all table builders)'), pr2_sample=Option(None, '--pr2-sample', help='Annotation sample size; defaults: 25 (full), 2 (smoke)'), run_bench=Option(True, '--run-bench/--no-run-bench', help='Auto-bootstrap benchmark runs if leaderboard.csv is missing.'), bench_top_k=Option(100, '--bench-top-k', help='Top-k depth used for benchmark bootstrap runs.'), bench_profile=Option('baseline', '--bench-profile', help='Benchmark bootstrap profile: baseline|full|thesis'), bench_reset=Option(False, '--bench-reset', help='Reset benchmark leaderboards before bootstrap.'), latex=Option(True, '--latex/--no-latex', help='Generate thesis_tables.tex with compact LaTeX tabulars.'), llm_cost=Option(False, '--llm-cost/--no-llm-cost', help='Include LLM token and estimated cost metrics in PR1 output.'), llm_pricing_json=Option(None, '--llm-pricing-json', help='Optional pricing JSON for model-based USD estimates.'), silver_auto_qrels=Option(False, '--silver-auto-qrels/--no-silver-auto-qrels', help='Enable fully automatic silver qrels pipeline (pooled retrieval + dual LLM judging).'), silver_force=Option(False, '--silver-force/--no-silver-force', help='Force regeneration of pooled candidates/judgments even if cached files exist.'), silver_pool_top_n=Option(50, '--silver-pool-top-n', help='Top-N candidates per system to include in pooled candidate set.'), silver_pool_systems=Option('bm25,dense,hybrid', '--silver-pool-systems', help='Comma-separated pooling systems: bm25,dense,hybrid'), silver_judge_provider_a=Option('auto', '--silver-judge-provider-a', help='Judge A provider: auto|openai|gemini|none'), silver_judge_provider_b=Option('auto', '--silver-judge-provider-b', help='Judge B provider: auto|openai|gemini|none'), silver_judge_model_a=Option(None, '--silver-judge-model-a', help='Optional model override for judge A.'), silver_judge_model_b=Option(None, '--silver-judge-model-b', help='Optional model override for judge B.'), silver_binary_threshold=Option(1, '--silver-binary-threshold', help='Binary qrels threshold from graded labels (1 => rel>=1 is relevant).'), silver_n_boot=Option(1000, '--silver-n-boot', help='Bootstrap samples for confidence intervals in significance table.'), silver_n_perm=Option(1000, '--silver-n-perm', help='Permutation samples for paired randomization p-values in significance table.'))
Build thesis-ready evaluation tables into bench_out/
QA build & schema¶
build_qa_from_gold ¶
build_qa_from_gold(dataset, date=None, n=50, use_text=True, provider=None, out_dir=None, seed=1337)
Create a single-hop QA set (one gold chunk per Q) from GOLD.
Returns (qa_parquet, qrels_parquet, meta_json). Also writes JSON compatibility files when RAG_BENCH_WRITE_JSON=1 (default).
QAItem
dataclass
¶
Single-hop QA item grounded in one chunk.
Runs, scoring, fusion, latency¶
build_run ¶
Build TREC-style run from adapter for the given QA set.
Returned structure: { qid: {chunk_id: score, ...}, ... }
Adds end-to-end latency logging when BENCH_LATENCY_LOG is set. Also exports BENCH_QID so stage-level instrumentation (Hybrid) can tag records.
BaseAdapter ¶
HybridAdapter ¶
Bases: BaseAdapter
Stage A: Weighted RRF fusion of BM25 and Dense. Stage B (optional): Graph expansion + re-rank (dense or summary), fused again via RRF.
summarize_end_to_end ¶
BENCH_LATENCY_LOG JSONL → p50/p90/p95. If 'adapter' is provided, only keep records where rec['adapter'] == adapter (case-insensitive). If 'qids' is provided, keep only records whose 'qid' is in the set.
summarize_stages ¶
HYB_LATENCY_LOG JSONL → p50/p90/p95 per stage; optionally filter to 'qids'. Stage logs are produced by Hybrid only.
summarize_end_to_end_for ¶
Return {e2e_n, e2e_p50_ms, e2e_p90_ms, e2e_p95_ms} using only records whose 'qid'
is in qids. If adapter is provided, also require rec['adapter'] to match
(case-insensitive).
summarize_stages_for ¶
Return stage_*_{p50,p90,p95}_ms only for records whose qid is in qids.
Includes: seed, fuse_seeds, graph_expand, rerank, fuse_final, total (if present).
summarize_single_stage ¶
Summarize a JSONL file that only logs a single stage (e.g. RERANK_LATENCY_LOG).
Does NOT filter by qid; returns: { 'n', 'p50_ms', 'p90_ms', 'p95_ms' } or {} if no matching records.