mirror of
https://github.com/nubenetes/awesome-kubernetes.git
synced 2026-05-24 10:04:07 +00:00
feat: enhance curation engine with robust retries, detailed logging, and branch-specific workflows
This commit is contained in:
@@ -48,109 +48,87 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
|
||||
|
||||
for i, asset in enumerate(raw_assets):
|
||||
post_date = asset.get('timestamp', 'Fecha desconocida')
|
||||
log_event(f"--- EVALUANDO {i+1}/{len(raw_assets)} ---")
|
||||
log_event(f" - URL: {asset['url']}\n - Post Date: {post_date}")
|
||||
context = asset.get('context', asset.get('description', 'Sin contexto adicional'))
|
||||
|
||||
log_event(f"--- EVALUANDO {i+1}/{len(raw_assets)} ---", section_break=False)
|
||||
log_event(f" - URL: {asset['url']}")
|
||||
log_event(f" - Post Date: {post_date}")
|
||||
log_event(f" - Contexto del Post: \"{context[:300]}...\"")
|
||||
|
||||
domain = asset['url'].split("//")[-1].split("/")[0]
|
||||
if domain in domain_blacklist:
|
||||
eval_res = {"status": "FILTERED", "reason": "Dominio en lista negra de reputación"}
|
||||
evaluations[asset["url"]] = eval_res
|
||||
log_event(f" [-] RECHAZADO: {eval_res['reason']}")
|
||||
log_event(f" [-] RECHAZADO: Dominio en lista negra ({domain})")
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Dominio en lista negra"}
|
||||
continue
|
||||
|
||||
web_content = await _deep_fetch_content(asset['url'])
|
||||
context = asset.get('context', asset.get('description', 'Sin contexto adicional'))
|
||||
|
||||
prompt = (
|
||||
"Actúas como Ingeniero Curador Senior de 'nubenetes/awesome-kubernetes'.\n"
|
||||
"Tu misión es catalogar contenido TÉCNICO sobre Kubernetes y Cloud Native compartido por el usuario.\n"
|
||||
"REGLA DE ORO: Si el enlace está en el feed, es porque el usuario lo considera útil. NO lo descartes a menos que sea ruido total.\n\n"
|
||||
f"Categorías válidas: {', '.join(NUBENETES_CATEGORIES)}.\n\n"
|
||||
"INSTRUCCIONES:\n"
|
||||
"1. YOUTUBE: Acepta videos técnicos o tutoriales. Categorízalos.\n"
|
||||
"2. RESUMEN: Crea un resumen conciso (1 frase). Usa prioritariamente el 'Contexto' (que es el post de X).\n"
|
||||
"3. ASIGNACIÓN: Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'. Si es técnico pero no sabes dónde, usa 'kubernetes-tools'.\n\n"
|
||||
f"URL: {asset['url']}\nContexto de X: {context}\nContenido Web Extraído: {web_content[:1500]}\n\n"
|
||||
"Evalúa (1-100):\n"
|
||||
"- >80: Recurso excepcional (🌟).\n"
|
||||
"- >1: Aceptar (si es técnico o útil).\n\n"
|
||||
"Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Breve explicación de por qué esta categoría y score\", \"rejection_reason\": \"... (si aplica)\"}"
|
||||
)
|
||||
|
||||
...
|
||||
try:
|
||||
data = await call_gemini_with_retry(prompt)
|
||||
score = data.get("impact_score", 50)
|
||||
valid_cats = [c for c in data.get("categories", []) if c in NUBENETES_CATEGORIES]
|
||||
reasoning = data.get("reasoning", "Sin motivo especificado")
|
||||
|
||||
if score < 1:
|
||||
if score < 20:
|
||||
reason = data.get("rejection_reason", "Bajo impacto técnico")
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": reason}
|
||||
log_event(f" [-] RECHAZADO: {reason} (Score: {score})\n Motivo IA: {data.get('reasoning')}")
|
||||
log_event(f" [-] RECHAZADO: {reason} (Score: {score})")
|
||||
log_event(f" Motivo IA: {reasoning}")
|
||||
|
||||
if score < 10 and domain not in domain_blacklist:
|
||||
domain_blacklist.add(domain)
|
||||
log_event(f" [!] Dominio {domain} añadido a lista negra.")
|
||||
elif not valid_cats:
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "No se encontró categoría técnica válida"}
|
||||
log_event(f" [-] RECHAZADO: Sin categoría válida (Cats sugeridas: {data.get('categories')})\n Motivo IA: {data.get('reasoning')}")
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Sin categoría técnica válida"}
|
||||
log_event(f" [-] RECHAZADO: No se encontró categoría válida (Sugeridas: {data.get('categories')})")
|
||||
log_event(f" Motivo IA: {reasoning}")
|
||||
else:
|
||||
evaluations[asset["url"]] = {
|
||||
"status": "INCLUDED", "title": data["title"], "description": data["desc"],
|
||||
"category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80,
|
||||
"reasoning": data.get("reasoning")
|
||||
"reasoning": reasoning
|
||||
}
|
||||
log_event(f" [+] ACEPTADO: {data['title']} -> {valid_cats[0]} (Score: {score})\n Desc: {data['desc']}\n Motivo IA: {data.get('reasoning')}")
|
||||
log_event(f" [+] ACEPTADO: \"{data['title']}\" -> {valid_cats[0]} (Score: {score})")
|
||||
log_event(f" Descripción: {data['desc']}")
|
||||
log_event(f" Motivo IA: {reasoning}")
|
||||
|
||||
except Exception as e:
|
||||
err_msg = str(e)
|
||||
if "Rate Limit" in err_msg or "429" in err_msg:
|
||||
log_event(f" [!] RATE LIMIT DETECTADO. Entrando en modo COOL DOWN (2 min)...")
|
||||
await asyncio.sleep(120)
|
||||
|
||||
err_log = f" [!] ERROR GEMINI: {err_msg[:200]}"
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": err_log}
|
||||
log_event(err_log)
|
||||
log_event(f" [!] ERROR CRÍTICO EVALUANDO {asset['url']}: {e}")
|
||||
evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Fallo Evaluación: {str(e)[:100]}"}
|
||||
|
||||
await asyncio.sleep(5.0)
|
||||
await asyncio.sleep(2.0) # Ritmo estable
|
||||
|
||||
if domain_blacklist:
|
||||
try:
|
||||
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
|
||||
with open(memory_file, 'w') as f:
|
||||
json.dump({"blacklisted_domains": list(domain_blacklist)}, f)
|
||||
except: pass
|
||||
# Guardar blacklist actualizada
|
||||
try:
|
||||
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
|
||||
with open(memory_file, 'w') as f:
|
||||
json.dump({"blacklisted_domains": list(domain_blacklist)}, f, indent=2)
|
||||
except: pass
|
||||
return evaluations
|
||||
|
||||
class AgenticCurator:
|
||||
def __init__(self):
|
||||
self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
|
||||
self.docs_dir = "docs"
|
||||
self.index_path = os.path.join(self.docs_dir, "index.md")
|
||||
self.mkdocs_path = "mkdocs.yml"
|
||||
self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0, "orphan_details": []}
|
||||
|
||||
async def decide_smart_injection(self, markdown_content: str, asset: Dict) -> str:
|
||||
"""Usa Gemini para decidir dónde y cómo inyectar el enlace dentro del markdown."""
|
||||
lines = markdown_content.splitlines()
|
||||
structure = "\n".join([l for l in lines if l.startswith("#")])
|
||||
|
||||
prompt = (
|
||||
"Actúas como Arquitecto de Contenidos para Nubenetes.com.\n"
|
||||
"Actúas como Arquitecto de Contenidos.\n"
|
||||
f"Enlace: [{asset['title']}]({asset['url']}) - {asset['description']}\n"
|
||||
f"Impacto: {asset['impact_score']}/100.\n\n"
|
||||
"Estructura del archivo:\n"
|
||||
f"{structure[:2000]}\n\n"
|
||||
"1. Encuentra el ## o ### más semántico.\n"
|
||||
"2. Decide formato: si es excelente, añade estrellas (🌟, 🌟🌟 o 🌟🌟🌟).\n"
|
||||
"3. Decide si usar negritas (==enlace== o **texto**).\n"
|
||||
"Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\", \"reasoning\": \"Breve por qué de esta ubicación/formato\"}"
|
||||
"Estructura:\n"
|
||||
f"{structure[:1500]}\n\n"
|
||||
"Responde JSON: {\"header\": \"## ...\", \"formatted_line\": \" - [Título](url) - Desc\", \"reasoning\": \"...\"}"
|
||||
)
|
||||
|
||||
try:
|
||||
data = await call_gemini_with_retry(prompt)
|
||||
header = data.get("header")
|
||||
new_line = data.get("formatted_line")
|
||||
reasoning = data.get("reasoning", "Sin motivo especificado")
|
||||
|
||||
if header and new_line:
|
||||
log_event(f" [>>>] UBICACIÓN: Header '{header}'\n Formato: {new_line}\n Motivo IA: {reasoning}")
|
||||
|
||||
new_lines = []
|
||||
inserted = False
|
||||
for line in lines:
|
||||
@@ -159,9 +137,7 @@ class AgenticCurator:
|
||||
new_lines.append(new_line)
|
||||
inserted = True
|
||||
if inserted: return "\n".join(new_lines)
|
||||
except Exception as e:
|
||||
log_event(f"[!] Error en decide_smart_injection: {e}")
|
||||
pass
|
||||
except: pass
|
||||
return self._manual_fallback_injection(markdown_content, asset)
|
||||
|
||||
def _manual_fallback_injection(self, content: str, asset: Dict) -> str:
|
||||
@@ -169,11 +145,36 @@ class AgenticCurator:
|
||||
line = f" - [{asset['title']}]({asset['url']}){stars} - {asset['description']}"
|
||||
return content + f"\n{line}"
|
||||
|
||||
async def audit_navigation(self):
|
||||
pass
|
||||
|
||||
async def suggest_reorganization(self):
|
||||
pass
|
||||
"""Detecta categorías con >15 links y propone/realiza el split."""
|
||||
log_event("[*] Iniciando Auditoría de Reorganización Estructural...", section_break=True)
|
||||
|
||||
bloated_files = []
|
||||
for file in os.listdir(self.docs_dir):
|
||||
if file.endswith(".md") and file != "index.md":
|
||||
path = os.path.join(self.docs_dir, file)
|
||||
with open(path, 'r') as f:
|
||||
content = f.read()
|
||||
links = re.findall(r'^\s*-\s*\[', content, re.MULTILINE)
|
||||
if len(links) > 15:
|
||||
bloated_files.append((file, len(links), content))
|
||||
|
||||
for file, count, content in bloated_files:
|
||||
log_event(f" [!] CATEGORÍA SATURADA: {file} tiene {count} enlaces. Proponiendo subdivisión...")
|
||||
|
||||
prompt = (
|
||||
f"El archivo '{file}' tiene demasiados enlaces ({count}).\n"
|
||||
"Propón una subdivisión semántica en 2 o 3 subcategorías nuevas.\n"
|
||||
"Responde JSON: {\"subcategories\": [{\"name\": \"nombre-slug\", \"title\": \"Título Legible\", \"links_indices\": [int]}]}"
|
||||
"Nota: Para simplificar, solo propón los nombres de las subcategorías por ahora."
|
||||
)
|
||||
# Por ahora, solo logueamos la intención para no romper el flujo principal
|
||||
# En una fase futura, implementaremos el split físico de archivos.
|
||||
log_event(f" [>>>] SUGERENCIA: Subdividir {file} para mejorar legibilidad.")
|
||||
|
||||
def validate_changes(self) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def validate_changes(self) -> bool:
|
||||
return True
|
||||
|
||||
@@ -26,9 +26,10 @@ GH_TOKEN = os.getenv("GH_TOKEN")
|
||||
# Gemini Configuration (May 2026)
|
||||
GEMINI_API_VERSION = "v1beta"
|
||||
GEMINI_MODELS = [
|
||||
"gemini-2.5-flash",
|
||||
"gemini-2.5-flash-lite",
|
||||
"gemini-2.0-flash"
|
||||
"gemini-2.0-flash",
|
||||
"gemini-1.5-flash",
|
||||
"gemini-2.5-flash"
|
||||
]
|
||||
|
||||
TARGET_REPO = "nubenetes/awesome-kubernetes"
|
||||
|
||||
@@ -5,6 +5,7 @@ import json
|
||||
import re
|
||||
from typing import Dict, Any, List, Optional
|
||||
from src.config import GEMINI_API_KEYS, GEMINI_API_VERSION, GEMINI_MODELS
|
||||
from src.logger import log_event
|
||||
|
||||
# Global para mantener el índice de la API Key actual
|
||||
CURRENT_KEY_INDEX = 0
|
||||
@@ -30,9 +31,9 @@ class GeminiDiagnostics:
|
||||
report += "\n"
|
||||
return report
|
||||
|
||||
async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 5):
|
||||
async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 3):
|
||||
"""
|
||||
Llama a la API de Gemini con rotación de modelos Y rotación de API Keys.
|
||||
Llama a la API de Gemini con rotación exhaustiva y REINTENTO REAL en 429.
|
||||
"""
|
||||
global CURRENT_KEY_INDEX
|
||||
if not GEMINI_API_KEYS:
|
||||
@@ -41,91 +42,54 @@ async def call_gemini_with_retry(prompt: str, response_format: str = "json", max
|
||||
diagnostics = GeminiDiagnostics()
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Intentamos con las llaves disponibles si una falla por cuota
|
||||
for _ in range(len(GEMINI_API_KEYS)):
|
||||
for key_attempt in range(len(GEMINI_API_KEYS)):
|
||||
api_key = GEMINI_API_KEYS[CURRENT_KEY_INDEX]
|
||||
|
||||
for model in GEMINI_MODELS:
|
||||
# Usamos el nombre completo del modelo como requiere la v1beta
|
||||
full_model_name = f"models/{model}"
|
||||
api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/{full_model_name}:generateContent?key={api_key}"
|
||||
|
||||
for attempt in range(max_retries):
|
||||
# Reintentos por modelo (incluyendo 429)
|
||||
for attempt in range(max_retries + 2):
|
||||
try:
|
||||
payload = {"contents": [{"parts": [{"text": prompt}]}]}
|
||||
response = await client.post(api_url, json=payload, timeout=35)
|
||||
response = await client.post(api_url, json=payload, timeout=45)
|
||||
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
resp_json = response.json()
|
||||
if 'candidates' not in resp_json or not resp_json['candidates']:
|
||||
diagnostics.add_attempt(model, 200, "Respuesta vacía (no candidates)", response.text)
|
||||
break
|
||||
|
||||
resp_json = response.json()
|
||||
if 'candidates' in resp_json and resp_json['candidates']:
|
||||
text_resp = resp_json['candidates'][0]['content']['parts'][0]['text']
|
||||
if response_format == "json":
|
||||
match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL)
|
||||
if match:
|
||||
data = json.loads(match.group(0))
|
||||
if isinstance(data, list):
|
||||
return data[0] if len(data) > 0 else {}
|
||||
return data
|
||||
diagnostics.add_attempt(model, 200, "JSON no encontrado en texto", text_resp)
|
||||
return data[0] if isinstance(data, list) and len(data) > 0 else data
|
||||
diagnostics.add_attempt(model, 200, "JSON no encontrado", text_resp)
|
||||
break
|
||||
return text_resp
|
||||
except Exception as e:
|
||||
diagnostics.add_attempt(model, 200, f"Error parseo: {str(e)}", response.text)
|
||||
break
|
||||
diagnostics.add_attempt(model, 200, "Sin candidates")
|
||||
break
|
||||
|
||||
elif response.status_code == 404:
|
||||
diagnostics.add_attempt(model, 404, f"Modelo {full_model_name} no encontrado")
|
||||
break # Probar siguiente modelo
|
||||
elif response.status_code == 429:
|
||||
wait_time = (10 * (attempt + 1)) + random.random() * 5
|
||||
log_event(f" [!] API 429 (Límite): Reintentando {model} en {wait_time:.1f}s... (Intento {attempt+1})")
|
||||
await asyncio.sleep(wait_time)
|
||||
continue # Reintentar el MISMO modelo
|
||||
|
||||
elif response.status_code in [429, 503]:
|
||||
# Si es un error de cuota (429), probamos a rotar la API Key inmediatamente
|
||||
if response.status_code == 429:
|
||||
reason = "Rate Limit / Quota Exceeded"
|
||||
diagnostics.add_attempt(model, 429, reason)
|
||||
# Loguear rotación en el log central
|
||||
with open("/home/inafev/.gemini/tmp/awesome-kubernetes/curation_progress.log", "a") as log_f:
|
||||
log_f.write(f" [!] Llave {CURRENT_KEY_INDEX + 1} agotada. Probando rotación...\n")
|
||||
break # Rompe el bucle de reintentos para cambiar de llave o modelo
|
||||
|
||||
reason = "Service Unavailable"
|
||||
diagnostics.add_attempt(model, response.status_code, reason)
|
||||
wait = (5 * (2 ** attempt)) + random.random() * 5
|
||||
await asyncio.sleep(wait)
|
||||
elif response.status_code in [500, 503, 504]:
|
||||
diagnostics.add_attempt(model, response.status_code, "Server Error")
|
||||
await asyncio.sleep(5)
|
||||
continue
|
||||
|
||||
else:
|
||||
diagnostics.add_attempt(model, response.status_code, "Error API", response.text)
|
||||
diagnostics.add_attempt(model, response.status_code, "API Error", response.text)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}")
|
||||
if attempt == max_retries - 1:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Si llegamos aquí por un 429, el bucle 'attempt' se rompió.
|
||||
# Salimos también del bucle 'model' para rotar la llave.
|
||||
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429:
|
||||
break
|
||||
break
|
||||
|
||||
CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Si la última falla fue un 429, rotamos la llave y probamos el siguiente ciclo
|
||||
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429:
|
||||
CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS)
|
||||
# Opcional: una pequeña espera antes de usar la nueva llave
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
else:
|
||||
# Si no fue un 429 o éxito, salimos del bucle de llaves
|
||||
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 200:
|
||||
# En caso de éxito (que devuelve directamente arriba), no llegamos aquí.
|
||||
# Este break es para otros errores terminales.
|
||||
pass
|
||||
break
|
||||
|
||||
# Si logramos el éxito, la función ya habría retornado dentro del bucle de éxito (200)
|
||||
# Si llegamos aquí, es porque todas las llaves y modelos fallaron.
|
||||
raise Exception(f"Fallo crítico Gemini tras rotación.\n{diagnostics.get_report()}")
|
||||
raise Exception(f"Fallo crítico Gemini tras rotación exhaustiva.\n{diagnostics.get_report()}")
|
||||
|
||||
163
src/main.py
163
src/main.py
@@ -12,8 +12,11 @@ from src.autonomous_discovery import discover_trending_assets
|
||||
from src.gitops_manager import RepositoryController
|
||||
from src.logger import log_event
|
||||
|
||||
from src.state_manager import get_last_date, save_state
|
||||
|
||||
async def master_orchestrator():
|
||||
git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
|
||||
start_time = datetime.now(MADRID_TZ)
|
||||
|
||||
log_event("INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)", section_break=True)
|
||||
|
||||
@@ -38,14 +41,22 @@ async def master_orchestrator():
|
||||
|
||||
log_event(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}")
|
||||
else:
|
||||
# Modo Normal (30 días)
|
||||
days_back = int(os.getenv("CURATION_DAYS_BACK", "30"))
|
||||
since_date = datetime.now(MADRID_TZ) - timedelta(days=days_back)
|
||||
until_date = None
|
||||
log_event(f"[*] Modo Normal: Desde {since_date.date()}")
|
||||
# Modo Normal: Usar CURATION_START_DATE si existe, si no state.json
|
||||
env_start = os.getenv("CURATION_START_DATE")
|
||||
if env_start:
|
||||
try:
|
||||
since_date = datetime.fromisoformat(env_start).replace(tzinfo=MADRID_TZ)
|
||||
log_event(f"[*] Modo Normal: Desde fecha manual del workflow {since_date.date()}")
|
||||
except:
|
||||
since_date = get_last_date()
|
||||
log_event(f"[*] Modo Normal: Error parseando fecha manual, usando state.json {since_date.date()}")
|
||||
else:
|
||||
since_date = get_last_date()
|
||||
log_event(f"[*] Modo Normal: Desde la última fecha guardada {since_date.date()}")
|
||||
|
||||
# 2. Ingesta Multi-fuente
|
||||
backup_file = os.getenv("BACKUP_FILE")
|
||||
x_audit_trail = []
|
||||
if backup_file and os.path.exists(backup_file):
|
||||
from src.ingestion_backup import BackupDataExtractor
|
||||
extractor = BackupDataExtractor(backup_file)
|
||||
@@ -67,77 +78,149 @@ async def master_orchestrator():
|
||||
t["timestamp"] = datetime.now(MADRID_TZ).isoformat()
|
||||
|
||||
all_raw_assets = raw_social + trending
|
||||
|
||||
# 3. Evaluación y Registro (Ignorar duplicados locales)
|
||||
if not all_raw_assets:
|
||||
log_event("[!] No se encontraron nuevos enlaces para procesar.")
|
||||
return
|
||||
|
||||
# 3. Evaluación y Registro (Deduplicación Global Robusta)
|
||||
existing_urls = set()
|
||||
for doc in os.listdir("docs"):
|
||||
if doc.endswith(".md"):
|
||||
try:
|
||||
with open(os.path.join("docs", doc), 'r') as f:
|
||||
existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read()))
|
||||
except: pass
|
||||
for root, dirs, files in os.walk("docs"):
|
||||
for file in files:
|
||||
if file.endswith(".md"):
|
||||
try:
|
||||
with open(os.path.join(root, file), 'r') as f:
|
||||
content = f.read()
|
||||
found = re.findall(r'\]\((https?://[^\)]+)\)', content)
|
||||
for url in found:
|
||||
existing_urls.add(url.split('#')[0].rstrip('/').lower())
|
||||
except: pass
|
||||
|
||||
log_event(f"[*] Deduplicación Global: {len(existing_urls)} URLs existentes cargadas.")
|
||||
|
||||
# --- INICIO PROCESAMIENTO POR LOTES ---
|
||||
BATCH_SIZE = 50
|
||||
BATCH_SIZE = 40
|
||||
all_raw_assets_batches = [all_raw_assets[i:i + BATCH_SIZE] for i in range(0, len(all_raw_assets), BATCH_SIZE)]
|
||||
|
||||
curator_agent = AgenticCurator()
|
||||
total_processed = 0
|
||||
max_tweet_date = since_date
|
||||
full_report_metrics = []
|
||||
modified_files_content = {}
|
||||
|
||||
for batch_index, batch_assets in enumerate(all_raw_assets_batches):
|
||||
log_event(f">>> INICIANDO LOTE {batch_index + 1}/{len(all_raw_assets_batches)} ({len(batch_assets)} enlaces)", section_break=True)
|
||||
|
||||
full_extraction_report = []
|
||||
unique_new_assets = []
|
||||
|
||||
evaluations = await evaluate_extracted_assets(batch_assets)
|
||||
|
||||
assets_to_evaluate = []
|
||||
for asset in batch_assets:
|
||||
url = asset["url"]
|
||||
clean_url = url.split('#')[0].rstrip('/')
|
||||
clean_url = url.split('#')[0].rstrip('/').lower()
|
||||
|
||||
# Trackear fecha máxima
|
||||
try:
|
||||
ts = asset.get('timestamp')
|
||||
asset_date = None
|
||||
if ts:
|
||||
if isinstance(ts, str):
|
||||
try:
|
||||
# Twitter format: 'Tue Oct 01 19:56:51 +0000 2024'
|
||||
asset_date = datetime.strptime(ts, '%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=MADRID_TZ)
|
||||
except:
|
||||
try: asset_date = datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
||||
except: pass
|
||||
|
||||
if asset_date and asset_date > max_tweet_date:
|
||||
max_tweet_date = asset_date
|
||||
except: pass
|
||||
|
||||
if clean_url in existing_urls:
|
||||
log_event(f" [=] SALTADO: {url[:60]}... (Ya existe)")
|
||||
full_report_metrics.append({
|
||||
"url": url, "status": "DUPLICATE", "reason": "Ya existe en repositorio",
|
||||
"category": "N/A", "post_date": ts, "source": asset.get("source_type", "Social")
|
||||
})
|
||||
continue
|
||||
assets_to_evaluate.append(asset)
|
||||
|
||||
if not assets_to_evaluate:
|
||||
log_event(" [*] El lote completo consiste en duplicados. Siguiente lote.")
|
||||
continue
|
||||
|
||||
evaluations = await evaluate_extracted_assets(assets_to_evaluate)
|
||||
unique_new_assets = []
|
||||
|
||||
for asset in assets_to_evaluate:
|
||||
url = asset["url"]
|
||||
evaluation = evaluations.get(url, {"status": "FILTERED", "reason": "No evaluado por IA"})
|
||||
status = evaluation["status"]
|
||||
reason = evaluation.get("reason", "Aceptado")
|
||||
category = evaluation.get("category", "N/A")
|
||||
|
||||
if clean_url in [u.split('#')[0].rstrip('/') for u in existing_urls]:
|
||||
status = "DUPLICATE"
|
||||
reason = "Ya existe en Nubenetes.com"
|
||||
log_event(f" [=] DUPLICADO: El enlace ya está en el repositorio.")
|
||||
|
||||
if status == "INCLUDED":
|
||||
full_report_metrics.append({
|
||||
"url": url, "status": evaluation["status"], "reason": evaluation.get("reason", "Aceptado"),
|
||||
"category": evaluation.get("category", "N/A"), "post_date": asset.get("timestamp"),
|
||||
"source": asset.get("source_type", "Social")
|
||||
})
|
||||
|
||||
if evaluation["status"] == "INCLUDED":
|
||||
unique_new_assets.append({
|
||||
"url": url, "title": evaluation["title"],
|
||||
"description": evaluation["description"], "category": category,
|
||||
"description": evaluation["description"], "category": evaluation.get("category", "kubernetes-tools"),
|
||||
"impact_score": evaluation["impact_score"],
|
||||
"reasoning": evaluation.get("reasoning")
|
||||
})
|
||||
existing_urls.add(url.split('#')[0].rstrip('/').lower())
|
||||
|
||||
# Inyección inmediata de este lote
|
||||
# Inyección inmediata
|
||||
if unique_new_assets:
|
||||
log_event(">>> APLICANDO INYECCIONES EN MARKDOWN...", section_break=True)
|
||||
|
||||
log_event(f">>> APLICANDO {len(unique_new_assets)} INYECCIONES EN MARKDOWN...", section_break=True)
|
||||
for asset in unique_new_assets:
|
||||
category = asset["category"]
|
||||
file_path = f"docs/{category}.md"
|
||||
try:
|
||||
with open(file_path, 'r') as f: content = f.read()
|
||||
if file_path in modified_files_content:
|
||||
content = modified_files_content[file_path]
|
||||
else:
|
||||
if not os.path.exists(file_path):
|
||||
content = f"# {category.capitalize()}\n\n"
|
||||
else:
|
||||
with open(file_path, 'r') as f: content = f.read()
|
||||
|
||||
new_content = await curator_agent.decide_smart_injection(content, asset)
|
||||
|
||||
if len(new_content) > len(content):
|
||||
# Actualizar archivo físico inmediatamente
|
||||
modified_files_content[file_path] = new_content
|
||||
with open(file_path, 'w') as f: f.write(new_content)
|
||||
log_event(f" [>>>] INYECTADO: {asset['url']}")
|
||||
except Exception as e:
|
||||
log_event(f" [!] Error inyectando {asset['url']}: {e}")
|
||||
|
||||
total_processed += len(batch_assets)
|
||||
log_event(f"Fin del Lote {batch_index + 1}. Total procesado: {total_processed}/{len(all_raw_assets)}", section_break=True)
|
||||
|
||||
# Pausa entre lotes para dejar respirar a la API
|
||||
if batch_index < len(all_raw_assets_batches) - 1:
|
||||
log_event("[*] Esperando 30 segundos para el siguiente lote...")
|
||||
await asyncio.sleep(30)
|
||||
log_event(f"[*] Pausa de seguridad: 5s para el siguiente lote...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# 4. Finalización y PR
|
||||
if modified_files_content:
|
||||
log_event(">>> GENERANDO PULL REQUEST...", section_break=True)
|
||||
metrics = {
|
||||
"total_extracted": len(all_raw_assets),
|
||||
"start_date": since_date.isoformat(),
|
||||
"end_date": datetime.now(MADRID_TZ).isoformat(),
|
||||
"full_report": full_report_metrics,
|
||||
"x_audit": x_audit_trail
|
||||
}
|
||||
try:
|
||||
git_controller.apply_multi_file_changes(modified_files_content, metrics)
|
||||
except Exception as e:
|
||||
log_event(f"[!] Error creando PR: {e}")
|
||||
|
||||
# Auditoría de reorganización
|
||||
await curator_agent.suggest_reorganization()
|
||||
|
||||
# Actualizar estado
|
||||
if max_tweet_date > since_date:
|
||||
save_state(max_tweet_date + timedelta(seconds=1))
|
||||
|
||||
log_event("PROCESO FINALIZADO CON ÉXITO.", section_break=True)
|
||||
|
||||
|
||||
|
||||
log_event("PROCESO FINALIZADO CON ÉXITO.", section_break=True)
|
||||
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"blacklisted_domains": []
|
||||
}
|
||||
36
src/state_manager.py
Normal file
36
src/state_manager.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from src.config import MADRID_TZ
|
||||
from src.logger import log_event
|
||||
|
||||
STATE_FILE = "src/memory/state.json"
|
||||
|
||||
def load_state() -> dict:
|
||||
default_state = {
|
||||
"last_processed_tweet_date": "2024-10-01T00:00:00"
|
||||
}
|
||||
if os.path.exists(STATE_FILE):
|
||||
try:
|
||||
with open(STATE_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
log_event(f"[!] Error cargando state.json: {e}")
|
||||
return default_state
|
||||
|
||||
def save_state(last_date: datetime):
|
||||
state = load_state()
|
||||
state["last_processed_tweet_date"] = last_date.isoformat()
|
||||
|
||||
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
|
||||
try:
|
||||
with open(STATE_FILE, 'w') as f:
|
||||
json.dump(state, f, indent=2)
|
||||
log_event(f"[*] Estado guardado: última fecha procesada {last_date.date()}")
|
||||
except Exception as e:
|
||||
log_event(f"[!] Error guardando state.json: {e}")
|
||||
|
||||
def get_last_date() -> datetime:
|
||||
state = load_state()
|
||||
date_str = state.get("last_processed_tweet_date")
|
||||
return datetime.fromisoformat(date_str).replace(tzinfo=MADRID_TZ)
|
||||
Reference in New Issue
Block a user