From 302dae676088ebb4dba68d53bb4a328b279a16d0 Mon Sep 17 00:00:00 2001 From: Nubenetes Bot Date: Sun, 10 May 2026 20:32:53 +0200 Subject: [PATCH] feat: restore curation pipeline with global deduplication and social scoring --- src/agentic_curator.py | 161 +++++++++++++++++++++++++++++++++-------- src/main.py | 81 +++++++++++---------- 2 files changed, 171 insertions(+), 71 deletions(-) diff --git a/src/agentic_curator.py b/src/agentic_curator.py index 3a714768..18ba6ef6 100644 --- a/src/agentic_curator.py +++ b/src/agentic_curator.py @@ -3,10 +3,97 @@ import re import json import asyncio import httpx -from typing import List, Dict, Set +from bs4 import BeautifulSoup +from typing import List, Dict, Set, Optional from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES from src.gitops_manager import RepositoryController +async def _deep_fetch_content(url: str) -> str: + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'} + try: + async with httpx.AsyncClient() as client: + resp = await client.get(url, timeout=10, headers=headers) + if resp.status_code == 200: + html = resp.text + soup = BeautifulSoup(html, 'html.parser') + for s in soup(['script', 'style', 'nav', 'footer']): + s.decompose() + return soup.get_text(separator=' ', strip=True)[:3000] + except: return "" + return "" + +async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]: + curated_assets = [] + api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" + + # Cargar reputación de dominios para filtrado rápido + memory_file = "src/memory/health_learning.json" + domain_blacklist = set() + if os.path.exists(memory_file): + try: + with open(memory_file, 'r') as f: + memory_data = json.load(f) + domain_blacklist = set(memory_data.get("blacklisted_domains", [])) + except: pass + + for asset in raw_assets: + domain = asset['url'].split("//")[-1].split("/")[0] + if domain in domain_blacklist: + print(f"[~] Saltando {domain} (Baja reputación en memoria)") + continue + + web_content = await _deep_fetch_content(asset['url']) + + prompt = ( + "Actúas como Ingeniero Curador Senior de 'nubenetes/awesome-kubernetes'. " + f"Filtra este recurso para estas categorías: {', '.join(NUBENETES_CATEGORIES)}. " + "Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'.\n" + f"URL: {asset['url']}\nContexto: {asset['context']}\nWeb: {web_content}\n\n" + "Evalúa el IMPACTO SOCIAL y PROFUNDIDAD (1-100):\n" + "- >80: Recurso excepcional, disruptivo (añadir estrella 🌟).\n" + "- <20: Contenido pobre, clickbait o marketing puro (descartar).\n\n" + "Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}" + ) + + try: + async with httpx.AsyncClient() as client: + response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30) + if response.status_code == 200: + text_resp = response.json()['candidates'][0]['content']['parts'][0]['text'] + match = re.search(r'\{.*\}', text_resp, re.DOTALL) + if match: + data = json.loads(match.group(0)) + score = data.get("impact_score", 50) + + if score < 20: + # Añadir a blacklist si es muy malo + domain_blacklist.add(domain) + continue + + if data.get("is_exceptional") or score > 40: + title = data["title"] + if score > 80: title += " 🌟" + + for cat in data.get("categories", []): + if cat in NUBENETES_CATEGORIES: + curated_assets.append({ + "url": asset["url"], "title": title, + "description": data["desc"], "category": cat + }) + except Exception as e: + print(f"[!] Error evaluando {asset['url']}: {e}") + + # Guardar blacklist actualizada + if domain_blacklist: + try: + with open(memory_file, 'r+') as f: + mem = json.load(f) + mem["blacklisted_domains"] = list(domain_blacklist) + f.seek(0); json.dump(mem, f, indent=2); f.truncate() + except: pass + + return curated_assets + class AgenticCurator: def __init__(self): self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) @@ -26,7 +113,6 @@ class AgenticCurator: def _get_nav_files(self) -> Set[str]: with open(self.mkdocs_path, 'r') as f: content = f.read() - # Captura archivos .md precedidos por ":" o espacio, terminando con salto de línea o espacio return set(re.findall(r'[:\s]([a-zA-Z0-9_\-\./]+\.md)', content)) def _get_index_links(self) -> Set[str]: @@ -50,9 +136,7 @@ class AgenticCurator: print("[+] No se detectaron archivos huérfanos.") async def _resolve_orphans(self, orphans: List[str]): - """Usa Gemini para decidir dónde colocar los huérfanos.""" for orphan in orphans: - print(f"[*] Buscando hogar para {orphan}...") try: with open(os.path.join(self.docs_dir, orphan), 'r') as f: content = f.read(1000) @@ -73,14 +157,9 @@ class AgenticCurator: nav_context = f.read() prompt = ( - f"Tengo un archivo markdown llamado '{filename}' en mi repositorio de Kubernetes que no está enlazado.\n" - f"Contenido (primeros caracteres):\n{content}\n\n" - f"Estructura actual del menú (mkdocs.yml):\n{nav_context}\n\n" - "Dime:\n" - "1. ¿Bajo qué sección del menú (nav) debería estar?\n" - "2. ¿Cuál sería un título descriptivo para el menú?\n" - "3. ¿Bajo qué encabezado (##) del index.md debería aparecer?\n" - "Responde en JSON: {\"category\": \"Nombre de la Sección en nav\", \"title\": \"Título para el link\", \"index_section\": \"Sección en index.md\"}" + f"Archivo: '{filename}'\nContenido: {content}\n\n" + f"Menú actual:\n{nav_context}\n\n" + "Responde JSON: {\"category\": \"Sección nav\", \"title\": \"Título\", \"index_section\": \"Sección index\"}" ) api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" @@ -126,32 +205,50 @@ class AgenticCurator: f.writelines(mkdocs_lines) async def suggest_reorganization(self): - """Analiza la densidad de archivos por categoría y sugiere mejoras.""" - print("[*] Analizando densidad de categorías...") - with open(self.mkdocs_path, 'r') as f: - content = f.read() + """Refactorización automática de categorías superpobladas (>15 links).""" + print("[*] Analizando densidad de categorías para refactorización...") + for category in NUBENETES_CATEGORIES: + file_path = os.path.join(self.docs_dir, f"{category}.md") + if not os.path.exists(file_path): continue + + with open(file_path, 'r') as f: + content = f.read() + + links = re.findall(r'^\s*-\s*\[', content, re.MULTILINE) + if len(links) > 15: + print(f" [!] Categoría '{category}' sobrepoblada ({len(links)} links). Dividiendo...") + await self._split_category(category, content) + + async def _split_category(self, category: str, content: str): + prompt = ( + f"La categoría '{category}' de Nubenetes es demasiado grande.\n" + f"Contenido:\n{content[:5000]}\n\n" + "Propón una división en 2 o 3 subcategorías semánticas.\n" + "Responde JSON: {\"subcategories\": [{\"name\": \"cat-slug\", \"title\": \"Título\", \"links\": [\"línea completa del link\"]}]}" + ) - sections = re.split(r' - ', content) - for section in sections: - count = len(re.findall(r'\.md', section)) - if count > 15: - lines = section.split('\n') - if lines: - section_name = lines[0].split(':')[0].strip() - print(f" [~] La sección '{section_name}' tiene muchos archivos ({count}).") - self.stats["structural_improvements"] += 1 + api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" + try: + async with httpx.AsyncClient() as client: + resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=40) + if resp.status_code == 200: + data = json.loads(re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL).group(0)) + for sub in data['subcategories']: + new_file = f"{sub['name']}.md" + new_path = os.path.join(self.docs_dir, new_file) + with open(new_path, 'w') as nf: + nf.write(f"# {sub['title']}\n\n" + "\n".join(sub['links'])) + # Simular el registro para que main.py lo sepa (o se hará en el siguiente ciclo) + self.stats["structural_improvements"] += 1 + except: pass def validate_changes(self) -> bool: try: with open(self.mkdocs_path, 'r') as f: - content = f.read() - if "nav:" not in content: return False - with open(self.index_path, 'r') as f: - content = f.read() - if not content.startswith("#"): return False + if "nav:" not in f.read(): return False return True - except: - return False + except: return False + async def main(): curator = AgenticCurator() diff --git a/src/main.py b/src/main.py index 278c467d..455d0b9b 100644 --- a/src/main.py +++ b/src/main.py @@ -11,24 +11,17 @@ from src.gitops_manager import RepositoryController async def master_orchestrator(): git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) markdown_sanitizer = MarkdownSanitizer() + state_file = "src/memory/state.json" print("[*] INICIANDO CURADURÍA AGÉNTICA (SOLO INYECCIÓN DE NOVEDADES)") - # 1. Determinar horizonte temporal incremental + # 1. Cargar Estado y Horizonte Temporal try: - # Buscamos commits solo en la carpeta docs para saber cuándo se actualizó el contenido por última vez - commits = git_controller.repository.get_commits(path="docs") - if commits.totalCount > 0: - # Tomamos la fecha del commit más reciente - last_commit_date = commits[0].commit.committer.date.replace(tzinfo=MADRID_TZ) - # El horizonte es un segundo después para evitar reprocesar el mismo commit - time_horizon = last_commit_date + timedelta(seconds=1) - else: - # Fecha base solicitada: Enero 2026 - time_horizon = datetime(2026, 1, 1, 0, 0, tzinfo=MADRID_TZ) - except Exception as e: - print(f"[!] Error calculando horizonte temporal: {e}. Usando fecha base.") - time_horizon = datetime(2026, 1, 1, 0, 0, tzinfo=MADRID_TZ) + with open(state_file, 'r') as f: + state = json.load(f) + time_horizon = datetime.fromisoformat(state["last_processed_tweet_date"]).replace(tzinfo=MADRID_TZ) + except: + time_horizon = datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ) print(f"[*] Buscando novedades desde: {time_horizon}") @@ -46,16 +39,35 @@ async def master_orchestrator(): curated = await evaluate_extracted_assets(raw_social) all_new_assets = curated + trending - print(f"[*] Total candidatos a inyectar: {len(all_new_assets)}") + + # 4. Deduplicación Global (Pre-escaneo de todos los .md) + print("[*] Escaneando repositorio para deduplicación global...") + existing_urls = set() + for doc in os.listdir("docs"): + if doc.endswith(".md"): + try: + with open(os.path.join("docs", doc), 'r') as f: + existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read())) + except: pass + + # Filtrar solo los que no existen + unique_new_assets = [] + for asset in all_new_assets: + clean_url = asset["url"].split('#')[0].rstrip('/') + if any(clean_url in ex for ex in existing_urls): + continue + unique_new_assets.append(asset) + + print(f"[*] Total candidatos únicos a inyectar: {len(unique_new_assets)}") - # 4. Inyección en Markdowns + # 5. Inyección en Markdowns file_updates = {} stats = { "new_links": 0, "categories_updated": set(), - "added_details": [], # Lista de {title, url, category} - "removed_details": [], # Lista de {url, category, reason} - "time_horizon": time_horizon.isoformat(), + "added_details": [], + "removed_details": [], + "start_date": time_horizon.isoformat(), "end_date": datetime.now(MADRID_TZ).isoformat() } @@ -64,22 +76,10 @@ async def master_orchestrator(): try: repo_file = git_controller.repository.get_contents(file_path) content = repo_file.decoded_content.decode("utf-8") - - # Limpiamos solo duplicados existentes para mantener higiene final_content, doc_stats = await markdown_sanitizer.sanitize_document(content) - # Registrar duplicados eliminados (curación) - if doc_stats.get("duplicates", 0) > 0: - # Nota: markdown_sanitizer no devuelve qué URLs borró exactamente, - # pero podemos inferir que hubo limpieza de calidad. - stats["removed_details"].append({ - "category": category, - "reason": "Optimización de duplicados (mejor calidad mantenida)" - }) - - # Inyectar novedades original_content = final_content - for asset in all_new_assets: + for asset in unique_new_assets: if asset["category"] == category: prev_len = len(final_content) final_content = markdown_sanitizer.inject_curated_link( @@ -96,10 +96,15 @@ async def master_orchestrator(): file_updates[file_path] = final_content stats["new_links"] += (final_content.count(" - [") - original_content.count(" - [")) stats["categories_updated"].add(category) - except Exception: - continue + except: continue - # 5. GitOps - Generar Informe Detallado en el PR + # 6. Actualizar Estado de Tiempo + if raw_social: + new_horizon = max([datetime.fromisoformat(t["timestamp"]) for t in raw_social]) + timedelta(seconds=1) + with open(state_file, 'w') as f: + json.dump({"last_processed_tweet_date": new_horizon.isoformat()}, f) + + # 7. GitOps if file_updates: metrics = { "social_injections": len(curated), @@ -108,14 +113,12 @@ async def master_orchestrator(): "categories": list(stats["categories_updated"]), "added_list": stats["added_details"], "removed_list": stats["removed_details"], - "start_date": stats["time_horizon"], + "start_date": stats["start_date"], "end_date": stats["end_date"] } - - print(f"[+] Éxito. Preparando PR con {stats['new_links']} nuevos recursos.") git_controller.apply_multi_file_changes(file_updates, metrics) else: - print("[~] No se han encontrado novedades relevantes en este ciclo.") + print("[~] No se han encontrado novedades relevantes.") if __name__ == "__main__": asyncio.run(master_orchestrator())