feat(health): detailed per-link action log in PR report

This commit is contained in:
Nubenetes Bot
2026-05-09 15:33:04 +02:00
parent 991ecd6fc2
commit e68874f066

View File

@@ -21,12 +21,13 @@ class IntelligentLinkCleaner:
self.sanitizer = MarkdownSanitizer()
self.curator = AgenticCurator()
self.link_registry: Dict[str, List[Dict]] = {}
self.dead_links: Dict[str, str] = {} # URL -> Reason/Fallback URL
self.dead_links: Dict[str, Tuple[str, str]] = {} # URL -> (Fallback/DEAD, Reason)
self.learning_data = self._load_memory()
self.action_log: List[Dict] = [] # List of {file, url, action, reason}
self.detailed_stats = {
"total_scanned": 0,
"by_file": {}, # file_path -> {"removed": 0, "modified": 0, "created": 0}
"by_category": {}, # category -> {"removed": 0, "modified": 0, "created": 0}
"by_file": {},
"by_category": {},
"operation_types": {"removals": 0, "archived": 0, "consolidated": 0, "orphans": 0}
}
self.stats = {
@@ -52,7 +53,6 @@ class IntelligentLinkCleaner:
json.dump(self.learning_data, f, indent=2)
async def _check_wayback(self, url: str) -> Optional[str]:
"""Busca una versión archivada en Wayback Machine."""
api_url = f"https://archive.org/wayback/available?url={url}"
try:
async with httpx.AsyncClient(timeout=10) as client:
@@ -64,53 +64,47 @@ class IntelligentLinkCleaner:
except: pass
return None
async def _check_url_with_retries(self, url: str, max_retries=3) -> Tuple[str, bool, Optional[str]]:
async def _check_url_with_retries(self, url: str, max_retries=3) -> Tuple[str, bool, Optional[str], str]:
domain = url.split("//")[-1].split("/")[0]
domain_info = self.learning_data["domains"].get(domain, {})
use_playwright_first = domain_info.get("requires_playwright", False)
last_reason = "Unknown"
for attempt in range(max_retries):
try:
wait_time = (2 ** attempt) + random.random()
if attempt > 0:
await asyncio.sleep(wait_time)
await asyncio.sleep((2 ** attempt) + random.random())
is_alive, reason = await self._check_url_logic(url, use_playwright_first)
last_reason = reason
if is_alive:
if domain not in self.learning_data["domains"]:
self.learning_data["domains"][domain] = {"success_count": 0, "fail_count": 0}
self.learning_data["domains"][domain]["success_count"] += 1
return url, True, None
return url, True, None, "Alive"
if reason in ["404", "soft_404", "redirect_to_home"]:
# REPO CONSOLIDATION
if any(git_host in url for git_host in ["github.com", "gitlab.com", "bitbucket.org"]):
parts = url.split("/")
if len(parts) > 4:
repo_root = "/".join(parts[:5])
root_alive, _ = await self._check_url_logic(repo_root, False)
if root_alive:
return url, False, f"REPO_ROOT:{repo_root}"
return url, False, f"REPO_ROOT:{repo_root}", f"Consolidated (Original: {reason})"
# ARCHIVE FALLBACK
archived = await self._check_wayback(url)
if archived:
return url, False, f"ARCHIVE:{archived}"
return url, False, None
return url, False, f"ARCHIVE:{archived}", f"Archived (Original: {reason})"
return url, False, None, reason
except Exception as e:
print(f" [!] Intento {attempt+1} fallido para {url}: {e}")
last_reason = str(e)
return url, True, None
return url, True, None, "Conservative Keep (Error)"
async def _check_url_logic(self, url: str, force_playwright: bool) -> Tuple[bool, str]:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Referer": "https://www.google.com/"
}
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Referer": "https://www.google.com/"}
if not force_playwright:
try:
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=12) as client:
@@ -119,17 +113,13 @@ class IntelligentLinkCleaner:
if resp.status_code < 300:
final_url = str(resp.url).rstrip('/')
original_base = "/".join(url.split("/")[:3])
if len(url) > len(original_base) + 10 and final_url == original_base:
pass # Sospechoso -> Playwright
else:
return True, "ok"
if len(url) > len(original_base) + 10 and final_url == original_base: pass
else: return True, "ok"
if resp.status_code in [403, 429, 401]:
domain = url.split("//")[-1].split("/")[0]
if domain not in self.learning_data["domains"]: self.learning_data["domains"][domain] = {}
self.learning_data["domains"][domain]["requires_playwright"] = True
except: pass
# Tier 2: Playwright
try:
from playwright.async_api import async_playwright
async with async_playwright() as p:
@@ -139,23 +129,16 @@ class IntelligentLinkCleaner:
response = await page.goto(url, wait_until="domcontentloaded", timeout=25000)
if not response: return True, "timeout"
if response.status in [404, 410]: return False, "404"
content = (await page.content()).lower()
title = (await page.title()).lower()
soft_404_keywords = ["page not found", "404 not found", "artículo no encontrado", "página no encontrada"]
if any(kw in title for kw in soft_404_keywords) or (("404" in title) and any(kw in content for kw in soft_404_keywords)):
return False, "soft_404"
if any(kw in title for kw in soft_404_keywords) or (("404" in title) and any(kw in content for kw in soft_404_keywords)): return False, "soft_404"
final_url = page.url.rstrip('/')
original_base = "/".join(url.split("/")[:3])
if len(url) > len(original_base) + 10 and final_url == original_base:
return False, "redirect_to_home"
if len(url) > len(original_base) + 10 and final_url == original_base: return False, "redirect_to_home"
return True, "ok"
finally:
await browser.close()
except:
return True, "engine_error"
finally: await browser.close()
except: return True, "engine_error"
async def build_global_registry(self):
print("[*] Construyendo registro global de enlaces...")
@@ -163,8 +146,7 @@ class IntelligentLinkCleaner:
for file_path in all_files:
try:
if os.path.exists(file_path):
with open(file_path, 'r') as f:
content = f.read()
with open(file_path, 'r') as f: content = f.read()
lines = content.splitlines()
for i, line in enumerate(lines):
match = self.sanitizer.link_pattern.search(line)
@@ -177,129 +159,97 @@ class IntelligentLinkCleaner:
except: pass
async def validate_links_tiered(self):
print(f"[*] Validando {len(self.link_registry)} URLs con aprendizaje activo...")
print(f"[*] Validando {len(self.link_registry)} URLs...")
unique_urls = list(self.link_registry.keys())
random.shuffle(unique_urls)
batch_size = 20
for i in range(0, len(unique_urls), batch_size):
batch = unique_urls[i:i+batch_size]
tasks = [self._check_url_with_retries(url) for url in batch]
results = await asyncio.gather(*tasks)
for url, is_alive, fallback in results:
if not is_alive:
self.dead_links[url] = fallback if fallback else "DEAD"
print(f" - Progreso: {min(i+batch_size, len(unique_urls))}/{len(unique_urls)}")
for url, is_alive, fallback, reason in results:
if not is_alive: self.dead_links[url] = (fallback if fallback else "DEAD", reason)
self._save_memory()
async def apply_changes(self):
print("[*] Aplicando cambios y generando métricas detalladas...")
print("[*] Aplicando cambios y registrando detalles...")
file_updates = {}
def track(file, op, cat=None):
if file not in self.detailed_stats["by_file"]:
self.detailed_stats["by_file"][file] = {"removed": 0, "modified": 0, "created": 0}
def track(file, op, url, reason, cat=None):
if file not in self.detailed_stats["by_file"]: self.detailed_stats["by_file"][file] = {"removed": 0, "modified": 0, "created": 0}
self.detailed_stats["by_file"][file][op] += 1
if not cat:
cat = file.replace("docs/", "").replace(".md", "")
if cat not in self.detailed_stats["by_category"]:
self.detailed_stats["by_category"][cat] = {"removed": 0, "modified": 0, "created": 0}
if not cat: cat = file.replace("docs/", "").replace(".md", "")
if cat not in self.detailed_stats["by_category"]: self.detailed_stats["by_category"][cat] = {"removed": 0, "modified": 0, "created": 0}
self.detailed_stats["by_category"][cat][op] += 1
self.action_log.append({"file": file, "url": url, "action": op, "reason": reason})
for url, fallback in self.dead_links.items():
for url, (fallback, reason) in self.dead_links.items():
occurrences = self.link_registry.get(url, [])
for occ in occurrences:
file_path = occ["file"]
if file_path not in file_updates:
with open(file_path, 'r') as f:
file_updates[file_path] = f.readlines()
with open(file_path, 'r') as f: file_updates[file_path] = f.readlines()
line_idx = occ["line_index"]
if fallback and fallback.startswith("ARCHIVE:"):
real_fallback = fallback.replace("ARCHIVE:", "")
file_updates[file_path][line_idx] = file_updates[file_path][line_idx].replace(url, real_fallback)
if "[ARCHIVED]" not in file_updates[file_path][line_idx]:
file_updates[file_path][line_idx] = file_updates[file_path][line_idx].replace("](", " [ARCHIVED]]( ")
track(file_path, "modified")
if "[ARCHIVED]" not in file_updates[file_path][line_idx]: file_updates[file_path][line_idx] = file_updates[file_path][line_idx].replace("](", " [ARCHIVED]]( ")
track(file_path, "modified", url, reason)
self.detailed_stats["operation_types"]["archived"] += 1
elif fallback and fallback.startswith("REPO_ROOT:"):
real_fallback = fallback.replace("REPO_ROOT:", "")
file_updates[file_path][line_idx] = file_updates[file_path][line_idx].replace(url, real_fallback)
track(file_path, "modified")
track(file_path, "modified", url, reason)
self.detailed_stats["operation_types"]["consolidated"] += 1
else:
if file_path not in CORE_FILES:
file_updates[file_path][line_idx] = None
track(file_path, "removed")
track(file_path, "removed", url, reason)
self.detailed_stats["operation_types"]["removals"] += 1
if self.curator.stats["orphans_linked"] > 0:
track("docs/index.md", "created", "Navigation")
track("mkdocs.yml", "created", "Configuration")
for orphan in self.curator.stats.get("orphan_details", []):
track("Navigation", "created", orphan["file"], "Orphan Linked")
self.detailed_stats["operation_types"]["orphans"] = self.curator.stats["orphans_linked"]
final_payload = {}
for path, lines in file_updates.items():
new_content = "".join([l for l in lines if l is not None])
final_payload[path] = new_content
final_payload[path] = "".join([l for l in lines if l is not None])
if self.curator.stats["orphans_linked"] > 0:
with open(self.curator.index_path, 'r') as f:
final_payload[self.curator.index_path] = f.read()
with open(self.curator.mkdocs_path, 'r') as f:
final_payload[self.curator.mkdocs_path] = f.read()
if final_payload:
self._create_pr(final_payload)
with open(self.curator.index_path, 'r') as f: final_payload[self.curator.index_path] = f.read()
with open(self.curator.mkdocs_path, 'r') as f: final_payload[self.curator.mkdocs_path] = f.read()
if final_payload: self._create_pr(final_payload)
def _create_pr(self, updates: Dict[str, str]):
timestamp = datetime.now().strftime("%Y%m%d-%H%M")
branch_name = f"bot/autonomous-health-{timestamp}"
self.git_controller._create_feature_branch(branch_name)
for path, content in updates.items():
try:
file_meta = self.git_controller.repository.get_contents(path)
self.git_controller.repository.update_file(
path=path,
message=f"fix(autonomous): engine update in {path}",
content=content,
sha=file_meta.sha,
branch=branch_name
)
self.git_controller.repository.update_file(path=path, message=f"fix(autonomous): engine update in {path}", content=content, sha=file_meta.sha, branch=branch_name)
except: pass
report = "## 🧠 Nubenetes Autonomous Health & Curation Engine\n\n"
report += "### 📊 Resumen Ejecutivo\n"
report += f"| Operación | Cantidad | Descripción |\n"
report += f"| :--- | :--- | :--- |\n"
report += f"| 💀 Eliminados | **{self.detailed_stats['operation_types']['removals']}** | Enlaces 404/Muertos sin recuperación |\n"
report += f"| 🏛️ Archivados | **{self.detailed_stats['operation_types']['archived']}** | Recuperados vía Wayback Machine |\n"
report += f"| 🎯 Consolidados | **{self.detailed_stats['operation_types']['consolidated']}** | Enlaces profundos Git redirigidos a la raíz |\n"
report += f"| 🖇️ Nuevos / Huérfanos | **{self.detailed_stats['operation_types']['orphans']}** | Páginas huérfanas vinculadas a la navegación |\n\n"
report += "| Operación | Cantidad | Descripción |\n| :--- | :--- | :--- |\n"
report += f"| 💀 Eliminados | **{self.detailed_stats['operation_types']['removals']}** | Enlaces 404/Muertos |\n"
report += f"| 🏛️ Archivados | **{self.detailed_stats['operation_types']['archived']}** | Vía Wayback Machine |\n"
report += f"| 🎯 Consolidados | **{self.detailed_stats['operation_types']['consolidated']}** | Raíz de Repositorio Git |\n"
report += f"| 🖇️ Nuevos | **{self.detailed_stats['operation_types']['orphans']}** | Páginas huérfanas vinculadas |\n\n"
report += "### 📂 Desglose por Documento\n"
report += "| Archivo | ❌ Elim | 🔄 Mod | ✨ Crea |\n"
report += "| :--- | :---: | :---: | :---: |\n"
for file, s in sorted(self.detailed_stats["by_file"].items()):
report += f"| `{file}` | {s['removed']} | {s['modified']} | {s['created']} |\n"
report += "\n### 🏷️ Desglose por Categoría\n"
report += "| Categoría | ❌ Elim | 🔄 Mod | ✨ Crea |\n"
report += "| :--- | :---: | :---: | :---: |\n"
for cat, s in sorted(self.detailed_stats["by_category"].items()):
report += f"| **{cat}** | {s['removed']} | {s['modified']} | {s['created']} |\n"
report += "### 📝 Registro Detallado de Acciones\n"
report += "| Archivo | Acción | URL / Recurso | Motivo / Detalle |\n"
report += "| :--- | :---: | :--- | :--- |\n"
for log in sorted(self.action_log, key=lambda x: x["file"]):
action_emoji = {"removed": "", "modified": "🔄", "created": ""}.get(log["action"], "")
report += f"| `{log['file']}` | {action_emoji} | {log['url']} | {log['reason']} |\n"
report += f"\n\n---\n*📈 Dominios aprendidos en este ciclo: `{len(self.learning_data['domains'])}`*"
report += "\n### 📂 Estadísticas por Documento\n"
report += "| Archivo | ❌ Elim | 🔄 Mod | ✨ Crea |\n| :--- | :---: | :---: | :---: |\n"
for file, s in sorted(self.detailed_stats["by_file"].items()): report += f"| `{file}` | {s['removed']} | {s['modified']} | {s['created']} |\n"
self.git_controller.repository.create_pull(
title=f"🧹 Autonomous Engine Health Report: {datetime.now().strftime('%d %b %Y')}",
body=report,
head=branch_name,
base="master"
)
report += f"\n\n---\n*📈 Dominios aprendidos: `{len(self.learning_data['domains'])}`*"
self.git_controller.repository.create_pull(title=f"🧹 Autonomous Engine Health Report: {datetime.now().strftime('%d %b %Y')}", body=report, head=branch_name, base="master")
async def main():
cleaner = IntelligentLinkCleaner()
@@ -308,10 +258,7 @@ async def main():
await cleaner.curator.audit_navigation()
await cleaner.curator.suggest_reorganization()
cleaner.stats["orphans_fixed"] = cleaner.curator.stats["orphans_linked"]
if cleaner.curator.validate_changes():
await cleaner.apply_changes()
else:
await cleaner.apply_changes()
await cleaner.apply_changes()
if __name__ == "__main__":
asyncio.run(main())