feat(ai): implement semantic deduplication, RSS ingestion, and critical asset monitoring

This commit is contained in:
Nubenetes Bot
2026-05-16 23:14:36 +02:00
parent 1608840ee4
commit 3819d586b4
7 changed files with 215 additions and 6 deletions

View File

@@ -0,0 +1,77 @@
name: Nubenetes Critical Asset Monitor
on:
schedule:
- cron: '0 8 * * *' # Daily at 08:00 UTC
workflow_dispatch:
permissions:
contents: write
pull-requests: write
jobs:
monitor-critical:
runs-on: ubuntu-latest
steps:
- name: Repository Synchronization
uses: actions/checkout@v4
with:
ref: develop
- name: Python Environment Provisioning
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Dependencies Installation
run: |
python -m pip install --upgrade pip
pip install --no-cache-dir httpx PyYAML PyGithub beautifulsoup4
- name: Critical Health Audit Execution
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PYTHONPATH: ${{ github.workspace }}
PYTHONUNBUFFERED: 1
run: |
python -c "
import asyncio, httpx, yaml, os
from datetime import datetime
from src.gitops_manager import RepositoryController
async def check():
with open('data/inventory.yaml', 'r') as f:
inv = yaml.safe_load(f) or {}
critical_links = [u for u, d in inv.items() if d.get('tag') in ['[DE FACTO STANDARD]', '[ENTERPRISE-STABLE]']]
print(f'[*] Auditing {len(critical_links)} critical assets...')
dead = []
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
for url in critical_links:
try:
resp = await client.get(url)
if resp.status_code == 404:
dead.append(url)
print(f' [!] CRITICAL DOWN: {url}')
except: pass
if dead:
# Flag in inventory
for url in dead:
inv[url]['status'] = 'offline'
inv[url]['last_offline'] = datetime.now().isoformat()
with open('data/inventory.yaml', 'w') as f:
yaml.dump(inv, f, sort_keys=False, allow_unicode=True)
# Create PR/Issue
git = RepositoryController(os.environ['GH_TOKEN'], 'nubenetes/awesome-kubernetes')
body = '### 🚨 Critical Assets Offline\n\n' + '\n'.join([f'- {u}' for u in dead])
git.apply_multi_file_changes({'data/inventory.yaml': yaml.dump(inv)}, {'removals': len(dead)})
print('[OK] Inventory updated and PR created.')
else:
print('[OK] All critical assets are online.')
asyncio.run(check())
"

View File

@@ -2,7 +2,7 @@ name: Nubenetes Intelligent Link Cleaner & Dedup
on:
schedule:
- cron: '0 0 1 * *'
- cron: '0 0 1 */3 *' # Quarterly: 1st of Jan, Apr, Jul, Oct
workflow_dispatch:
inputs:
force_full_check:

View File

@@ -426,6 +426,8 @@ graph TD
### 7.6. Strategic Benefits
- **Linguistic Diversity and Global Access**: AI agents automatically detect the source language. **V1 Archive** preserves descriptions in the resource's native language (e.g., Spanish) to respect original context, while the **V2 Portal** provides professional English summaries and explicit language tagging (e.g., `[SPANISH CONTENT]`) for global accessibility.
- **Universal English Curation**: All high-level reasoning and synthesis are curated into professional technical English, maintaining Nubenetes as a truly global resource.
- **Semantic Conflict Resolution**: AI identifies multiple URLs pointing to the same technical project (e.g., repository vs. landing page) and automatically consolidates them into a single canonical reference.
- **Critical Asset Monitoring**: While the exhaustive health check runs every **3 months**, high-priority assets ([DE FACTO STANDARD]) are monitored daily to ensure zero downtime for essential industry tools.
- **Canonical Deduplication**: Automatically merges duplicate resources (stripping UTM/trackers), ensuring a clean and precise inventory.
- **The Agentic Pulse**: A dynamic trending section on the V2 home page that highlights the freshest high-impact resources.
- **Zero Redundancy**: Links already analyzed by Gemini are never re-evaluated unless forced.

View File

@@ -47,9 +47,17 @@ sources:
- "ApacheSpark"
- "snowflakedb"
- topic: "Infrastructure as Code & GitOps"
- topic: \"Infrastructure as Code & GitOps\"
accounts:
- "HashiCorp"
- "PulumiCorp"
- "ArgoProj"
- "fluxcd"
- \"HashiCorp\"
- \"PulumiCorp\"
- \"ArgoProj\"
- \"fluxcd\"
feeds:
- \"https://netflixtechblog.com/feed\"
- \"https://engineering.atlassian.com/feed\"
- \"https://blog.cloudflare.com/rss/\"
- \"https://medium.com/feed/uber-engineering\"
- \"https://engineering.fb.com/feed/\"
- \"https://aws.amazon.com/blogs/aws/feed/\"
- \"https://cloud.google.com/blog/rss/\"

55
src/ingestion_rss.py Normal file
View File

@@ -0,0 +1,55 @@
import feedparser
import asyncio
import re
from datetime import datetime
from typing import List, Dict, Optional
from src.logger import log_event
from src.config import MADRID_TZ
class RSSDataExtractor:
def __init__(self):
self.audit_trail = []
def log_audit(self, method: str, success: Optional[bool], msg: str):
icons = {True: "✅ SUCCESS", False: "❌ FAILURE", None: "⚡ ATTEMPT"}
entry = f"**RSS ({method})** - {icons.get(success, ' INFO')}: {msg}"
self.audit_trail.append(entry)
log_event(entry)
async def fetch_links_since(self, since_date: datetime, feeds: List[str]) -> List[Dict]:
all_articles = []
for url in feeds:
self.log_audit("Discovery", None, f"Parsing feed: {url}")
try:
# Use a thread for feedparser as it's blocking
feed = await asyncio.to_thread(feedparser.parse, url)
if feed.bozo:
self.log_audit("Parsing", False, f"Malformed feed: {url}")
continue
for entry in feed.entries:
# Parse published date
published = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
published = datetime(*entry.published_parsed[:6]).replace(tzinfo=MADRID_TZ)
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
published = datetime(*entry.updated_parsed[:6]).replace(tzinfo=MADRID_TZ)
if not published:
published = datetime.now(MADRID_TZ)
if published >= since_date:
all_articles.append({
"url": entry.link,
"title": entry.title,
"text": entry.get("summary", "")[:500],
"timestamp": published.isoformat(),
"source_type": f"RSS Feed ({feed.feed.get('title', 'Unknown')})"
})
self.log_audit("Parsing", True, f"Extracted {len(feed.entries)} entries from {url}")
except Exception as e:
self.log_audit("Error", False, f"Feed error {url}: {str(e)[:50]}")
return all_articles

View File

@@ -412,8 +412,58 @@ class IntelligentLinkCleaner:
except Exception as e:
log_event(f" [!] Batch enrichment error: {e}")
async def run_semantic_deduplication(self):
"""
SEMANTIC DEDUPLICATION ENGINE: Identifies multiple URLs pointing to the same technical project
(e.g., project.github.io vs github.com/user/project) and consolidates them.
"""
log_event("RUNNING SEMANTIC DEDUPLICATION (AI CONFLICT RESOLUTION)...", section_break=True)
# 1. Identify potential conflicts via GitHub Metadata
gh_projects = {} # {repo_path: [urls]}
for url, meta in self.inventory.items():
if "github.com" in url:
match = re.search(r'github\.com/([^/]+/[^/]+)', url)
if match:
repo = match.group(1).lower().replace(".git", "")
if repo not in gh_projects: gh_projects[repo] = []
gh_projects[repo].append(url)
# Cross-reference with GitHub Pages
if ".github.io" in url:
match = re.search(r'https?://([^.]+)\.github\.io/([^/]+)', url)
if match:
user, project = match.groups()
repo = f"{user}/{project}".lower()
if repo not in gh_projects: gh_projects[repo] = []
gh_projects[repo].append(url)
# 2. Consolidate conflicts
consolidations = 0
for repo, urls in gh_projects.items():
if len(set(urls)) > 1:
# We have a conflict. Prefer the GitHub Repository URL as Canonical
repo_url = f"https://github.com/{repo}"
others = [u for u in set(urls) if normalize_url(u) != normalize_url(repo_url)]
for duplicate in others:
if duplicate in self.link_registry:
# Mark for consolidation
self.dead_links[duplicate] = (f"CANONICAL:{repo_url}", f"Semantic duplicate of {repo}")
consolidations += 1
log_event(f" [♻️] Semantic Merge: {duplicate} -> {repo_url}")
if consolidations > 0:
log_event(f" [OK] Identified {consolidations} semantic conflicts for consolidation.")
else:
log_event(" [OK] No semantic duplicates found.")
async def apply_changes(self):
log_event("APPLYING INTELLIGENT CLEANING & PR GENERATION...", section_break=True)
# Run Semantic Dedup before applying
await self.run_semantic_deduplication()
file_updates = {}
def track(file, op, url, reason, cat=None):
if file not in self.detailed_stats["by_file"]: self.detailed_stats["by_file"][file] = {"removed": 0, "modified": 0, "created": 0}

View File

@@ -80,6 +80,7 @@ async def master_orchestrator():
# 2. Load Multi-source Accounts with Topic Filtering
accounts_to_scan = []
feeds_to_scan = []
sources_file = "data/curation_sources.yaml"
# Topic Inclusion Flags (from Env)
@@ -106,9 +107,14 @@ async def master_orchestrator():
for acc in topic_data.get("accounts", []):
if acc.lower() not in exclude_list:
all_accounts.add(acc)
for feed in topic_data.get("feeds", []):
feeds_to_scan.append(feed)
if all_accounts:
accounts_to_scan = list(all_accounts)
log_event(f"[*] Multi-source loaded: {len(accounts_to_scan)} accounts from enabled topics.")
if feeds_to_scan:
log_event(f"[*] RSS Feeds loaded: {len(feeds_to_scan)} technical blogs.")
except Exception as e:
log_event(f"[!] Error loading sources: {e}")
@@ -119,16 +125,27 @@ async def master_orchestrator():
# 3. Multi-source Ingestion
backup_file = os.getenv("BACKUP_FILE")
x_audit_trail = []
raw_social = []
if backup_file and os.path.exists(backup_file):
from src.ingestion_backup import BackupDataExtractor
extractor = BackupDataExtractor(backup_file)
raw_social = await extractor.fetch_links()
x_audit_trail = extractor.audit_trail
else:
# A. X.com Extraction
strategy = os.getenv("EXTRACTION_STRATEGY", "search")
twitter_client = SocialDataExtractor()
raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy, accounts=accounts_to_scan)
x_audit_trail = twitter_client.audit_trail
# B. RSS Extraction
if feeds_to_scan:
from src.ingestion_rss import RSSDataExtractor
rss_client = RSSDataExtractor()
raw_rss = await rss_client.fetch_links_since(since_date, feeds_to_scan)
raw_social.extend(raw_rss)
x_audit_trail.extend(rss_client.audit_trail)
trending = []
if not is_historical and not backup_file: