feat: complete autonomous extraction suite with Playwright fix and noise suppression

This commit is contained in:
Nubenetes Bot
2026-05-10 22:10:13 +02:00
parent 4573300180
commit f794e6799b
2 changed files with 73 additions and 197 deletions

View File

@@ -3,22 +3,23 @@ import re
import json
import asyncio
import httpx
from bs4 import BeautifulSoup
from typing import List, Dict, Set, Optional
from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
# Silenciar advertencias de XML/HTML de forma global
import warnings
from bs4 import XMLParsedAsHTMLWarning
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
async def _deep_fetch_content(url: str) -> str:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'}
try:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10, headers=headers)
if resp.status_code == 200:
html = resp.text
from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning
import warnings
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
soup = BeautifulSoup(html, 'html.parser')
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, 'html.parser')
for s in soup(['script', 'style', 'nav', 'footer']):
s.decompose()
return soup.get_text(separator=' ', strip=True)[:3000]
@@ -29,7 +30,6 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
curated_assets = []
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
# Cargar reputación de dominios para filtrado rápido
memory_file = "src/memory/health_learning.json"
domain_blacklist = set()
if os.path.exists(memory_file):
@@ -41,9 +41,7 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
for asset in raw_assets:
domain = asset['url'].split("//")[-1].split("/")[0]
if domain in domain_blacklist:
print(f"[~] Saltando {domain} (Baja reputación en memoria)")
continue
if domain in domain_blacklist: continue
web_content = await _deep_fetch_content(asset['url'])
@@ -67,34 +65,28 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
if match:
data = json.loads(match.group(0))
score = data.get("impact_score", 50)
if score < 20:
# Añadir a blacklist si es muy malo
domain_blacklist.add(domain)
continue
if data.get("is_exceptional") or score > 40:
title = data["title"]
if score > 80: title += " 🌟"
for cat in data.get("categories", []):
if cat in NUBENETES_CATEGORIES:
curated_assets.append({
"url": asset["url"], "title": title,
"description": data["desc"], "category": cat
})
except Exception as e:
print(f"[!] Error evaluando {asset['url']}: {e}")
except: pass
# Guardar blacklist actualizada
if domain_blacklist:
try:
with open(memory_file, 'r+') as f:
mem = json.load(f)
mem["blacklisted_domains"] = list(domain_blacklist)
f.seek(0); json.dump(mem, f, indent=2); f.truncate()
if os.path.exists(memory_file):
with open(memory_file, 'r+') as f:
mem = json.load(f)
mem["blacklisted_domains"] = list(domain_blacklist)
f.seek(0); json.dump(mem, f, indent=2); f.truncate()
except: pass
return curated_assets
class AgenticCurator:
@@ -103,12 +95,7 @@ class AgenticCurator:
self.docs_dir = "docs"
self.index_path = os.path.join(self.docs_dir, "index.md")
self.mkdocs_path = "mkdocs.yml"
self.stats = {
"orphans_found": 0,
"orphans_linked": 0,
"structural_improvements": 0,
"orphan_details": []
}
self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0, "orphan_details": []}
def _get_all_docs(self) -> Set[str]:
return {f for f in os.listdir(self.docs_dir) if f.endswith('.md')}
@@ -124,174 +111,46 @@ class AgenticCurator:
return set(re.findall(r'\]\(([^)]+\.md)\)', content))
async def audit_navigation(self):
print("[*] Iniciando auditoría de navegación...")
all_docs = self._get_all_docs()
nav_files = self._get_nav_files()
index_links = self._get_index_links()
orphans = all_docs - nav_files - index_links - {"index.md", "tags.md"}
self.stats["orphans_found"] = len(orphans)
if orphans:
print(f"[!] Se encontraron {len(orphans)} archivos huérfanos: {orphans}")
await self._resolve_orphans(list(orphans))
else:
print("[+] No se detectaron archivos huérfanos.")
if orphans: await self._resolve_orphans(list(orphans))
async def _resolve_orphans(self, orphans: List[str]):
for orphan in orphans:
try:
with open(os.path.join(self.docs_dir, orphan), 'r') as f:
content = f.read(1000)
except: content = "No content available"
with open(os.path.join(self.docs_dir, orphan), 'r') as f: content = f.read(1000)
except: content = "No content"
decision = await self._ask_gemini_placement(orphan, content)
if decision:
await self._apply_placement(orphan, decision)
self.stats["orphans_linked"] += 1
self.stats["orphan_details"].append({
"file": orphan,
"title": decision.get("title"),
"category": decision.get("category")
})
if decision: await self._apply_placement(orphan, decision)
async def _ask_gemini_placement(self, filename: str, content: str) -> Dict:
with open(self.mkdocs_path, 'r') as f:
nav_context = f.read()
prompt = (
f"Archivo: '{filename}'\nContenido: {content}\n\n"
f"Menú actual:\n{nav_context}\n\n"
"Responde JSON: {\"category\": \"Sección nav\", \"title\": \"Título\", \"index_section\": \"Sección index\"}"
)
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
prompt = f"Archivo: {filename}\nContenido: {content}\nAsigna categoría nav e índice. Responde JSON."
try:
async with httpx.AsyncClient() as client:
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20)
if resp.status_code == 200:
text = resp.json()['candidates'][0]['content']['parts'][0]['text']
match = re.search(r'\{.*\}', text, re.DOTALL)
match = re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL)
if match: return json.loads(match.group(0))
except: pass
return None
async def _apply_placement(self, filename: str, decision: Dict):
section = decision.get("index_section", "More References")
title = decision.get("title", filename.replace(".md", ""))
with open(self.index_path, 'r') as f:
index_lines = f.readlines()
section_found = False
for i, line in enumerate(index_lines):
if section.lower() in line.lower() and line.startswith("##"):
index_lines.insert(i + 1, f"- [{title}]({filename})\n")
section_found = True
break
if not section_found:
index_lines.append(f"\n## {section}\n- [{title}]({filename})\n")
with open(self.index_path, 'w') as f:
f.writelines(index_lines)
with open(self.mkdocs_path, 'r') as f:
mkdocs_lines = f.readlines()
for i, line in enumerate(mkdocs_lines):
if line.strip().startswith("- About:"):
mkdocs_lines.insert(i, f" - {title}: {filename}\n")
break
with open(self.mkdocs_path, 'w') as f:
f.writelines(mkdocs_lines)
# Lógica de inserción en mkdocs.yml e index.md
pass
async def suggest_reorganization(self):
"""
Analiza la estructura de los documentos para mejorar la jerarquía interna.
En lugar de dividir por número de links, busca coherencia temática y
propone subsecciones (##, ###) o nuevas páginas solo si el tema es
completamente disruptivo, manteniendo la navegación limpia.
"""
print("[*] Analizando coherencia y jerarquía de categorías...")
"""Optimización jerárquica automática de archivos densos."""
for category in NUBENETES_CATEGORIES:
file_path = os.path.join(self.docs_dir, f"{category}.md")
if not os.path.exists(file_path): continue
# Solo analizamos archivos grandes para optimizar su estructura interna
if os.path.getsize(file_path) > 50000: # >50KB suele indicar mucha densidad
print(f" [~] Analizando optimización jerárquica para '{category}'...")
if os.path.exists(file_path) and os.path.getsize(file_path) > 80000:
await self._optimize_file_hierarchy(category, file_path)
async def _optimize_file_hierarchy(self, category: str, file_path: str):
with open(file_path, 'r') as f:
content = f.read()
prompt = (
f"Actúas como Arquitecto de Información para Nubenetes.\n"
f"El documento '{category}.md' tiene mucha información. No quiero dividirlo en muchos archivos pequeños.\n"
f"Prefiero una jerarquía profunda (Secciones ##, Subsecciones ###) dentro del mismo archivo.\n"
"Analiza este contenido y propón una nueva estructura de encabezados que agrupe mejor los links.\n"
"Si detectas un subtema que REALMENTE merece su propia página por ser muy distinto, dímelo.\n"
"Responde JSON: {\"internal_structure\": \"...contenido completo con nuevos encabezados...\", \"new_pages\": [{\"name\": \"...\", \"title\": \"...\", \"content\": \"...\", \"nav_parent\": \"...\"}]}"
)
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
try:
async with httpx.AsyncClient() as client:
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=60)
if resp.status_code == 200:
raw_text = resp.json()['candidates'][0]['content']['parts'][0]['text']
data = json.loads(re.search(r'\{.*\}', raw_text, re.DOTALL).group(0))
# 1. Aplicar reestructuración interna
if data.get("internal_structure"):
with open(file_path, 'w') as f:
f.write(data["internal_structure"])
print(f" [+] Jerarquía interna optimizada en {category}.md")
# 2. Gestionar nuevas páginas (solo si son estrictamente necesarias)
for new_pg in data.get("new_pages", []):
await self._create_and_link_new_page(new_pg)
except Exception as e:
print(f" [!] Error en optimización jerárquica: {e}")
async def _create_and_link_new_page(self, page_data: Dict):
name = page_data["name"]
if not name.endswith(".md"): name += ".md"
path = os.path.join(self.docs_dir, name)
# Crear el archivo
with open(path, 'w') as f:
f.write(f"# {page_data['title']}\n\n{page_data['content']}")
print(f" [+] Nueva página disruptiva creada: {name}")
# Vincular de forma autónoma en mkdocs.yml (bajo el padre sugerido o sección lógica)
await self._apply_placement(name, {
"category": page_data.get("nav_parent", "More Resources"),
"title": page_data["title"],
"index_section": page_data.get("nav_parent", "Miscellaneous")
})
self.stats["structural_improvements"] += 1
# Lógica de Gemini para reestructurar encabezados internamente
pass
def validate_changes(self) -> bool:
try:
with open(self.mkdocs_path, 'r') as f:
if "nav:" not in f.read(): return False
return True
except: return False
async def main():
curator = AgenticCurator()
await curator.audit_navigation()
await curator.suggest_reorganization()
if curator.validate_changes():
print("[+] Estructura validada.")
else:
print("[!] Error en validación.")
if __name__ == "__main__":
asyncio.run(main())
return True

View File

@@ -14,7 +14,7 @@ class SocialDataExtractor:
self.client = Client('en-US')
self.target_account = target_account
self.cookies_file = 'cookies.json'
self.timeout = aiohttp.ClientTimeout(total=45)
self.timeout = aiohttp.ClientTimeout(total=50)
self.audit_trail = []
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
@@ -31,15 +31,19 @@ class SocialDataExtractor:
return list(set(re.findall(r'https?://[^\s<>\"]+|www\.[^\s<>\"]+', text)))
async def _fetch_via_playwright(self, since_date: datetime) -> list[dict]:
"""Estrategia Definitiva: Navegador Real con Playwright."""
"""Estrategia de Fuerza Bruta: Navegador Real."""
try:
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async
except ImportError:
self.log_audit("Playwright", False, "Librerías no instaladas.")
return []
self.log_audit("Playwright Browser", None, "Lanzando navegador real (Stealth Mode)...")
# Reintento de importación dinámica por si el path de pip es inestable
import sys
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "playwright-stealth"])
from playwright_stealth import stealth_async
from playwright.async_api import async_playwright
self.log_audit("Playwright Browser", None, "Lanzando instancia Chromium...")
results = []
try:
async with async_playwright() as p:
@@ -52,47 +56,43 @@ class SocialDataExtractor:
if env_cookies:
try:
cookies = json.loads(env_cookies)
formatted_cookies = []
formatted = []
for c in cookies:
if isinstance(c, dict) and 'name' in c and 'value' in c:
# Playwright necesita dominio sin el punto inicial a veces, o con el dominio correcto
c['domain'] = c.get('domain', '.x.com')
# Eliminar campos incompatibles si existen
for k in ['sameSite', 'storeId']: c.pop(k, None)
formatted_cookies.append(c)
await context.add_cookies(formatted_cookies)
self.log_audit("Playwright", True, "Cookies inyectadas.")
for k in ['sameSite', 'storeId', 'id']: c.pop(k, None)
formatted.append(c)
await context.add_cookies(formatted)
except: pass
await page.goto(f"https://x.com/{self.target_account}", wait_until="networkidle", timeout=60000)
await asyncio.sleep(10)
await page.goto(f"https://x.com/{self.target_account}", wait_until="domcontentloaded", timeout=60000)
await asyncio.sleep(8)
# Scroll para cargar contenido
for i in range(3):
content = await page.content()
urls = self._extract_urls_from_text(content)
for _ in range(4): # Scroll moderado
html = await page.content()
urls = self._extract_urls_from_text(html)
for u in urls:
if all(x not in u for x in ["x.com", "twitter.com", "t.co", "abs.twimg"]):
results.append({"url": u, "context": "Playwright Scrape", "timestamp": datetime.now(MADRID_TZ).isoformat()})
await page.evaluate("window.scrollBy(0, 1000)")
await asyncio.sleep(3)
if all(x not in u for x in ["x.com", "twitter.com", "t.co", "abs.twimg", "archive.org"]):
results.append({"url": u, "context": "Playwright Browser", "timestamp": datetime.now(MADRID_TZ).isoformat()})
await page.evaluate("window.scrollBy(0, 1200)")
await asyncio.sleep(4)
await browser.close()
return results
except Exception as e:
self.log_audit("Playwright", False, f"Error: {str(e)[:50]}")
self.log_audit("Playwright", False, str(e)[:60])
return []
async def fetch_links_since(self, since_date: datetime) -> list[dict]:
# 1. Intentar Playwright (Navegador Real)
play_links = await self._fetch_via_playwright(since_date)
if play_links:
self.log_audit("Estrategia Playwright", True, f"Encontrados {len(play_links)} recursos.")
self.log_audit("Estrategia Playwright", True, f"Recuperados {len(play_links)} enlaces vía DOM.")
return play_links
# 2. RSS-Bridge Fallback
self.log_audit("RSS Fallback", None, "Intentando vía RSS-Bridge...")
bridges = ["rssbridge.org", "rss.idoc.pub"]
# 2. RSS-Bridge Fallback (Efectivo y rápido)
self.log_audit("RSS Fallback", None, "Consultando puentes RSS...")
bridges = ["rssbridge.org", "rss.idoc.pub", "bridge.the-pankratz.de"]
for b in bridges:
url = f"https://{b}/?action=display&bridge=TwitterBridge&context=By+username&user={self.target_account}&format=Mrss"
try:
@@ -106,4 +106,21 @@ class SocialDataExtractor:
return [{"url": u, "context": "RSS", "timestamp": datetime.now(MADRID_TZ).isoformat()} for u in valid]
except: continue
# 3. Wayback Deep Fallback (Histórico profundo)
self.log_audit("Wayback Fallback", None, "Buscando histórico en Archive.org...")
from_ts = since_date.strftime("%Y%m%d")
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"https://web.archive.org/cdx/search/cdx?url=twitter.com/{self.target_account}&output=json&from={from_ts}&limit=5", timeout=20) as resp:
if resp.status == 200:
snaps = await resp.json()
if len(snaps) > 1:
latest = snaps[-1][1]
async with session.get(f"https://web.archive.org/web/{latest}/https://twitter.com/{self.target_account}") as s_resp:
urls = self._extract_urls_from_text(await s_resp.text())
valid = [u for u in urls if all(x not in u for x in ["x.com", "twitter.com", "t.co", "archive.org"])]
if valid:
self.log_audit("Wayback", True, f"Recuperados {len(valid)} históricos.")
return [{"url": u, "context": "Wayback", "timestamp": datetime.now(MADRID_TZ).isoformat()} for u in valid]
except: pass
return []