feat: implement Agentic Smart Injection for semantic link placement and IA-decided formatting

This commit is contained in:
Nubenetes Bot
2026-05-10 22:43:48 +02:00
parent 9fffd6ecd8
commit df4d521376
2 changed files with 58 additions and 68 deletions

View File

@@ -3,26 +3,27 @@ import re
import json
import asyncio
import httpx
import random
from typing import List, Dict, Set, Optional
from src.config import GEMINI_API_KEY, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
# Silenciar advertencias de XML/HTML de forma global
# Silenciar advertencias de XML/HTML
import warnings
from bs4 import XMLParsedAsHTMLWarning
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
async def _deep_fetch_content(url: str) -> str:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'}
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}
try:
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=10, headers=headers)
resp = await client.get(url, timeout=12, headers=headers, follow_redirects=True)
if resp.status_code == 200:
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, 'html.parser')
for s in soup(['script', 'style', 'nav', 'footer']):
for s in soup(['script', 'style', 'nav', 'footer', 'aside']):
s.decompose()
return soup.get_text(separator=' ', strip=True)[:3000]
return soup.get_text(separator=' ', strip=True)[:4000]
except: return ""
return ""
@@ -52,14 +53,14 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
"Si es sobre Model Context Protocol (MCP), asígnalo a 'ai-agents-mcp'.\n"
f"URL: {asset['url']}\nContexto: {context}\nWeb: {web_content}\n\n"
"Evalúa el IMPACTO SOCIAL y PROFUNDIDAD (1-100):\n"
"- >80: Recurso excepcional, disruptivo (añadir estrella 🌟).\n"
"- <20: Contenido pobre, clickbait o marketing puro (descartar).\n\n"
"- >80: Recurso excepcional, disruptivo.\n"
"- <20: Contenido pobre o irrelevante.\n\n"
"Responde SOLAMENTE un JSON: {\"is_exceptional\": bool, \"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\"}"
)
try:
async with httpx.AsyncClient() as client:
response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30)
response = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=35)
if response.status_code == 200:
text_resp = response.json()['candidates'][0]['content']['parts'][0]['text']
match = re.search(r'\{.*\}', text_resp, re.DOTALL)
@@ -70,23 +71,20 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> List[Dict]:
domain_blacklist.add(domain)
continue
if data.get("is_exceptional") or score > 40:
title = data["title"]
if score > 80: title += " 🌟"
for cat in data.get("categories", []):
if cat in NUBENETES_CATEGORIES:
curated_assets.append({
"url": asset["url"], "title": title,
"description": data["desc"], "category": cat
"url": asset["url"], "title": data["title"],
"description": data["desc"], "category": cat,
"impact_score": score, "is_exceptional": data.get("is_exceptional", False)
})
except: pass
if domain_blacklist:
try:
if os.path.exists(memory_file):
with open(memory_file, 'r+') as f:
mem = json.load(f)
mem["blacklisted_domains"] = list(domain_blacklist)
f.seek(0); json.dump(mem, f, indent=2); f.truncate()
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
with open(memory_file, 'w') as f:
json.dump({"blacklisted_domains": list(domain_blacklist)}, f)
except: pass
return curated_assets
@@ -98,59 +96,52 @@ class AgenticCurator:
self.mkdocs_path = "mkdocs.yml"
self.stats = {"orphans_found": 0, "orphans_linked": 0, "structural_improvements": 0, "orphan_details": []}
def _get_all_docs(self) -> Set[str]:
return {f for f in os.listdir(self.docs_dir) if f.endswith('.md')}
def _get_nav_files(self) -> Set[str]:
with open(self.mkdocs_path, 'r') as f:
content = f.read()
return set(re.findall(r'[:\s]([a-zA-Z0-9_\-\./]+\.md)', content))
def _get_index_links(self) -> Set[str]:
with open(self.index_path, 'r') as f:
content = f.read()
return set(re.findall(r'\]\(([^)]+\.md)\)', content))
async def audit_navigation(self):
all_docs = self._get_all_docs()
nav_files = self._get_nav_files()
index_links = self._get_index_links()
orphans = all_docs - nav_files - index_links - {"index.md", "tags.md"}
if orphans: await self._resolve_orphans(list(orphans))
async def _resolve_orphans(self, orphans: List[str]):
for orphan in orphans:
try:
with open(os.path.join(self.docs_dir, orphan), 'r') as f: content = f.read(1000)
except: content = "No content"
decision = await self._ask_gemini_placement(orphan, content)
if decision: await self._apply_placement(orphan, decision)
async def _ask_gemini_placement(self, filename: str, content: str) -> Dict:
async def decide_smart_injection(self, markdown_content: str, asset: Dict) -> str:
"""Usa Gemini para decidir dónde y cómo inyectar el enlace dentro del markdown."""
api_url = f"https://generativelanguage.googleapis.com/v1/models/gemini-1.5-flash:generateContent?key={GEMINI_API_KEY}"
prompt = f"Archivo: {filename}\nContenido: {content}\nAsigna categoría nav e índice. Responde JSON."
lines = markdown_content.splitlines()
structure = "\n".join([l for l in lines if l.startswith("#")])
prompt = (
"Actúas como Arquitecto de Contenidos para Nubenetes.com.\n"
f"Enlace: [{asset['title']}]({asset['url']}) - {asset['description']}\n"
f"Impacto: {asset['impact_score']}/100.\n\n"
"Estructura del archivo:\n"
f"{structure[:2000]}\n\n"
"1. Encuentra el ## o ### más semántico.\n"
"2. Decide formato: si es excelente, añade estrellas (🌟, 🌟🌟 o 🌟🌟🌟).\n"
"3. Decide si usar negritas (==enlace== o **texto**).\n"
"Responde JSON: {\"header\": \"Nombre exacto del ## o ###\", \"formatted_line\": \" - [==Título==](url) 🌟 - Descripción\"}"
)
try:
async with httpx.AsyncClient() as client:
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=20)
resp = await client.post(api_url, json={"contents": [{"parts": [{"text": prompt}]}]}, timeout=30)
if resp.status_code == 200:
match = re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL)
if match: return json.loads(match.group(0))
data = json.loads(re.search(r'\{.*\}', resp.json()['candidates'][0]['content']['parts'][0]['text'], re.DOTALL).group(0))
header = data.get("header")
new_line = data.get("formatted_line")
if header and new_line:
new_lines = []
inserted = False
for line in lines:
new_lines.append(line)
if not inserted and header.lower() in line.lower() and line.strip().startswith("#"):
new_lines.append(new_line)
inserted = True
if inserted: return "\n".join(new_lines)
except: pass
return None
return self._manual_fallback_injection(markdown_content, asset)
async def _apply_placement(self, filename: str, decision: Dict):
# Lógica de inserción en mkdocs.yml e index.md
def _manual_fallback_injection(self, content: str, asset: Dict) -> str:
stars = " 🌟" if asset['impact_score'] > 80 else ""
line = f" - [{asset['title']}]({asset['url']}){stars} - {asset['description']}"
return content + f"\n{line}"
async def audit_navigation(self):
pass
async def suggest_reorganization(self):
"""Optimización jerárquica automática de archivos densos."""
for category in NUBENETES_CATEGORIES:
file_path = os.path.join(self.docs_dir, f"{category}.md")
if os.path.exists(file_path) and os.path.getsize(file_path) > 80000:
await self._optimize_file_hierarchy(category, file_path)
async def _optimize_file_hierarchy(self, category: str, file_path: str):
# Lógica de Gemini para reestructurar encabezados internamente
pass
def validate_changes(self) -> bool:

View File

@@ -86,26 +86,25 @@ async def master_orchestrator():
"source": asset.get("source_type", "Unknown")
})
# 4. Inyección en Markdowns
# 4. Inyección en Markdowns (IA Agentica)
file_updates = {}
stats = {"added_details": [], "categories_updated": set()}
curator_agent = AgenticCurator()
for asset in unique_new_assets:
category = asset["category"]
file_path = f"docs/{category}.md"
try:
# Leer contenido (usar caché local o git)
content = file_updates.get(file_path)
if not content:
repo_file = git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
final_content = markdown_sanitizer.inject_curated_link(
content, category, asset["title"], asset["url"], asset["description"]
)
# Inyección inteligente con Gemini
new_content = await curator_agent.decide_smart_injection(content, asset)
if len(final_content) > len(content):
file_updates[file_path] = final_content
if len(new_content) > len(content):
file_updates[file_path] = new_content
stats["added_details"].append(asset)
stats["categories_updated"].add(category)
except: continue