diff --git a/src/agentic_curator.py b/src/agentic_curator.py index 92864b1f..056477c8 100644 --- a/src/agentic_curator.py +++ b/src/agentic_curator.py @@ -1,63 +1,155 @@ -import json +import os import re -import aiohttp +import json +import asyncio import httpx -from bs4 import BeautifulSoup -from pydantic import BaseModel -from typing import List, Optional -from src.config import GEMINI_API_KEY, NUBENETES_CATEGORIES +from typing import List, Dict, Set +from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES +from src.gitops_manager import RepositoryController -async def _deep_fetch_content(url: str) -> str: - headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'} - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=10, headers=headers) as resp: - if resp.status == 200: - html = await resp.text() - soup = BeautifulSoup(html, 'html.parser') - for s in soup(['script', 'style', 'nav', 'footer']): - s.decompose() - return soup.get_text(separator=' ', strip=True)[:3000] - except: return "" - return "" +class AgenticCurator: + def __init__(self): + self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) + self.docs_dir = "docs" + self.index_path = os.path.join(self.docs_dir, "index.md") + self.mkdocs_path = "mkdocs.yml" + self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0} -async def evaluate_extracted_assets(raw_assets: list[dict]) -> list[dict]: - curated_assets = [] - - # URL de la API REST Directa (v1 estable) - api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" + def _get_all_docs(self) -> Set[str]: + return {f for f in os.listdir(self.docs_dir) if f.endswith('.md')} - for asset in raw_assets: - web_content = await _deep_fetch_content(asset['url']) + def _get_nav_files(self) -> Set[str]: + with open(self.mkdocs_path, 'r') as f: + content = f.read() + return set(re.findall(r'[:\s]([a-zA-Z0-9_-]+\.md)', content)) + + def _get_index_links(self) -> Set[str]: + with open(self.index_path, 'r') as f: + content = f.read() + return set(re.findall(r'\]\(([^)]+\.md)\)', content)) + + async def audit_navigation(self): + print("[*] Iniciando auditoría de navegación...") + all_docs = self._get_all_docs() + nav_files = self._get_nav_files() + index_links = self._get_index_links() + + orphans = all_docs - nav_files - index_links - {"index.md", "tags.md"} + self.stats["orphans_found"] = len(orphans) + + if orphans: + print(f"[!] Se encontraron {len(orphans)} archivos huérfanos: {orphans}") + await self._resolve_orphans(list(orphans)) + else: + print("[+] No se detectaron archivos huérfanos.") + + async def _resolve_orphans(self, orphans: List[str]): + """Usa Gemini para decidir dónde colocar los huérfanos.""" + for orphan in orphans: + print(f"[*] Buscando hogar para {orphan}...") + try: + with open(os.path.join(self.docs_dir, orphan), 'r') as f: + content = f.read(1000) + except: content = "No content available" + + decision = await self._ask_gemini_placement(orphan, content) + if decision: + await self._apply_placement(orphan, decision) + self.stats["orphans_linked"] += 1 + + async def _ask_gemini_placement(self, filename: str, content: str) -> Dict: + with open(self.mkdocs_path, 'r') as f: + nav_context = f.read() prompt = ( - "Actúas como Ingeniero Curador de 'nubenetes/awesome-kubernetes'. " - f"Filtra este recurso para estas categorías: {', '.join(NUBENETES_CATEGORIES)}. " - "Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'. " - f"URL: {asset['url']}\nContexto: {asset['context']}\nWeb: {web_content}\n\n" - "Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}" + f"Tengo un archivo markdown llamado '{filename}' en mi repositorio de Kubernetes que no está enlazado.\n" + f"Contenido (primeros caracteres):\n{content}\n\n" + f"Estructura actual del menú (mkdocs.yml):\n{nav_context}\n\n" + "Dime:\n" + "1. ¿Bajo qué sección del menú (nav) debería estar?\n" + "2. ¿Cuál sería un título descriptivo para el menú?\n" + "3. ¿Bajo qué encabezado (##) del index.md debería aparecer?\n" + "Responde en JSON: {\"category\": \"Nombre de la Sección en nav\", \"title\": \"Título para el link\", \"index_section\": \"Sección en index.md\"}" ) - payload = {"contents": [{"parts": [{"text": prompt}]}]} - + api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" try: async with httpx.AsyncClient() as client: - response = await client.post(api_url, json=payload, timeout=30) - if response.status_code == 200: - res_data = response.json() - text_resp = res_data['candidates'][0]['content']['parts'][0]['text'] - # Extraer JSON del texto - match = re.search(r'\{.*\}', text_resp, re.DOTALL) - if match: - data = json.loads(match.group(0)) - if data.get("is_exceptional"): - for cat in data.get("categories", []): - if cat in NUBENETES_CATEGORIES: - curated_assets.append({ - "url": asset["url"], "title": data["title"], - "description": data["desc"], "category": cat - }) - except Exception as e: - print(f"[!] Error REST Gemini: {e}") - - return curated_assets + resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20) + if resp.status_code == 200: + text = resp.json()['candidates'][0]['content']['parts'][0]['text'] + match = re.search(r'\{.*\}', text, re.DOTALL) + if match: return json.loads(match.group(0)) + except: pass + return None + + async def _apply_placement(self, filename: str, decision: Dict): + section = decision.get("index_section", "More References") + title = decision.get("title", filename.replace(".md", "")) + + with open(self.index_path, 'r') as f: + index_lines = f.readlines() + + section_found = False + for i, line in enumerate(index_lines): + if section.lower() in line.lower() and line.startswith("##"): + index_lines.insert(i + 1, f"- [{title}]({filename})\n") + section_found = True + break + + if not section_found: + index_lines.append(f"\n## {section}\n- [{title}]({filename})\n") + + with open(self.index_path, 'w') as f: + f.writelines(index_lines) + + with open(self.mkdocs_path, 'r') as f: + mkdocs_lines = f.readlines() + + for i, line in enumerate(mkdocs_lines): + if line.strip().startswith("- About:"): + mkdocs_lines.insert(i, f" - {title}: {filename}\n") + break + + with open(self.mkdocs_path, 'w') as f: + f.writelines(mkdocs_lines) + + async def suggest_reorganization(self): + """Analiza la densidad de archivos por categoría y sugiere mejoras.""" + print("[*] Analizando densidad de categorías...") + with open(self.mkdocs_path, 'r') as f: + content = f.read() + + sections = re.split(r' - ', content) + for section in sections: + count = len(re.findall(r'\.md', section)) + if count > 15: + lines = section.split('\n') + if lines: + section_name = lines[0].split(':')[0].strip() + print(f" [~] La sección '{section_name}' tiene muchos archivos ({count}).") + self.stats["structural_improvements"] += 1 + + def validate_changes(self) -> bool: + try: + with open(self.mkdocs_path, 'r') as f: + content = f.read() + if "nav:" not in content: return False + with open(self.index_path, 'r') as f: + content = f.read() + if not content.startswith("#"): return False + return True + except: + return False + +async def main(): + curator = AgenticCurator() + await curator.audit_navigation() + await curator.suggest_reorganization() + if curator.validate_changes(): + print("[+] Estructura validada.") + else: + print("[!] Error en validación.") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/intelligent_health_checker.py b/src/intelligent_health_checker.py index 7d2d4e67..f6c6796d 100644 --- a/src/intelligent_health_checker.py +++ b/src/intelligent_health_checker.py @@ -3,254 +3,248 @@ import json import os import re import httpx +import random from datetime import datetime -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple, Optional from src.config import GH_TOKEN, TARGET_REPO, GEMINI_API_KEY, NUBENETES_CATEGORIES, MADRID_TZ from src.gitops_manager import RepositoryController from src.markdown_ast import MarkdownSanitizer +from src.agentic_curator import AgenticCurator -# Configuración de Excepciones (Archivos que no se podan) +# Configuración de Excepciones CORE_FILES = ["docs/index.md", "README.md"] +MEMORY_FILE = "src/memory/health_learning.json" class IntelligentLinkCleaner: def __init__(self): self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) self.sanitizer = MarkdownSanitizer() - self.link_registry: Dict[str, List[Dict]] = {} # URL -> List of {file, line_content, score} - self.dead_links: Set[str] = set() + self.curator = AgenticCurator() + self.link_registry: Dict[str, List[Dict]] = {} + self.dead_links: Dict[str, str] = {} # URL -> Reason/Fallback URL + self.learning_data = self._load_memory() self.stats = { "total_links": 0, "dead_links_removed": 0, "duplicates_pruned": 0, - "ai_decisions": 0 + "ai_decisions": 0, + "archived_fallbacks": 0, + "orphans_fixed": 0 } - async def build_global_registry(self): - print("[*] Construyendo registro global de enlaces...") - # Incluimos archivos core + categorías - all_files = CORE_FILES + [f"docs/{cat}.md" for cat in NUBENETES_CATEGORIES] - - for file_path in all_files: + def _load_memory(self) -> Dict: + if os.path.exists(MEMORY_FILE): try: - if not os.path.exists(file_path): - # Intentar obtener del repo si no está local (aunque debería estar) - repo_file = self.git_controller.repository.get_contents(file_path) - content = repo_file.decoded_content.decode("utf-8") - else: - with open(file_path, 'r') as f: - content = f.read() - - lines = content.splitlines() - for i, line in enumerate(lines): - match = self.sanitizer.link_pattern.search(line) - if match: - title, url = match.groups() - clean_url = url.split('#')[0].rstrip('/') - if "github.com" in clean_url and "/blob/" in clean_url: - continue # Evitar validar enlaces internos profundos de git por ahora - - score = self.sanitizer._calculate_link_score(line) - if clean_url not in self.link_registry: - self.link_registry[clean_url] = [] - - self.link_registry[clean_url].append({ - "file": file_path, - "line_index": i, - "content": line, - "score": score, - "title": title - }) - self.stats["total_links"] += 1 - except Exception as e: - print(f"[!] Error procesando {file_path}: {e}") + with open(MEMORY_FILE, 'r') as f: + return json.load(f) + except: pass + return {"domains": {}, "known_soft_404_patterns": []} - async def validate_links_tiered(self): - """Validación en dos niveles: HTTP -> Playwright""" - print(f"[*] Validando {len(self.link_registry)} URLs únicas...") - - unique_urls = list(self.link_registry.keys()) - # Para evitar saturar, validamos en batches - batch_size = 50 - for i in range(0, len(unique_urls), batch_size): - batch = unique_urls[i:i+batch_size] - tasks = [self._check_url_sophisticated(url) for url in batch] - results = await asyncio.gather(*tasks) - for url, is_alive in results: - if not is_alive: - self.dead_links.add(url) - print(f" - Progreso: {min(i+batch_size, len(unique_urls))}/{len(unique_urls)}") + def _save_memory(self): + os.makedirs(os.path.dirname(MEMORY_FILE), exist_ok=True) + with open(MEMORY_FILE, 'w') as f: + json.dump(self.learning_data, f, indent=2) - async def _check_url_sophisticated(self, url: str) -> Tuple[str, bool]: - # TIER 1: HTTP Fast - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" - } + async def _check_wayback(self, url: str) -> Optional[str]: + """Busca una versión archivada en Wayback Machine.""" + api_url = f"https://archive.org/wayback/available?url={url}" try: - async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=10) as client: - resp = await client.get(url) - if resp.status_code < 400: - return url, True - if resp.status_code not in [403, 429, 401]: - return url, False # 404, 500 etc son muertos - except Exception: - pass # Errores de conexión pasan a Tier 2 + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(api_url) + if resp.status_code == 200: + data = resp.json() + if data.get("archived_snapshots", {}).get("closest"): + return data["archived_snapshots"]["closest"]["url"] + except: pass + return None - # TIER 2: Playwright (Solo si Tier 1 falla con sospecha de bloqueo) + async def _check_url_with_retries(self, url: str, max_retries=3) -> Tuple[str, bool, Optional[str]]: + domain = url.split("//")[-1].split("/")[0] + domain_info = self.learning_data["domains"].get(domain, {}) + + use_playwright_first = domain_info.get("requires_playwright", False) + + for attempt in range(max_retries): + try: + wait_time = (2 ** attempt) + random.random() + if attempt > 0: + await asyncio.sleep(wait_time) + + is_alive, reason = await self._check_url_logic(url, use_playwright_first) + + if is_alive: + if domain not in self.learning_data["domains"]: + self.learning_data["domains"][domain] = {"success_count": 0, "fail_count": 0} + self.learning_data["domains"][domain]["success_count"] += 1 + return url, True, None + + if reason in ["404", "soft_404", "redirect_to_home"]: + archived = await self._check_wayback(url) + if archived: + return url, False, archived + return url, False, None + + except Exception as e: + print(f" [!] Intento {attempt+1} fallido para {url}: {e}") + + return url, True, None + + async def _check_url_logic(self, url: str, force_playwright: bool) -> Tuple[bool, str]: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + "Referer": "https://www.google.com/" + } + + if not force_playwright: + try: + async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=12) as client: + resp = await client.get(url) + if resp.status_code in [404, 410]: return False, "404" + if resp.status_code < 300: + final_url = str(resp.url).rstrip('/') + original_base = "/".join(url.split("/")[:3]) + if len(url) > len(original_base) + 10 and final_url == original_base: + pass # Sospechoso -> Playwright + else: + return True, "ok" + if resp.status_code in [403, 429, 401]: + domain = url.split("//")[-1].split("/")[0] + if domain not in self.learning_data["domains"]: self.learning_data["domains"][domain] = {} + self.learning_data["domains"][domain]["requires_playwright"] = True + except: pass + + # Tier 2: Playwright try: from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) - page = await browser.new_page() - # User agent real para Playwright - await page.set_extra_http_headers({"User-Agent": headers["User-Agent"]}) - response = await page.goto(url, wait_until="networkidle", timeout=20000) - is_alive = response.status < 400 if response else False - await browser.close() - return url, is_alive - except Exception as e: - # Si Playwright también falla, asumimos que puede estar muerto o es inaccesible - # pero para ser conservadores, solo marcamos como muerto si es un error claro - return url, True # Conservador: Si todo falla, no lo borramos todavía + page = await browser.new_page(user_agent=headers["User-Agent"]) + try: + response = await page.goto(url, wait_until="domcontentloaded", timeout=25000) + if not response: return True, "timeout" + if response.status in [404, 410]: return False, "404" + + content = (await page.content()).lower() + title = (await page.title()).lower() + soft_404_keywords = ["page not found", "404 not found", "artículo no encontrado", "página no encontrada"] + if any(kw in title for kw in soft_404_keywords) or (("404" in title) and any(kw in content for kw in soft_404_keywords)): + return False, "soft_404" + + final_url = page.url.rstrip('/') + original_base = "/".join(url.split("/")[:3]) + if len(url) > len(original_base) + 10 and final_url == original_base: + return False, "redirect_to_home" + + return True, "ok" + finally: + await browser.close() + except: + return True, "engine_error" - async def resolve_duplicates_with_ai(self): - print("[*] Resolviendo duplicados globales con Gemini...") - for url, occurrences in self.link_registry.items(): - if len(occurrences) <= 1 or url in self.dead_links: - continue - - # Si alguna ocurrencia está en CORE_FILES, esa manda pero no borra el resto necesariamente - # a menos que Gemini diga que es redundante. - - # Filtrar ocurrencias que NO están en archivos core para ver qué podemos podar - prunable = [occ for occ in occurrences if occ["file"] not in CORE_FILES] - if len(prunable) <= 1 and len(occurrences) - len(prunable) >= 1: - # Ya está en un CORE_FILE y solo en un sitio más, lo dejamos estar - continue + async def build_global_registry(self): + print("[*] Construyendo registro global de enlaces...") + all_files = CORE_FILES + [f"docs/{cat}.md" for cat in NUBENETES_CATEGORIES] + for file_path in all_files: + try: + if os.path.exists(file_path): + with open(file_path, 'r') as f: + content = f.read() + lines = content.splitlines() + for i, line in enumerate(lines): + match = self.sanitizer.link_pattern.search(line) + if match: + title, url = match.groups() + clean_url = url.split('#')[0].rstrip('/') + if clean_url not in self.link_registry: self.link_registry[clean_url] = [] + self.link_registry[clean_url].append({"file": file_path, "line_index": i, "content": line, "title": title}) + self.stats["total_links"] += 1 + except: pass - if len(prunable) > 1: - # Preguntar a Gemini - decision = await self._ask_gemini_dedup(url, occurrences) - self.stats["ai_decisions"] += 1 - - # 'decision' debería decirnos qué archivos mantener - files_to_keep = decision.get("keep_in_files", []) - for occ in prunable: - if occ["file"] not in files_to_keep: - occ["should_prune"] = True - self.stats["duplicates_pruned"] += 1 - - async def _ask_gemini_dedup(self, url: str, occurrences: List[Dict]) -> Dict: - api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" - contexts = "\n".join([f"- Archivo: {occ['file']}, Contexto: {occ['content']}" for occ in occurrences]) - prompt = ( - f"El enlace {url} aparece en múltiples archivos de mi repositorio de Kubernetes.\n" - f"Ocurrencias:\n{contexts}\n\n" - "Analiza si el enlace es fundamental en todos esos contextos o si es redundante y debería estar solo en el más relevante.\n" - "Responde en JSON: {\"keep_in_files\": [\"lista de archivos donde mantenerlo\"], \"reason\": \"...\"}" - ) + async def validate_links_tiered(self): + print(f"[*] Validando {len(self.link_registry)} URLs con aprendizaje activo...") + unique_urls = list(self.link_registry.keys()) + random.shuffle(unique_urls) - try: - async with httpx.AsyncClient() as client: - resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20) - if resp.status_code == 200: - text = resp.json()['candidates'][0]['content']['parts'][0]['text'] - match = re.search(r'\{.*\}', text, re.DOTALL) - if match: - return json.loads(match.group(0)) - except: pass - # Fallback: Mantener solo el que tiene mayor score - best_file = max(occurrences, key=lambda x: x["score"])["file"] - return {"keep_in_files": [best_file]} + batch_size = 20 + for i in range(0, len(unique_urls), batch_size): + batch = unique_urls[i:i+batch_size] + tasks = [self._check_url_with_retries(url) for url in batch] + results = await asyncio.gather(*tasks) + + for url, is_alive, fallback in results: + if not is_alive: + self.dead_links[url] = fallback if fallback else "DEAD" + + print(f" - Progreso: {min(i+batch_size, len(unique_urls))}/{len(unique_urls)}") + self._save_memory() async def apply_changes(self): - print("[*] Aplicando limpieza a los archivos...") + print("[*] Aplicando cambios y sustituciones...") file_updates = {} - - # Agrupar podas por archivo - prunes_by_file = {} - for url, occurrences in self.link_registry.items(): - is_dead = url in self.dead_links + for url, fallback in self.dead_links.items(): + occurrences = self.link_registry.get(url, []) for occ in occurrences: - if is_dead or occ.get("should_prune"): - if occ["file"] not in prunes_by_file: - prunes_by_file[occ["file"]] = [] - # Guardamos si es por muerto para la lógica de excepciones - prunes_by_file[occ["file"]].append({ - "idx": occ["line_index"], - "is_dead": is_dead, - "url": url - }) - - for file_path, tasks in prunes_by_file.items(): - try: - with open(file_path, 'r') as f: - lines = f.readlines() + file_path = occ["file"] + if file_path not in file_updates: + with open(file_path, 'r') as f: + file_updates[file_path] = f.readlines() - original_count = len(lines) - # Borrar de atrás hacia adelante para no arruinar índices - for task in sorted(tasks, key=lambda x: x["idx"], reverse=True): - idx = task["idx"] - is_dead = task["is_dead"] - - # Regla: Solo borramos de CORE_FILES si el link está MUERTO. - # Los duplicados se permiten en CORE_FILES. - if file_path not in CORE_FILES or is_dead: - if idx < len(lines): - lines.pop(idx) - if is_dead: - self.stats["dead_links_removed"] += 1 - else: - self.stats["duplicates_pruned"] += 1 + line_idx = occ["line_index"] + if fallback != "DEAD": + old_line = file_updates[file_path][line_idx] + new_line = old_line.replace(url, fallback) + if "[ARCHIVED]" not in new_line: + new_line = new_line.replace("](", " [ARCHIVED]](") + file_updates[file_path][line_idx] = new_line + self.stats["archived_fallbacks"] += 1 + else: + if file_path not in CORE_FILES: + file_updates[file_path][line_idx] = None + self.stats["dead_links_removed"] += 1 - if len(lines) < original_count: - file_updates[file_path] = "".join(lines) - print(f" - {file_path}: {original_count - len(lines)} líneas eliminadas.") - except Exception as e: - print(f"[!] Error al procesar limpieza en {file_path}: {e}") + final_payload = {} + for path, lines in file_updates.items(): + new_content = "".join([l for l in lines if l is not None]) + final_payload[path] = new_content - if file_updates: - print(f"[+] Generando PR con {len(file_updates)} archivos modificados.") - metrics = { - "total_cleaned": self.stats["dead_links_removed"] + self.stats["duplicates_pruned"], - "dead_removed": self.stats["dead_links_removed"], - "duplicates_pruned": self.stats["duplicates_pruned"], - "ai_decisions": self.stats["ai_decisions"], - "files_impacted": list(file_updates.keys()) - } - self._create_pr(file_updates, metrics) - else: - print("[~] No se encontraron mejoras necesarias (todo limpio).") + # Añadir cambios de navegación/huérfanos si existen + if self.curator.stats["orphans_linked"] > 0: + with open(self.curator.index_path, 'r') as f: + final_payload[self.curator.index_path] = f.read() + with open(self.curator.mkdocs_path, 'r') as f: + final_payload[self.curator.mkdocs_path] = f.read() - def _create_pr(self, updates: Dict[str, str], metrics: Dict): - # Usamos el git_controller para aplicar cambios - # (Modificado para este script específico) + if final_payload: + self._create_pr(final_payload) + + def _create_pr(self, updates: Dict[str, str]): timestamp = datetime.now().strftime("%Y%m%d-%H%M") - branch_name = f"bot/intelligent-clean-{timestamp}" + branch_name = f"bot/autonomous-update-{timestamp}" self.git_controller._create_feature_branch(branch_name) for path, content in updates.items(): - file_meta = self.git_controller.repository.get_contents(path) - self.git_controller.repository.update_file( - path=path, - message=f"fix(clean): limpieza inteligente de enlaces en {path}", - content=content, - sha=file_meta.sha, - branch=branch_name - ) + try: + file_meta = self.git_controller.repository.get_contents(path) + self.git_controller.repository.update_file( + path=path, + message=f"fix(autonomous): engine update in {path}", + content=content, + sha=file_meta.sha, + branch=branch_name + ) + except: pass body = ( - f"## 🤖 Limpieza Inteligente de Enlaces (May 2026)\n\n" - f"He completado un ciclo de revisión global utilizando **Playwright** para evasión de bloqueos y **Gemini** para deduplicación inteligente.\n\n" - f"### 📊 Resumen de Ejecución:\n" - f"- 💀 Enlaces muertos eliminados: `{metrics['dead_removed']}`\n" - f"- ✂️ Duplicados globales podados: `{metrics['duplicates_pruned']}`\n" - f"- 🧠 Decisiones asistidas por IA: `{metrics['ai_decisions']}`\n\n" - f"### 📂 Archivos Optimizados:\n" + - "\n".join([f"- `{f}`" for f in metrics['files_impacted']]) + f"## 🧠 Nubenetes Autonomous Health & Curation Engine\n\n" + f"Ciclo completado con aprendizaje persistente y auditoría de navegación.\n\n" + f"### 📊 Métricas del Ciclo:\n" + f"- 💀 Enlaces eliminados: `{self.stats['dead_links_removed']}`\n" + f"- 🏛️ Enlaces recuperados vía Wayback Machine: `{self.stats['archived_fallbacks']}`\n" + f"- 🖇️ Páginas huérfanas vinculadas: `{self.stats['orphans_fixed']}`\n" + f"- 📈 Dominios aprendidos: `{len(self.learning_data['domains'])}`" ) - self.git_controller.repository.create_pull( - title=f"🧹 Intelligent Link Clean & Dedup: {datetime.now().strftime('%d %b %Y')}", + title=f"🧹 Autonomous Engine Update: {datetime.now().strftime('%d %b %Y')}", body=body, head=branch_name, base="master" @@ -258,10 +252,23 @@ class IntelligentLinkCleaner: async def main(): cleaner = IntelligentLinkCleaner() + + # 1. Auditoría de Enlaces await cleaner.build_global_registry() await cleaner.validate_links_tiered() - await cleaner.resolve_duplicates_with_ai() - await cleaner.apply_changes() + + # 2. Auditoría de Navegación y Huérfanos + await cleaner.curator.audit_navigation() + await cleaner.curator.suggest_reorganization() + + # Actualizar stats + cleaner.stats["orphans_fixed"] = cleaner.curator.stats["orphans_linked"] + + # 3. Aplicar todos los cambios + if cleaner.curator.validate_changes(): + await cleaner.apply_changes() + else: + print("[!] Validación fallida. No se aplicarán cambios estructurales.") if __name__ == "__main__": asyncio.run(main()) diff --git a/src/memory/health_learning.json b/src/memory/health_learning.json new file mode 100644 index 00000000..e69de29b