fix(curation): repair IndentationError and restore AgenticCurator structure

This commit is contained in:
Nubenetes Bot
2026-05-16 12:20:00 +02:00
parent 21bb5d1cd7
commit ac73f185fb

View File

@@ -7,9 +7,11 @@ import random
import difflib
from datetime import datetime
from typing import List, Dict, Set, Optional, Tuple
import yaml
from src.config import GEMINI_API_KEYS, GH_TOKEN, TARGET_REPO, NUBENETES_CATEGORIES
from src.gitops_manager import RepositoryController
from src.gemini_utils import call_gemini_with_retry
from src.logger import log_event
def normalize_url(url: str) -> str:
url = url.split("#")[0].split("?")[0].rstrip("/")
@@ -32,42 +34,132 @@ def get_best_category_match(suggested: str) -> Optional[str]:
async def _deep_fetch_content(url: str) -> str:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
}
try:
self.inventory[asset["url"]] = {
"title": data["title"],
"description": data["desc"],
"ai_summary": data["desc"],
"year": year,
"stars": min(max(score // 20, 0), 5),
"post_date": asset.get("post_date", "N/A"),
"pub_date": data.get("pub_date", "N/A"),
"repo_created_at": asset.get("gh_created", "N/A"),
"repo_pushed_at": asset.get("gh_pushed", "N/A"),
"last_checked": datetime.now().timestamp()
}
self._save_inventory()
except: pass
log_event(f" [+] ACCEPTED: \"{data['title']}\" (Score: {score})")
log_event(f" Primary: {primary_cat} | Related: {', '.join(related_cats)}")
except Exception as e:
log_event(f" [!] ERROR EVALUATING {asset['url']}: {e}")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Evaluation Failed"}
# Re-optimized for Pay-as-you-go
await asyncio.sleep(1.0)
try:
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
with open(memory_file, 'w') as f:
json.dump({"blacklisted_domains": list(domain_blacklist)}, f, indent=2)
except: pass
return evaluations
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout, verify=False) as client:
resp = await client.get(url, headers=headers, follow_redirects=True)
if resp.status_code == 200:
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "html.parser")
for s in soup(["script", "style", "nav", "footer", "aside"]): s.decompose()
return soup.get_text(separator=" ", strip=True)[:4000]
except: return ""
return ""
async def _get_github_activity(url: str) -> Dict:
"""Obtiene metadatos de GitHub (estrellas, creación, actividad)."""
if "github.com" not in url or not GH_TOKEN: return {}
try:
match = re.search(r"github\.com/([^/]+)/([^/]+)", url)
if match:
owner, repo = match.groups()
repo = repo.split("#")[0].split("?")[0].rstrip(".git")
api_url = f"https://api.github.com/repos/{owner}/{repo}"
headers = {"Authorization": f"token {GH_TOKEN}"}
async with httpx.AsyncClient() as client:
resp = await client.get(api_url, headers=headers, timeout=5)
if resp.status_code == 200:
data = resp.json()
return {
"gh_pushed": data.get("pushed_at", "").split("T")[0],
"gh_created": data.get("created_at", "").split("T")[0],
"gh_stars": data.get("stargazers_count", 0)
}
except: pass
return {}
async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
evaluations = {}
memory_file = "src/memory/health_learning.json"
domain_blacklist = set()
if os.path.exists(memory_file):
try:
with open(memory_file, "r") as f:
memory_data = json.load(f)
domain_blacklist = set(memory_data.get("blacklisted_domains", []))
except: pass
curator = AgenticCurator()
for i, asset in enumerate(raw_assets):
context = asset.get("text", "No additional context")
source = asset.get("source_type", "Social")
is_primary = "nubenetes" in source.lower()
log_event(f"--- EVALUATING {i+1}/{len(raw_assets)} ---", section_break=False)
log_event(f" - URL: {asset['url']}")
norm_url = normalize_url(asset["url"])
if norm_url.split("//")[-1].split("/")[0] in domain_blacklist:
log_event(f" [-] REJECTED: Blacklisted domain")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Blacklisted domain"}
continue
gh_meta = {}
mvq_penalty = False
if "github.com" in asset["url"]:
gh_meta = await _get_github_activity(asset["url"])
if gh_meta.get("gh_pushed"):
try:
last_date = datetime.fromisoformat(gh_meta["gh_pushed"])
if (datetime.now() - last_date).days > (365 * 4):
mvq_penalty = True
except: pass
web_content = await _deep_fetch_content(asset["url"])
strictness_directive = "BE EXTREMELY SELECTIVE.\n" if not is_primary else ""
prompt = (
"You act as a Senior Technical Librarian for 'nubenetes/awesome-kubernetes' in 2026.\n"
f"{strictness_directive}"
"PHASE 1: SOPHISTICATED SYNTHESIS & DATING\n"
"- Extract precise PUBLICATION DATE (YYYY-MM-DD or YYYY): Look for dates in URL, context, or text.\n"
"- Identify ONE primary_category and up to TWO related_categories from the list.\n"
"PHASE 2: MANDATORY PROFESSIONAL DESCRIPTIONS\n"
"- Summaries MUST BE DESCRIPTIVE (neutral, objective, technical).\n"
"PHASE 3: QUALITY & MVQ\n"
"- Evaluate TECHNICAL IMPACT (1-100).\n"
f"{'IMPORTANT: This repo is old (>4 years inactive). Apply penalty.' if mvq_penalty else ''}\n\n"
f"Existing categories: {', '.join(NUBENETES_CATEGORIES)}.\n"
f"URL: {asset['url']}\nExtracted Web Content: {web_content[:2000]}\n"
"Respond ONLY with a JSON: {\"impact_score\": int, \"pub_date\": \"YYYY-MM-DD\", \"primary_category\": \"cat\", \"related_categories\": [\"cat1\", \"cat2\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"...\"}"
)
try:
data = await call_gemini_with_retry(prompt)
score = data.get("impact_score", 50)
year = data.get("pub_date", "N/A").split("-")[0] if data.get("pub_date") else "N/A"
if gh_meta.get("gh_pushed"): year = gh_meta["gh_pushed"].split("-")[0]
primary_cat = get_best_category_match(data.get("primary_category"))
related_cats = [get_best_category_match(rc) for rc in data.get("related_categories", [])]
related_cats = [rc for rc in related_cats if rc and rc != primary_cat]
min_score = 5 if is_primary else 80
if score < min_score or not primary_cat:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Low impact or no category"}
log_event(f" [-] REJECTED: Score {score}")
else:
evaluations[asset["url"]] = {
"status": "INCLUDED", "title": data["title"], "description": data["desc"],
"year": year, "category": primary_cat, "related_categories": related_cats[:2],
"impact_score": score, "is_exceptional": score > 80
}
curator.inventory[norm_url] = {
"title": data["title"], "description": data["desc"], "ai_summary": data["desc"],
"year": year, "pub_date": data.get("pub_date", "N/A"), "post_date": asset.get("timestamp", "N/A"),
"repo_created_at": gh_meta.get("gh_created", "N/A"), "repo_pushed_at": gh_meta.get("gh_pushed", "N/A"),
"stars": min(max(score // 20, 0), 5), "last_checked": datetime.now().timestamp()
}
curator._save_inventory()
log_event(f" [+] ACCEPTED: {data['title']}")
except: pass
await asyncio.sleep(1.0)
return evaluations
INVENTORY_PATH = "data/inventory.yaml"
STRUCTURE_MAP_PATH = "data/structure_map.yaml"
@@ -85,73 +177,45 @@ class AgenticCurator:
def _load_inventory(self) -> dict:
if os.path.exists(INVENTORY_PATH):
try:
with open(INVENTORY_PATH, "r") as f:
import yaml
return yaml.safe_load(f) or {}
with open(INVENTORY_PATH, "r") as f: return yaml.safe_load(f) or {}
except: return {}
return {}
def _save_inventory(self):
os.makedirs(os.path.dirname(INVENTORY_PATH), exist_ok=True)
with open(INVENTORY_PATH, "w") as f:
import yaml
yaml.dump(self.inventory, f, sort_keys=False, allow_unicode=True)
with open(INVENTORY_PATH, "w") as f: yaml.dump(self.inventory, f, sort_keys=False, allow_unicode=True)
def _load_structure_map(self) -> dict:
if os.path.exists(STRUCTURE_MAP_PATH):
try:
with open(STRUCTURE_MAP_PATH, "r") as f:
import yaml
return yaml.safe_load(f) or {}
with open(STRUCTURE_MAP_PATH, "r") as f: return yaml.safe_load(f) or {}
except: return {}
return {}
def _save_structure_map(self):
os.makedirs(os.path.dirname(STRUCTURE_MAP_PATH), exist_ok=True)
with open(STRUCTURE_MAP_PATH, "w") as f:
import yaml
yaml.dump(self.structure_map, f, sort_keys=False, allow_unicode=True)
self.inventory = self._load_inventory()
self.structure_map = self._load_structure_map()
with open(STRUCTURE_MAP_PATH, "w") as f: yaml.dump(self.structure_map, f, sort_keys=False, allow_unicode=True)
async def _rebuild_toc(self, content: str) -> str:
"""
Detecta y reconstruye el TOC interno de un archivo markdown.
Busca el patrón de lista numerada al inicio del archivo.
"""
lines = content.splitlines()
new_lines = []
headers = []
toc_start_idx = -1
toc_end_idx = -1
# 1. Extraer todos los headers (## y ###) para el nuevo TOC
for line in lines:
if line.startswith("## ") or line.startswith("### "):
title = line.strip("#").strip()
# Generar ancla simplificada (slug)
anchor = title.lower().replace(" ", "-").replace(".", "").replace("/", "").replace("(", "").replace(")", "").replace(",", "")
level = 2 if line.startswith("## ") else 3
headers.append({"title": title, "anchor": anchor, "level": level})
headers.append({"title": title, "anchor": anchor, "level": 2 if line.startswith("## ") else 3})
if not headers: return content
# 2. Localizar el TOC actual
toc_start_idx = -1
toc_end_idx = -1
for i, line in enumerate(lines):
if re.match(r'^\d+\.\s+\[', line.strip()):
if re.match(r"^\d+\.\s+\[", line.strip()):
if toc_start_idx == -1: toc_start_idx = i
toc_end_idx = i
elif toc_start_idx != -1 and line.strip() == "" and i < len(lines)-1 and re.match(r'^\d+\.\s+\[', lines[i+1].strip()):
continue # Espacios en blanco dentro del TOC
elif toc_start_idx != -1 and not re.match(r'^\s*\d+\.\s+\[', line.strip()) and line.strip() != "":
elif toc_start_idx != -1 and not re.match(r"^\s*\d+\.\s+\[", line.strip()) and line.strip() != "":
if toc_end_idx != -1: break
if toc_start_idx == -1: return content # No hay TOC que actualizar
# 3. Construir el nuevo TOC
if toc_start_idx == -1: return content
new_toc = []
h2_count = 0
h3_count = 0
h2_count, h3_count = 0, 0
for h in headers:
if h["level"] == 2:
h2_count += 1
@@ -160,120 +224,54 @@ class AgenticCurator:
else:
h3_count += 1
new_toc.append(f" {h3_count}. [{h['title']}](#{h['anchor']})")
# 4. Reensamblar el archivo
return "\n".join(lines[:toc_start_idx] + new_toc + lines[toc_end_idx + 1:])
async def decide_smart_injection(self, markdown_content: str, asset: Dict) -> str:
"""
Smartly injects a link and updates the TOC if necessary.
"""
lines = markdown_content.splitlines()
structure = "\n".join([l for l in lines if l.startswith("#")])
stars = " 🌟" if asset['impact_score'] > 80 else ""
year_prefix = f"**({asset.get('year')})** " if asset.get('year') and asset.get('year') != "N/A" else ""
stars = " 🌟" if asset["impact_score"] > 80 else ""
year_prefix = f"**({asset.get('year')})** " if asset.get("year") and asset.get("year") != "N/A" else ""
formatted_line = f" - {year_prefix}[{asset['title']}]({asset['url']}){stars} - {asset['description']}"
prompt = (
"You act as a Content Architect for Nubenetes.com.\n"
f"Your mission is to logically inject this new resource into the markdown file (LANGUAGE: ENGLISH):\n"
f"RESOURCE: {formatted_line}\n"
"CURRENT STRUCTURE:\n"
f"{structure[:1500]}\n\n"
"INSTRUCTIONS:\n"
"1. Identify the most suitable header (##).\n"
"2. If it doesn't exist, PROPOSE A NEW ONE (in English).\n"
"Respond JSON: {\"target_header\": \"## ...\", \"is_new_header\": bool, \"insert_after_header\": \"## ...\"}"
)
prompt = f"Inject resource: {formatted_line} into structure: {structure[:1000]}. JSON: {{\"target_header\": \"## ...\", \"is_new_header\": bool}}"
try:
data = await call_gemini_with_retry(prompt)
target_header = data.get("target_header")
target = data.get("target_header")
is_new = data.get("is_new_header", False)
ref_header = data.get("insert_after_header")
if not target_header: return self._manual_fallback_injection(markdown_content, asset)
new_content_raw = ""
inserted = False
if not target: return self._manual_fallback_injection(markdown_content, asset)
new_lines = []
if is_new:
if not ref_header:
new_lines = lines + ["", target_header, formatted_line]
inserted = False
for line in lines:
new_lines.append(line)
if not inserted and target.lower() in line.lower() and line.startswith("#"):
if is_new: new_lines.append("")
new_lines.append(formatted_line)
inserted = True
else:
for line in lines:
new_lines.append(line)
if not inserted and ref_header.lower() in line.lower() and line.strip().startswith("#"):
new_lines.append("")
new_lines.append(target_header)
new_lines.append(formatted_line)
inserted = True
new_content_raw = "\n".join(new_lines)
else:
for line in lines:
new_lines.append(line)
if not inserted and target_header.lower() in line.lower() and line.strip().startswith("#"):
new_lines.append(formatted_line)
inserted = True
new_content_raw = "\n".join(new_lines)
if inserted:
# If a new header was added, rebuild the TOC
if is_new:
log_event(f" [🏠] AI decided: Section '{target_header}' (NEW)")
return await self._rebuild_toc(new_content_raw)
log_event(f" [🏠] AI decided: Section '{target_header}' (EXISTING)")
return new_content_raw
res = "\n".join(new_lines)
return await self._rebuild_toc(res) if is_new else res
except: pass
return self._manual_fallback_injection(markdown_content, asset)
def _manual_fallback_injection(self, content: str, asset: Dict) -> str:
stars = " 🌟" if asset['impact_score'] > 80 else ""
year_prefix = f"**({asset.get('year')})** " if asset.get('year') and asset.get('year') != "N/A" else ""
stars = " 🌟" if asset["impact_score"] > 80 else ""
year_prefix = f"**({asset.get('year')})** " if asset.get("year") and asset.get("year") != "N/A" else ""
line = f" - {year_prefix}[{asset['title']}]({asset['url']}){stars} - {asset['description']}"
# If no sections, add a generic header
if "##" not in content:
return content + f"\n\n## Tools and Resources\n{line}"
return content + f"\n{line}"
return content + f"\n{line}" if "##" in content else content + f"\n\n## Tools and Resources\n{line}"
async def suggest_reorganization(self):
"""
Audits files and reorganizes them INTERNALLY, rebuilding the TOC.
"""
log_event("[*] Starting Internal Reorganization Audit...", section_break=True)
for file in os.listdir(self.docs_dir):
if not file.endswith(".md") or file == "index.md": continue
path = os.path.join(self.docs_dir, file)
with open(path, 'r') as f: content = f.read()
links = re.findall(r'^\s*-\s*\[', content, re.MULTILINE)
headers = re.findall(r'^##\s+', content, re.MULTILINE)
if len(links) > 25 and len(headers) < 3:
with open(path, "r") as f: content = f.read()
if len(re.findall(r"^\s*-\s*\[", content, re.MULTILINE)) > 25:
log_event(f" [!] REORGANIZING: {file}")
prompt = (
f"Reorganize the file '{file}' into logical sections (##).\n"
"KEEP ALL LINKS. DO NOT include the TOC (I will generate it).\n"
"ALL HEADERS MUST BE IN ENGLISH.\n"
f"CURRENT CONTENT:\n{content[:5000]}"
)
prompt = f"Reorganize '{file}' into logical sections (##). English headers only. Content:\n{content[:4000]}"
try:
reorganized = await call_gemini_with_retry(prompt, response_format="text")
if len(reorganized) > len(content) * 0.7:
# Rebuild the TOC after massive reorganization
final_content = await self._rebuild_toc(reorganized)
with open(path, 'w') as f: f.write(final_content)
log_event(f" [OK] Reorganization and TOC updated for {file}")
except Exception as e:
log_event(f" [!] Error reorganizing {file}: {e}")
final = await self._rebuild_toc(reorganized)
with open(path, "w") as f: f.write(final)
log_event(f" [OK] Reorganized: {file}")
except Exception as e: log_event(f" [!] Error: {e}")
def validate_changes(self) -> bool:
return True
def validate_changes(self) -> bool: return True