feat(health): autonomous engine with learning, retries, and navigation audit

This commit is contained in:
Nubenetes Bot
2026-05-09 10:00:11 +02:00
parent b3847eb583
commit 0b0c525152
3 changed files with 356 additions and 257 deletions

View File

@@ -1,63 +1,155 @@
import json
import os
import re
import aiohttp
import json
import asyncio
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel
from typing import List, Optional
from src.config import GEMINI_API_KEY, NUBENETES_CATEGORIES
from typing import List, Dict, Set
from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
async def _deep_fetch_content(url: str) -> str:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10, headers=headers) as resp:
if resp.status == 200:
html = await resp.text()
soup = BeautifulSoup(html, 'html.parser')
for s in soup(['script', 'style', 'nav', 'footer']):
s.decompose()
return soup.get_text(separator=' ', strip=True)[:3000]
except: return ""
return ""
class AgenticCurator:
def __init__(self):
self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
self.docs_dir = "docs"
self.index_path = os.path.join(self.docs_dir, "index.md")
self.mkdocs_path = "mkdocs.yml"
self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0}
async def evaluate_extracted_assets(raw_assets: list[dict]) -> list[dict]:
curated_assets = []
# URL de la API REST Directa (v1 estable)
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
def _get_all_docs(self) -> Set[str]:
return {f for f in os.listdir(self.docs_dir) if f.endswith('.md')}
for asset in raw_assets:
web_content = await _deep_fetch_content(asset['url'])
def _get_nav_files(self) -> Set[str]:
with open(self.mkdocs_path, 'r') as f:
content = f.read()
return set(re.findall(r'[:\s]([a-zA-Z0-9_-]+\.md)', content))
def _get_index_links(self) -> Set[str]:
with open(self.index_path, 'r') as f:
content = f.read()
return set(re.findall(r'\]\(([^)]+\.md)\)', content))
async def audit_navigation(self):
print("[*] Iniciando auditoría de navegación...")
all_docs = self._get_all_docs()
nav_files = self._get_nav_files()
index_links = self._get_index_links()
orphans = all_docs - nav_files - index_links - {"index.md", "tags.md"}
self.stats["orphans_found"] = len(orphans)
if orphans:
print(f"[!] Se encontraron {len(orphans)} archivos huérfanos: {orphans}")
await self._resolve_orphans(list(orphans))
else:
print("[+] No se detectaron archivos huérfanos.")
async def _resolve_orphans(self, orphans: List[str]):
"""Usa Gemini para decidir dónde colocar los huérfanos."""
for orphan in orphans:
print(f"[*] Buscando hogar para {orphan}...")
try:
with open(os.path.join(self.docs_dir, orphan), 'r') as f:
content = f.read(1000)
except: content = "No content available"
decision = await self._ask_gemini_placement(orphan, content)
if decision:
await self._apply_placement(orphan, decision)
self.stats["orphans_linked"] += 1
async def _ask_gemini_placement(self, filename: str, content: str) -> Dict:
with open(self.mkdocs_path, 'r') as f:
nav_context = f.read()
prompt = (
"Actúas como Ingeniero Curador de 'nubenetes/awesome-kubernetes'. "
f"Filtra este recurso para estas categorías: {', '.join(NUBENETES_CATEGORIES)}. "
"Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'. "
f"URL: {asset['url']}\nContexto: {asset['context']}\nWeb: {web_content}\n\n"
"Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}"
f"Tengo un archivo markdown llamado '{filename}' en mi repositorio de Kubernetes que no está enlazado.\n"
f"Contenido (primeros caracteres):\n{content}\n\n"
f"Estructura actual del menú (mkdocs.yml):\n{nav_context}\n\n"
"Dime:\n"
"1. ¿Bajo qué sección del menú (nav) debería estar?\n"
"2. ¿Cuál sería un título descriptivo para el menú?\n"
"3. ¿Bajo qué encabezado (##) del index.md debería aparecer?\n"
"Responde en JSON: {\"category\": \"Nombre de la Sección en nav\", \"title\": \"Título para el link\", \"index_section\": \"Sección en index.md\"}"
)
payload = {"contents": [{"parts": [{"text": prompt}]}]}
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
try:
async with httpx.AsyncClient() as client:
response = await client.post(api_url, json=payload, timeout=30)
if response.status_code == 200:
res_data = response.json()
text_resp = res_data['candidates'][0]['content']['parts'][0]['text']
# Extraer JSON del texto
match = re.search(r'\{.*\}', text_resp, re.DOTALL)
if match:
data = json.loads(match.group(0))
if data.get("is_exceptional"):
for cat in data.get("categories", []):
if cat in NUBENETES_CATEGORIES:
curated_assets.append({
"url": asset["url"], "title": data["title"],
"description": data["desc"], "category": cat
})
except Exception as e:
print(f"[!] Error REST Gemini: {e}")
return curated_assets
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20)
if resp.status_code == 200:
text = resp.json()['candidates'][0]['content']['parts'][0]['text']
match = re.search(r'\{.*\}', text, re.DOTALL)
if match: return json.loads(match.group(0))
except: pass
return None
async def _apply_placement(self, filename: str, decision: Dict):
section = decision.get("index_section", "More References")
title = decision.get("title", filename.replace(".md", ""))
with open(self.index_path, 'r') as f:
index_lines = f.readlines()
section_found = False
for i, line in enumerate(index_lines):
if section.lower() in line.lower() and line.startswith("##"):
index_lines.insert(i + 1, f"- [{title}]({filename})\n")
section_found = True
break
if not section_found:
index_lines.append(f"\n## {section}\n- [{title}]({filename})\n")
with open(self.index_path, 'w') as f:
f.writelines(index_lines)
with open(self.mkdocs_path, 'r') as f:
mkdocs_lines = f.readlines()
for i, line in enumerate(mkdocs_lines):
if line.strip().startswith("- About:"):
mkdocs_lines.insert(i, f" - {title}: {filename}\n")
break
with open(self.mkdocs_path, 'w') as f:
f.writelines(mkdocs_lines)
async def suggest_reorganization(self):
"""Analiza la densidad de archivos por categoría y sugiere mejoras."""
print("[*] Analizando densidad de categorías...")
with open(self.mkdocs_path, 'r') as f:
content = f.read()
sections = re.split(r' - ', content)
for section in sections:
count = len(re.findall(r'\.md', section))
if count > 15:
lines = section.split('\n')
if lines:
section_name = lines[0].split(':')[0].strip()
print(f" [~] La sección '{section_name}' tiene muchos archivos ({count}).")
self.stats["structural_improvements"] += 1
def validate_changes(self) -> bool:
try:
with open(self.mkdocs_path, 'r') as f:
content = f.read()
if "nav:" not in content: return False
with open(self.index_path, 'r') as f:
content = f.read()
if not content.startswith("#"): return False
return True
except:
return False
async def main():
curator = AgenticCurator()
await curator.audit_navigation()
await curator.suggest_reorganization()
if curator.validate_changes():
print("[+] Estructura validada.")
else:
print("[!] Error en validación.")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -3,254 +3,248 @@ import json
import os
import re
import httpx
import random
from datetime import datetime
from typing import Dict, List, Set, Tuple
from typing import Dict, List, Set, Tuple, Optional
from src.config import GH_TOKEN, TARGET_REPO, GEMINI_API_KEY, NUBENETES_CATEGORIES, MADRID_TZ
from src.gitops_manager import RepositoryController
from src.markdown_ast import MarkdownSanitizer
from src.agentic_curator import AgenticCurator
# Configuración de Excepciones (Archivos que no se podan)
# Configuración de Excepciones
CORE_FILES = ["docs/index.md", "README.md"]
MEMORY_FILE = "src/memory/health_learning.json"
class IntelligentLinkCleaner:
def __init__(self):
self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
self.sanitizer = MarkdownSanitizer()
self.link_registry: Dict[str, List[Dict]] = {} # URL -> List of {file, line_content, score}
self.dead_links: Set[str] = set()
self.curator = AgenticCurator()
self.link_registry: Dict[str, List[Dict]] = {}
self.dead_links: Dict[str, str] = {} # URL -> Reason/Fallback URL
self.learning_data = self._load_memory()
self.stats = {
"total_links": 0,
"dead_links_removed": 0,
"duplicates_pruned": 0,
"ai_decisions": 0
"ai_decisions": 0,
"archived_fallbacks": 0,
"orphans_fixed": 0
}
async def build_global_registry(self):
print("[*] Construyendo registro global de enlaces...")
# Incluimos archivos core + categorías
all_files = CORE_FILES + [f"docs/{cat}.md" for cat in NUBENETES_CATEGORIES]
for file_path in all_files:
def _load_memory(self) -> Dict:
if os.path.exists(MEMORY_FILE):
try:
if not os.path.exists(file_path):
# Intentar obtener del repo si no está local (aunque debería estar)
repo_file = self.git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
else:
with open(file_path, 'r') as f:
content = f.read()
lines = content.splitlines()
for i, line in enumerate(lines):
match = self.sanitizer.link_pattern.search(line)
if match:
title, url = match.groups()
clean_url = url.split('#')[0].rstrip('/')
if "github.com" in clean_url and "/blob/" in clean_url:
continue # Evitar validar enlaces internos profundos de git por ahora
score = self.sanitizer._calculate_link_score(line)
if clean_url not in self.link_registry:
self.link_registry[clean_url] = []
self.link_registry[clean_url].append({
"file": file_path,
"line_index": i,
"content": line,
"score": score,
"title": title
})
self.stats["total_links"] += 1
except Exception as e:
print(f"[!] Error procesando {file_path}: {e}")
with open(MEMORY_FILE, 'r') as f:
return json.load(f)
except: pass
return {"domains": {}, "known_soft_404_patterns": []}
async def validate_links_tiered(self):
"""Validación en dos niveles: HTTP -> Playwright"""
print(f"[*] Validando {len(self.link_registry)} URLs únicas...")
unique_urls = list(self.link_registry.keys())
# Para evitar saturar, validamos en batches
batch_size = 50
for i in range(0, len(unique_urls), batch_size):
batch = unique_urls[i:i+batch_size]
tasks = [self._check_url_sophisticated(url) for url in batch]
results = await asyncio.gather(*tasks)
for url, is_alive in results:
if not is_alive:
self.dead_links.add(url)
print(f" - Progreso: {min(i+batch_size, len(unique_urls))}/{len(unique_urls)}")
def _save_memory(self):
os.makedirs(os.path.dirname(MEMORY_FILE), exist_ok=True)
with open(MEMORY_FILE, 'w') as f:
json.dump(self.learning_data, f, indent=2)
async def _check_url_sophisticated(self, url: str) -> Tuple[str, bool]:
# TIER 1: HTTP Fast
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
async def _check_wayback(self, url: str) -> Optional[str]:
"""Busca una versión archivada en Wayback Machine."""
api_url = f"https://archive.org/wayback/available?url={url}"
try:
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=10) as client:
resp = await client.get(url)
if resp.status_code < 400:
return url, True
if resp.status_code not in [403, 429, 401]:
return url, False # 404, 500 etc son muertos
except Exception:
pass # Errores de conexión pasan a Tier 2
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(api_url)
if resp.status_code == 200:
data = resp.json()
if data.get("archived_snapshots", {}).get("closest"):
return data["archived_snapshots"]["closest"]["url"]
except: pass
return None
# TIER 2: Playwright (Solo si Tier 1 falla con sospecha de bloqueo)
async def _check_url_with_retries(self, url: str, max_retries=3) -> Tuple[str, bool, Optional[str]]:
domain = url.split("//")[-1].split("/")[0]
domain_info = self.learning_data["domains"].get(domain, {})
use_playwright_first = domain_info.get("requires_playwright", False)
for attempt in range(max_retries):
try:
wait_time = (2 ** attempt) + random.random()
if attempt > 0:
await asyncio.sleep(wait_time)
is_alive, reason = await self._check_url_logic(url, use_playwright_first)
if is_alive:
if domain not in self.learning_data["domains"]:
self.learning_data["domains"][domain] = {"success_count": 0, "fail_count": 0}
self.learning_data["domains"][domain]["success_count"] += 1
return url, True, None
if reason in ["404", "soft_404", "redirect_to_home"]:
archived = await self._check_wayback(url)
if archived:
return url, False, archived
return url, False, None
except Exception as e:
print(f" [!] Intento {attempt+1} fallido para {url}: {e}")
return url, True, None
async def _check_url_logic(self, url: str, force_playwright: bool) -> Tuple[bool, str]:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Referer": "https://www.google.com/"
}
if not force_playwright:
try:
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=12) as client:
resp = await client.get(url)
if resp.status_code in [404, 410]: return False, "404"
if resp.status_code < 300:
final_url = str(resp.url).rstrip('/')
original_base = "/".join(url.split("/")[:3])
if len(url) > len(original_base) + 10 and final_url == original_base:
pass # Sospechoso -> Playwright
else:
return True, "ok"
if resp.status_code in [403, 429, 401]:
domain = url.split("//")[-1].split("/")[0]
if domain not in self.learning_data["domains"]: self.learning_data["domains"][domain] = {}
self.learning_data["domains"][domain]["requires_playwright"] = True
except: pass
# Tier 2: Playwright
try:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# User agent real para Playwright
await page.set_extra_http_headers({"User-Agent": headers["User-Agent"]})
response = await page.goto(url, wait_until="networkidle", timeout=20000)
is_alive = response.status < 400 if response else False
await browser.close()
return url, is_alive
except Exception as e:
# Si Playwright también falla, asumimos que puede estar muerto o es inaccesible
# pero para ser conservadores, solo marcamos como muerto si es un error claro
return url, True # Conservador: Si todo falla, no lo borramos todavía
page = await browser.new_page(user_agent=headers["User-Agent"])
try:
response = await page.goto(url, wait_until="domcontentloaded", timeout=25000)
if not response: return True, "timeout"
if response.status in [404, 410]: return False, "404"
content = (await page.content()).lower()
title = (await page.title()).lower()
soft_404_keywords = ["page not found", "404 not found", "artículo no encontrado", "página no encontrada"]
if any(kw in title for kw in soft_404_keywords) or (("404" in title) and any(kw in content for kw in soft_404_keywords)):
return False, "soft_404"
final_url = page.url.rstrip('/')
original_base = "/".join(url.split("/")[:3])
if len(url) > len(original_base) + 10 and final_url == original_base:
return False, "redirect_to_home"
return True, "ok"
finally:
await browser.close()
except:
return True, "engine_error"
async def resolve_duplicates_with_ai(self):
print("[*] Resolviendo duplicados globales con Gemini...")
for url, occurrences in self.link_registry.items():
if len(occurrences) <= 1 or url in self.dead_links:
continue
# Si alguna ocurrencia está en CORE_FILES, esa manda pero no borra el resto necesariamente
# a menos que Gemini diga que es redundante.
# Filtrar ocurrencias que NO están en archivos core para ver qué podemos podar
prunable = [occ for occ in occurrences if occ["file"] not in CORE_FILES]
if len(prunable) <= 1 and len(occurrences) - len(prunable) >= 1:
# Ya está en un CORE_FILE y solo en un sitio más, lo dejamos estar
continue
async def build_global_registry(self):
print("[*] Construyendo registro global de enlaces...")
all_files = CORE_FILES + [f"docs/{cat}.md" for cat in NUBENETES_CATEGORIES]
for file_path in all_files:
try:
if os.path.exists(file_path):
with open(file_path, 'r') as f:
content = f.read()
lines = content.splitlines()
for i, line in enumerate(lines):
match = self.sanitizer.link_pattern.search(line)
if match:
title, url = match.groups()
clean_url = url.split('#')[0].rstrip('/')
if clean_url not in self.link_registry: self.link_registry[clean_url] = []
self.link_registry[clean_url].append({"file": file_path, "line_index": i, "content": line, "title": title})
self.stats["total_links"] += 1
except: pass
if len(prunable) > 1:
# Preguntar a Gemini
decision = await self._ask_gemini_dedup(url, occurrences)
self.stats["ai_decisions"] += 1
# 'decision' debería decirnos qué archivos mantener
files_to_keep = decision.get("keep_in_files", [])
for occ in prunable:
if occ["file"] not in files_to_keep:
occ["should_prune"] = True
self.stats["duplicates_pruned"] += 1
async def _ask_gemini_dedup(self, url: str, occurrences: List[Dict]) -> Dict:
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
contexts = "\n".join([f"- Archivo: {occ['file']}, Contexto: {occ['content']}" for occ in occurrences])
prompt = (
f"El enlace {url} aparece en múltiples archivos de mi repositorio de Kubernetes.\n"
f"Ocurrencias:\n{contexts}\n\n"
"Analiza si el enlace es fundamental en todos esos contextos o si es redundante y debería estar solo en el más relevante.\n"
"Responde en JSON: {\"keep_in_files\": [\"lista de archivos donde mantenerlo\"], \"reason\": \"...\"}"
)
async def validate_links_tiered(self):
print(f"[*] Validando {len(self.link_registry)} URLs con aprendizaje activo...")
unique_urls = list(self.link_registry.keys())
random.shuffle(unique_urls)
try:
async with httpx.AsyncClient() as client:
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20)
if resp.status_code == 200:
text = resp.json()['candidates'][0]['content']['parts'][0]['text']
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
return json.loads(match.group(0))
except: pass
# Fallback: Mantener solo el que tiene mayor score
best_file = max(occurrences, key=lambda x: x["score"])["file"]
return {"keep_in_files": [best_file]}
batch_size = 20
for i in range(0, len(unique_urls), batch_size):
batch = unique_urls[i:i+batch_size]
tasks = [self._check_url_with_retries(url) for url in batch]
results = await asyncio.gather(*tasks)
for url, is_alive, fallback in results:
if not is_alive:
self.dead_links[url] = fallback if fallback else "DEAD"
print(f" - Progreso: {min(i+batch_size, len(unique_urls))}/{len(unique_urls)}")
self._save_memory()
async def apply_changes(self):
print("[*] Aplicando limpieza a los archivos...")
print("[*] Aplicando cambios y sustituciones...")
file_updates = {}
# Agrupar podas por archivo
prunes_by_file = {}
for url, occurrences in self.link_registry.items():
is_dead = url in self.dead_links
for url, fallback in self.dead_links.items():
occurrences = self.link_registry.get(url, [])
for occ in occurrences:
if is_dead or occ.get("should_prune"):
if occ["file"] not in prunes_by_file:
prunes_by_file[occ["file"]] = []
# Guardamos si es por muerto para la lógica de excepciones
prunes_by_file[occ["file"]].append({
"idx": occ["line_index"],
"is_dead": is_dead,
"url": url
})
for file_path, tasks in prunes_by_file.items():
try:
with open(file_path, 'r') as f:
lines = f.readlines()
file_path = occ["file"]
if file_path not in file_updates:
with open(file_path, 'r') as f:
file_updates[file_path] = f.readlines()
original_count = len(lines)
# Borrar de atrás hacia adelante para no arruinar índices
for task in sorted(tasks, key=lambda x: x["idx"], reverse=True):
idx = task["idx"]
is_dead = task["is_dead"]
# Regla: Solo borramos de CORE_FILES si el link está MUERTO.
# Los duplicados se permiten en CORE_FILES.
if file_path not in CORE_FILES or is_dead:
if idx < len(lines):
lines.pop(idx)
if is_dead:
self.stats["dead_links_removed"] += 1
else:
self.stats["duplicates_pruned"] += 1
line_idx = occ["line_index"]
if fallback != "DEAD":
old_line = file_updates[file_path][line_idx]
new_line = old_line.replace(url, fallback)
if "[ARCHIVED]" not in new_line:
new_line = new_line.replace("](", " [ARCHIVED]](")
file_updates[file_path][line_idx] = new_line
self.stats["archived_fallbacks"] += 1
else:
if file_path not in CORE_FILES:
file_updates[file_path][line_idx] = None
self.stats["dead_links_removed"] += 1
if len(lines) < original_count:
file_updates[file_path] = "".join(lines)
print(f" - {file_path}: {original_count - len(lines)} líneas eliminadas.")
except Exception as e:
print(f"[!] Error al procesar limpieza en {file_path}: {e}")
final_payload = {}
for path, lines in file_updates.items():
new_content = "".join([l for l in lines if l is not None])
final_payload[path] = new_content
if file_updates:
print(f"[+] Generando PR con {len(file_updates)} archivos modificados.")
metrics = {
"total_cleaned": self.stats["dead_links_removed"] + self.stats["duplicates_pruned"],
"dead_removed": self.stats["dead_links_removed"],
"duplicates_pruned": self.stats["duplicates_pruned"],
"ai_decisions": self.stats["ai_decisions"],
"files_impacted": list(file_updates.keys())
}
self._create_pr(file_updates, metrics)
else:
print("[~] No se encontraron mejoras necesarias (todo limpio).")
# Añadir cambios de navegación/huérfanos si existen
if self.curator.stats["orphans_linked"] > 0:
with open(self.curator.index_path, 'r') as f:
final_payload[self.curator.index_path] = f.read()
with open(self.curator.mkdocs_path, 'r') as f:
final_payload[self.curator.mkdocs_path] = f.read()
def _create_pr(self, updates: Dict[str, str], metrics: Dict):
# Usamos el git_controller para aplicar cambios
# (Modificado para este script específico)
if final_payload:
self._create_pr(final_payload)
def _create_pr(self, updates: Dict[str, str]):
timestamp = datetime.now().strftime("%Y%m%d-%H%M")
branch_name = f"bot/intelligent-clean-{timestamp}"
branch_name = f"bot/autonomous-update-{timestamp}"
self.git_controller._create_feature_branch(branch_name)
for path, content in updates.items():
file_meta = self.git_controller.repository.get_contents(path)
self.git_controller.repository.update_file(
path=path,
message=f"fix(clean): limpieza inteligente de enlaces en {path}",
content=content,
sha=file_meta.sha,
branch=branch_name
)
try:
file_meta = self.git_controller.repository.get_contents(path)
self.git_controller.repository.update_file(
path=path,
message=f"fix(autonomous): engine update in {path}",
content=content,
sha=file_meta.sha,
branch=branch_name
)
except: pass
body = (
f"## 🤖 Limpieza Inteligente de Enlaces (May 2026)\n\n"
f"He completado un ciclo de revisión global utilizando **Playwright** para evasión de bloqueos y **Gemini** para deduplicación inteligente.\n\n"
f"### 📊 Resumen de Ejecución:\n"
f"- 💀 Enlaces muertos eliminados: `{metrics['dead_removed']}`\n"
f"- Duplicados globales podados: `{metrics['duplicates_pruned']}`\n"
f"- 🧠 Decisiones asistidas por IA: `{metrics['ai_decisions']}`\n\n"
f"### 📂 Archivos Optimizados:\n" +
"\n".join([f"- `{f}`" for f in metrics['files_impacted']])
f"## 🧠 Nubenetes Autonomous Health & Curation Engine\n\n"
f"Ciclo completado con aprendizaje persistente y auditoría de navegación.\n\n"
f"### 📊 Métricas del Ciclo:\n"
f"- 💀 Enlaces eliminados: `{self.stats['dead_links_removed']}`\n"
f"- 🏛 Enlaces recuperados vía Wayback Machine: `{self.stats['archived_fallbacks']}`\n"
f"- 🖇️ Páginas huérfanas vinculadas: `{self.stats['orphans_fixed']}`\n"
f"- 📈 Dominios aprendidos: `{len(self.learning_data['domains'])}`"
)
self.git_controller.repository.create_pull(
title=f"🧹 Intelligent Link Clean & Dedup: {datetime.now().strftime('%d %b %Y')}",
title=f"🧹 Autonomous Engine Update: {datetime.now().strftime('%d %b %Y')}",
body=body,
head=branch_name,
base="master"
@@ -258,10 +252,23 @@ class IntelligentLinkCleaner:
async def main():
cleaner = IntelligentLinkCleaner()
# 1. Auditoría de Enlaces
await cleaner.build_global_registry()
await cleaner.validate_links_tiered()
await cleaner.resolve_duplicates_with_ai()
await cleaner.apply_changes()
# 2. Auditoría de Navegación y Huérfanos
await cleaner.curator.audit_navigation()
await cleaner.curator.suggest_reorganization()
# Actualizar stats
cleaner.stats["orphans_fixed"] = cleaner.curator.stats["orphans_linked"]
# 3. Aplicar todos los cambios
if cleaner.curator.validate_changes():
await cleaner.apply_changes()
else:
print("[!] Validación fallida. No se aplicarán cambios estructurales.")
if __name__ == "__main__":
asyncio.run(main())

View File