feat: implement granular real-time logging, batch processing, and multi-key API rotation

This commit is contained in:
Nubenetes Bot
2026-05-14 17:21:52 +02:00
parent a788992459
commit 42d7da9e8b
7 changed files with 383 additions and 171 deletions

53
.github/workflows/agentic_backup.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: Nubenetes Backup-based Curation
on:
workflow_dispatch:
inputs:
backup_file:
description: 'Ruta al fichero de backup (JSON o MD) dentro del repositorio'
required: true
default: 'data/backup_posts.json'
historical_mode:
description: 'Activar Modo Histórico (Ignora fecha de 30 días)'
required: false
default: 'true'
type: boolean
permissions:
contents: write
pull-requests: write
jobs:
backup-curation-process:
runs-on: ubuntu-latest
steps:
- name: Sincronización del repositorio
uses: actions/checkout@v4
- name: Provisión del Entorno Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Instalación de dependencias
run: |
python -m pip install --upgrade pip
pip install --no-cache-dir pydantic PyGithub aiohttp beautifulsoup4 httpx fake-useragent pytz python-dotenv twikit>=2.1.2 playwright playwright-stealth
# Playwright es necesario porque el evaluador de Gemini o el curator podrían usarlo indirectamente,
# aunque para la extracción del backup no sea estrictamente necesario.
playwright install chromium --with-deps
- name: Ejecución de la Canalización Agéntica (Modo Backup)
env:
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BACKUP_FILE: ${{ github.event.inputs.backup_file }}
HISTORICAL_MODE: ${{ github.event.inputs.historical_mode }}
PYTHONPATH: .
run: |
if [ ! -f "$BACKUP_FILE" ]; then
echo "❌ ERROR: El archivo '$BACKUP_FILE' no se encuentra en el repositorio."
echo "Por favor, sube el archivo a esa ruta y vuelve a intentarlo."
exit 1
fi
python src/main.py

View File

@@ -4,8 +4,9 @@ import json
import asyncio
import httpx
import random
from datetime import datetime
from typing import List, Dict, Set, Optional
from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.config import GEMINI_API_KEYS, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
from src.gemini_utils import call_gemini_with_retry
@@ -28,11 +29,12 @@ async def _deep_fetch_content(url: str) -> str:
except: return ""
return ""
from src.logger import log_event
async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
evaluations = {}
# Revertir a v1 y asegurar que la clave existe
if not GEMINI_API_KEY:
print("[!] ERROR CRÍTICO: GEMINI_API_KEY no encontrada en el entorno.")
if not GEMINI_API_KEYS:
log_event("[!] ERROR CRÍTICO: GEMINI_API_KEYS no encontrada en el entorno.")
return {a["url"]: {"status": "FILTERED", "reason": "Configuración: API KEY faltante"} for a in raw_assets}
memory_file = "src/memory/health_learning.json"
@@ -44,10 +46,16 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
domain_blacklist = set(memory_data.get("blacklisted_domains", []))
except: pass
for asset in raw_assets:
for i, asset in enumerate(raw_assets):
post_date = asset.get('timestamp', 'Fecha desconocida')
log_event(f"--- EVALUANDO {i+1}/{len(raw_assets)} ---")
log_event(f" - URL: {asset['url']}\n - Post Date: {post_date}")
domain = asset['url'].split("//")[-1].split("/")[0]
if domain in domain_blacklist:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Dominio en lista negra de reputación"}
eval_res = {"status": "FILTERED", "reason": "Dominio en lista negra de reputación"}
evaluations[asset["url"]] = eval_res
log_event(f" [-] RECHAZADO: {eval_res['reason']}")
continue
web_content = await _deep_fetch_content(asset['url'])
@@ -66,7 +74,7 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
"Evalúa (1-100):\n"
"- >80: Recurso excepcional (🌟).\n"
"- >1: Aceptar (si es técnico o útil).\n\n"
"Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"rejection_reason\": \"... (si aplica)\"}"
"Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Breve explicación de por qué esta categoría y score\", \"rejection_reason\": \"... (si aplica)\"}"
)
try:
@@ -75,18 +83,31 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
valid_cats = [c for c in data.get("categories", []) if c in NUBENETES_CATEGORIES]
if score < 1:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": data.get("rejection_reason", "Bajo impacto técnico")}
reason = data.get("rejection_reason", "Bajo impacto técnico")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": reason}
log_event(f" [-] RECHAZADO: {reason} (Score: {score})\n Motivo IA: {data.get('reasoning')}")
elif not valid_cats:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "No se encontró categoría técnica válida"}
log_event(f" [-] RECHAZADO: Sin categoría válida (Cats sugeridas: {data.get('categories')})\n Motivo IA: {data.get('reasoning')}")
else:
evaluations[asset["url"]] = {
"status": "INCLUDED", "title": data["title"], "description": data["desc"],
"category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80
"category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80,
"reasoning": data.get("reasoning")
}
log_event(f" [+] ACEPTADO: {data['title']} -> {valid_cats[0]} (Score: {score})\n Desc: {data['desc']}\n Motivo IA: {data.get('reasoning')}")
except Exception as e:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Error Gemini: {str(e)[:500]}"}
err_msg = str(e)
if "Rate Limit" in err_msg or "429" in err_msg:
log_event(f" [!] RATE LIMIT DETECTADO. Entrando en modo COOL DOWN (2 min)...")
await asyncio.sleep(120)
err_log = f" [!] ERROR GEMINI: {err_msg[:200]}"
evaluations[asset["url"]] = {"status": "FILTERED", "reason": err_log}
log_event(err_log)
await asyncio.sleep(2.0) # Evitar saturar la API con un delay más conservador
await asyncio.sleep(5.0)
if domain_blacklist:
try:
@@ -118,14 +139,18 @@ class AgenticCurator:
"1. Encuentra el ## o ### más semántico.\n"
"2. Decide formato: si es excelente, añade estrellas (🌟, 🌟🌟 o 🌟🌟🌟).\n"
"3. Decide si usar negritas (==enlace== o **texto**).\n"
"Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\"}"
"Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\", \"reasoning\": \"Breve por qué de esta ubicación/formato\"}"
)
try:
data = await call_gemini_with_retry(prompt)
header = data.get("header")
new_line = data.get("formatted_line")
reasoning = data.get("reasoning", "Sin motivo especificado")
if header and new_line:
log_event(f" [>>>] UBICACIÓN: Header '{header}'\n Formato: {new_line}\n Motivo IA: {reasoning}")
new_lines = []
inserted = False
for line in lines:
@@ -135,7 +160,7 @@ class AgenticCurator:
inserted = True
if inserted: return "\n".join(new_lines)
except Exception as e:
print(f"[!] Error en decide_smart_injection: {e}")
log_event(f"[!] Error en decide_smart_injection: {e}")
pass
return self._manual_fallback_injection(markdown_content, asset)

View File

@@ -11,7 +11,13 @@ TWITTER_EMAIL = os.getenv("TWITTER_EMAIL")
TWITTER_PASSWORD = os.getenv("TWITTER_PASSWORD")
# Pydantic-AI y otras librerías pueden usar diferentes nombres para la misma clave
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
GEMINI_API_KEYS = [
os.getenv("GEMINI_API_KEY_1"),
os.getenv("GEMINI_API_KEY_2")
]
GEMINI_API_KEYS = [k for k in GEMINI_API_KEYS if k] # Filtrar nulos
GEMINI_API_KEY = GEMINI_API_KEYS[0] if GEMINI_API_KEYS else None
if GEMINI_API_KEY and not os.getenv("GOOGLE_API_KEY"):
os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY
@@ -20,9 +26,8 @@ GH_TOKEN = os.getenv("GH_TOKEN")
# Gemini Configuration (May 2026)
GEMINI_API_VERSION = "v1beta"
GEMINI_MODELS = [
"gemini-3.1-flash-lite",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
"gemini-2.0-flash"
]

View File

@@ -4,7 +4,10 @@ import random
import json
import re
from typing import Dict, Any, List, Optional
from src.config import GEMINI_API_KEY, GEMINI_API_VERSION, GEMINI_MODELS
from src.config import GEMINI_API_KEYS, GEMINI_API_VERSION, GEMINI_MODELS
# Global para mantener el índice de la API Key actual
CURRENT_KEY_INDEX = 0
class GeminiDiagnostics:
def __init__(self):
@@ -29,64 +32,100 @@ class GeminiDiagnostics:
async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 5):
"""
Llama a la API de Gemini con rotación de modelos y diagnóstico detallado.
Llama a la API de Gemini con rotación de modelos Y rotación de API Keys.
"""
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY no configurada.")
global CURRENT_KEY_INDEX
if not GEMINI_API_KEYS:
raise ValueError("No hay GEMINI_API_KEYS configuradas.")
diagnostics = GeminiDiagnostics()
async with httpx.AsyncClient() as client:
for model in GEMINI_MODELS:
api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/models/{model}:generateContent?key={GEMINI_API_KEY}"
# Intentamos con las llaves disponibles si una falla por cuota
for _ in range(len(GEMINI_API_KEYS)):
api_key = GEMINI_API_KEYS[CURRENT_KEY_INDEX]
for attempt in range(max_retries):
try:
payload = {"contents": [{"parts": [{"text": prompt}]}]}
response = await client.post(api_url, json=payload, timeout=35)
if response.status_code == 200:
try:
resp_json = response.json()
if 'candidates' not in resp_json or not resp_json['candidates']:
diagnostics.add_attempt(model, 200, "Respuesta vacía (no candidates)", response.text)
break
text_resp = resp_json['candidates'][0]['content']['parts'][0]['text']
if response_format == "json":
match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL)
if match:
data = json.loads(match.group(0))
if isinstance(data, list):
return data[0] if len(data) > 0 else {}
return data
diagnostics.add_attempt(model, 200, "JSON no encontrado en texto", text_resp)
break
return text_resp
except Exception as e:
diagnostics.add_attempt(model, 200, f"Error parseo: {str(e)}", response.text)
break
elif response.status_code == 404:
diagnostics.add_attempt(model, 404, "Modelo no encontrado")
break # Probar siguiente modelo
elif response.status_code in [429, 503]:
reason = "Rate Limit" if response.status_code == 429 else "Service Unavailable"
diagnostics.add_attempt(model, response.status_code, reason)
# Backoff agresivo: 5, 10, 20, 40... segundos
wait = (5 * (2 ** attempt)) + random.random() * 5
await asyncio.sleep(wait)
continue
else:
diagnostics.add_attempt(model, response.status_code, "Error API", response.text)
break
for model in GEMINI_MODELS:
# Usamos el nombre completo del modelo como requiere la v1beta
full_model_name = f"models/{model}"
api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/{full_model_name}:generateContent?key={api_key}"
for attempt in range(max_retries):
try:
payload = {"contents": [{"parts": [{"text": prompt}]}]}
response = await client.post(api_url, json=payload, timeout=35)
except Exception as e:
diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}")
if attempt == max_retries - 1:
break
await asyncio.sleep(1)
if response.status_code == 200:
try:
resp_json = response.json()
if 'candidates' not in resp_json or not resp_json['candidates']:
diagnostics.add_attempt(model, 200, "Respuesta vacía (no candidates)", response.text)
break
text_resp = resp_json['candidates'][0]['content']['parts'][0]['text']
if response_format == "json":
match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL)
if match:
data = json.loads(match.group(0))
if isinstance(data, list):
return data[0] if len(data) > 0 else {}
return data
diagnostics.add_attempt(model, 200, "JSON no encontrado en texto", text_resp)
break
return text_resp
except Exception as e:
diagnostics.add_attempt(model, 200, f"Error parseo: {str(e)}", response.text)
break
elif response.status_code == 404:
diagnostics.add_attempt(model, 404, f"Modelo {full_model_name} no encontrado")
break # Probar siguiente modelo
elif response.status_code in [429, 503]:
# Si es un error de cuota (429), probamos a rotar la API Key inmediatamente
if response.status_code == 429:
reason = "Rate Limit / Quota Exceeded"
diagnostics.add_attempt(model, 429, reason)
# Loguear rotación en el log central
with open("/home/inafev/.gemini/tmp/awesome-kubernetes/curation_progress.log", "a") as log_f:
log_f.write(f" [!] Llave {CURRENT_KEY_INDEX + 1} agotada. Probando rotación...\n")
break # Rompe el bucle de reintentos para cambiar de llave o modelo
reason = "Service Unavailable"
diagnostics.add_attempt(model, response.status_code, reason)
wait = (5 * (2 ** attempt)) + random.random() * 5
await asyncio.sleep(wait)
continue
else:
diagnostics.add_attempt(model, response.status_code, "Error API", response.text)
break
except Exception as e:
diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}")
if attempt == max_retries - 1:
break
await asyncio.sleep(1)
# Si llegamos aquí por un 429, el bucle 'attempt' se rompió.
# Salimos también del bucle 'model' para rotar la llave.
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429:
break
# Si la última falla fue un 429, rotamos la llave y probamos el siguiente ciclo
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429:
CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS)
# Opcional: una pequeña espera antes de usar la nueva llave
await asyncio.sleep(2)
continue
else:
# Si no fue un 429 o éxito, salimos del bucle de llaves
if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 200:
# En caso de éxito (que devuelve directamente arriba), no llegamos aquí.
# Este break es para otros errores terminales.
pass
break
raise Exception(f"Fallo crítico Gemini.\n{diagnostics.get_report()}")
# Si logramos el éxito, la función ya habría retornado dentro del bucle de éxito (200)
# Si llegamos aquí, es porque todas las llaves y modelos fallaron.
raise Exception(f"Fallo crítico Gemini tras rotación.\n{diagnostics.get_report()}")

102
src/ingestion_backup.py Normal file
View File

@@ -0,0 +1,102 @@
import json
import re
from datetime import datetime
from src.config import MADRID_TZ
class BackupDataExtractor:
def __init__(self, file_path: str):
self.file_path = file_path
self.audit_trail = []
def log_audit(self, method: str, success: bool, msg: str):
icons = {True: "✅ ÉXITO", False: "❌ FALLO", None: "⚡ INTENTO"}
entry = f"**{method}** - {icons.get(success, ' INFO')}: {msg}"
self.audit_trail.append(entry)
print(entry)
def _extract_urls_from_text(self, text: str) -> list[str]:
# Regex para URLs comunes
urls = re.findall(r'https?://[^\s<>\"]+|www\.[^\s<>\"]+', text)
noise_domains = [
"x.com", "twitter.com", "abs.twimg", "pbs.twimg",
"t.co", "nitter.net"
]
valid_urls = []
for u in urls:
u_clean = u.rstrip('.,!?;:)(')
if all(d not in u_clean.lower() for d in noise_domains):
valid_urls.append(u_clean)
return list(set(valid_urls))
async def fetch_links(self) -> list[dict]:
self.log_audit("Backup Ingestion", None, f"Procesando: {self.file_path}")
results = []
try:
if self.file_path.endswith('.json'):
with open(self.file_path, 'r') as f:
data = json.load(f)
for item in data:
# Formato standard de exportación de X (o similar)
text = item.get('full_text', '') or item.get('text', '')
timestamp_raw = item.get('created_at', '')
# Intentar extraer de entities.urls si existe (más limpio)
extracted_urls = []
if 'entities' in item and 'urls' in item['entities']:
for u_obj in item['entities']['urls']:
expanded = u_obj.get('expanded_url')
if expanded: extracted_urls.append(expanded)
# Fallback a regex si no hay entities
if not extracted_urls:
extracted_urls = self._extract_urls_from_text(text)
# Filtrar ruido de nuevo por si acaso
noise_domains = ["x.com", "twitter.com", "t.co"]
for url in set(extracted_urls):
if any(d in url.lower() for d in noise_domains):
continue
results.append({
"url": url,
"context": text[:250],
"timestamp": timestamp_raw,
"source_type": "Backup JSON"
})
elif self.file_path.endswith('.md'):
with open(self.file_path, 'r') as f:
content = f.read()
# En MD, buscamos todos los links que no sean de X
# El usuario mencionó que hay links al post original si se cortó,
# pero nos interesan los links EXTERNOS curados.
urls = self._extract_urls_from_text(content)
for u in urls:
results.append({
"url": u,
"context": "Extraído de Backup Markdown",
"timestamp": datetime.now(MADRID_TZ).isoformat(),
"source_type": "Backup MD"
})
# Ordenar por fecha si es posible (JSON suele tenerla)
try:
# 'Tue Oct 01 19:56:51 +0000 2024'
def parse_date(x):
try:
return datetime.strptime(x["timestamp"], '%a %b %d %H:%M:%S +0000 %Y')
except:
return datetime.min
results.sort(key=parse_date)
except:
pass
self.log_audit("Backup Ingestion", True, f"Total enlaces extraídos: {len(results)}")
return results
except Exception as e:
self.log_audit("Backup Ingestion", False, f"Error: {str(e)}")
return []

32
src/logger.py Normal file
View File

@@ -0,0 +1,32 @@
import os
from datetime import datetime
# Ruta por defecto para el log local (fuera del repo)
DEFAULT_LOG_PATH = "/home/inafev/.gemini/tmp/awesome-kubernetes/curation_progress.log"
def log_event(message: str, section_break: bool = False):
"""
Registra un evento tanto en la consola (STDOUT) como en el archivo de log local si existe.
En GitHub Actions, esto aparecerá en los logs del workflow.
"""
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_msg = f"[{timestamp}] {message}"
if section_break:
separator = "=" * 60
print(f"\n{separator}\n{formatted_msg}\n{separator}")
_write_to_file(f"\n{separator}\n{formatted_msg}\n{separator}")
else:
print(formatted_msg)
_write_to_file(formatted_msg)
def _write_to_file(message: str):
# Solo intentamos escribir en archivo si no estamos en GitHub Actions
if not os.getenv("GITHUB_ACTIONS"):
try:
# Aseguramos que el directorio existe
os.makedirs(os.path.dirname(DEFAULT_LOG_PATH), exist_ok=True)
with open(DEFAULT_LOG_PATH, "a") as f:
f.write(message + "\n")
except:
pass

View File

@@ -10,11 +10,12 @@ from src.markdown_ast import MarkdownSanitizer
from src.agentic_curator import evaluate_extracted_assets, AgenticCurator
from src.autonomous_discovery import discover_trending_assets
from src.gitops_manager import RepositoryController
from src.logger import log_event
async def master_orchestrator():
git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
print("[*] INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)")
log_event("INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)", section_break=True)
# 1. Horizonte Temporal Dinámico / Histórico
is_historical = os.getenv("HISTORICAL_MODE", "false").lower() == "true"
@@ -35,24 +36,31 @@ async def master_orchestrator():
if since_date < final_stop_date:
since_date = final_stop_date
print(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}")
log_event(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}")
else:
# Modo Normal (30 días)
days_back = int(os.getenv("CURATION_DAYS_BACK", "30"))
since_date = datetime.now(MADRID_TZ) - timedelta(days=days_back)
until_date = None
print(f"[*] Modo Normal: Desde {since_date.date()}")
log_event(f"[*] Modo Normal: Desde {since_date.date()}")
# 2. Ingesta Multi-fuente
strategy = os.getenv("EXTRACTION_STRATEGY", "search")
twitter_client = SocialDataExtractor()
raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy)
x_audit_trail = twitter_client.audit_trail
backup_file = os.getenv("BACKUP_FILE")
if backup_file and os.path.exists(backup_file):
from src.ingestion_backup import BackupDataExtractor
extractor = BackupDataExtractor(backup_file)
raw_social = await extractor.fetch_links()
x_audit_trail = extractor.audit_trail
else:
strategy = os.getenv("EXTRACTION_STRATEGY", "search")
twitter_client = SocialDataExtractor()
raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy)
x_audit_trail = twitter_client.audit_trail
# GitHub Trending solo en modo normal (para no repetir)
trending = []
if not is_historical:
print("[*] Buscando novedades en GitHub Trending...")
if not is_historical and not backup_file:
log_event("[*] Buscando novedades en GitHub Trending...")
trending = await discover_trending_assets()
for t in trending:
t["source_type"] = "GitHub Trending"
@@ -69,14 +77,22 @@ async def master_orchestrator():
existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read()))
except: pass
full_extraction_report = []
unique_new_assets = []
# --- INICIO PROCESAMIENTO POR LOTES ---
BATCH_SIZE = 50
all_raw_assets_batches = [all_raw_assets[i:i + BATCH_SIZE] for i in range(0, len(all_raw_assets), BATCH_SIZE)]
if all_raw_assets:
print(f"[*] Evaluando {len(all_raw_assets)} candidatos con Gemini...")
evaluations = await evaluate_extracted_assets(all_raw_assets)
curator_agent = AgenticCurator()
total_processed = 0
for batch_index, batch_assets in enumerate(all_raw_assets_batches):
log_event(f">>> INICIANDO LOTE {batch_index + 1}/{len(all_raw_assets_batches)} ({len(batch_assets)} enlaces)", section_break=True)
for asset in all_raw_assets:
full_extraction_report = []
unique_new_assets = []
evaluations = await evaluate_extracted_assets(batch_assets)
for asset in batch_assets:
url = asset["url"]
clean_url = url.split('#')[0].rstrip('/')
@@ -88,102 +104,42 @@ async def master_orchestrator():
if clean_url in [u.split('#')[0].rstrip('/') for u in existing_urls]:
status = "DUPLICATE"
reason = "Ya existe en Nubenetes.com"
log_event(f" [=] DUPLICADO: El enlace ya está en el repositorio.")
if status == "INCLUDED":
unique_new_assets.append({
"url": url, "title": evaluation["title"],
"description": evaluation["description"], "category": category,
"impact_score": evaluation["impact_score"]
"impact_score": evaluation["impact_score"],
"reasoning": evaluation.get("reasoning")
})
full_extraction_report.append({
"url": url,
"status": status,
"reason": reason,
"category": category,
"source": asset.get("source_type", "Unknown"),
"post_date": asset.get("timestamp")
})
# 4. Inyección en Markdowns (Local)
file_updates = {}
stats = {"added_details": [], "categories_updated": set()}
curator_agent = AgenticCurator()
# Inyección inmediata de este lote
if unique_new_assets:
log_event(">>> APLICANDO INYECCIONES EN MARKDOWN...", section_break=True)
for asset in unique_new_assets:
category = asset["category"]
file_path = f"docs/{category}.md"
try:
content = file_updates.get(file_path)
if not content:
if os.path.exists(file_path):
for asset in unique_new_assets:
category = asset["category"]
file_path = f"docs/{category}.md"
try:
with open(file_path, 'r') as f: content = f.read()
else: continue
new_content = await curator_agent.decide_smart_injection(content, asset)
if len(new_content) > len(content):
file_updates[file_path] = new_content
stats["added_details"].append(asset)
stats["categories_updated"].add(category)
except: continue
new_content = await curator_agent.decide_smart_injection(content, asset)
if len(new_content) > len(content):
# Actualizar archivo físico inmediatamente
with open(file_path, 'w') as f: f.write(new_content)
except Exception as e:
log_event(f" [!] Error inyectando {asset['url']}: {e}")
# 5. Gestión de Métricas Acumulativas (Para reporte final)
metrics_file = "src/memory/historical_metrics.json"
accumulated_report = full_extraction_report
# Intentar cargar métricas previas (Local o desde la rama accumulator)
prev_metrics = None
if is_historical:
if os.path.exists(metrics_file):
try:
with open(metrics_file, 'r') as f: prev_metrics = json.load(f)
except: pass
else:
# Intentar desde la rama
acc_content = git_controller.get_file_from_branch(metrics_file, "bot/historical-accumulator")
if acc_content:
try: prev_metrics = json.loads(acc_content)
except: pass
if prev_metrics:
# Filtrar fallos críticos de Gemini del histórico para no "ensuciar" el reporte final
# y permitir que se re-intenten si el tramo se solapa.
clean_prev_report = [
item for item in prev_metrics.get("full_report", [])
if "Fallo crítico Gemini" not in item.get("reason", "")
]
accumulated_report.extend(clean_prev_report)
metrics = {
"total_extracted": len(accumulated_report),
"full_report": accumulated_report,
"x_audit": x_audit_trail,
"start_date": since_date.isoformat() if is_historical else (datetime.now(MADRID_TZ) - timedelta(days=30)).isoformat(),
"end_date": until_date.isoformat() if until_date else datetime.now(MADRID_TZ).isoformat()
}
if is_historical:
# En modo histórico, guardamos métricas para el siguiente tramo
file_updates[metrics_file] = json.dumps(metrics, indent=2)
total_processed += len(batch_assets)
log_event(f"Fin del Lote {batch_index + 1}. Total procesado: {total_processed}/{len(all_raw_assets)}", section_break=True)
# Si aún no hemos llegado al stop_date, señalamos que hay que seguir
has_more = since_date > datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ)
if has_more:
# Commit y trigger siguiente (esto se gestiona mejor en el YAML del workflow)
print(f"NEXT_CHUNK_START: {since_date.isoformat()}")
git_controller.apply_historical_chunk(file_updates, since_date.isoformat())
else:
# Último tramo: Crear el PR final
print("[+] Todos los tramos completados. Generando PR final...")
git_controller.apply_multi_file_changes(file_updates, metrics)
# Borrar el archivo de métricas temporal en el commit final
if os.path.exists(metrics_file):
os.remove(metrics_file)
else:
# Modo Normal: PR inmediato
if file_updates or full_extraction_report:
git_controller.apply_multi_file_changes(file_updates, metrics)
# Pausa entre lotes para dejar respirar a la API
if batch_index < len(all_raw_assets_batches) - 1:
log_event("[*] Esperando 30 segundos para el siguiente lote...")
await asyncio.sleep(30)
log_event("PROCESO FINALIZADO CON ÉXITO.", section_break=True)
if __name__ == "__main__":
asyncio.run(master_orchestrator())