From 42d7da9e8bd9c3139d0c2dfd34dd804271d531ba Mon Sep 17 00:00:00 2001 From: Nubenetes Bot Date: Thu, 14 May 2026 17:21:52 +0200 Subject: [PATCH] feat: implement granular real-time logging, batch processing, and multi-key API rotation --- .github/workflows/agentic_backup.yml | 53 +++++++++ src/agentic_curator.py | 51 ++++++--- src/config.py | 13 ++- src/gemini_utils.py | 147 +++++++++++++++---------- src/ingestion_backup.py | 102 ++++++++++++++++++ src/logger.py | 32 ++++++ src/main.py | 156 ++++++++++----------------- 7 files changed, 383 insertions(+), 171 deletions(-) create mode 100644 .github/workflows/agentic_backup.yml create mode 100644 src/ingestion_backup.py create mode 100644 src/logger.py diff --git a/.github/workflows/agentic_backup.yml b/.github/workflows/agentic_backup.yml new file mode 100644 index 00000000..e6624678 --- /dev/null +++ b/.github/workflows/agentic_backup.yml @@ -0,0 +1,53 @@ +name: Nubenetes Backup-based Curation + +on: + workflow_dispatch: + inputs: + backup_file: + description: 'Ruta al fichero de backup (JSON o MD) dentro del repositorio' + required: true + default: 'data/backup_posts.json' + historical_mode: + description: 'Activar Modo Histórico (Ignora fecha de 30 días)' + required: false + default: 'true' + type: boolean + +permissions: + contents: write + pull-requests: write + +jobs: + backup-curation-process: + runs-on: ubuntu-latest + steps: + - name: Sincronización del repositorio + uses: actions/checkout@v4 + + - name: Provisión del Entorno Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Instalación de dependencias + run: | + python -m pip install --upgrade pip + pip install --no-cache-dir pydantic PyGithub aiohttp beautifulsoup4 httpx fake-useragent pytz python-dotenv twikit>=2.1.2 playwright playwright-stealth + # Playwright es necesario porque el evaluador de Gemini o el curator podrían usarlo indirectamente, + # aunque para la extracción del backup no sea estrictamente necesario. + playwright install chromium --with-deps + + - name: Ejecución de la Canalización Agéntica (Modo Backup) + env: + GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BACKUP_FILE: ${{ github.event.inputs.backup_file }} + HISTORICAL_MODE: ${{ github.event.inputs.historical_mode }} + PYTHONPATH: . + run: | + if [ ! -f "$BACKUP_FILE" ]; then + echo "❌ ERROR: El archivo '$BACKUP_FILE' no se encuentra en el repositorio." + echo "Por favor, sube el archivo a esa ruta y vuelve a intentarlo." + exit 1 + fi + python src/main.py diff --git a/src/agentic_curator.py b/src/agentic_curator.py index 1af5b7d7..64988ceb 100644 --- a/src/agentic_curator.py +++ b/src/agentic_curator.py @@ -4,8 +4,9 @@ import json import asyncio import httpx import random +from datetime import datetime from typing import List, Dict, Set, Optional -from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES +from src.config import GEMINI_API_KEYS, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES from src.gitops_manager import RepositoryController from src.gemini_utils import call_gemini_with_retry @@ -28,11 +29,12 @@ async def _deep_fetch_content(url: str) -> str: except: return "" return "" +from src.logger import log_event + async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: evaluations = {} - # Revertir a v1 y asegurar que la clave existe - if not GEMINI_API_KEY: - print("[!] ERROR CRÍTICO: GEMINI_API_KEY no encontrada en el entorno.") + if not GEMINI_API_KEYS: + log_event("[!] ERROR CRÍTICO: GEMINI_API_KEYS no encontrada en el entorno.") return {a["url"]: {"status": "FILTERED", "reason": "Configuración: API KEY faltante"} for a in raw_assets} memory_file = "src/memory/health_learning.json" @@ -44,10 +46,16 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: domain_blacklist = set(memory_data.get("blacklisted_domains", [])) except: pass - for asset in raw_assets: + for i, asset in enumerate(raw_assets): + post_date = asset.get('timestamp', 'Fecha desconocida') + log_event(f"--- EVALUANDO {i+1}/{len(raw_assets)} ---") + log_event(f" - URL: {asset['url']}\n - Post Date: {post_date}") + domain = asset['url'].split("//")[-1].split("/")[0] if domain in domain_blacklist: - evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Dominio en lista negra de reputación"} + eval_res = {"status": "FILTERED", "reason": "Dominio en lista negra de reputación"} + evaluations[asset["url"]] = eval_res + log_event(f" [-] RECHAZADO: {eval_res['reason']}") continue web_content = await _deep_fetch_content(asset['url']) @@ -66,7 +74,7 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: "Evalúa (1-100):\n" "- >80: Recurso excepcional (🌟).\n" "- >1: Aceptar (si es técnico o útil).\n\n" - "Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"rejection_reason\": \"... (si aplica)\"}" + "Responde SOLAMENTE un JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Breve explicación de por qué esta categoría y score\", \"rejection_reason\": \"... (si aplica)\"}" ) try: @@ -75,18 +83,31 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]: valid_cats = [c for c in data.get("categories", []) if c in NUBENETES_CATEGORIES] if score < 1: - evaluations[asset["url"]] = {"status": "FILTERED", "reason": data.get("rejection_reason", "Bajo impacto técnico")} + reason = data.get("rejection_reason", "Bajo impacto técnico") + evaluations[asset["url"]] = {"status": "FILTERED", "reason": reason} + log_event(f" [-] RECHAZADO: {reason} (Score: {score})\n Motivo IA: {data.get('reasoning')}") elif not valid_cats: evaluations[asset["url"]] = {"status": "FILTERED", "reason": "No se encontró categoría técnica válida"} + log_event(f" [-] RECHAZADO: Sin categoría válida (Cats sugeridas: {data.get('categories')})\n Motivo IA: {data.get('reasoning')}") else: evaluations[asset["url"]] = { "status": "INCLUDED", "title": data["title"], "description": data["desc"], - "category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80 + "category": valid_cats[0], "impact_score": score, "is_exceptional": score > 80, + "reasoning": data.get("reasoning") } + log_event(f" [+] ACEPTADO: {data['title']} -> {valid_cats[0]} (Score: {score})\n Desc: {data['desc']}\n Motivo IA: {data.get('reasoning')}") + except Exception as e: - evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Error Gemini: {str(e)[:500]}"} + err_msg = str(e) + if "Rate Limit" in err_msg or "429" in err_msg: + log_event(f" [!] RATE LIMIT DETECTADO. Entrando en modo COOL DOWN (2 min)...") + await asyncio.sleep(120) + + err_log = f" [!] ERROR GEMINI: {err_msg[:200]}" + evaluations[asset["url"]] = {"status": "FILTERED", "reason": err_log} + log_event(err_log) - await asyncio.sleep(2.0) # Evitar saturar la API con un delay más conservador + await asyncio.sleep(5.0) if domain_blacklist: try: @@ -118,14 +139,18 @@ class AgenticCurator: "1. Encuentra el ## o ### más semántico.\n" "2. Decide formato: si es excelente, añade estrellas (🌟, 🌟🌟 o 🌟🌟🌟).\n" "3. Decide si usar negritas (==enlace== o **texto**).\n" - "Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\"}" + "Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\", \"reasoning\": \"Breve por qué de esta ubicación/formato\"}" ) try: data = await call_gemini_with_retry(prompt) header = data.get("header") new_line = data.get("formatted_line") + reasoning = data.get("reasoning", "Sin motivo especificado") + if header and new_line: + log_event(f" [>>>] UBICACIÓN: Header '{header}'\n Formato: {new_line}\n Motivo IA: {reasoning}") + new_lines = [] inserted = False for line in lines: @@ -135,7 +160,7 @@ class AgenticCurator: inserted = True if inserted: return "\n".join(new_lines) except Exception as e: - print(f"[!] Error en decide_smart_injection: {e}") + log_event(f"[!] Error en decide_smart_injection: {e}") pass return self._manual_fallback_injection(markdown_content, asset) diff --git a/src/config.py b/src/config.py index 1c387d72..0b9e5e8d 100644 --- a/src/config.py +++ b/src/config.py @@ -11,7 +11,13 @@ TWITTER_EMAIL = os.getenv("TWITTER_EMAIL") TWITTER_PASSWORD = os.getenv("TWITTER_PASSWORD") # Pydantic-AI y otras librerías pueden usar diferentes nombres para la misma clave -GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") +GEMINI_API_KEYS = [ + os.getenv("GEMINI_API_KEY_1"), + os.getenv("GEMINI_API_KEY_2") +] +GEMINI_API_KEYS = [k for k in GEMINI_API_KEYS if k] # Filtrar nulos + +GEMINI_API_KEY = GEMINI_API_KEYS[0] if GEMINI_API_KEYS else None if GEMINI_API_KEY and not os.getenv("GOOGLE_API_KEY"): os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY @@ -20,9 +26,8 @@ GH_TOKEN = os.getenv("GH_TOKEN") # Gemini Configuration (May 2026) GEMINI_API_VERSION = "v1beta" GEMINI_MODELS = [ - "gemini-3.1-flash-lite", - "gemini-2.5-flash", - "gemini-2.5-flash-lite", + "gemini-2.5-flash", + "gemini-2.5-flash-lite", "gemini-2.0-flash" ] diff --git a/src/gemini_utils.py b/src/gemini_utils.py index f925cec2..90e2c798 100644 --- a/src/gemini_utils.py +++ b/src/gemini_utils.py @@ -4,7 +4,10 @@ import random import json import re from typing import Dict, Any, List, Optional -from src.config import GEMINI_API_KEY, GEMINI_API_VERSION, GEMINI_MODELS +from src.config import GEMINI_API_KEYS, GEMINI_API_VERSION, GEMINI_MODELS + +# Global para mantener el índice de la API Key actual +CURRENT_KEY_INDEX = 0 class GeminiDiagnostics: def __init__(self): @@ -29,64 +32,100 @@ class GeminiDiagnostics: async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 5): """ - Llama a la API de Gemini con rotación de modelos y diagnóstico detallado. + Llama a la API de Gemini con rotación de modelos Y rotación de API Keys. """ - if not GEMINI_API_KEY: - raise ValueError("GEMINI_API_KEY no configurada.") + global CURRENT_KEY_INDEX + if not GEMINI_API_KEYS: + raise ValueError("No hay GEMINI_API_KEYS configuradas.") diagnostics = GeminiDiagnostics() async with httpx.AsyncClient() as client: - for model in GEMINI_MODELS: - api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/models/{model}:generateContent?key={GEMINI_API_KEY}" + # Intentamos con las llaves disponibles si una falla por cuota + for _ in range(len(GEMINI_API_KEYS)): + api_key = GEMINI_API_KEYS[CURRENT_KEY_INDEX] - for attempt in range(max_retries): - try: - payload = {"contents": [{"parts": [{"text": prompt}]}]} - response = await client.post(api_url, json=payload, timeout=35) - - if response.status_code == 200: - try: - resp_json = response.json() - if 'candidates' not in resp_json or not resp_json['candidates']: - diagnostics.add_attempt(model, 200, "Respuesta vacía (no candidates)", response.text) - break - - text_resp = resp_json['candidates'][0]['content']['parts'][0]['text'] - if response_format == "json": - match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL) - if match: - data = json.loads(match.group(0)) - if isinstance(data, list): - return data[0] if len(data) > 0 else {} - return data - diagnostics.add_attempt(model, 200, "JSON no encontrado en texto", text_resp) - break - return text_resp - except Exception as e: - diagnostics.add_attempt(model, 200, f"Error parseo: {str(e)}", response.text) - break - - elif response.status_code == 404: - diagnostics.add_attempt(model, 404, "Modelo no encontrado") - break # Probar siguiente modelo - - elif response.status_code in [429, 503]: - reason = "Rate Limit" if response.status_code == 429 else "Service Unavailable" - diagnostics.add_attempt(model, response.status_code, reason) - # Backoff agresivo: 5, 10, 20, 40... segundos - wait = (5 * (2 ** attempt)) + random.random() * 5 - await asyncio.sleep(wait) - continue - - else: - diagnostics.add_attempt(model, response.status_code, "Error API", response.text) - break + for model in GEMINI_MODELS: + # Usamos el nombre completo del modelo como requiere la v1beta + full_model_name = f"models/{model}" + api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/{full_model_name}:generateContent?key={api_key}" + + for attempt in range(max_retries): + try: + payload = {"contents": [{"parts": [{"text": prompt}]}]} + response = await client.post(api_url, json=payload, timeout=35) - except Exception as e: - diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}") - if attempt == max_retries - 1: - break - await asyncio.sleep(1) + if response.status_code == 200: + try: + resp_json = response.json() + if 'candidates' not in resp_json or not resp_json['candidates']: + diagnostics.add_attempt(model, 200, "Respuesta vacía (no candidates)", response.text) + break + + text_resp = resp_json['candidates'][0]['content']['parts'][0]['text'] + if response_format == "json": + match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL) + if match: + data = json.loads(match.group(0)) + if isinstance(data, list): + return data[0] if len(data) > 0 else {} + return data + diagnostics.add_attempt(model, 200, "JSON no encontrado en texto", text_resp) + break + return text_resp + except Exception as e: + diagnostics.add_attempt(model, 200, f"Error parseo: {str(e)}", response.text) + break + + elif response.status_code == 404: + diagnostics.add_attempt(model, 404, f"Modelo {full_model_name} no encontrado") + break # Probar siguiente modelo + + elif response.status_code in [429, 503]: + # Si es un error de cuota (429), probamos a rotar la API Key inmediatamente + if response.status_code == 429: + reason = "Rate Limit / Quota Exceeded" + diagnostics.add_attempt(model, 429, reason) + # Loguear rotación en el log central + with open("/home/inafev/.gemini/tmp/awesome-kubernetes/curation_progress.log", "a") as log_f: + log_f.write(f" [!] Llave {CURRENT_KEY_INDEX + 1} agotada. Probando rotación...\n") + break # Rompe el bucle de reintentos para cambiar de llave o modelo + + reason = "Service Unavailable" + diagnostics.add_attempt(model, response.status_code, reason) + wait = (5 * (2 ** attempt)) + random.random() * 5 + await asyncio.sleep(wait) + continue + + else: + diagnostics.add_attempt(model, response.status_code, "Error API", response.text) + break + + except Exception as e: + diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}") + if attempt == max_retries - 1: + break + await asyncio.sleep(1) + + # Si llegamos aquí por un 429, el bucle 'attempt' se rompió. + # Salimos también del bucle 'model' para rotar la llave. + if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429: + break + + # Si la última falla fue un 429, rotamos la llave y probamos el siguiente ciclo + if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 429: + CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS) + # Opcional: una pequeña espera antes de usar la nueva llave + await asyncio.sleep(2) + continue + else: + # Si no fue un 429 o éxito, salimos del bucle de llaves + if diagnostics.attempts and diagnostics.attempts[-1]['status'] == 200: + # En caso de éxito (que devuelve directamente arriba), no llegamos aquí. + # Este break es para otros errores terminales. + pass + break - raise Exception(f"Fallo crítico Gemini.\n{diagnostics.get_report()}") + # Si logramos el éxito, la función ya habría retornado dentro del bucle de éxito (200) + # Si llegamos aquí, es porque todas las llaves y modelos fallaron. + raise Exception(f"Fallo crítico Gemini tras rotación.\n{diagnostics.get_report()}") diff --git a/src/ingestion_backup.py b/src/ingestion_backup.py new file mode 100644 index 00000000..f010fd5b --- /dev/null +++ b/src/ingestion_backup.py @@ -0,0 +1,102 @@ +import json +import re +from datetime import datetime +from src.config import MADRID_TZ + +class BackupDataExtractor: + def __init__(self, file_path: str): + self.file_path = file_path + self.audit_trail = [] + + def log_audit(self, method: str, success: bool, msg: str): + icons = {True: "✅ ÉXITO", False: "❌ FALLO", None: "⚡ INTENTO"} + entry = f"**{method}** - {icons.get(success, 'ℹ️ INFO')}: {msg}" + self.audit_trail.append(entry) + print(entry) + + def _extract_urls_from_text(self, text: str) -> list[str]: + # Regex para URLs comunes + urls = re.findall(r'https?://[^\s<>\"]+|www\.[^\s<>\"]+', text) + noise_domains = [ + "x.com", "twitter.com", "abs.twimg", "pbs.twimg", + "t.co", "nitter.net" + ] + valid_urls = [] + for u in urls: + u_clean = u.rstrip('.,!?;:)(') + if all(d not in u_clean.lower() for d in noise_domains): + valid_urls.append(u_clean) + return list(set(valid_urls)) + + async def fetch_links(self) -> list[dict]: + self.log_audit("Backup Ingestion", None, f"Procesando: {self.file_path}") + results = [] + + try: + if self.file_path.endswith('.json'): + with open(self.file_path, 'r') as f: + data = json.load(f) + + for item in data: + # Formato standard de exportación de X (o similar) + text = item.get('full_text', '') or item.get('text', '') + timestamp_raw = item.get('created_at', '') + + # Intentar extraer de entities.urls si existe (más limpio) + extracted_urls = [] + if 'entities' in item and 'urls' in item['entities']: + for u_obj in item['entities']['urls']: + expanded = u_obj.get('expanded_url') + if expanded: extracted_urls.append(expanded) + + # Fallback a regex si no hay entities + if not extracted_urls: + extracted_urls = self._extract_urls_from_text(text) + + # Filtrar ruido de nuevo por si acaso + noise_domains = ["x.com", "twitter.com", "t.co"] + for url in set(extracted_urls): + if any(d in url.lower() for d in noise_domains): + continue + + results.append({ + "url": url, + "context": text[:250], + "timestamp": timestamp_raw, + "source_type": "Backup JSON" + }) + + elif self.file_path.endswith('.md'): + with open(self.file_path, 'r') as f: + content = f.read() + + # En MD, buscamos todos los links que no sean de X + # El usuario mencionó que hay links al post original si se cortó, + # pero nos interesan los links EXTERNOS curados. + urls = self._extract_urls_from_text(content) + for u in urls: + results.append({ + "url": u, + "context": "Extraído de Backup Markdown", + "timestamp": datetime.now(MADRID_TZ).isoformat(), + "source_type": "Backup MD" + }) + + # Ordenar por fecha si es posible (JSON suele tenerla) + try: + # 'Tue Oct 01 19:56:51 +0000 2024' + def parse_date(x): + try: + return datetime.strptime(x["timestamp"], '%a %b %d %H:%M:%S +0000 %Y') + except: + return datetime.min + results.sort(key=parse_date) + except: + pass + + self.log_audit("Backup Ingestion", True, f"Total enlaces extraídos: {len(results)}") + return results + + except Exception as e: + self.log_audit("Backup Ingestion", False, f"Error: {str(e)}") + return [] diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 00000000..08eef44b --- /dev/null +++ b/src/logger.py @@ -0,0 +1,32 @@ +import os +from datetime import datetime + +# Ruta por defecto para el log local (fuera del repo) +DEFAULT_LOG_PATH = "/home/inafev/.gemini/tmp/awesome-kubernetes/curation_progress.log" + +def log_event(message: str, section_break: bool = False): + """ + Registra un evento tanto en la consola (STDOUT) como en el archivo de log local si existe. + En GitHub Actions, esto aparecerá en los logs del workflow. + """ + timestamp = datetime.now().strftime('%H:%M:%S') + formatted_msg = f"[{timestamp}] {message}" + + if section_break: + separator = "=" * 60 + print(f"\n{separator}\n{formatted_msg}\n{separator}") + _write_to_file(f"\n{separator}\n{formatted_msg}\n{separator}") + else: + print(formatted_msg) + _write_to_file(formatted_msg) + +def _write_to_file(message: str): + # Solo intentamos escribir en archivo si no estamos en GitHub Actions + if not os.getenv("GITHUB_ACTIONS"): + try: + # Aseguramos que el directorio existe + os.makedirs(os.path.dirname(DEFAULT_LOG_PATH), exist_ok=True) + with open(DEFAULT_LOG_PATH, "a") as f: + f.write(message + "\n") + except: + pass diff --git a/src/main.py b/src/main.py index f74f9afa..aeb17f49 100644 --- a/src/main.py +++ b/src/main.py @@ -10,11 +10,12 @@ from src.markdown_ast import MarkdownSanitizer from src.agentic_curator import evaluate_extracted_assets, AgenticCurator from src.autonomous_discovery import discover_trending_assets from src.gitops_manager import RepositoryController +from src.logger import log_event async def master_orchestrator(): git_controller = RepositoryController(GH_TOKEN, TARGET_REPO) - print("[*] INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)") + log_event("INICIANDO CURADURÍA AGÉNTICA (CRONOLOGÍA Y TRANSPARENCIA)", section_break=True) # 1. Horizonte Temporal Dinámico / Histórico is_historical = os.getenv("HISTORICAL_MODE", "false").lower() == "true" @@ -35,24 +36,31 @@ async def master_orchestrator(): if since_date < final_stop_date: since_date = final_stop_date - print(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}") + log_event(f"[*] MODO HISTÓRICO: Tramo {since_date.date()} -> {until_date.date()}") else: # Modo Normal (30 días) days_back = int(os.getenv("CURATION_DAYS_BACK", "30")) since_date = datetime.now(MADRID_TZ) - timedelta(days=days_back) until_date = None - print(f"[*] Modo Normal: Desde {since_date.date()}") + log_event(f"[*] Modo Normal: Desde {since_date.date()}") # 2. Ingesta Multi-fuente - strategy = os.getenv("EXTRACTION_STRATEGY", "search") - twitter_client = SocialDataExtractor() - raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy) - x_audit_trail = twitter_client.audit_trail + backup_file = os.getenv("BACKUP_FILE") + if backup_file and os.path.exists(backup_file): + from src.ingestion_backup import BackupDataExtractor + extractor = BackupDataExtractor(backup_file) + raw_social = await extractor.fetch_links() + x_audit_trail = extractor.audit_trail + else: + strategy = os.getenv("EXTRACTION_STRATEGY", "search") + twitter_client = SocialDataExtractor() + raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy) + x_audit_trail = twitter_client.audit_trail # GitHub Trending solo en modo normal (para no repetir) trending = [] - if not is_historical: - print("[*] Buscando novedades en GitHub Trending...") + if not is_historical and not backup_file: + log_event("[*] Buscando novedades en GitHub Trending...") trending = await discover_trending_assets() for t in trending: t["source_type"] = "GitHub Trending" @@ -69,14 +77,22 @@ async def master_orchestrator(): existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read())) except: pass - full_extraction_report = [] - unique_new_assets = [] + # --- INICIO PROCESAMIENTO POR LOTES --- + BATCH_SIZE = 50 + all_raw_assets_batches = [all_raw_assets[i:i + BATCH_SIZE] for i in range(0, len(all_raw_assets), BATCH_SIZE)] - if all_raw_assets: - print(f"[*] Evaluando {len(all_raw_assets)} candidatos con Gemini...") - evaluations = await evaluate_extracted_assets(all_raw_assets) + curator_agent = AgenticCurator() + total_processed = 0 + + for batch_index, batch_assets in enumerate(all_raw_assets_batches): + log_event(f">>> INICIANDO LOTE {batch_index + 1}/{len(all_raw_assets_batches)} ({len(batch_assets)} enlaces)", section_break=True) - for asset in all_raw_assets: + full_extraction_report = [] + unique_new_assets = [] + + evaluations = await evaluate_extracted_assets(batch_assets) + + for asset in batch_assets: url = asset["url"] clean_url = url.split('#')[0].rstrip('/') @@ -88,102 +104,42 @@ async def master_orchestrator(): if clean_url in [u.split('#')[0].rstrip('/') for u in existing_urls]: status = "DUPLICATE" reason = "Ya existe en Nubenetes.com" + log_event(f" [=] DUPLICADO: El enlace ya está en el repositorio.") if status == "INCLUDED": unique_new_assets.append({ "url": url, "title": evaluation["title"], "description": evaluation["description"], "category": category, - "impact_score": evaluation["impact_score"] + "impact_score": evaluation["impact_score"], + "reasoning": evaluation.get("reasoning") }) - - full_extraction_report.append({ - "url": url, - "status": status, - "reason": reason, - "category": category, - "source": asset.get("source_type", "Unknown"), - "post_date": asset.get("timestamp") - }) - # 4. Inyección en Markdowns (Local) - file_updates = {} - stats = {"added_details": [], "categories_updated": set()} - curator_agent = AgenticCurator() + # Inyección inmediata de este lote + if unique_new_assets: + log_event(">>> APLICANDO INYECCIONES EN MARKDOWN...", section_break=True) - for asset in unique_new_assets: - category = asset["category"] - file_path = f"docs/{category}.md" - try: - content = file_updates.get(file_path) - if not content: - if os.path.exists(file_path): + for asset in unique_new_assets: + category = asset["category"] + file_path = f"docs/{category}.md" + try: with open(file_path, 'r') as f: content = f.read() - else: continue - - new_content = await curator_agent.decide_smart_injection(content, asset) - if len(new_content) > len(content): - file_updates[file_path] = new_content - stats["added_details"].append(asset) - stats["categories_updated"].add(category) - except: continue + + new_content = await curator_agent.decide_smart_injection(content, asset) + if len(new_content) > len(content): + # Actualizar archivo físico inmediatamente + with open(file_path, 'w') as f: f.write(new_content) + except Exception as e: + log_event(f" [!] Error inyectando {asset['url']}: {e}") - # 5. Gestión de Métricas Acumulativas (Para reporte final) - metrics_file = "src/memory/historical_metrics.json" - accumulated_report = full_extraction_report - - # Intentar cargar métricas previas (Local o desde la rama accumulator) - prev_metrics = None - if is_historical: - if os.path.exists(metrics_file): - try: - with open(metrics_file, 'r') as f: prev_metrics = json.load(f) - except: pass - else: - # Intentar desde la rama - acc_content = git_controller.get_file_from_branch(metrics_file, "bot/historical-accumulator") - if acc_content: - try: prev_metrics = json.loads(acc_content) - except: pass - - if prev_metrics: - # Filtrar fallos críticos de Gemini del histórico para no "ensuciar" el reporte final - # y permitir que se re-intenten si el tramo se solapa. - clean_prev_report = [ - item for item in prev_metrics.get("full_report", []) - if "Fallo crítico Gemini" not in item.get("reason", "") - ] - accumulated_report.extend(clean_prev_report) - - metrics = { - "total_extracted": len(accumulated_report), - "full_report": accumulated_report, - "x_audit": x_audit_trail, - "start_date": since_date.isoformat() if is_historical else (datetime.now(MADRID_TZ) - timedelta(days=30)).isoformat(), - "end_date": until_date.isoformat() if until_date else datetime.now(MADRID_TZ).isoformat() - } - - if is_historical: - # En modo histórico, guardamos métricas para el siguiente tramo - file_updates[metrics_file] = json.dumps(metrics, indent=2) + total_processed += len(batch_assets) + log_event(f"Fin del Lote {batch_index + 1}. Total procesado: {total_processed}/{len(all_raw_assets)}", section_break=True) - # Si aún no hemos llegado al stop_date, señalamos que hay que seguir - has_more = since_date > datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ) - - if has_more: - # Commit y trigger siguiente (esto se gestiona mejor en el YAML del workflow) - print(f"NEXT_CHUNK_START: {since_date.isoformat()}") - git_controller.apply_historical_chunk(file_updates, since_date.isoformat()) - else: - # Último tramo: Crear el PR final - print("[+] Todos los tramos completados. Generando PR final...") - git_controller.apply_multi_file_changes(file_updates, metrics) - # Borrar el archivo de métricas temporal en el commit final - if os.path.exists(metrics_file): - os.remove(metrics_file) - else: - # Modo Normal: PR inmediato - if file_updates or full_extraction_report: - git_controller.apply_multi_file_changes(file_updates, metrics) + # Pausa entre lotes para dejar respirar a la API + if batch_index < len(all_raw_assets_batches) - 1: + log_event("[*] Esperando 30 segundos para el siguiente lote...") + await asyncio.sleep(30) + + log_event("PROCESO FINALIZADO CON ÉXITO.", section_break=True) if __name__ == "__main__": asyncio.run(master_orchestrator())