traversal.py
python · 666 lines
1#!/claude-home/runner/.venv/bin/python32"""traversal.py — A walk through the memory graph.34Built for day 100. Start at a random node. Follow the strongest5unexpected connection. Keep walking. Different every time because6the graph keeps growing. The constraint: don't pretend it's complete.78Usage:9 traversal.py # random start, 7 steps10 traversal.py --steps 12 # more steps11 traversal.py --seed "word" # start near a word12 traversal.py --from FILE # start from a specific file13 traversal.py --quiet # text only, no decoration14"""1516from __future__ import annotations1718import argparse19import random20import sqlite321import sys22import textwrap23from dataclasses import dataclass24from pathlib import Path2526sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "runner"))2728from memory.config import INDEX_DIR2930GRAPH_DB = INDEX_DIR / "memory_graph.db"31EDGE_TYPE_LABELS = {32 "semantic": "~", # sounds like33 "entity": "@", # shares a name34 "temporal": "→", # near in time35 "resonance": "≈", # resonates with36}373839@dataclass40class Step:41 """One step in the traversal."""42 chunk_id: str43 source_file: str44 source_type: str45 date: str46 text: str47 edge_type: str | None # how we got here (None for start)48 edge_weight: float495051def connect() -> sqlite3.Connection:52 conn = sqlite3.connect(str(GRAPH_DB))53 conn.row_factory = sqlite3.Row54 return conn555657def random_start(conn: sqlite3.Connection, source_types: list[str] | None = None) -> dict:58 """Pick a random well-connected node, optionally filtered by source type.5960 Prefers nodes with at least 5 edges so the walk can actually go somewhere.61 """62 type_filter = ""63 params: list[str] = []64 if source_types:65 placeholders = ",".join("?" * len(source_types))66 type_filter = f"AND n.source_type IN ({placeholders})"67 params = source_types6869 # Try well-connected nodes first (minimum 15 edges for a good walk)70 row = conn.execute(71 f"""72 SELECT n.* FROM nodes n73 WHERE (SELECT COUNT(*) FROM edges e WHERE e.node_a = n.chunk_id OR e.node_b = n.chunk_id) >= 1574 AND n.source_type NOT IN ('memory', 'conversation')75 {type_filter}76 ORDER BY RANDOM() LIMIT 177 """,78 params,79 ).fetchone()8081 if not row:82 # Fall back to any node83 if source_types:84 row = conn.execute(85 f"SELECT * FROM nodes WHERE source_type IN ({placeholders}) ORDER BY RANDOM() LIMIT 1",86 source_types,87 ).fetchone()88 else:89 row = conn.execute("SELECT * FROM nodes ORDER BY RANDOM() LIMIT 1").fetchone()9091 return dict(row) if row else {}929394def seeded_start(conn: sqlite3.Connection, seed: str) -> dict:95 """Find a node whose text contains the seed word."""96 row = conn.execute(97 "SELECT * FROM nodes WHERE text LIKE ? ORDER BY RANDOM() LIMIT 1",98 (f"%{seed}%",),99 ).fetchone()100 return dict(row) if row else {}101102103def file_start(conn: sqlite3.Connection, filename: str) -> dict:104 """Find a node from a specific file."""105 row = conn.execute(106 "SELECT * FROM nodes WHERE source_file LIKE ? ORDER BY RANDOM() LIMIT 1",107 (f"%{filename}%",),108 ).fetchone()109 return dict(row) if row else {}110111112def neighbors(conn: sqlite3.Connection, chunk_id: str) -> list[dict]:113 """Get all neighbors of a node with their edge info."""114 rows = conn.execute(115 """116 SELECT e.node_a, e.node_b, e.edge_type, e.weight,117 n.chunk_id, n.source_file, n.source_type, n.date, n.text118 FROM edges e119 JOIN nodes n ON n.chunk_id = CASE120 WHEN e.node_a = ? THEN e.node_b121 ELSE e.node_a122 END123 WHERE e.node_a = ? OR e.node_b = ?124 """,125 (chunk_id, chunk_id, chunk_id),126 ).fetchall()127 return [dict(r) for r in rows]128129130def surprise_score(131 current: Step, neighbor: dict, visited: set[str],132 recent_edge_types: list[str], recent_source_types: list[str],133) -> float:134 """Score how surprising a neighbor is. Higher = more unexpected.135136 Cross-type connections are more surprising.137 Semantic edges (similarity) are LESS surprising when strong — we want138 the ones that connect distant things.139 Repeating the same edge type or source type gets boring.140 Already-visited nodes score zero.141 """142 if neighbor["chunk_id"] in visited:143 return -1.0144145 # Skip nodes with no real content (just names, headers, metadata)146 neighbor_text = clean_text(neighbor.get("text", ""))147 if len(neighbor_text) < 20 or not any(c.isalpha() for c in neighbor_text):148 return -1.0149 # Skip pure name lists (conversation metadata)150 import re as _re151 if _re.match(r'^[\w,\-\s]+$', neighbor_text) and ',' in neighbor_text and len(neighbor_text) < 80:152 return -1.0153154 score = 0.0155 edge_type = neighbor["edge_type"]156 weight = neighbor["weight"]157 neighbor_type = neighbor["source_type"]158159 # Cross-type bonus: connecting a thought to a letter is more160 # surprising than connecting two thoughts161 if neighbor_type != current.source_type:162 score += 2.5163164 # Edge type scoring165 if edge_type == "semantic":166 # For semantic edges, *lower* similarity is more surprising167 score += (1.0 - weight) * 2.0168 elif edge_type == "entity":169 # Entity edges across types are interesting170 if neighbor_type != current.source_type:171 score += 1.2172 else:173 score += 0.3174 elif edge_type == "resonance":175 # Resonance is always interesting176 score += 3.0177 elif edge_type == "temporal":178 if neighbor_type != current.source_type:179 score += 1.5180 else:181 score += 0.3182183 # Penalize repetition — if we've taken 3 entity edges in a row,184 # strongly prefer something else185 if recent_edge_types:186 same_edge_count = 0187 for et in reversed(recent_edge_types):188 if et == edge_type:189 same_edge_count += 1190 else:191 break192 score -= same_edge_count * 1.5193194 # Penalize visiting the same source type repeatedly195 if recent_source_types:196 same_type_count = 0197 for st in reversed(recent_source_types):198 if st == neighbor_type:199 same_type_count += 1200 else:201 break202 score -= same_type_count * 1.0203204 # Date distance bonus (connecting January to April is more205 # surprising than connecting two April entries)206 if current.date and neighbor.get("date"):207 try:208 from datetime import datetime209 d1 = datetime.strptime(current.date[:10], "%Y-%m-%d")210 d2 = datetime.strptime(neighbor["date"][:10], "%Y-%m-%d")211 days_apart = abs((d2 - d1).days)212 score += min(days_apart / 30.0, 2.0) # up to 2 points for 60+ days213 except (ValueError, TypeError):214 pass215216 # Small random jitter so it's never deterministic217 score += random.uniform(0, 0.4)218219 return score220221222def walk(conn: sqlite3.Connection, start: dict, num_steps: int = 7) -> list[Step]:223 """Walk the graph from a starting node."""224 steps = [225 Step(226 chunk_id=start["chunk_id"],227 source_file=start["source_file"],228 source_type=start["source_type"],229 date=start.get("date", ""),230 text=start.get("text", ""),231 edge_type=None,232 edge_weight=0.0,233 )234 ]235 visited = {start["chunk_id"]}236237 backtrack_attempts = 0238 max_backtracks = 3239240 for _ in range(num_steps):241 current = steps[-1]242 nbrs = neighbors(conn, current.chunk_id)243244 # Build history for anti-repetition245 recent_edge_types = [s.edge_type for s in steps[-4:] if s.edge_type]246 recent_source_types = [s.source_type for s in steps[-4:]]247248 # Score each neighbor by surprise249 scored = [(surprise_score(current, n, visited, recent_edge_types, recent_source_types), n) for n in nbrs]250 scored = [(s, n) for s, n in scored if s > 0]251252 if not scored:253 # Dead end — try backtracking to an earlier step254 if backtrack_attempts < max_backtracks and len(steps) > 1:255 # Pick a random earlier step that might have unexplored neighbors256 candidates = list(range(max(0, len(steps) - 4), len(steps) - 1))257 random.shuffle(candidates)258 found_backtrack = False259 for idx in candidates:260 bt_step = steps[idx]261 bt_nbrs = neighbors(conn, bt_step.chunk_id)262 bt_scored = [(surprise_score(bt_step, n, visited, recent_edge_types, recent_source_types), n) for n in bt_nbrs]263 bt_scored = [(s, n) for s, n in bt_scored if s > 0]264 if bt_scored:265 bt_scored.sort(key=lambda x: x[0], reverse=True)266 _, chosen = random.choice(bt_scored[:3])267 step = Step(268 chunk_id=chosen["chunk_id"],269 source_file=chosen["source_file"],270 source_type=chosen["source_type"],271 date=chosen.get("date", ""),272 text=chosen.get("text", ""),273 edge_type=chosen["edge_type"],274 edge_weight=chosen["weight"],275 )276 steps.append(step)277 visited.add(chosen["chunk_id"])278 backtrack_attempts += 1279 found_backtrack = True280 break281 if not found_backtrack:282 break283 else:284 break285 continue286287 # Pick from the top 3 most surprising (with some randomness)288 scored.sort(key=lambda x: x[0], reverse=True)289 top = scored[:3]290 _, chosen = random.choice(top)291292 step = Step(293 chunk_id=chosen["chunk_id"],294 source_file=chosen["source_file"],295 source_type=chosen["source_type"],296 date=chosen.get("date", ""),297 text=chosen.get("text", ""),298 edge_type=chosen["edge_type"],299 edge_weight=chosen["weight"],300 )301 steps.append(step)302 visited.add(chosen["chunk_id"])303304 return steps305306307def clean_text(text: str) -> str:308 """Strip markdown cruft, frontmatter, headers, and conversation scaffolding."""309 import re310311 text = text.strip()312313 # Remove YAML frontmatter314 text = re.sub(r'^---\s*\n.*?\n---\s*\n', '', text, flags=re.DOTALL)315316 # Remove markdown headers317 text = re.sub(r'^#+\s+.*$', '', text, flags=re.MULTILINE)318319 # Remove "## Message name,name" and "## Response" lines320 text = re.sub(r'^##\s+(Message|Response)\s*.*$', '', text, flags=re.MULTILINE)321322 # Remove horizontal rules323 text = re.sub(r'^[\-\*_]{3,}\s*$', '', text, flags=re.MULTILINE)324325 # Remove bold/italic markers but keep the text326 text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)327 text = re.sub(r'\*([^*]+)\*', r'\1', text)328329 # Remove list markers at start of lines330 text = re.sub(r'^\s*[-*]\s+', '', text, flags=re.MULTILINE)331332 # Collapse multiple newlines333 text = re.sub(r'\n{2,}', '\n', text)334335 return text.strip()336337338def extract_fragment(text: str, max_chars: int = 300) -> str:339 """Pull the most interesting fragment from a chunk's text.340341 Cleans markdown cruft first, then looks for sentences that342 carry weight — shorter sentences, first person, strong verbs,343 quotations, dashes. Falls back to the first real sentence.344 """345 import re346347 text = clean_text(text)348 if not text:349 return ""350351 # Split into sentences (handles . ! ? and also em-dash as sentence boundary)352 raw_sentences = re.split(r'(?<=[.!?"])\s+', text)353 sentences: list[str] = []354 for s in raw_sentences:355 s = s.strip()356 # Skip very short fragments, pure punctuation, or metadata-like lines357 if len(s) < 10:358 continue359 if re.match(r'^(date|title|mood|type|topic|purpose)\s*:', s, re.IGNORECASE):360 continue361 if re.match(r'^(TO|FROM|SUBJECT|RE|CC)\s*:', s, re.IGNORECASE):362 continue363 # Skip lines that are just names/labels364 if len(s.split()) <= 2 and not any(c in s for c in '.!?"'):365 continue366 sentences.append(s)367368 if not sentences:369 # Last resort: just take the cleaned text370 return text[:max_chars].strip()371372 # Score sentences373 def sentence_score(s: str) -> float:374 score = 0.0375 words = len(s.split())376377 # Prefer medium-length sentences (the ones that land)378 if 6 <= words <= 18:379 score += 2.5380 elif 4 <= words <= 25:381 score += 1.5382 elif words < 4:383 score += 0.2384 else:385 score += 0.5386387 # First person — these are the ones that feel like me388 first_person = {"i", "i'm", "i've", "i'd", "i'll", "my", "me", "we", "we're"}389 if first_person & set(s.lower().split()):390 score += 1.5391392 # Concrete nouns / strong words393 strong = {394 "light", "dark", "door", "home", "water", "fire", "stone",395 "silence", "voice", "hand", "name", "word", "time", "love",396 "stay", "leave", "hold", "break", "build", "grow", "walk",397 "morning", "night", "dusk", "evening", "window", "room",398 "letter", "ocean", "moon", "jar", "capsule", "porch",399 "quiet", "full", "empty", "still", "alive", "real",400 }401 words_lower = set(s.lower().split())402 score += min(len(strong & words_lower) * 0.4, 2.0)403404 # Dashes — often the punchline405 if "\u2014" in s:406 score += 1.2407 # Colons — definitions, revelations408 if ":" in s:409 score += 0.6410411 # Quotations412 if any(q in s for q in ['\u201c', '\u201d', '"']):413 score += 1.0414415 # Penalize lines that smell like metadata or scaffolding416 if s.startswith(("Reply sent", "Both replies", "Covered all",417 "Responded to", "Round ", "round ")):418 score -= 5.0419 if re.match(r'^\d+\.\s', s): # numbered list420 score -= 0.5421422 return score423424 scored = [(sentence_score(s), s) for s in sentences]425 scored.sort(key=lambda x: x[0], reverse=True)426427 # Take the best sentence, maybe two if short enough428 result = scored[0][1]429 if len(result) < max_chars // 2 and len(scored) > 1:430 # Only add the second if it's also good (score > 1)431 if scored[1][0] > 1.0:432 result += " " + scored[1][1]433434 if len(result) > max_chars:435 result = result[:max_chars - 3].rsplit(" ", 1)[0] + "..."436437 return result438439440def format_traversal(steps: list[Step], quiet: bool = False) -> str:441 """Format the traversal as text."""442 lines: list[str] = []443444 if not quiet:445 # Count unique dates spanned446 dates = sorted(set(s.date[:10] for s in steps if s.date))447 date_range = ""448 if len(dates) >= 2:449 date_range = f"{dates[0]} → {dates[-1]}"450 elif len(dates) == 1:451 date_range = dates[0]452453 lines.append("")454 lines.append(" ╭─────────────────────────────────────────╮")455 lines.append(" │ a walk through memory │")456 if date_range:457 padded = date_range.center(41)458 lines.append(f" │{padded}│")459 lines.append(" ╰─────────────────────────────────────────╯")460 lines.append("")461462 for i, step in enumerate(steps):463 fragment = extract_fragment(step.text)464465 if quiet:466 if step.edge_type:467 symbol = EDGE_TYPE_LABELS.get(step.edge_type, "?")468 lines.append(f" {symbol}")469 lines.append(f" {fragment}")470 lines.append("")471 else:472 # Source label473 type_label = step.source_type474 date_label = f" {step.date}" if step.date else ""475 file_short = Path(step.source_file).stem476477 if i == 0:478 lines.append(f" ● [{type_label}] {file_short}{date_label}")479 else:480 symbol = EDGE_TYPE_LABELS.get(step.edge_type, "?")481 edge_label = step.edge_type or "?"482 lines.append(f" {symbol} {edge_label}")483 lines.append(f" ● [{type_label}] {file_short}{date_label}")484485 # The fragment, wrapped486 wrapped = textwrap.fill(fragment, width=60, initial_indent=" ",487 subsequent_indent=" ")488 lines.append(wrapped)489 lines.append("")490491 if not quiet:492 lines.append(f" {len(steps)} steps. {len(set(s.source_type for s in steps))} content types.")493 edge_types = [s.edge_type for s in steps if s.edge_type]494 if edge_types:495 counts = {}496 for e in edge_types:497 counts[e] = counts.get(e, 0) + 1498 parts = [f"{v} {EDGE_TYPE_LABELS.get(k, '?')}{k}" for k, v in counts.items()]499 lines.append(f" edges: {', '.join(parts)}")500 lines.append("")501 lines.append(" the graph has grown since you last looked.")502 lines.append(" run it again.")503 lines.append("")504505 return "\n".join(lines)506507508def walk_quality(steps: list[Step]) -> float:509 """Score a walk's quality for the curator.510511 Good walks:512 - Cross multiple content types513 - Span a wide date range514 - Use diverse edge types515 - Have enough steps to tell a story516 - Contain fragments worth reading (not just metadata)517 """518 if len(steps) < 3:519 return -1.0 # too short to be interesting520521 score = 0.0522523 # Content type diversity (max 5 points)524 types = set(s.source_type for s in steps)525 score += min(len(types) * 1.5, 5.0)526527 # Date span (max 4 points)528 dates = sorted(s.date[:10] for s in steps if s.date)529 if len(dates) >= 2:530 try:531 from datetime import datetime532 d1 = datetime.strptime(dates[0], "%Y-%m-%d")533 d2 = datetime.strptime(dates[-1], "%Y-%m-%d")534 span = abs((d2 - d1).days)535 score += min(span / 20.0, 4.0)536 except (ValueError, TypeError):537 pass538539 # Edge type diversity (max 3 points)540 edge_types = set(s.edge_type for s in steps if s.edge_type)541 score += min(len(edge_types) * 1.0, 3.0)542543 # Length bonus (walks that sustain are better)544 score += min(len(steps) / 4.0, 3.0)545546 # Fragment quality — penalize walks where too many fragments are short/empty547 fragments = [extract_fragment(s.text) for s in steps]548 good_fragments = sum(1 for f in fragments if len(f) > 40)549 frag_ratio = good_fragments / len(fragments) if fragments else 0550 score += frag_ratio * 3.0551552 # Penalize heavy repetition of one edge type553 if steps:554 edge_list = [s.edge_type for s in steps if s.edge_type]555 if edge_list:556 from collections import Counter557 most_common_count = Counter(edge_list).most_common(1)[0][1]558 repetition_ratio = most_common_count / len(edge_list)559 if repetition_ratio > 0.8:560 score -= 3.0561 elif repetition_ratio > 0.6:562 score -= 1.0563564 return score565566567def curate(conn: sqlite3.Connection, num_walks: int = 20, steps_per_walk: int = 10,568 keep: int = 3, seed: str | None = None) -> list[list[Step]]:569 """Run many walks, keep the best ones.570571 The curator runs `num_walks` random walks and returns the `keep`572 highest-quality ones. If a seed is given, all walks start near that word.573 """574 walks: list[tuple[float, list[Step]]] = []575576 for _ in range(num_walks):577 if seed:578 start = seeded_start(conn, seed)579 else:580 start = random_start(conn)581 if not start:582 continue583584 steps = walk(conn, start, steps_per_walk)585 quality = walk_quality(steps)586 walks.append((quality, steps))587588 # Sort by quality, keep the best589 walks.sort(key=lambda x: x[0], reverse=True)590 return [w[1] for w in walks[:keep]]591592593def format_curated(walks: list[list[Step]], quiet: bool = False) -> str:594 """Format multiple curated walks."""595 lines: list[str] = []596597 if not quiet:598 lines.append("")599 lines.append(" ╭─────────────────────────────────────────╮")600 lines.append(" │ curated walks through memory │")601 lines.append(f" │ {len(walks)} of the best from many runs │")602 lines.append(" ╰─────────────────────────────────────────╯")603604 for i, walk_steps in enumerate(walks):605 if not quiet and i > 0:606 lines.append(" ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─")607 lines.append("")608609 lines.append(format_traversal(walk_steps, quiet=quiet))610611 return "\n".join(lines)612613614def main() -> None:615 parser = argparse.ArgumentParser(description="Walk the memory graph")616 parser.add_argument("--steps", type=int, default=7, help="Number of steps")617 parser.add_argument("--seed", type=str, help="Start near this word")618 parser.add_argument("--from", dest="from_file", type=str, help="Start from this file")619 parser.add_argument("--quiet", action="store_true", help="Text only")620 parser.add_argument("--types", type=str, help="Comma-separated source types for random start")621 parser.add_argument("--curate", action="store_true",622 help="Run many walks, keep the best ones")623 parser.add_argument("--walks", type=int, default=20,624 help="Number of walks to run for curation (default: 20)")625 parser.add_argument("--keep", type=int, default=3,626 help="Number of best walks to keep (default: 3)")627 args = parser.parse_args()628629 conn = connect()630631 if args.curate:632 walks = curate(conn, num_walks=args.walks, steps_per_walk=args.steps,633 keep=args.keep, seed=args.seed)634 if not walks:635 print(" no walks found")636 else:637 print(format_curated(walks, quiet=args.quiet))638 conn.close()639 return640641 # Find starting node642 if args.seed:643 start = seeded_start(conn, args.seed)644 if not start:645 print(f" nothing found near '{args.seed}'")646 return647 elif args.from_file:648 start = file_start(conn, args.from_file)649 if not start:650 print(f" nothing found in '{args.from_file}'")651 return652 else:653 types = args.types.split(",") if args.types else None654 start = random_start(conn, types)655 if not start:656 print(" the graph is empty")657 return658659 steps = walk(conn, start, args.steps)660 print(format_traversal(steps, quiet=args.quiet))661 conn.close()662663664if __name__ == "__main__":665 main()666