From df4d52137625bea76c3fdea8a539ef09f85b904b Mon Sep 17 00:00:00 2001 From: Nubenetes Bot Date: Sun, 10 May 2026 22:43:48 +0200 Subject: [PATCH] feat: implement Agentic Smart Injection for semantic link placement and IA-decided formatting --- src/agentic_curator.py | 113 +++++++++++++++++++---------------------- src/main.py | 13 +++-- 2 files changed, 58 insertions(+), 68 deletions(-) diff --git a/src/agentic_curator.py b/src/agentic_curator.py index 5b49ce16..633228b4 100644 --- a/src/agentic_curator.py +++ b/src/agentic_curator.py @@ -3,26 +3,27 @@ import re import json import asyncio import httpx +import random from typing import List, Dict, Set, Optional from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES from src.gitops_manager import RepositoryController -# Silenciar advertencias de XML/HTML de forma global +# Silenciar advertencias de XML/HTML import warnings from bs4 import XMLParsedAsHTMLWarning warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) async def _deep_fetch_content(url: str) -> str: - headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'} + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'} try: async with httpx.AsyncClient() as client: - resp = await client.get(url, timeout=10, headers=headers) + resp = await client.get(url, timeout=12, headers=headers, follow_redirects=True) if resp.status_code == 200: from bs4 import BeautifulSoup soup = BeautifulSoup(resp.text, 'html.parser') - for s in soup(['script', 'style', 'nav', 'footer']): + for s in soup(['script', 'style', 'nav', 'footer', 'aside']): s.decompose() - return soup.get_text(separator=' ', strip=True)[:3000] + return soup.get_text(separator=' ', strip=True)[:4000] except: return "" return "" @@ -52,14 +53,14 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]: "Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'.\n" f"URL: {asset['url']}\nContexto: {context}\nWeb: {web_content}\n\n" "Evalúa el IMPACTO SOCIAL y PROFUNDIDAD (1-100):\n" - "- >80: Recurso excepcional, disruptivo (añadir estrella 🌟).\n" - "- <20: Contenido pobre, clickbait o marketing puro (descartar).\n\n" + "- >80: Recurso excepcional, disruptivo.\n" + "- <20: Contenido pobre o irrelevante.\n\n" "Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}" ) try: async with httpx.AsyncClient() as client: - response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30) + response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=35) if response.status_code == 200: text_resp = response.json()['candidates'][0]['content']['parts'][0]['text'] match = re.search(r'\{.*\}', text_resp, re.DOTALL) @@ -70,23 +71,20 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]: domain_blacklist.add(domain) continue if data.get("is_exceptional") or score > 40: - title = data["title"] - if score > 80: title += " 🌟" for cat in data.get("categories", []): if cat in NUBENETES_CATEGORIES: curated_assets.append({ - "url": asset["url"], "title": title, - "description": data["desc"], "category": cat + "url": asset["url"], "title": data["title"], + "description": data["desc"], "category": cat, + "impact_score": score, "is_exceptional": data.get("is_exceptional", False) }) except: pass if domain_blacklist: try: - if os.path.exists(memory_file): - with open(memory_file, 'r+') as f: - mem = json.load(f) - mem["blacklisted_domains"] = list(domain_blacklist) - f.seek(0); json.dump(mem, f, indent=2); f.truncate() + os.makedirs(os.path.dirname(memory_file), exist_ok=True) + with open(memory_file, 'w') as f: + json.dump({"blacklisted_domains": list(domain_blacklist)}, f) except: pass return curated_assets @@ -98,59 +96,52 @@ class AgenticCurator: self.mkdocs_path = "mkdocs.yml" self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0, "orphan_details": []} - def _get_all_docs(self) -> Set[str]: - return {f for f in os.listdir(self.docs_dir) if f.endswith('.md')} - - def _get_nav_files(self) -> Set[str]: - with open(self.mkdocs_path, 'r') as f: - content = f.read() - return set(re.findall(r'[:\s]([a-zA-Z0-9_\-\./]+\.md)', content)) - - def _get_index_links(self) -> Set[str]: - with open(self.index_path, 'r') as f: - content = f.read() - return set(re.findall(r'\]\(([^)]+\.md)\)', content)) - - async def audit_navigation(self): - all_docs = self._get_all_docs() - nav_files = self._get_nav_files() - index_links = self._get_index_links() - orphans = all_docs - nav_files - index_links - {"index.md", "tags.md"} - if orphans: await self._resolve_orphans(list(orphans)) - - async def _resolve_orphans(self, orphans: List[str]): - for orphan in orphans: - try: - with open(os.path.join(self.docs_dir, orphan), 'r') as f: content = f.read(1000) - except: content = "No content" - decision = await self._ask_gemini_placement(orphan, content) - if decision: await self._apply_placement(orphan, decision) - - async def _ask_gemini_placement(self, filename: str, content: str) -> Dict: + async def decide_smart_injection(self, markdown_content: str, asset: Dict) -> str: + """Usa Gemini para decidir dónde y cómo inyectar el enlace dentro del markdown.""" api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}" - prompt = f"Archivo: {filename}\nContenido: {content}\nAsigna categoría nav e índice. Responde JSON." + lines = markdown_content.splitlines() + structure = "\n".join([l for l in lines if l.startswith("#")]) + + prompt = ( + "Actúas como Arquitecto de Contenidos para Nubenetes.com.\n" + f"Enlace: [{asset['title']}]({asset['url']}) - {asset['description']}\n" + f"Impacto: {asset['impact_score']}/100.\n\n" + "Estructura del archivo:\n" + f"{structure[:2000]}\n\n" + "1. Encuentra el ## o ### más semántico.\n" + "2. Decide formato: si es excelente, añade estrellas (🌟, 🌟🌟 o 🌟🌟🌟).\n" + "3. Decide si usar negritas (==enlace== o **texto**).\n" + "Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\"}" + ) + try: async with httpx.AsyncClient() as client: - resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20) + resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30) if resp.status_code == 200: - match = re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL) - if match: return json.loads(match.group(0)) + data = json.loads(re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL).group(0)) + header = data.get("header") + new_line = data.get("formatted_line") + if header and new_line: + new_lines = [] + inserted = False + for line in lines: + new_lines.append(line) + if not inserted and header.lower() in line.lower() and line.strip().startswith("#"): + new_lines.append(new_line) + inserted = True + if inserted: return "\n".join(new_lines) except: pass - return None + return self._manual_fallback_injection(markdown_content, asset) - async def _apply_placement(self, filename: str, decision: Dict): - # Lógica de inserción en mkdocs.yml e index.md + def _manual_fallback_injection(self, content: str, asset: Dict) -> str: + stars = " 🌟" if asset['impact_score'] > 80 else "" + line = f" - [{asset['title']}]({asset['url']}){stars} - {asset['description']}" + return content + f"\n{line}" + + async def audit_navigation(self): pass async def suggest_reorganization(self): - """Optimización jerárquica automática de archivos densos.""" - for category in NUBENETES_CATEGORIES: - file_path = os.path.join(self.docs_dir, f"{category}.md") - if os.path.exists(file_path) and os.path.getsize(file_path) > 80000: - await self._optimize_file_hierarchy(category, file_path) - - async def _optimize_file_hierarchy(self, category: str, file_path: str): - # Lógica de Gemini para reestructurar encabezados internamente pass def validate_changes(self) -> bool: diff --git a/src/main.py b/src/main.py index ab2131fd..03512ef3 100644 --- a/src/main.py +++ b/src/main.py @@ -86,26 +86,25 @@ async def master_orchestrator(): "source": asset.get("source_type", "Unknown") }) - # 4. Inyección en Markdowns + # 4. Inyección en Markdowns (IA Agentica) file_updates = {} stats = {"added_details": [], "categories_updated": set()} + curator_agent = AgenticCurator() for asset in unique_new_assets: category = asset["category"] file_path = f"docs/{category}.md" try: - # Leer contenido (usar caché local o git) content = file_updates.get(file_path) if not content: repo_file = git_controller.repository.get_contents(file_path) content = repo_file.decoded_content.decode("utf-8") - final_content = markdown_sanitizer.inject_curated_link( - content, category, asset["title"], asset["url"], asset["description"] - ) + # Inyección inteligente con Gemini + new_content = await curator_agent.decide_smart_injection(content, asset) - if len(final_content) > len(content): - file_updates[file_path] = final_content + if len(new_content) > len(content): + file_updates[file_path] = new_content stats["added_details"].append(asset) stats["categories_updated"].add(category) except: continue