refactor: implementar saneamiento inteligente de enlaces y auditoría robusta

This commit is contained in:
Inaki Fernandez
2026-04-25 12:03:42 +02:00
parent 674aac7063
commit e10ef16e25
3 changed files with 119 additions and 93 deletions

View File

@@ -5,18 +5,19 @@ from src.config import NUBENETES_CATEGORIES
class LinkEvaluationResult(BaseModel):
is_exceptional_value: bool = Field(description="¿Es un recurso avanzado o disruptivo?")
category_assignment: Optional[str] = Field(description="Categoría asignada.", enum=NUBENETES_CATEGORIES)
canonical_title: str = Field(description="Título formal.")
technical_description: str = Field(description="Descripción técnica corta.")
category_assignments: List[str] = Field(description="Lista de categorías/archivos donde encaja este recurso.", min_items=1)
canonical_title: str = Field(description="Título formal y directo.")
technical_description: str = Field(description="Descripción técnica de máx 150 caracteres.")
evaluation_rationale: str = Field(description="Razonamiento de la decisión.")
curation_agent = Agent(
'google-gla:gemini-2.0-flash-exp',
result_type=LinkEvaluationResult,
system_prompt=(
"Actúas como el Ingeniero Curador Principal para 'nubenetes/awesome-kubernetes'. "
"Descarta tutoriales genéricos. Privilegia Agentes de IA, Model Context Protocol (MCP), automatización avanzada, GitOps y operadores K8s. "
"Usa una categoría existente. Redacta descripciones asépticas y técnicas."
"Actúas como el Ingeniero Curador Principal de 'nubenetes/awesome-kubernetes'. "
"Tu misión es filtrar recursos de altísima calidad sobre K8s, Agentes de IA, MCP y Cloud Native. "
"Puedes asignar un recurso a MÁS DE UNA categoría si es estrictamente necesario, pero intenta ser preciso. "
"Categorías válidas: " + ", ".join(NUBENETES_CATEGORIES)
)
)
@@ -26,14 +27,16 @@ async def evaluate_extracted_assets(raw_assets: list[dict]) -> list[dict]:
cognitive_prompt = f"Evalúa este candidato:\nURL: {asset['url']}\nContexto: {asset['context']}"
try:
response = await curation_agent.run(cognitive_prompt)
evaluation = response.data
if evaluation.is_exceptional_value and evaluation.category_assignment:
curated_assets.append({
"url": asset["url"],
"title": evaluation.canonical_title,
"description": evaluation.technical_description,
"category": evaluation.category_assignment
})
ev = response.data
if ev.is_exceptional_value:
for cat in ev.category_assignments:
if cat in NUBENETES_CATEGORIES:
curated_assets.append({
"url": asset["url"],
"title": ev.canonical_title,
"description": ev.technical_description,
"category": cat
})
except Exception as e:
print(f"Error evaluando {asset['url']}: {str(e)}")
return curated_assets

View File

@@ -17,56 +17,60 @@ async def master_orchestrator():
except:
time_horizon = datetime(2024, 10, 5, 18, 36, tzinfo=MADRID_TZ)
print(f">>> Iniciando curaduría desde: {time_horizon}")
print(f"[*] Iniciando ciclo agéntico. Horizonte: {time_horizon}")
# 1. Obtención de datos
# 1. Ingesta
twitter_client = SocialDataExtractor()
raw_social_links = await twitter_client.fetch_links_since(time_horizon)
autonomous_links = await discover_trending_assets()
raw_social = await twitter_client.fetch_links_since(time_horizon)
trending = await discover_trending_assets()
# 2. Evaluación con IA
curated_social_links = await evaluate_extracted_assets(raw_social_links)
total_new_assets = curated_social_links + autonomous_links
# 2. IA - Evaluación y Clasificación
curated = await evaluate_extracted_assets(raw_social)
all_new_assets = curated + trending
# 3. Preparar cambios
# 3. Saneamiento y Aplicación Global
markdown_sanitizer = MarkdownSanitizer()
file_updates = {}
# Identificar qué archivos necesitan ser procesados
categories_to_update = set([a["category"] for asset in total_new_assets])
# 4. Procesar inyecciones y saneamiento
global_stats = {"fixed": 0, "removed": 0, "duplicates": 0, "new": 0}
for category in NUBENETES_CATEGORIES:
file_path = f"docs/{category}.md"
try:
repo_file = git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
# Saneamiento (siempre lo hacemos para mantener la salud)
new_content = await markdown_sanitizer.sanitize_document(content)
# Saneamiento inteligente (Redirecciones + Borrado de muertos)
purified, stats = await markdown_sanitizer.sanitize_document(content)
# Inyección si hay activos para esta categoría
for asset in total_new_assets:
# Inyección de novedades
final_content = purified
for asset in all_new_assets:
if asset["category"] == category:
new_content = markdown_sanitizer.inject_curated_link(
new_content, category, asset["title"], asset["url"], asset["description"]
prev_content = final_content
final_content = markdown_sanitizer.inject_curated_link(
final_content, category, asset["title"], asset["url"], asset["description"]
)
if new_content.strip() != content.strip():
file_updates[file_path] = new_content
if final_content != prev_content:
global_stats["new"] += 1
# Consolidar estadísticas
for k in ["fixed", "removed", "duplicates"]:
global_stats[k] += stats[k]
if final_content.strip() != content.strip():
file_updates[file_path] = final_content
except:
continue
# 5. Aplicar cambios vía GitOps
# 4. GitOps - Entrega de Valor
if file_updates:
metrics = {
"social_injections": len(curated_social_links),
"autonomous_injections": len(autonomous_links)
}
git_controller.apply_multi_file_changes(file_updates, metrics)
print(f">>> Éxito: PR abierta con cambios en {len(file_updates)} archivos.")
git_controller.apply_multi_file_changes(file_updates, global_stats)
print(f"[+] Proceso completado. PR generada con {len(file_updates)} archivos mejorados.")
print(f" - Enlaces actualizados (Redir): {global_stats['fixed']}")
print(f" - Enlaces eliminados (Muertos): {global_stats['removed']}")
print(f" - Enlaces nuevos: {global_stats['new']}")
else:
print(">>> Sin cambios necesarios.")
print("[~] No hay cambios necesarios en este ciclo.")
if __name__ == "__main__":
asyncio.run(master_orchestrator())

View File

@@ -1,70 +1,89 @@
import re
import aiohttp
import asyncio
from typing import Tuple, Optional
class MarkdownSanitizer:
def __init__(self):
# Captura [texto](url)
self.link_pattern = re.compile(r'\[([^\]]+)\]\((https?://[^\)]+)\)')
async def _verify_link_health(self, session: aiohttp.ClientSession, url: str) -> bool:
async def _check_url_robust(self, session: aiohttp.ClientSession, url: str, retries: int = 3) -> Tuple[bool, Optional[str]]:
"""
Retorna (is_alive, final_url).
Si final_url es distinto a url, significa que hubo una redirección permanente.
"""
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
try:
async with session.head(url, timeout=15, allow_redirects=True, headers=headers) as response:
if response.status < 400:
return True
if response.status in [404, 405]:
async with session.get(url, timeout=15, headers=headers) as get_resp:
return get_resp.status < 400
except:
return False
return False
for attempt in range(retries):
try:
async with session.get(url, timeout=20, allow_redirects=True, headers=headers) as response:
if response.status < 400:
final_url = str(response.url).rstrip('/')
original_url = url.split('#')[0].rstrip('/')
if final_url != original_url and response.status in [301, 308]:
return True, str(response.url) # Actualización recomendada
return True, None
if response.status >= 500: # Error de servidor, reintentar
await asyncio.sleep(2 ** attempt)
continue
return False, None
except:
if attempt < retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return False, None
async def sanitize_document(self, markdown_content: str) -> str:
all_links = self.link_pattern.findall(markdown_content)
unique_url_registry = set()
duplicates_flagged = set()
unique_link_pairs = []
for text, url in all_links:
clean_url = url.split('#')[0].rstrip('/')
if clean_url in unique_url_registry:
duplicates_flagged.add((text, url))
else:
unique_url_registry.add(clean_url)
unique_link_pairs.append((text, url))
async def sanitize_document(self, markdown_content: str) -> Tuple[str, dict]:
lines = markdown_content.splitlines()
new_lines = []
stats = {"fixed": 0, "removed": 0, "duplicates": 0}
seen_in_file = set()
healthy_urls = set()
connector = aiohttp.TCPConnector(limit=50)
connector = aiohttp.TCPConnector(limit=30)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self._verify_link_health(session, url) for _, url in unique_link_pairs]
health_results = await asyncio.gather(*tasks)
for (text, url), is_healthy in zip(unique_link_pairs, health_results):
if is_healthy:
healthy_urls.add(url.split('#')[0].rstrip('/'))
for line in lines:
match = self.link_pattern.search(line)
if not match:
new_lines.append(line)
continue
reconstructed_lines = []
for line in markdown_content.splitlines():
links_in_line = self.link_pattern.findall(line)
should_retain_line = True
for txt, uri in links_in_line:
clean_uri = uri.split('#')[0].rstrip('/')
if (txt, uri) in duplicates_flagged or clean_uri not in healthy_urls:
should_retain_line = False
if (txt, uri) in duplicates_flagged:
duplicates_flagged.remove((txt, uri))
break
if should_retain_line:
reconstructed_lines.append(line)
return "\n".join(reconstructed_lines)
text, url = match.groups()
clean_url = url.split('#')[0].rstrip('/')
# 1. Check Duplicados dentro del mismo archivo
if clean_url in seen_in_file:
stats["duplicates"] += 1
continue # Eliminar duplicado literal
# 2. Check Salud e Inteligencia de redirección
is_alive, new_url = await self._check_url_robust(session, url)
if is_alive:
seen_in_file.add(clean_url)
if new_url: # El enlace se ha movido permanentemente
line = line.replace(url, new_url)
stats["fixed"] += 1
new_lines.append(line)
else:
stats["removed"] += 1
# No añadimos la línea, por lo que se elimina
return "\n".join(new_lines), stats
def inject_curated_link(self, markdown_text: str, category: str, title: str, url: str, description: str) -> str:
# Evitar duplicados antes de inyectar
if url.split('#')[0].rstrip('/') in markdown_text:
return markdown_text
new_entry = f" - [{title}]({url}) - {description}"
lines = markdown_text.splitlines()
for index, line in enumerate(lines):
# Buscar el mejor sitio (debajo del encabezado de la categoría o al final)
for i, line in enumerate(lines):
if category.lower() in line.lower() and (line.startswith("#") or line.startswith("-")):
lines.insert(index + 1, new_entry)
lines.insert(i + 1, new_entry)
return "\n".join(lines)
lines.append(f"\n### {category}")
lines.append(f"\n## {category}\n")
lines.append(new_entry)
return "\n".join(lines)