feat(ops): harden Special Asset protection and eliminate safety audit false positives

This commit is contained in:
Nubenetes Bot
2026-05-17 19:11:36 +02:00
parent 6fd01c545b
commit 7df92b5128
2 changed files with 55 additions and 72 deletions

View File

@@ -57,13 +57,12 @@ class SafetyGuard:
def validate_special_assets_completeness(self):
"""Mandate 27: Inclusión exhaustiva de Activos Especiales en V2."""
if not os.path.exists(SPECIAL_ASSETS_PATH) or not os.path.exists(V2_DIR): return
if not os.path.exists(SPECIAL_ASSETS_PATH): return
with open(SPECIAL_ASSETS_PATH, "r") as f:
special = yaml.safe_load(f).get("special_assets", [])
for sa in special:
if "Include 100%" in sa.get("v2_rule", ""):
if "Include 100%" in sa.get("v2_rule", "") or "Exhaustive" in sa.get("v2_rule", ""):
file_name = sa["file"]
v1_path = os.path.join(V1_DIR, file_name)
if os.path.exists(v1_path):
@@ -71,8 +70,9 @@ class SafetyGuard:
for link in v1_links:
nu = normalize_url(link)
if nu in self.inventory and self.inventory[nu].get("status") == "online":
if not self.inventory[nu].get("v2_locations"):
self.errors.append(f"💎 **Special Asset Leak**: `{link}` from `{file_name}` is missing in V2 portal")
# Check for inherited is_special flag instead of v2_locations (which are built later)
if not self.inventory[nu].get("is_special"):
self.errors.append(f"💎 **VIP Flag Missing**: `{link}` from `{file_name}` is not marked as Special")
def validate_mvq_compliance(self):
"""Mandato 3 & 16: Verificar cumplimiento de MVQ en V2."""
@@ -116,7 +116,6 @@ class SafetyGuard:
def has_valid_toc(self, content: str) -> bool:
"""Checks if content has an explicit header or an implicit list-based TOC."""
if "## Table of Contents" in content: return True
# Check for V1 style: Numbered list of links at the top (after H1)
toc_pattern = r'^\d+\.\s+\[.*?\]\(#.*?\)'
matches = re.findall(toc_pattern, content, re.MULTILINE)
return len(matches) >= 3
@@ -153,7 +152,7 @@ class SafetyGuard:
for root, _, files in os.walk(V1_DIR):
for file in files:
if file.endswith(".md"):
if file in exempt_files: continue
if file in exempt_files or file == "index.md": continue
content = open(os.path.join(root, file), "r").read()
if not self.has_valid_toc(content) and len(re.findall(r'^## ', content, re.M)) > 2:
self.warnings.append(f"📍 **V1 TOC Missing**: `{file}` has many sections but no TOC")

View File

@@ -42,7 +42,7 @@ class V2VisionEngine:
"- KEEP >90% of technical resources (except for 'introduction.md' where only high-impact links are kept).\n"
"PHASE 2: SOPHISTICATED HIERARCHICAL CLASSIFICATION\n"
"- Identify TECHNICAL_HIERARCHY: A list of strings (max 10) representing Area > Topic > Subtopics.\n"
"- For 'introduction.md', set is_microservice: true if context matches.\n"
"- For 'introduction.md', identify links related to MICROSERVICES for extraction.\n"
"PHASE 3: KNOWLEDGE ASSIMILATION FLOW\n"
"- Order hierarchy to facilitate a structured learning journey.\n"
"PHASE 4: MANDATORY DESCRIPTIONS\n"
@@ -69,22 +69,23 @@ class V2VisionEngine:
def _load_inventory(self) -> Dict:
if os.path.exists(INVENTORY_PATH):
try:
with open(INVENTORY_PATH, "r") as f: return yaml.safe_load(f) or {}
try: return yaml.safe_load(open(INVENTORY_PATH, "r")) or {}
except: return {}
return {}
def _save_inventory(self):
os.makedirs(os.path.dirname(INVENTORY_PATH), exist_ok=True)
with open(INVENTORY_PATH, "w") as f:
yaml.dump(self.inventory, f, sort_keys=False, allow_unicode=True)
yaml.dump(self.inventory, open(INVENTORY_PATH, "w"), sort_keys=False, allow_unicode=True)
async def analyze_and_cluster(self):
log_event("STARTING V2 HIGH-DENSITY O'REILLY LIBRARY GENERATION", section_break=True)
# 0. Mandate Sync
# 0. Mandate Sync & Workspace Cleanup
try:
from src.mandate_ingestor import MandateIngestor
MandateIngestor().save_system_instructions()
if os.path.exists(V2_DIR):
for f in os.listdir(V2_DIR):
if f.endswith(".md"): os.remove(os.path.join(V2_DIR, f))
except: pass
all_v1_links, mosaic_html, videos_html = await self._gather_all_v1_content()
@@ -101,12 +102,11 @@ class V2VisionEngine:
log_event("[*] Phase 3: Recursive Hierarchy Construction...")
v2_data = await self._rebuild_structure(library_inventory)
log_event("[*] Phase 4: Generating Premium Portal Hubs (Comparison Tables)...")
log_event("[*] Phase 4: Generating Premium Portal Hubs...")
os.makedirs(V2_DIR, exist_ok=True)
await self._write_premium_files(v2_data, mosaic_html, videos_html)
await self._sync_enterprise_navigation(v2_data)
self._save_inventory()
log_event("V2 ELITE PORTAL GENERATED SUCCESSFULLY.")
# --- FINAL SAFETY AUDIT ---
try:
@@ -116,6 +116,8 @@ class V2VisionEngine:
with open("v2_safety_report.md", "w") as f: f.write(report)
except Exception as e:
log_event(f" [!] V2 Safety Audit Error: {e}")
log_event("V2 ELITE PORTAL GENERATED SUCCESSFULLY.")
async def _gather_all_v1_content(self):
all_links, mosaic_html, videos_html = [], "", ""
@@ -167,28 +169,28 @@ class V2VisionEngine:
async def _evaluate_and_score_resources(self, links: List[Dict]):
to_evaluate = []
project_registry = {} # {project_id: best_item}
project_registry = {}
force_eval = os.getenv("FORCE_EVAL", "false").lower() == "true"
special_files = [sa["file"] for sa in self.special_assets_rules.get("special_assets", [])]
for l in links:
item = l.copy()
norm_url = normalize_url(l["url"])
orig_file = l.get("original_file", "unknown.md")
# Identify Project Signature (Semantic Dedup)
# Mandate 27: VIP Status Inheritance
is_special = orig_file in special_files
item["is_special"] = is_special
# Project Identification
project_id = norm_url
if "github.com" in norm_url:
match = re.search(r'github\.com/([^/]+/[^/]+)', norm_url)
if match: project_id = match.group(1).lower()
# --- MANDATE 27: SPECIAL ASSET PROTECTION ---
is_special = orig_file in special_rules
item["is_special"] = is_special
# --- MANDATE 23: AUTHORITATIVE MERGE ---
if not force_eval and norm_url in self.inventory and "stars" in self.inventory[norm_url]:
cached = self.inventory[norm_url]
item.update(cached)
# Ensure is_special is preserved even if cache didn't have it
if is_special: item["is_special"] = True
if cached.get("hierarchy"):
@@ -196,17 +198,14 @@ class V2VisionEngine:
project_registry[project_id] = item
else:
existing = project_registry[project_id]
# Inheritance: If any version was special, the consolidated one is special
if item.get("is_special"): existing["is_special"] = True
is_current_root = "github.com" not in norm_url
if is_current_root or item.get("stars", 0) > existing.get("stars", 0):
item.setdefault("aliases", []).append(existing["url"])
# Preserve special status during overwrite
if existing.get("is_special"): item["is_special"] = True
project_registry[project_id] = item
else:
existing.setdefault("aliases", []).append(url)
existing.setdefault("aliases", []).append(l["url"])
continue
to_evaluate.append(item)
@@ -232,16 +231,16 @@ class V2VisionEngine:
"ai_summary": res.get("summary", ""), "language": res.get("language", "English"),
"resource_type": res.get("type", "Reference"), "complexity": res.get("complexity", "Intermediate"),
"hierarchy": res.get("hierarchy", ["General"]), "is_microservice": bool(res.get("is_microservice", False)),
"status": "online", "tag": self._calculate_tag(item)
"status": "online", "is_special": item.get("is_special", False)
}
item.update(eval_data)
self.inventory[norm_url] = eval_data
self.inventory[norm_url]["title"] = item["title"]
if p_id not in project_registry or item["stars"] > project_registry[p_id].get("stars", 0):
if p_id in project_registry and project_registry[p_id].get("is_special"): item["is_special"] = True
project_registry[p_id] = item
except:
for l in batch:
# Fallback registry injection
u = normalize_url(l["url"])
if u not in project_registry: project_registry[u] = l
await asyncio.sleep(0.3)
@@ -264,7 +263,7 @@ class V2VisionEngine:
cat_name = orig_file.replace(".md", "").replace("-", " ").title()
if item.get("is_microservice"): cat_name = "Microservices"; dim = "Architectural Foundations" if orig_file == "introduction.md" else dim
# --- MANDATE 27: SPECIAL ASSET PROTECTION ---
# Mandate 27: VIP PROTECTION
is_special = item.get("is_special", False) or orig_file in special_rules
if orig_file == "introduction.md" and item.get("stars", 0) < 4 and not item.get("is_microservice"): continue
@@ -312,26 +311,26 @@ class V2VisionEngine:
slug = dim.lower().replace(" ", "-").replace("&", "and").replace("(", "").replace(")", "")
index_md += f"- **[{dim}](./{slug}.md)**: {content['summary']}\n"
with open(os.path.join(V2_DIR, "index.md"), "w") as f: f.write(index_md)
# Helper functions for recursive rendering
def gen_toc(node, depth, base_slug):
toc = ""
for name, subnode in sorted(node.items()):
if name == "__links__": continue
clean_name = clean_toc_text(name)
slug = f"{base_slug}-{clean_name.lower().replace(' ', '-')}"
toc += f"{' ' * (depth * 4)}- [{clean_name}](#{slug})\n" + gen_toc(subnode, depth + 1, slug)
return toc
async def render_node(node, depth, base_slug, is_intro=False):
md = ""
for name, subnode in sorted(node.items()):
if name == "__links__": continue
clean_name = clean_toc_text(name)
slug = f"{base_slug}-{clean_name.lower().replace(' ', '-')}"
md += f"{'#' * min(6, depth + 2)} {clean_name}\n\n"
if depth == 1 and "__links__" in subnode: md += await self._generate_comparison_table(subnode["__links__"])
md += await render_node(subnode, depth + 1, slug, is_intro)
if "__links__" in node:
def gen_toc(node, depth, base_slug):
toc = ""
for name, subnode in sorted(node.items()):
if name == "__links__": continue
clean_name = clean_toc_text(name)
slug = f"{base_slug}-{clean_name.lower().replace(' ', '-')}"
toc += f"{' ' * (depth * 4)}- [{clean_name}](#{slug})\n" + gen_toc(subnode, depth + 1, slug)
return toc
async def render_node(node, depth, base_slug, is_intro=False):
md = ""
for name, subnode in sorted(node.items()):
if name == "__links__": continue
clean_name = clean_toc_text(name)
slug = f"{base_slug}-{clean_name.lower().replace(' ', '-')}"
md += f"{'#' * min(6, depth + 2)} {clean_name}\n\n"
if depth == 1 and "__links__" in subnode: md += await self._generate_comparison_table(subnode["__links__"])
md += await render_node(subnode, depth + 1, slug, is_intro)
if "__links__" in node:
for l in node["__links__"]:
is_gold = is_intro and l.get("stars", 0) >= 4
title = l['title'].replace("==", "")
@@ -342,26 +341,16 @@ async def render_node(node, depth, base_slug, is_intro=False):
year_prefix = f"**({l.get('year', 'N/A')})** "
gh_info = f" <span class='md-tag md-tag--info'>⭐ {l.get('gh_stars',0)}</span>" if l.get('gh_stars') else ""
icon = " 🎥" if l.get("is_video") else ""
lang = l.get("language", "English")
lang_tag = f" <span class='md-tag md-tag--warning'>[{lang.upper()} CONTENT]</span>" if lang.lower() != "english" else ""
comp = l.get("complexity", "Intermediate")
comp_tag = f" <span class='md-tag md-tag--critical'>[{comp.upper()} LEVEL]</span>" if comp.lower() in ["architect", "advanced"] else ""
level_tag = f" <span class='md-tag md-tag--critical'>[{comp.upper()} LEVEL]</span>" if comp.lower() in ["architect", "advanced"] else ""
res_type = l.get("resource_type", "Reference")
type_tag = f" <span class='md-tag md-tag--primary'>[{res_type.upper()}]</span>" if res_type.lower() in ["case study", "guide", "documentation"] else ""
rich = "".join([
f" <small>by **{l['author']}**</small>" if l.get("author") else "",
f" <span class='md-tag md-tag--info'>⏱️ {l['duration']}</span>" if l.get("duration") else "",
f" <span class='md-tag md-tag--info'>📖 {l['reading_time']}</span>" if l.get("reading_time") else ""
])
rich = "".join([f" <small>by **{l['author']}**</small>" if l.get("author") else "", f" <span class='md-tag md-tag--info'>⏱️ {l['duration']}</span>" if l.get("duration") else "", f" <span class='md-tag md-tag--info'>📖 {l['reading_time']}</span>" if l.get("reading_time") else ""])
tag = l.get("tag", "[COMMUNITY-TOOL]")
color = "success" if "STANDARD" in tag else "warning" if "EMERGING" in tag else "info"
md += f" - {year_prefix}[{title}]({l['url']}){icon}{gh_info}{lang_tag}{comp_tag}{type_tag}{rich} {'🌟'*l.get('stars',0)} <span class='md-tag md-tag--{color}'>{tag}</span>\n"
md += f" - {year_prefix}[{title}]({l['url']}){icon}{gh_info}{lang_tag}{level_tag}{type_tag}{rich} {'🌟'*l.get('stars',0)} <span class='md-tag md-tag--{color}'>{tag}</span>\n"
if l.get('ai_summary'): md += f"\n {l['ai_summary']}\n\n"
return md
@@ -369,21 +358,16 @@ async def render_node(node, depth, base_slug, is_intro=False):
if not content["categories"]: continue
slug = dim.lower().replace(" ", "-").replace("&", "and").replace("(", "").replace(")", "")
v2_page = f"{slug}.md"
# --- TRACK V2 LOCATIONS IN INVENTORY ---
def track_v2(node, page_path):
def track_v2(node, p):
if "__links__" in node:
for l in node["__links__"]:
nu = normalize_url(l["url"])
if nu in self.inventory:
locs = self.inventory[nu].get("v2_locations", [])
if page_path not in locs:
self.inventory[nu].setdefault("v2_locations", []).append(page_path)
if p not in locs: self.inventory[nu].setdefault("v2_locations", []).append(p)
for k, v in node.items():
if k != "__links__": track_v2(v, page_path)
for cat_topics in content["categories"].values(): track_v2(cat_topics, v2_page)
if k != "__links__": track_v2(v, p)
for ct in content["categories"].values(): track_v2(ct, v2_page)
md = f"# {dim}\n\n!!! info \"Architectural Context\"\n {content['summary']}\n\n## Table of Contents\n"
for cat, topics in content["categories"].items():
cat_slug = cat.lower().replace(" ", "-")
@@ -395,7 +379,7 @@ async def render_node(node, depth, base_slug, is_intro=False):
if cat == "Introduction":
md += "!!! quote \"Vision 2026\"\n The focus shifts to agentic autonomy and hardened security.\n\n### Ecosystem Map\n```mermaid\ngraph TD\n A[Foundations] --> B[AI & Intelligence]\n A --> C[Hardened Infra]\n B --> D[Agentic Curation]\n C --> E[Enterprise Stability]\n D --> F[Nubenetes Portal]\n E --> F\n```\n\n### Gateway Hub\n- 🚀 [Explore AI Dimensions](./ai-and-artificial-intelligence.md)\n- 📦 [Microservices Guide](./microservices.md)\n\n"
md += await render_node(topics, 0, cat_slug, is_intro=(cat=="Introduction"))
with open(os.path.join(V2_DIR, f"{slug}.md"), "w") as f: f.write(md)
with open(os.path.join(V2_DIR, v2_page), "w") as f: f.write(md)
async def _sync_enterprise_navigation(self, data: Dict[str, Dict]):
try: