feat: implement comprehensive PR audit report with matrix table and Mermaid metrics

This commit is contained in:
Nubenetes Bot
2026-05-10 22:15:25 +02:00
parent f794e6799b
commit f8a6eb7eb2
2 changed files with 107 additions and 146 deletions

View File

@@ -12,9 +12,6 @@ class RepositoryController:
self.repository.create_git_ref(ref=f"refs/heads/{branch_name}", sha=base_branch.commit.sha)
def apply_multi_file_changes(self, updates: dict, metrics: dict) -> None:
if not updates:
return
timestamp_slug = datetime.now().strftime("%Y%m%d-%H%M")
branch_name = f"bot/knowledge-update-{timestamp_slug}"
self._create_feature_branch(branch_name)
@@ -25,70 +22,52 @@ class RepositoryController:
try:
file_meta = self.repository.get_contents(file_path, ref=self.default_branch_name)
self.repository.update_file(
path=file_path,
message=commit_signature,
content=content,
sha=file_meta.sha,
branch=branch_name
path=file_path, message=commit_signature, content=content,
sha=file_meta.sha, branch=branch_name
)
except Exception as e:
# Si no existe (404), lo creamos
if "404" in str(e):
self.repository.create_file(
path=file_path,
message=f"chore: create {file_path} [{timestamp_slug}]",
content=content,
branch=branch_name
path=file_path, message=f"chore: create {file_path}",
content=content, branch=branch_name
)
else:
raise e
except Exception as e:
print(f"Error procesando {file_path}: {e}")
# Informe Visual en el PR
categories_str = ", ".join([f"`{c}`" for c in metrics.get('categories', [])])
# --- CONSTRUCCIÓN DEL REPORTE ÉLITE ---
full_report = metrics.get('full_report', [])
# Detalle de enlaces añadidos
added_md = ""
if metrics.get('added_list'):
added_md = "### Enlaces Añadidos\n| Recurso | Categoría | URL |\n| :--- | :--- | :--- |\n"
for item in metrics['added_list']:
added_md += f"| {item['title']} | `{item['category']}` | {item['url']} |\n"
# 1. Tabla Matricial de Auditoría
matrix_table = "### 📋 Matriz de Auditoría de Enlaces (Full Extraction)\n"
matrix_table += "| Estado | Motivo | Categoría | URL |\n| :--- | :--- | :--- | :--- |\n"
# Detalle de curación/borrado
removed_md = ""
if metrics.get('removed_list'):
removed_md = "### 🧹 Curación y Limpieza (Duplicados)\n| Categoría | Acción |\n| :--- | :--- |\n"
for item in metrics['removed_list']:
removed_md += f"| `{item['category']}` | {item['reason']} |\n"
counts = {"INCLUDED": 0, "DUPLICATE": 0, "FILTERED": 0}
for item in full_report:
status_emoji = {"INCLUDED": "", "DUPLICATE": "👯", "FILTERED": "🛡️"}.get(item['status'], "")
matrix_table += f"| {status_emoji} {item['status']} | {item['reason']} | `{item['category']}` | {item['url']} |\n"
counts[item['status']] = counts.get(item['status'], 0) + 1
# Informe de Diagnóstico de X.com
x_report = ""
if metrics.get('x_diagnostics'):
x_report = "### ⚠️ Informe de Diagnóstico: X.com\n"
for diag in metrics['x_diagnostics']:
# Escapar markdown básico en mensajes de error
safe_diag = diag.replace("|", "\\|").replace("`", "'")
x_report += f"- {safe_diag}\n"
x_report += "\n"
# 2. Diagrama Mermaid
mermaid_pie = "### 📊 Métricas de Decisión\n```mermaid\npie title Distribución de Decisión Agéntica\n"
mermaid_pie += f" \"Aceptados (Inyectados)\" : {counts['INCLUDED']}\n"
mermaid_pie += f" \"Duplicados (Ignorados)\" : {counts['DUPLICATE']}\n"
mermaid_pie += f" \"Filtrados (Calidad/Impacto)\" : {counts['FILTERED']}\n```\n"
# 3. Log de Ingesta
x_log = "### ⚡ Audit Trail de Ingesta (X.com)\n"
for entry in metrics.get('x_audit', []):
x_log += f"- {entry}\n"
pr_narrative = (
f"## 💎 Actualización de Conocimiento: Kubernetes & Cloud Native\n\n"
f"Este PR añade **{metrics.get('total_new', 0)}** nuevos recursos y optimiza los existentes.\n\n"
f"**Rango Temporal Analizado:** `{metrics.get('start_date')}` ➔ `{metrics.get('end_date')}`\n\n"
f"{x_report}"
f"### ✅ Resumen de Ingesta:\n"
f"```mermaid\n"
f"pie title Origen de los Recursos\n"
f" \"X (@nubenetes)\" : {metrics.get('social_injections', 0)}\n"
f" \"GitHub Trending\" : {metrics.get('trending_injections', 0)}\n"
f"```\n\n"
f"{added_md}\n"
f"{removed_md}\n"
f"### 📂 Categorías Impactadas:\n"
f"{categories_str}\n\n"
f"## 💎 Knowledge Update War Room: Kubernetes & Cloud Native\n\n"
f"Este reporte detalla el procesamiento de **{metrics.get('total_extracted', 0)}** enlaces detectados.\n\n"
f"**Ventana Temporal:** `{metrics.get('start_date')}` ➔ `{metrics.get('end_date')}`\n\n"
f"{mermaid_pie}\n"
f"{x_log}\n"
f"{matrix_table}\n"
f"---\n"
f"**Nota del Bot:** El bot utiliza heurísticas de calidad para decidir qué duplicados mantener (estrellas 🌟 y longitud de descripción)."
f"**Nota de Evaluación:** Este PR incluye {len(metrics.get('added_list', []))} novedades reales. "
f"La ventana temporal se ha calculado automáticamente basándose en el último PR mergeado con éxito."
)
self.repository.create_pull(

View File

@@ -14,38 +14,30 @@ from src.gitops_manager import RepositoryController
async def master_orchestrator():
git_controller = RepositoryController(GH_TOKEN, TARGET_REPO)
markdown_sanitizer = MarkdownSanitizer()
state_file = "src/memory/state.json"
print("[*] INICIANDO CURADURÍA AGÉNTICA (SOLO INYECCIÓN DE NOVEDADES)")
print("[*] INICIANDO CURADURÍA AGÉNTICA (ESTRATEGIA DE TRANSPARENCIA TOTAL)")
# 1. Cargar Estado y Horizonte Temporal
# 1. Determinar Horizonte Temporal según el último MERGE
time_horizon = datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ)
try:
with open(state_file, 'r') as f:
state = json.load(f)
time_horizon = datetime.fromisoformat(state["last_processed_tweet_date"]).replace(tzinfo=MADRID_TZ)
except:
time_horizon = datetime(2024, 10, 1, 0, 0, tzinfo=MADRID_TZ)
# Buscamos PRs cerradas y merged del bot
pulls = git_controller.repository.get_pulls(state='closed', sort='updated', direction='desc')
for pr in pulls:
if pr.merged and "💎 Knowledge Update" in pr.title:
time_horizon = pr.merged_at.replace(tzinfo=MADRID_TZ) + timedelta(seconds=1)
print(f"[+] Último PR mergeado encontrado ({pr.merged_at}). Retomando desde ahí.")
break
except Exception as e:
print(f"[!] No se pudieron consultar PRs mergeadas: {e}. Usando fallback Oct 2024.")
print(f"[*] Buscando novedades desde: {time_horizon}")
print(f"[*] Rango de búsqueda: {time_horizon} ➔ Ahora")
# 2. Ingesta Multi-fuente
twitter_client = SocialDataExtractor()
raw_social = await twitter_client.fetch_links_since(time_horizon)
x_diagnostics = twitter_client.audit_trail
x_audit_trail = twitter_client.audit_trail
print("[*] Buscando novedades en GitHub Trending...")
trending = await discover_trending_assets()
# 3. Evaluación con IA
curated = []
if raw_social:
print(f"[*] Evaluando {len(raw_social)} candidatos de X con Gemini...")
curated = await evaluate_extracted_assets(raw_social)
all_new_assets = curated + trending
# 4. Deduplicación Global (Pre-escaneo de todos los .md)
print("[*] Escaneando repositorio para deduplicación global...")
# 3. Evaluación y Registro de Auditoría (Deduplicación Global Previa)
existing_urls = set()
for doc in os.listdir("docs"):
if doc.endswith(".md"):
@@ -53,91 +45,81 @@ async def master_orchestrator():
with open(os.path.join("docs", doc), 'r') as f:
existing_urls.update(re.findall(r'\]\((https?://[^\)]+)\)', f.read()))
except: pass
# Filtrar solo los que no existen
full_extraction_report = []
unique_new_assets = []
for asset in all_new_assets:
clean_url = asset["url"].split('#')[0].rstrip('/')
if any(clean_url in ex for ex in existing_urls):
continue
unique_new_assets.append(asset)
print(f"[*] Total candidatos únicos a inyectar: {len(unique_new_assets)}")
if raw_social:
print(f"[*] Evaluando {len(raw_social)} candidatos con Gemini...")
curated = await evaluate_extracted_assets(raw_social)
# Mapear resultados para el reporte matricial
curated_urls = {a["url"]: a for a in curated}
for asset in raw_social:
url = asset["url"]
clean_url = url.split('#')[0].rstrip('/')
reason = "Aceptado"
status = "INCLUDED"
if clean_url in [u.split('#')[0].rstrip('/') for u in existing_urls]:
status = "DUPLICATE"
reason = "Ya existe en Nubenetes.com"
elif url not in curated_urls:
status = "FILTERED"
reason = "Bajo impacto o no encaja en categorías"
if status == "INCLUDED":
unique_new_assets.append(curated_urls[url])
full_extraction_report.append({
"url": url,
"status": status,
"reason": reason,
"category": curated_urls[url]["category"] if url in curated_urls else "N/A"
})
# 5. Inyección en Markdowns
# 4. Inyección en Markdowns
file_updates = {}
stats = {
"new_links": 0,
"categories_updated": set(),
"added_details": [],
"removed_details": [],
"start_date": time_horizon.isoformat(),
"end_date": datetime.now(MADRID_TZ).isoformat()
}
stats = {"added_details": [], "categories_updated": set()}
for category in NUBENETES_CATEGORIES:
for asset in unique_new_assets:
category = asset["category"]
file_path = f"docs/{category}.md"
try:
repo_file = git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
final_content, doc_stats = await markdown_sanitizer.sanitize_document(content)
# Leer contenido (usar caché local o git)
content = file_updates.get(file_path)
if not content:
repo_file = git_controller.repository.get_contents(file_path)
content = repo_file.decoded_content.decode("utf-8")
original_content = final_content
for asset in unique_new_assets:
if asset["category"] == category:
prev_len = len(final_content)
final_content = markdown_sanitizer.inject_curated_link(
final_content, category, asset["title"], asset["url"], asset["description"]
)
if len(final_content) > prev_len:
stats["added_details"].append({
"title": asset["title"],
"url": asset["url"],
"category": category
})
final_content = markdown_sanitizer.inject_curated_link(
content, category, asset["title"], asset["url"], asset["description"]
)
if final_content.strip() != original_content.strip():
if len(final_content) > len(content):
file_updates[file_path] = final_content
stats["new_links"] += (final_content.count(" - [") - original_content.count(" - ["))
stats["added_details"].append(asset)
stats["categories_updated"].add(category)
except: continue
# 6. Actualizar Estado de Tiempo y Persistir en Repo
if raw_social:
try:
# Obtener el timestamp más reciente de los nuevos tweets
all_timestamps = [datetime.fromisoformat(t["timestamp"]) for t in raw_social]
new_horizon = max(all_timestamps) + timedelta(seconds=1)
state_data = {"last_processed_tweet_date": new_horizon.isoformat()}
new_state_json = json.dumps(state_data, indent=2)
# Guardar localmente
with open(state_file, 'w') as f:
f.write(new_state_json)
# Incluir en la subida a GitHub para "tener memoria"
file_updates[state_file] = new_state_json
print(f"[+] Memoria actualizada: Siguiente run desde {new_horizon.isoformat()}")
except Exception as e:
print(f"[!] Error actualizando memoria: {e}")
# 7. GitOps
if file_updates or x_diagnostics:
metrics = {
"social_injections": len(curated),
"trending_injections": len(trending),
"total_new": stats["new_links"],
"categories": list(stats["categories_updated"]),
"added_list": stats["added_details"],
"removed_list": stats["removed_details"],
"start_date": stats["start_date"],
"end_date": stats["end_date"],
"x_diagnostics": x_diagnostics
}
# 5. GitOps con Reporte Matricial
metrics = {
"social_injections": len(unique_new_assets),
"total_extracted": len(raw_social),
"full_report": full_extraction_report,
"x_audit": x_audit_trail,
"added_list": stats["added_details"],
"categories": list(stats["categories_updated"]),
"start_date": time_horizon.isoformat(),
"end_date": datetime.now(MADRID_TZ).isoformat()
}
if file_updates or full_extraction_report:
print(f"[+] Finalizado. Generando PR con auditoría completa.")
git_controller.apply_multi_file_changes(file_updates, metrics)
else:
print("[~] No se han encontrado novedades relevantes.")
print("[~] Sin novedades ni reportes que generar.")
if __name__ == "__main__":
asyncio.run(master_orchestrator())