def _read_parquet(path: Path) -> pd.DataFrame | None:
try:
if path.exists():
return pd.read_parquet(path)
except Exception as e:
print(f"WARNING: Failed to read {path} as Parquet: {e}")
return None
def load_parquet_or_sample(path: Path, kind: str) -> pd.DataFrame:
"""Load Parquet file if available; otherwise provide small synthetic sample."""
df = _read_parquet(path)
if df is not None:
print(f"Loaded {kind} dataset from {path} with {len(df):,} rows.")
return df
print(f"WARNING: {kind} path {path} not found. Using synthetic sample data for demonstration.")
if kind == 'fixed':
data = [
{"text": "Fixed sample chunk A", "metadata": json.dumps({"doc_id": "topic_doc1", "token_count": 100})},
{"text": "Fixed sample chunk B", "metadata": json.dumps({"doc_id": "media_doc1", "token_count": 110})},
{"text": "Fixed sample chunk C", "metadata": json.dumps({"doc_id": "patent_doc2", "token_count": 90})},
]
else:
data = [
{"text": "Semantic sample chunk A", "metadata": json.dumps({"doc_id": "topic:doc1", "token_count": 80})},
{"text": "Semantic sample chunk B", "metadata": json.dumps({"doc_id": "media:doc2", "token_count": 95})},
]
return pd.DataFrame(data)
def expand_metadata(df: pd.DataFrame) -> pd.DataFrame:
"""Expand 'metadata' JSON; enforce doc_id and token_count; keep text/doc_type if present."""
if 'metadata' in df.columns:
meta = df['metadata'].apply(lambda m: m if isinstance(m, dict) else json.loads(m))
meta_df = pd.json_normalize(meta)
df = pd.concat([df.drop(columns=['metadata'], errors='ignore'), meta_df], axis=1)
if 'doc_id' not in df.columns:
df['doc_id'] = 'unknown'
df['doc_id'] = df['doc_id'].astype(str)
token_col = next((c for c in ['token_count', 'tokens', 'n_tokens'] if c in df.columns), None)
if token_col is None:
raise ValueError("No token count field found in metadata. Expected one of: token_count, tokens, n_tokens.")
df['token_count'] = pd.to_numeric(df[token_col], errors='coerce')
before = len(df)
df = df.dropna(subset=['token_count']).copy()
df['token_count'] = df['token_count'].astype(int)
dropped = before - len(df)
if dropped:
print(f"Dropped {dropped} rows without valid token_count.")
keep = [c for c in ['doc_id', 'text', 'token_count', 'doc_type'] if c in df.columns]
return df[keep]
def add_doc_type(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure 'doc_type' exists. Prefer explicit metadata, otherwise infer from doc_id prefix."""
out = df.copy()
# Normalize provided doc_type if any
if 'doc_type' in out.columns:
dt = out['doc_type'].astype(str).str.lower().str.strip()
dt = dt.replace({'topics':'topic', 'patents':'patent', 'medias':'media'})
else:
dt = pd.Series(index=out.index, dtype=object)
base = out['doc_id'].astype(str).str.lower().str.strip()
inferred = pd.Series('unknown', index=out.index)
for t in ['topic', 'media', 'patent']:
inferred[base.str.startswith(f"{t}:")] = t
dt_final = dt.mask(dt.eq('') | dt.isna(), inferred) if 'doc_type' in out.columns else inferred
out['doc_type'] = dt_final
return out
def describe_series(x: Iterable[float | int]) -> Dict[str, float]:
arr = np.asarray(list(x), dtype=float)
arr = arr[~np.isnan(arr)]
if arr.size == 0:
return {k: 0.0 for k in ['count','mean','median','std','min','max','p05','p25','p75','p95']}
return {
'count': float(arr.size),
'mean': float(arr.mean()),
'median': float(np.median(arr)),
'std': float(arr.std(ddof=0)),
'min': float(arr.min()),
'max': float(arr.max()),
'p05': float(np.percentile(arr, 5)),
'p25': float(np.percentile(arr, 25)),
'p75': float(np.percentile(arr, 75)),
'p95': float(np.percentile(arr, 95)),
}
def per_doc_stats(df: pd.DataFrame) -> pd.DataFrame:
g = df.groupby('doc_id')['token_count']
out = g.agg(
chunk_count='count',
mean='mean',
median='median',
std=lambda s: float(np.std(s.to_numpy(dtype=float), ddof=0)),
min='min',
max='max'
).reset_index().sort_values('chunk_count', ascending=False)
return out
def fd_bins(arr: np.ndarray) -> int:
x = np.asarray(arr, dtype=float)
x = x[~np.isnan(x)]
n = x.size
if n <= 1:
return max(1, n)
iqr = np.subtract(*np.percentile(x, [75, 25]))
if iqr == 0:
return max(5, int(np.sqrt(n)))
h = 2 * iqr / (n ** (1/3))
if h <= 0:
return max(5, int(np.sqrt(n)))
bins = int(np.ceil((x.max() - x.min()) / h))
return max(5, min(bins, 100))
def plot_hist(series: pd.Series, title: str, fname: Path, xlabel: str = "Tokens per chunk", ylabel: str = "Frequency"):
vals = series.dropna().to_numpy()
if len(vals) == 0:
return
fname.parent.mkdir(parents=True, exist_ok=True)
plt.figure()
plt.hist(vals, bins=fd_bins(vals))
plt.title(title)
plt.xlabel(xlabel) # supports "Chunks per document"
plt.ylabel(ylabel) # can be customized if desired
plt.tight_layout()
plt.savefig(fname, bbox_inches='tight')
plt.close()
def plot_ecdf(series: pd.Series, title: str, fname: Path):
vals = np.sort(series.dropna().to_numpy(dtype=float))
if vals.size == 0:
return
fname.parent.mkdir(parents=True, exist_ok=True)
y = np.arange(1, vals.size + 1) / vals.size
plt.figure()
plt.step(vals, y, where='post')
plt.title(title)
plt.xlabel('Tokens per chunk')
plt.ylabel('ECDF')
plt.tight_layout()
plt.savefig(fname, bbox_inches='tight')
plt.close()
def plot_box(series: pd.Series, title: str, fname: Path):
vals = series.dropna().to_numpy(dtype=float)
if len(vals) == 0:
return
fname.parent.mkdir(parents=True, exist_ok=True)
plt.figure()
plt.boxplot(vals, vert=True, showfliers=True)
plt.title(title)
plt.ylabel('Tokens per chunk')
plt.tight_layout()
plt.savefig(fname, bbox_inches='tight')
plt.close()
def dataset_slug(label: str) -> str:
return label.lower().replace(' ', '_')