diff --git a/src/agentic_curator.py b/src/agentic_curator.py index 79789b3b..9820561d 100644 --- a/src/agentic_curator.py +++ b/src/agentic_curator.py @@ -34,8 +34,8 @@ from src.logger import log_event async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: evaluations = {} if not GEMINI_API_KEYS: - log_event("[!] ERROR CRÍTICO: GEMINI_API_KEYS no encontrada en el entorno.") - return {a["url"]: {"status": "FILTERED", "reason": "Configuración: API KEY faltante"} for a in raw_assets} + log_event("[!] CRITICAL ERROR: GEMINI_API_KEYS not found in environment.") + return {a["url"]: {"status": "FILTERED", "reason": "Config: Missing API KEY"} for a in raw_assets} memory_file = "src/memory/health_learning.json" domain_blacklist = set() @@ -47,74 +47,75 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: except: pass for i, asset in enumerate(raw_assets): - post_date = asset.get('timestamp', 'Fecha desconocida') - context = asset.get('context', asset.get('description', 'Sin contexto adicional')) + post_date = asset.get('timestamp', 'Unknown date') + context = asset.get('context', asset.get('description', 'No additional context')) - log_event(f"--- EVALUANDO {i+1}/{len(raw_assets)} ---", section_break=False) + log_event(f"--- EVALUATING {i+1}/{len(raw_assets)} ---", section_break=False) log_event(f" - URL: {asset['url']}") log_event(f" - Post Date: {post_date}") - log_event(f" - Contexto del Post: \"{context[:300]}...\"") + log_event(f" - Post Context: \"{context[:300]}...\"") domain = asset['url'].split("//")[-1].split("/")[0] if domain in domain_blacklist: - log_event(f" [-] RECHAZADO: Dominio en lista negra ({domain})") - evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Dominio en lista negra"} + log_event(f" [-] REJECTED: Blacklisted domain ({domain})") + evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Blacklisted domain"} continue web_content = await _deep_fetch_content(asset['url']) prompt = ( - "Actúas como Ingeniero Curador Senior de 'nubenetes/awesome-kubernetes'.\n" - "Tu misión es catalogar contenido TÉCNICO sobre Kubernetes y Cloud Native compartido por el usuario.\n" - "REGLA DE ORO: Si el enlace está en el feed, es porque el usuario lo considera útil. NO lo descartes a menos que sea ruido total (publicidad agresiva, error 404, o contenido no técnico).\n\n" - f"Categorías válidas: {', '.join(NUBENETES_CATEGORIES)}.\n\n" - "INSTRUCCIONES:\n" - "1. YOUTUBE: Acepta videos técnicos o tutoriales. Categorízalos según su temática.\n" - "2. RESUMEN: Crea un resumen conciso (1 frase). Usa prioritariamente el 'Contexto' (que es el post de X) ya que suele explicar por qué se compartió.\n" - "3. ASIGNACIÓN: Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'.\n\n" - f"URL: {asset['url']}\nContexto de X: {context}\nContenido Web Extraído: {web_content[:2000]}\n\n" - "Evalúa el IMPACTO TÉCNICO (1-100):\n" - "- >80: Recurso excepcional (🌟).\n" - "- >5: Aceptar (si encaja en alguna categoría).\n" - "- <5: Descartar (Ruido absoluto).\n\n" - "Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Breve explicación de por qué esta categoría y score\", \"rejection_reason\": \"... (si aplica)\"}" + "You act as a Senior Curation Engineer for 'nubenetes/awesome-kubernetes'.\n" + "Your mission is to catalog TECHNICAL content about Kubernetes and Cloud Native shared by the user.\n" + "GOLDEN RULE: If the link is in the feed, it's because the user considers it useful. DO NOT discard unless it is total noise (aggressive ads, 404, or non-technical content).\n\n" + f"Valid categories: {', '.join(NUBENETES_CATEGORIES)}.\n\n" + "INSTRUCTIONS:\n" + "1. LANGUAGE: ALL outputs (title, desc, reasoning) MUST BE IN ENGLISH.\n" + "2. YOUTUBE: Accept technical videos or tutorials. Categorize them by topic.\n" + "3. SUMMARY: Create a concise summary (1 sentence). Use the 'Context' (the X post) as a priority as it explains why it was shared.\n" + "4. ASSIGNMENT: If it's about Model Context Protocol (MCP), assign it to 'ai-agents-mcp'.\n\n" + f"URL: {asset['url']}\nX Context: {context}\nExtracted Web Content: {web_content[:2000]}\n\n" + "Evaluate TECHNICAL IMPACT (1-100):\n" + "- >80: Exceptional resource (🌟).\n" + "- >5: Accept (if it fits a category).\n" + "- <5: Discard (Absolute noise).\n\n" + "Respond ONLY with a JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Brief explanation (English)\", \"rejection_reason\": \"... (if applicable, English)\"}" ) try: data = await call_gemini_with_retry(prompt) score = data.get("impact_score", 50) valid_cats = [c for c in data.get("categories", []) if c in NUBENETES_CATEGORIES] - reasoning = data.get("reasoning", "Sin motivo especificado") + reasoning = data.get("reasoning", "No reason specified") if score < 5: - reason = data.get("rejection_reason", "Bajo impacto técnico") + reason = data.get("rejection_reason", "Low technical impact") evaluations[asset["url"]] = {"status": "FILTERED", "reason": reason} - log_event(f" [-] RECHAZADO: {reason} (Score: {score})") - log_event(f" Motivo IA: {reasoning}") + log_event(f" [-] REJECTED: {reason} (Score: {score})") + log_event(f" AI Reason: {reasoning}") if score < 1 and domain not in domain_blacklist: domain_blacklist.add(domain) - log_event(f" [!] Dominio {domain} añadido a lista negra.") + log_event(f" [!] Domain {domain} added to blacklist.") elif not valid_cats: - evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Sin categoría técnica válida"} - log_event(f" [-] RECHAZADO: No se encontró categoría válida (Sugeridas: {data.get('categories')})") - log_event(f" Motivo IA: {reasoning}") + evaluations[asset["url"]] = {"status": "FILTERED", "reason": "No valid technical category found"} + log_event(f" [-] REJECTED: No valid category found (Suggested: {data.get('categories')})") + log_event(f" AI Reason: {reasoning}") else: evaluations[asset["url"]] = { "status": "INCLUDED", "title": data["title"], "description": data["desc"], "category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80, "reasoning": reasoning } - log_event(f" [+] ACEPTADO: \"{data['title']}\" (Score: {score})") - log_event(f" Destino: docs/{valid_cats[0]}.md") - log_event(f" Descripción: {data['desc']}") - log_event(f" Motivo IA: {reasoning}") + log_event(f" [+] ACCEPTED: \"{data['title']}\" (Score: {score})") + log_event(f" Destination: docs/{valid_cats[0]}.md") + log_event(f" Description: {data['desc']}") + log_event(f" AI Reason: {reasoning}") except Exception as e: - log_event(f" [!] ERROR CRÍTICO EVALUANDO {asset['url']}: {e}") - evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Fallo Evaluación: {str(e)[:100]}"} + log_event(f" [!] CRITICAL ERROR EVALUATING {asset['url']}: {e}") + evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Evaluation Failed: {str(e)[:100]}"} - await asyncio.sleep(2.0) # Ritmo estable + await asyncio.sleep(2.0) # Steady pace # Guardar blacklist actualizada try: @@ -182,7 +183,7 @@ class AgenticCurator: async def decide_smart_injection(self, markdown_content: str, asset: Dict) -> str: """ - Inyecta un enlace de forma inteligente y actualiza el TOC si es necesario. + Smartly injects a link and updates the TOC if necessary. """ lines = markdown_content.splitlines() structure = "\n".join([l for l in lines if l.startswith("#")]) @@ -191,15 +192,15 @@ class AgenticCurator: formatted_line = f" - [{asset['title']}]({asset['url']}){stars} - {asset['description']}" prompt = ( - "Actúas como Arquitecto de Contenidos de Nubenetes.com.\n" - f"Tu misión es inyectar este nuevo recurso en el archivo markdown de forma lógica:\n" - f"RECURSO: {formatted_line}\n" - "ESTRUCTURA ACTUAL:\n" + "You act as a Content Architect for Nubenetes.com.\n" + f"Your mission is to logically inject this new resource into the markdown file (LANGUAGE: ENGLISH):\n" + f"RESOURCE: {formatted_line}\n" + "CURRENT STRUCTURE:\n" f"{structure[:1500]}\n\n" - "INSTRUCCIONES:\n" - "1. Identifica el header (##) más adecuado.\n" - "2. Si no existe, PROPÓN UNO NUEVO.\n" - "Responde JSON: {\"target_header\": \"## ...\", \"is_new_header\": bool, \"insert_after_header\": \"## ...\"}" + "INSTRUCTIONS:\n" + "1. Identify the most suitable header (##).\n" + "2. If it doesn't exist, PROPOSE A NEW ONE (in English).\n" + "Respond JSON: {\"target_header\": \"## ...\", \"is_new_header\": bool, \"insert_after_header\": \"## ...\"}" ) try: @@ -236,19 +237,29 @@ class AgenticCurator: new_content_raw = "\n".join(new_lines) if inserted: - # Si se añadió un header nuevo, reconstruir el TOC + # If a new header was added, rebuild the TOC if is_new: + log_event(f" [🏠] AI decided: Section '{target_header}' (NEW)") return await self._rebuild_toc(new_content_raw) + log_event(f" [🏠] AI decided: Section '{target_header}' (EXISTING)") return new_content_raw except: pass return self._manual_fallback_injection(markdown_content, asset) + def _manual_fallback_injection(self, content: str, asset: Dict) -> str: + stars = " 🌟" if asset['impact_score'] > 80 else "" + line = f" - [{asset['title']}]({asset['url']}){stars} - {asset['description']}" + # If no sections, add a generic header + if "##" not in content: + return content + f"\n\n## Tools and Resources\n{line}" + return content + f"\n{line}" + async def suggest_reorganization(self): """ - Audita archivos y los reorganiza INTERNAMENTE, reconstruyendo el TOC. + Audits files and reorganizes them INTERNALLY, rebuilding the TOC. """ - log_event("[*] Iniciando Auditoría de Reorganización Interna...", section_break=True) + log_event("[*] Starting Internal Reorganization Audit...", section_break=True) for file in os.listdir(self.docs_dir): if not file.endswith(".md") or file == "index.md": continue @@ -260,23 +271,24 @@ class AgenticCurator: headers = re.findall(r'^##\s+', content, re.MULTILINE) if len(links) > 25 and len(headers) < 3: - log_event(f" [!] REORGANIZANDO: {file}") + log_event(f" [!] REORGANIZING: {file}") prompt = ( - f"Reorganiza el archivo '{file}' en secciones (##) lógicas.\n" - "MANTÉN TODOS LOS ENLACES. NO incluyas el TOC (yo lo generaré).\n" - f"CONTENIDO ACTUAL:\n{content[:5000]}" + f"Reorganize the file '{file}' into logical sections (##).\n" + "KEEP ALL LINKS. DO NOT include the TOC (I will generate it).\n" + "ALL HEADERS MUST BE IN ENGLISH.\n" + f"CURRENT CONTENT:\n{content[:5000]}" ) try: reorganized = await call_gemini_with_retry(prompt, response_format="text") if len(reorganized) > len(content) * 0.7: - # Reconstruir el TOC después de la reorganización masiva + # Rebuild the TOC after massive reorganization final_content = await self._rebuild_toc(reorganized) with open(path, 'w') as f: f.write(final_content) - log_event(f" [OK] Reorganización y TOC actualizados para {file}") + log_event(f" [OK] Reorganization and TOC updated for {file}") except Exception as e: - log_event(f" [!] Error reorganizando {file}: {e}") + log_event(f" [!] Error reorganizing {file}: {e}") def validate_changes(self) -> bool: return True diff --git a/src/main.py b/src/main.py index 78442c6b..f70f0276 100644 --- a/src/main.py +++ b/src/main.py @@ -18,17 +18,17 @@ async def master_orchestrator(): git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) start_time = datetime.now(MADRID_TZ) - log_event("INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)", section_break=True) + log_event("STARTING AGENTIC CURATION (CHRONOLOGY & TRANSPARENCY)", section_break=True) - # 1. Horizonte Temporal Dinámico / Histórico + # 1. Dynamic / Historical Time Horizon is_historical = os.getenv("HISTORICAL_MODE", "false").lower() == "true" if is_historical: - # Modo Histórico por Tramos (Ej: tramos de 180 días) + # Historical Mode by Chunks (e.g., 180-day chunks) final_stop_date = datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ) chunk_days = int(os.getenv("HISTORICAL_CHUNK_DAYS", "180")) - # El tramo actual termina donde el anterior empezó (o en 'ahora' si es el primero) + # Current chunk ends where the previous one started (or 'now' if first) until_str = os.getenv("HISTORICAL_UNTIL_DATE") if until_str: until_date = datetime.fromisoformat(until_str).replace(tzinfo=MADRID_TZ) @@ -39,22 +39,22 @@ async def master_orchestrator(): if since_date < final_stop_date: since_date = final_stop_date - log_event(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}") + log_event(f"[*] HISTORICAL MODE: Chunk {since_date.date()} -> {until_date.date()}") else: - # Modo Normal: Usar CURATION_START_DATE si existe, si no state.json + # Normal Mode: Use CURATION_START_DATE if exists, else state.json env_start = os.getenv("CURATION_START_DATE") if env_start: try: since_date = datetime.fromisoformat(env_start).replace(tzinfo=MADRID_TZ) - log_event(f"[*] Modo Normal: Desde fecha manual del workflow {since_date.date()}") + log_event(f"[*] Normal Mode: From manual workflow date {since_date.date()}") except: since_date = get_last_date() - log_event(f"[*] Modo Normal: Error parseando fecha manual, usando state.json {since_date.date()}") + log_event(f"[*] Normal Mode: Error parsing manual date, using state.json {since_date.date()}") else: since_date = get_last_date() - log_event(f"[*] Modo Normal: Desde la última fecha guardada {since_date.date()}") + log_event(f"[*] Normal Mode: From last saved date {since_date.date()}") - # 2. Ingesta Multi-fuente + # 2. Multi-source Ingestion backup_file = os.getenv("BACKUP_FILE") x_audit_trail = [] if backup_file and os.path.exists(backup_file): @@ -68,10 +68,10 @@ async def master_orchestrator(): raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy) x_audit_trail = twitter_client.audit_trail - # GitHub Trending solo en modo normal (para no repetir) + # GitHub Trending only in normal mode (to avoid repetition) trending = [] if not is_historical and not backup_file: - log_event("[*] Buscando novedades en GitHub Trending...") + log_event("[*] Searching for news in GitHub Trending...") trending = await discover_trending_assets() for t in trending: t["source_type"] = "GitHub Trending" @@ -79,13 +79,13 @@ async def master_orchestrator(): all_raw_assets = raw_social + trending if not all_raw_assets: - log_event("[!] No se encontraron nuevos enlaces para procesar.") + log_event("[!] No new links found to process.") return - # 3. Expansión y Deduplicación Inicial - log_event(f"[*] Expandiendo y deduplicando {len(all_raw_assets)} enlaces brutos...") + # 3. Expansion and Initial Deduplication + log_event(f"[*] Expanding and deduplicating {len(all_raw_assets)} raw links...") - semaphore = asyncio.Semaphore(20) # Máximo 20 peticiones simultáneas + semaphore = asyncio.Semaphore(20) # Max 20 simultaneous requests async def process_asset(asset): async with semaphore: @@ -105,9 +105,9 @@ async def master_orchestrator(): unique_assets_map[clean_url] = asset all_raw_assets = list(unique_assets_map.values()) - log_event(f"[*] Total tras deduplicación inicial: {len(all_raw_assets)} enlaces únicos.") + log_event(f"[*] Total after initial deduplication: {len(all_raw_assets)} unique links.") - # 4. Evaluación y Registro (Deduplicación Global Robusta) + # 4. Evaluation and Registration (Robust Global Deduplication) existing_urls = set() for root, dirs, files in os.walk("docs"): for file in files: @@ -120,9 +120,9 @@ async def master_orchestrator(): existing_urls.add(url.split('#')[0].rstrip('/').lower()) except: pass - log_event(f"[*] Deduplicación Global: {len(existing_urls)} URLs existentes cargadas.") + log_event(f"[*] Global Deduplication: {len(existing_urls)} existing URLs loaded.") - # --- INICIO PROCESAMIENTO POR LOTES --- + # --- START BATCH PROCESSING --- BATCH_SIZE = 40 all_raw_assets_batches = [all_raw_assets[i:i + BATCH_SIZE] for i in range(0, len(all_raw_assets), BATCH_SIZE)] @@ -133,14 +133,14 @@ async def master_orchestrator(): modified_files_content = {} for batch_index, batch_assets in enumerate(all_raw_assets_batches): - log_event(f">>> INICIANDO LOTE {batch_index + 1}/{len(all_raw_assets_batches)} ({len(batch_assets)} enlaces)", section_break=True) + log_event(f">>> STARTING BATCH {batch_index + 1}/{len(all_raw_assets_batches)} ({len(batch_assets)} links)", section_break=True) assets_to_evaluate = [] for asset in batch_assets: url = asset["url"] clean_url = url.split('#')[0].rstrip('/').lower() - # Trackear fecha máxima + # Track max date try: ts = asset.get('timestamp') asset_date = None @@ -158,16 +158,16 @@ async def master_orchestrator(): except: pass if clean_url in existing_urls: - log_event(f" [=] SALTADO: {url[:60]}... (Ya existe)") + log_event(f" [=] SKIPPED: {url[:60]}... (Already exists)") full_report_metrics.append({ - "url": url, "status": "DUPLICATE", "reason": "Ya existe en repositorio", + "url": url, "status": "DUPLICATE", "reason": "Already exists in repository", "category": "N/A", "post_date": ts, "source": asset.get("source_type", "Social") }) continue assets_to_evaluate.append(asset) if not assets_to_evaluate: - log_event(" [*] El lote completo consiste en duplicados. Siguiente lote.") + log_event(" [*] Entire batch consists of duplicates. Next batch.") continue evaluations = await evaluate_extracted_assets(assets_to_evaluate) @@ -175,10 +175,10 @@ async def master_orchestrator(): for asset in assets_to_evaluate: url = asset["url"] - evaluation = evaluations.get(url, {"status": "FILTERED", "reason": "No evaluado por IA"}) + evaluation = evaluations.get(url, {"status": "FILTERED", "reason": "Not evaluated by AI"}) full_report_metrics.append({ - "url": url, "status": evaluation["status"], "reason": evaluation.get("reason", "Aceptado"), + "url": url, "status": evaluation["status"], "reason": evaluation.get("reason", "Accepted"), "category": evaluation.get("category", "N/A"), "post_date": asset.get("timestamp"), "source": asset.get("source_type", "Social") }) @@ -192,9 +192,9 @@ async def master_orchestrator(): }) existing_urls.add(url.split('#')[0].rstrip('/').lower()) - # Inyección inmediata + # Immediate injection if unique_new_assets: - log_event(f">>> APLICANDO {len(unique_new_assets)} INYECCIONES EN MARKDOWN...", section_break=True) + log_event(f">>> APPLYING {len(unique_new_assets)} INJECTIONS IN MARKDOWN...", section_break=True) for asset in unique_new_assets: category = asset["category"] file_path = f"docs/{category}.md" @@ -212,20 +212,20 @@ async def master_orchestrator(): if len(new_content) > len(content): modified_files_content[file_path] = new_content with open(file_path, 'w') as f: f.write(new_content) - log_event(f" [>>>] ÉXITO: Inyectado en docs/{category}.md -> {asset['url']}") + log_event(f" [>>>] SUCCESS: Injected into docs/{category}.md -> {asset['url']}") else: - log_event(f" [!] ADVERTENCIA: La inyección no modificó el archivo para {asset['url']}") + log_event(f" [!] WARNING: Injection did not modify file for {asset['url']}") except Exception as e: - log_event(f" [!] Error inyectando {asset['url']}: {e}") + log_event(f" [!] Error injecting {asset['url']}: {e}") total_processed += len(batch_assets) if batch_index < len(all_raw_assets_batches) - 1: - log_event(f"[*] Pausa de seguridad: 5s para el siguiente lote...") + log_event(f"[*] Safety pause: 5s for the next batch...") await asyncio.sleep(5) - # 4. Finalización y PR + # 4. Finalization and PR if modified_files_content: - log_event(">>> GENERANDO PULL REQUEST...", section_break=True) + log_event(">>> GENERATING PULL REQUEST...", section_break=True) metrics = { "total_extracted": len(all_raw_assets), "start_date": since_date.isoformat(), @@ -236,22 +236,22 @@ async def master_orchestrator(): try: git_controller.apply_multi_file_changes(modified_files_content, metrics) except Exception as e: - log_event(f"[!] Error creando PR: {e}") + log_event(f"[!] Error creating PR: {e}") - # Auditoría de reorganización + # Reorganization audit await curator_agent.suggest_reorganization() - # Actualizar estado + # Update state if max_tweet_date > since_date: save_state(max_tweet_date + timedelta(seconds=1)) - # Lógica de re-disparo para Modo Histórico en GitHub Actions + # Re-trigger logic for Historical Mode in GitHub Actions if is_historical and since_date > final_stop_date: - # Imprimir para que el YAML lo capture + # Print for YAML to capture print(f"\nNEXT_CHUNK_START: {since_date.isoformat()}") - log_event(f"[*] TRAMO FINALIZADO. Sugiriendo siguiente tramo desde: {since_date.date()}", section_break=True) + log_event(f"[*] CHUNK FINISHED. Suggesting next chunk from: {since_date.date()}", section_break=True) - log_event("PROCESO FINALIZADO CON ÉXITO.", section_break=True) + log_event("PROCESS FINISHED SUCCESSFULLY.", section_break=True) if __name__ == "__main__": asyncio.run(master_orchestrator())