feat: implement MVQ, descriptive style guide, and optimized API rotation

This commit is contained in:
Nubenetes Bot
2026-05-14 20:51:01 +02:00
parent 083252560d
commit fdfee39bba
5 changed files with 138 additions and 76 deletions

View File

@@ -31,6 +31,26 @@ async def _deep_fetch_content(url: str) -> str:
from src.logger import log_event
async def _get_github_activity(url: str) -> Optional[datetime]:
"""Obtiene la fecha del último commit de un repo de GitHub usando la API (si hay token)."""
if "github.com" not in url or not GH_TOKEN: return None
try:
# Extraer user/repo
match = re.search(r'github\.com/([^/]+)/([^/]+)', url)
if match:
owner, repo = match.groups()
repo = repo.split('#')[0].split('?')[0].rstrip('.git')
api_url = f"https://api.github.com/repos/{owner}/{repo}"
headers = {"Authorization": f"token {GH_TOKEN}"}
async with httpx.AsyncClient() as client:
resp = await client.get(api_url, headers=headers, timeout=5)
if resp.status_code == 200:
pushed_at = resp.json().get("pushed_at")
if pushed_at:
return datetime.fromisoformat(pushed_at.replace('Z', '+00:00'))
except: pass
return None
async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
evaluations = {}
if not GEMINI_API_KEYS:
@@ -52,8 +72,6 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
log_event(f"--- EVALUATING {i+1}/{len(raw_assets)} ---", section_break=False)
log_event(f" - URL: {asset['url']}")
log_event(f" - Post Date: {post_date}")
log_event(f" - Post Context: \"{context[:300]}...\"")
domain = asset['url'].split("//")[-1].split("/")[0]
if domain in domain_blacklist:
@@ -61,24 +79,34 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Blacklisted domain"}
continue
# MVQ: Check GitHub activity
mvq_penalty = False
last_activity = await _get_github_activity(asset['url'])
if last_activity:
years_inactive = (datetime.now(last_activity.tzinfo) - last_activity).days / 365
if years_inactive > 4:
log_event(f" [⚠️] MVQ Warning: Inactive for {years_inactive:.1f} years.")
mvq_penalty = True
web_content = await _deep_fetch_content(asset['url'])
prompt = (
"You act as a Senior Curation Engineer for 'nubenetes/awesome-kubernetes'.\n"
"Your mission is to catalog TECHNICAL content about Kubernetes and Cloud Native shared by the user.\n"
"GOLDEN RULE: If the link is in the feed, it's because the user considers it useful. DO NOT discard unless it is total noise (aggressive ads, 404, or non-technical content).\n\n"
"GOLDEN RULE: If the link is in the feed, it's because the user considers it useful. DO NOT discard unless it is total noise.\n\n"
f"Valid categories: {', '.join(NUBENETES_CATEGORIES)}.\n\n"
"INSTRUCTIONS:\n"
"1. LANGUAGE: ALL outputs (title, desc, reasoning) MUST BE IN ENGLISH.\n"
"2. YOUTUBE: Accept technical videos or tutorials. Categorize them by topic.\n"
"3. SUMMARY: Create a concise summary (1 sentence). Use the 'Context' (the X post) as a priority as it explains why it was shared.\n"
"4. ASSIGNMENT: If it's about Model Context Protocol (MCP), assign it to 'ai-agents-mcp'.\n\n"
"2. STYLE: Summaries MUST BE DESCRIPTIVE (neutral, objective, explaining what/why).\n"
"3. MVQ: If it's a GitHub repo inactive for >4 years, penalize the impact score.\n"
"4. SUMMARY: Create a concise summary (1 sentence).\n"
f"{'IMPORTANT: This repo is old (>4 years inactive). Apply penalty.' if mvq_penalty else ''}\n\n"
f"URL: {asset['url']}\nX Context: {context}\nExtracted Web Content: {web_content[:2000]}\n\n"
"Evaluate TECHNICAL IMPACT (1-100):\n"
"- >80: Exceptional resource (🌟).\n"
"- >5: Accept (if it fits a category).\n"
"- <5: Discard (Absolute noise).\n\n"
"Respond ONLY with a JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Brief explanation (English)\", \"rejection_reason\": \"... (if applicable, English)\"}"
"Respond ONLY with a JSON: {\"impact_score\": int, \"categories\": [\"cat1\"], \"title\": \"...\", \"desc\": \"...\", \"reasoning\": \"Brief explanation (English)\"}"
)
try:
@@ -88,18 +116,11 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
reasoning = data.get("reasoning", "No reason specified")
if score < 5:
reason = data.get("rejection_reason", "Low technical impact")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": reason}
log_event(f" [-] REJECTED: {reason} (Score: {score})")
log_event(f" AI Reason: {reasoning}")
if score < 1 and domain not in domain_blacklist:
domain_blacklist.add(domain)
log_event(f" [!] Domain {domain} added to blacklist.")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "Low technical impact"}
log_event(f" [-] REJECTED: Low technical impact (Score: {score})")
elif not valid_cats:
evaluations[asset["url"]] = {"status": "FILTERED", "reason": "No valid technical category found"}
log_event(f" [-] REJECTED: No valid category found (Suggested: {data.get('categories')})")
log_event(f" AI Reason: {reasoning}")
log_event(f" [-] REJECTED: No valid category found")
else:
evaluations[asset["url"]] = {
"status": "INCLUDED", "title": data["title"], "description": data["desc"],
@@ -107,17 +128,13 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
"reasoning": reasoning
}
log_event(f" [+] ACCEPTED: \"{data['title']}\" (Score: {score})")
log_event(f" Destination: docs/{valid_cats[0]}.md")
log_event(f" Description: {data['desc']}")
log_event(f" AI Reason: {reasoning}")
except Exception as e:
log_event(f" [!] CRITICAL ERROR EVALUATING {asset['url']}: {e}")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Evaluation Failed: {str(e)[:100]}"}
log_event(f" [!] ERROR EVALUATING {asset['url']}: {e}")
evaluations[asset["url"]] = {"status": "FILTERED", "reason": f"Evaluation Failed"}
await asyncio.sleep(2.0) # Steady pace
await asyncio.sleep(1.0)
# Guardar blacklist actualizada
try:
os.makedirs(os.path.dirname(memory_file), exist_ok=True)
with open(memory_file, 'w') as f:
@@ -125,6 +142,7 @@ async def evaluate_extracted_assets(raw_assets: List[Dict]) -> Dict[str, Dict]:
except: pass
return evaluations
class AgenticCurator:
def __init__(self):
self.git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)

View File

@@ -32,54 +32,70 @@ class GeminiDiagnostics:
return report
async def resolve_url(url: str) -> str:
"""Sigue las redirecciones para obtener la URL larga final y consolida repositorios si fallan."""
shorteners = ['t.co', 'bit.ly', 'buff.ly', 'goo.gl', 'tinyurl.com', 't.ly', 'rb.gy', 'is.gd', 'drp.li', 't.me']
"""Sigue las redirecciones para obtener la URL larga final, consolidando repositorios y evitando bucles."""
shorteners = ['t.co', 'bit.ly', 'buff.ly', 'goo.gl', 'tinyurl.com', 't.ly', 'rb.gy', 'is.gd', 'drp.li', 't.me', 'lnkd.in']
try:
domain = url.split("//")[-1].split("/")[0].lower()
except:
return url
# 1. Expansión inicial
# 1. Expansión Multi-salto (evita intermediarios de tracking)
final_url = url
if domain in shorteners or url.endswith(''):
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.head(url, timeout=5)
final_url = str(resp.url)
if final_url != url:
log_event(f" [🔗] URL Expandida: {url} -> {final_url}")
except:
pass
max_hops = 5
current_hop = 0
async with httpx.AsyncClient(follow_redirects=True, timeout=8) as client:
while current_hop < max_hops:
try:
# Si no es un acortador conocido y ya tenemos una URL larga, paramos
current_domain = final_url.split("//")[-1].split("/")[0].lower()
if current_hop > 0 and current_domain not in shorteners:
break
resp = await client.head(final_url, timeout=5)
new_url = str(resp.url)
if new_url == final_url: break
final_url = new_url
current_hop += 1
except:
break
# 2. Consolidación de Repositorios (GitHub/GitLab)
# 2. Consolidación de Repositorios (GitHub/GitLab) con chequeo de MVQ (vía REST si es necesario)
repo_domains = ['github.com', 'gitlab.com']
current_domain = final_url.split("//")[-1].split("/")[0].lower()
if any(d in current_domain for d in repo_domains):
# Intentar validar si el enlace profundo funciona
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
resp = await client.head(final_url, timeout=5)
if resp.status_code == 200:
return final_url
# Si falla, intentar consolidar a la raíz del repo
# Formato esperado: https://github.com/user/repo/...
parts = final_url.split('/')
if len(parts) > 4: # https: , , domain, user, repo
root_repo = "/".join(parts[:5])
resp_root = await client.head(root_repo, timeout=5)
if resp_root.status_code == 200:
log_event(f" [📦] Consolidación: {final_url} -> {root_repo} (Raíz validada)")
return root_repo
if resp.status_code != 200:
parts = final_url.split('/')
if len(parts) > 4:
root_repo = "/".join(parts[:5])
resp_root = await client.head(root_repo, timeout=5)
if resp_root.status_code == 200:
log_event(f" [📦] Consolidación: {final_url} -> {root_repo}")
final_url = root_repo
except:
pass
return final_url
def is_fuzzy_duplicate(url_a: str, url_b: str) -> bool:
"""Detecta si dos URLs son iguales ignorando parámetros de tracking comunes."""
def clean(u):
u = u.split('#')[0].rstrip('/').lower()
# Eliminar parámetros utm_* y otros comunes
u = re.sub(r'(\?|&)(utm_[^&]+|s=[^&]+|t=[^&]+|ref=[^&]+)', '', u)
if u.endswith('?'): u = u[:-1]
return u
return clean(url_a) == clean(url_b)
async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 3):
"""
Llama a la API de Gemini con rotación exhaustiva y REINTENTO REAL en 429.
Llama a Gemini optimizando el uso de cuota (pay-per-use).
Rota llaves inmediatamente en 429 y usa backoff exponencial inteligente.
"""
global CURRENT_KEY_INDEX
if not GEMINI_API_KEYS:
@@ -87,16 +103,16 @@ async def call_gemini_with_retry(prompt: str, response_format: str = "json", max
diagnostics = GeminiDiagnostics()
async with httpx.AsyncClient() as client:
for key_attempt in range(len(GEMINI_API_KEYS)):
api_key = GEMINI_API_KEYS[CURRENT_KEY_INDEX]
# Intentamos rotar entre todas las llaves disponibles antes de fallar
for key_attempt in range(len(GEMINI_API_KEYS)):
api_key = GEMINI_API_KEYS[CURRENT_KEY_INDEX]
async with httpx.AsyncClient() as client:
for model in GEMINI_MODELS:
full_model_name = f"models/{model}"
api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/{full_model_name}:generateContent?key={api_key}"
# Reintentos por modelo (incluyendo 429)
for attempt in range(max_retries + 2):
for attempt in range(max_retries):
try:
payload = {"contents": [{"parts": [{"text": prompt}]}]}
response = await client.post(api_url, json=payload, timeout=45)
@@ -117,14 +133,14 @@ async def call_gemini_with_retry(prompt: str, response_format: str = "json", max
break
elif response.status_code == 429:
wait_time = (10 * (attempt + 1)) + random.random() * 5
log_event(f" [!] API 429 (Límite): Reintentando {model} en {wait_time:.1f}s... (Intento {attempt+1})")
await asyncio.sleep(wait_time)
continue # Reintentar el MISMO modelo
# 429: Rotamos llave inmediatamente para no desperdiciar tiempo
log_event(f" [!] API 429 en llave {CURRENT_KEY_INDEX+1}. Rotando...")
CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS)
# Rompemos el bucle de intentos para este modelo/llave y pasamos a la siguiente llave
break
elif response.status_code in [500, 503, 504]:
diagnostics.add_attempt(model, response.status_code, "Server Error")
await asyncio.sleep(5)
await asyncio.sleep(2 * (attempt + 1))
continue
else:
@@ -135,7 +151,13 @@ async def call_gemini_with_retry(prompt: str, response_format: str = "json", max
diagnostics.add_attempt(model, 0, f"Excepción: {str(e)}")
break
# Si terminamos los modelos de una llave con 429, saltamos a la siguiente
if response.status_code == 429:
continue
# Si llegamos aquí y no tuvimos éxito, probamos la siguiente llave tras un breve respiro
CURRENT_KEY_INDEX = (CURRENT_KEY_INDEX + 1) % len(GEMINI_API_KEYS)
await asyncio.sleep(2)
await asyncio.sleep(1)
raise Exception(f"Fallo crítico Gemini tras rotación de llaves.\n{diagnostics.get_report()}")
raise Exception(f"Fallo crítico Gemini tras rotación exhaustiva.\n{diagnostics.get_report()}")

View File

@@ -64,7 +64,7 @@ class IntelligentLinkCleaner:
return url, True, None, "Cached (Recent)"
domain = url.split("//")[-1].split("/")[0]
domain_info = self.learning_data["domains"].get(domain, {})
domain_info = self.learning_data.get("domains", {}).get(domain, {})
strategies = [
{"type": "http", "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "ref": "https://www.google.com/", "desc": "Desktop/Google"},
{"type": "http", "ua": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1", "ref": "https://t.co/", "desc": "Mobile/Twitter"},
@@ -72,9 +72,13 @@ class IntelligentLinkCleaner:
{"type": "http", "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0", "ref": "https://news.ycombinator.com/", "desc": "Firefox/Reddit"},
{"type": "playwright", "ua": "Mozilla/5.0 (Linux; Android 13; SM-S918B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Mobile Safari/537.36", "ref": "https://www.google.com/", "desc": "PW Mobile/Google"}
]
# PRIORIZACIÓN INTELIGENTE: Si ya sabemos qué funciona para este dominio, empezar por ahí.
best_strat_idx = domain_info.get("best_strategy_idx")
if best_strat_idx is not None and best_strat_idx < len(strategies):
best_strat = strategies.pop(best_strat_idx); strategies.insert(0, best_strat)
# Mover la mejor estrategia al inicio
best_strat = strategies.pop(best_strat_idx)
strategies.insert(0, best_strat)
for attempt in range(min(max_retries, len(strategies))):
strategy = strategies[attempt]
@@ -82,11 +86,17 @@ class IntelligentLinkCleaner:
if attempt > 0: await asyncio.sleep((2 ** attempt) + random.random())
is_alive, reason = await self._check_url_logic(url, strategy)
if is_alive:
if "domains" not in self.learning_data: self.learning_data["domains"] = {}
if domain not in self.learning_data["domains"]: self.learning_data["domains"][domain] = {}
# Guardar el índice REAL de la estrategia que funcionó
original_idx = attempt if best_strat_idx is None else (best_strat_idx if attempt == 0 else (attempt if attempt < best_strat_idx else attempt))
self.learning_data["domains"][domain]["best_strategy_idx"] = original_idx
if "link_cache" not in self.learning_data: self.learning_data["link_cache"] = {}
self.learning_data["link_cache"][url] = {"status": "ALIVE", "last_checked": now}
return url, True, None, f"Alive ({strategy['desc']}) - {reason}"
if reason in ["404", "soft_404", "redirect_to_home"]:
if any(git_host in url for git_host in ["github.com", "gitlab.com", "bitbucket.org"]):
parts = url.split("/"); repo_root = "/".join(parts[:5]) if len(parts) > 4 else None

View File

@@ -108,6 +108,7 @@ async def master_orchestrator():
log_event(f"[*] Total after initial deduplication: {len(all_raw_assets)} unique links.")
# 4. Evaluation and Registration (Robust Global Deduplication)
from src.gemini_utils import is_fuzzy_duplicate
existing_urls = set()
for root, dirs, files in os.walk("docs"):
for file in files:
@@ -140,6 +141,21 @@ async def master_orchestrator():
url = asset["url"]
clean_url = url.split('#')[0].rstrip('/').lower()
# Fuzzy Deduplication
is_dup = False
for existing in existing_urls:
if is_fuzzy_duplicate(url, existing):
is_dup = True
break
if is_dup:
log_event(f" [=] SKIPPED: {url[:60]}... (Already exists - Fuzzy)")
full_report_metrics.append({
"url": url, "status": "DUPLICATE", "reason": "Already exists in repository",
"category": "N/A", "post_date": asset.get('timestamp'), "source": asset.get("source_type", "Social")
})
continue
# Track max date
try:
ts = asset.get('timestamp')
@@ -157,15 +173,9 @@ async def master_orchestrator():
max_tweet_date = asset_date
except: pass
if clean_url in existing_urls:
log_event(f" [=] SKIPPED: {url[:60]}... (Already exists)")
full_report_metrics.append({
"url": url, "status": "DUPLICATE", "reason": "Already exists in repository",
"category": "N/A", "post_date": ts, "source": asset.get("source_type", "Social")
})
continue
assets_to_evaluate.append(asset)
if not assets_to_evaluate:
log_event(" [*] Entire batch consists of duplicates. Next batch.")
continue