feat: restore curation pipeline with global deduplication and social scoring

This commit is contained in:
Nubenetes Bot
2026-05-10 20:32:53 +02:00
parent 866ed4b7f8
commit 302dae6760
2 changed files with 171 additions and 71 deletions

View File

@@ -3,10 +3,97 @@ import re
import json
import asyncio
import httpx
from typing import List, Dict, Set
from bs4 import BeautifulSoup
from typing import List, Dict, Set, Optional
from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
async def _deep_fetch_content(url: str) -> str:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'}
try:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10, headers=headers)
if resp.status_code == 200:
html = resp.text
soup = BeautifulSoup(html, 'html.parser')
for s in soup(['script', 'style', 'nav', 'footer']):
s.decompose()
return soup.get_text(separator=' ', strip=True)[:3000]
except: return ""
return ""
async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
curated_assets = []
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
# Cargar reputación de dominios para filtrado rápido
memory_file = "src/memory/health_learning.json"
domain_blacklist = set()
if os.path.exists(memory_file):
try:
with open(memory_file, 'r') as f:
memory_data = json.load(f)
domain_blacklist = set(memory_data.get("blacklisted_domains", []))
except: pass
for asset in raw_assets:
domain = asset['url'].split("//")[-1].split("/")[0]
if domain in domain_blacklist:
print(f"[~] Saltando {domain} (Baja reputación en memoria)")
continue
web_content = await _deep_fetch_content(asset['url'])
prompt = (
"Actúas como Ingeniero Curador Senior de 'nubenetes/awesome-kubernetes'. "
f"Filtra este recurso para estas categorías: {', '.join(NUBENETES_CATEGORIES)}. "
"Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'.\n"
f"URL: {asset['url']}\nContexto: {asset['context']}\nWeb: {web_content}\n\n"
"Evalúa el IMPACTO SOCIAL y PROFUNDIDAD (1-100):\n"
"- >80: Recurso excepcional, disruptivo (añadir estrella 🌟).\n"
"- <20: Contenido pobre, clickbait o marketing puro (descartar).\n\n"
"Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}"
)
try:
async with httpx.AsyncClient() as client:
response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30)
if response.status_code == 200:
text_resp = response.json()['candidates'][0]['content']['parts'][0]['text']
match = re.search(r'\{.*\}', text_resp, re.DOTALL)
if match:
data = json.loads(match.group(0))
score = data.get("impact_score", 50)
if score < 20:
# Añadir a blacklist si es muy malo
domain_blacklist.add(domain)
continue
if data.get("is_exceptional") or score > 40:
title = data["title"]
if score > 80: title += " 🌟"
for cat in data.get("categories", []):
if cat in NUBENETES_CATEGORIES:
curated_assets.append({
"url": asset["url"], "title": title,
"description": data["desc"], "category": cat
})
except Exception as e:
print(f"[!] Error evaluando {asset['url']}: {e}")
# Guardar blacklist actualizada
if domain_blacklist:
try:
with open(memory_file, 'r+') as f:
mem = json.load(f)
mem["blacklisted_domains"] = list(domain_blacklist)
f.seek(0); json.dump(mem, f, indent=2); f.truncate()
except: pass
return curated_assets
class AgenticCurator:
def __init__(self):
self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
@@ -26,7 +113,6 @@ class AgenticCurator:
def _get_nav_files(self) -> Set[str]:
with open(self.mkdocs_path, 'r') as f:
content = f.read()
# Captura archivos .md precedidos por ":" o espacio, terminando con salto de línea o espacio
return set(re.findall(r'[:\s]([a-zA-Z0-9_\-\./]+\.md)', content))
def _get_index_links(self) -> Set[str]:
@@ -50,9 +136,7 @@ class AgenticCurator:
print("[+] No se detectaron archivos huérfanos.")
async def _resolve_orphans(self, orphans: List[str]):
"""Usa Gemini para decidir dónde colocar los huérfanos."""
for orphan in orphans:
print(f"[*] Buscando hogar para {orphan}...")
try:
with open(os.path.join(self.docs_dir, orphan), 'r') as f:
content = f.read(1000)
@@ -73,14 +157,9 @@ class AgenticCurator:
nav_context = f.read()
prompt = (
f"Tengo un archivo markdown llamado '{filename}' en mi repositorio de Kubernetes que no está enlazado.\n"
f"Contenido (primeros caracteres):\n{content}\n\n"
f"Estructura actual del menú (mkdocs.yml):\n{nav_context}\n\n"
"Dime:\n"
"1. ¿Bajo qué sección del menú (nav) debería estar?\n"
"2. ¿Cuál sería un título descriptivo para el menú?\n"
"3. ¿Bajo qué encabezado (##) del index.md debería aparecer?\n"
"Responde en JSON: {\"category\": \"Nombre de la Sección en nav\", \"title\": \"Título para el link\", \"index_section\": \"Sección en index.md\"}"
f"Archivo: '{filename}'\nContenido: {content}\n\n"
f"Menú actual:\n{nav_context}\n\n"
"Responde JSON: {\"category\": \"Sección nav\", \"title\": \"Título\", \"index_section\": \"Sección index\"}"
)
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
@@ -126,32 +205,50 @@ class AgenticCurator:
f.writelines(mkdocs_lines)
async def suggest_reorganization(self):
"""Analiza la densidad de archivos por categoría y sugiere mejoras."""
print("[*] Analizando densidad de categorías...")
with open(self.mkdocs_path, 'r') as f:
content = f.read()
"""Refactorización automática de categorías superpobladas (>15 links)."""
print("[*] Analizando densidad de categorías para refactorización...")
for category in NUBENETES_CATEGORIES:
file_path = os.path.join(self.docs_dir, f"{category}.md")
if not os.path.exists(file_path): continue
with open(file_path, 'r') as f:
content = f.read()
links = re.findall(r'^\s*-\s*\[', content, re.MULTILINE)
if len(links) > 15:
print(f" [!] Categoría '{category}' sobrepoblada ({len(links)} links). Dividiendo...")
await self._split_category(category, content)
async def _split_category(self, category: str, content: str):
prompt = (
f"La categoría '{category}' de Nubenetes es demasiado grande.\n"
f"Contenido:\n{content[:5000]}\n\n"
"Propón una división en 2 o 3 subcategorías semánticas.\n"
"Responde JSON: {\"subcategories\": [{\"name\": \"cat-slug\", \"title\": \"Título\", \"links\": [\"línea completa del link\"]}]}"
)
sections = re.split(r' - ', content)
for section in sections:
count = len(re.findall(r'\.md', section))
if count > 15:
lines = section.split('\n')
if lines:
section_name = lines[0].split(':')[0].strip()
print(f" [~] La sección '{section_name}' tiene muchos archivos ({count}).")
self.stats["structural_improvements"] += 1
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
try:
async with httpx.AsyncClient() as client:
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=40)
if resp.status_code == 200:
data = json.loads(re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL).group(0))
for sub in data['subcategories']:
new_file = f"{sub['name']}.md"
new_path = os.path.join(self.docs_dir, new_file)
with open(new_path, 'w') as nf:
nf.write(f"# {sub['title']}\n\n" + "\n".join(sub['links']))
# Simular el registro para que main.py lo sepa (o se hará en el siguiente ciclo)
self.stats["structural_improvements"] += 1
except: pass
def validate_changes(self) -> bool:
try:
with open(self.mkdocs_path, 'r') as f:
content = f.read()
if "nav:" not in content: return False
with open(self.index_path, 'r') as f:
content = f.read()
if not content.startswith("#"): return False
if "nav:" not in f.read(): return False
return True
except:
return False
except: return False
async def main():
curator = AgenticCurator()

View File

@@ -11,24 +11,17 @@ from src.gitops_manager import RepositoryController
async def master_orchestrator():
git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
markdown_sanitizer = MarkdownSanitizer()
state_file = "src/memory/state.json"
print("[*] INICIANDO CURADURÍA AGÉNTICA (SOLO INYECCIÓN DE NOVEDADES)")
# 1. Determinar horizonte temporal incremental
# 1. Cargar Estado y Horizonte Temporal
try:
# Buscamos commits solo en la carpeta docs para saber cuándo se actualizó el contenido por última vez
commits = git_controller.repository.get_commits(path="docs")
if commits.totalCount > 0:
# Tomamos la fecha del commit más reciente
last_commit_date = commits[0].commit.committer.date.replace(tzinfo=MADRID_TZ)
# El horizonte es un segundo después para evitar reprocesar el mismo commit
time_horizon = last_commit_date + timedelta(seconds=1)
else:
# Fecha base solicitada: Enero 2026
time_horizon = datetime(2026, 1, 1, 0, 0, tzinfo=MADRID_TZ)
except Exception as e:
print(f"[!] Error calculando horizonte temporal: {e}. Usando fecha base.")
time_horizon = datetime(2026, 1, 1, 0, 0, tzinfo=MADRID_TZ)
with open(state_file, 'r') as f:
state = json.load(f)
time_horizon = datetime.fromisoformat(state["last_processed_tweet_date"]).replace(tzinfo=MADRID_TZ)
except:
time_horizon = datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ)
print(f"[*] Buscando novedades desde: {time_horizon}")
@@ -46,16 +39,35 @@ async def master_orchestrator():
curated = await evaluate_extracted_assets(raw_social)
all_new_assets = curated + trending
print(f"[*] Total candidatos a inyectar: {len(all_new_assets)}")
# 4. Deduplicación Global (Pre-escaneo de todos los .md)
print("[*] Escaneando repositorio para deduplicación global...")
existing_urls = set()
for doc in os.listdir("docs"):
if doc.endswith(".md"):
try:
with open(os.path.join("docs", doc), 'r') as f:
existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read()))
except: pass
# Filtrar solo los que no existen
unique_new_assets = []
for asset in all_new_assets:
clean_url = asset["url"].split('#')[0].rstrip('/')
if any(clean_url in ex for ex in existing_urls):
continue
unique_new_assets.append(asset)
print(f"[*] Total candidatos únicos a inyectar: {len(unique_new_assets)}")
# 4. Inyección en Markdowns
# 5. Inyección en Markdowns
file_updates = {}
stats = {
"new_links": 0,
"categories_updated": set(),
"added_details": [], # Lista de {title, url, category}
"removed_details": [], # Lista de {url, category, reason}
"time_horizon": time_horizon.isoformat(),
"added_details": [],
"removed_details": [],
"start_date": time_horizon.isoformat(),
"end_date": datetime.now(MADRID_TZ).isoformat()
}
@@ -64,22 +76,10 @@ async def master_orchestrator():
try:
repo_file = git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
# Limpiamos solo duplicados existentes para mantener higiene
final_content, doc_stats = await markdown_sanitizer.sanitize_document(content)
# Registrar duplicados eliminados (curación)
if doc_stats.get("duplicates", 0) > 0:
# Nota: markdown_sanitizer no devuelve qué URLs borró exactamente,
# pero podemos inferir que hubo limpieza de calidad.
stats["removed_details"].append({
"category": category,
"reason": "Optimización de duplicados (mejor calidad mantenida)"
})
# Inyectar novedades
original_content = final_content
for asset in all_new_assets:
for asset in unique_new_assets:
if asset["category"] == category:
prev_len = len(final_content)
final_content = markdown_sanitizer.inject_curated_link(
@@ -96,10 +96,15 @@ async def master_orchestrator():
file_updates[file_path] = final_content
stats["new_links"] += (final_content.count(" - [") - original_content.count(" - ["))
stats["categories_updated"].add(category)
except Exception:
continue
except: continue
# 5. GitOps - Generar Informe Detallado en el PR
# 6. Actualizar Estado de Tiempo
if raw_social:
new_horizon = max([datetime.fromisoformat(t["timestamp"]) for t in raw_social]) + timedelta(seconds=1)
with open(state_file, 'w') as f:
json.dump({"last_processed_tweet_date": new_horizon.isoformat()}, f)
# 7. GitOps
if file_updates:
metrics = {
"social_injections": len(curated),
@@ -108,14 +113,12 @@ async def master_orchestrator():
"categories": list(stats["categories_updated"]),
"added_list": stats["added_details"],
"removed_list": stats["removed_details"],
"start_date": stats["time_horizon"],
"start_date": stats["start_date"],
"end_date": stats["end_date"]
}
print(f"[+] Éxito. Preparando PR con {stats['new_links']} nuevos recursos.")
git_controller.apply_multi_file_changes(file_updates, metrics)
else:
print("[~] No se han encontrado novedades relevantes en este ciclo.")
print("[~] No se han encontrado novedades relevantes.")
if __name__ == "__main__":
asyncio.run(master_orchestrator())