diff --git a/.github/workflows/critical_asset_monitor.yml b/.github/workflows/critical_asset_monitor.yml new file mode 100644 index 00000000..90e48b2f --- /dev/null +++ b/.github/workflows/critical_asset_monitor.yml @@ -0,0 +1,77 @@ +name: Nubenetes Critical Asset Monitor + +on: + schedule: + - cron: '0 8 * * *' # Daily at 08:00 UTC + workflow_dispatch: + +permissions: + contents: write + pull-requests: write + +jobs: + monitor-critical: + runs-on: ubuntu-latest + steps: + - name: Repository Synchronization + uses: actions/checkout@v4 + with: + ref: develop + + - name: Python Environment Provisioning + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Dependencies Installation + run: | + python -m pip install --upgrade pip + pip install --no-cache-dir httpx PyYAML PyGithub beautifulsoup4 + + - name: Critical Health Audit Execution + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + PYTHONPATH: ${{ github.workspace }} + PYTHONUNBUFFERED: 1 + run: | + python -c " + import asyncio, httpx, yaml, os + from datetime import datetime + from src.gitops_manager import RepositoryController + + async def check(): + with open('data/inventory.yaml', 'r') as f: + inv = yaml.safe_load(f) or {} + + critical_links = [u for u, d in inv.items() if d.get('tag') in ['[DE FACTO STANDARD]', '[ENTERPRISE-STABLE]']] + print(f'[*] Auditing {len(critical_links)} critical assets...') + + dead = [] + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: + for url in critical_links: + try: + resp = await client.get(url) + if resp.status_code == 404: + dead.append(url) + print(f' [!] CRITICAL DOWN: {url}') + except: pass + + if dead: + # Flag in inventory + for url in dead: + inv[url]['status'] = 'offline' + inv[url]['last_offline'] = datetime.now().isoformat() + + with open('data/inventory.yaml', 'w') as f: + yaml.dump(inv, f, sort_keys=False, allow_unicode=True) + + # Create PR/Issue + git = RepositoryController(os.environ['GH_TOKEN'], 'nubenetes/awesome-kubernetes') + body = '### 🚨 Critical Assets Offline\n\n' + '\n'.join([f'- {u}' for u in dead]) + git.apply_multi_file_changes({'data/inventory.yaml': yaml.dump(inv)}, {'removals': len(dead)}) + print('[OK] Inventory updated and PR created.') + else: + print('[OK] All critical assets are online.') + + asyncio.run(check()) + " diff --git a/.github/workflows/intelligent_link_cleaner.yml b/.github/workflows/intelligent_link_cleaner.yml index 7624a2cb..9224a130 100644 --- a/.github/workflows/intelligent_link_cleaner.yml +++ b/.github/workflows/intelligent_link_cleaner.yml @@ -2,7 +2,7 @@ name: Nubenetes Intelligent Link Cleaner & Dedup on: schedule: - - cron: '0 0 1 * *' + - cron: '0 0 1 */3 *' # Quarterly: 1st of Jan, Apr, Jul, Oct workflow_dispatch: inputs: force_full_check: diff --git a/README.md b/README.md index e324e184..c4291ad4 100644 --- a/README.md +++ b/README.md @@ -426,6 +426,8 @@ graph TD ### 7.6. Strategic Benefits - **Linguistic Diversity and Global Access**: AI agents automatically detect the source language. **V1 Archive** preserves descriptions in the resource's native language (e.g., Spanish) to respect original context, while the **V2 Portal** provides professional English summaries and explicit language tagging (e.g., `[SPANISH CONTENT]`) for global accessibility. - **Universal English Curation**: All high-level reasoning and synthesis are curated into professional technical English, maintaining Nubenetes as a truly global resource. +- **Semantic Conflict Resolution**: AI identifies multiple URLs pointing to the same technical project (e.g., repository vs. landing page) and automatically consolidates them into a single canonical reference. +- **Critical Asset Monitoring**: While the exhaustive health check runs every **3 months**, high-priority assets ([DE FACTO STANDARD]) are monitored daily to ensure zero downtime for essential industry tools. - **Canonical Deduplication**: Automatically merges duplicate resources (stripping UTM/trackers), ensuring a clean and precise inventory. - **The Agentic Pulse**: A dynamic trending section on the V2 home page that highlights the freshest high-impact resources. - **Zero Redundancy**: Links already analyzed by Gemini are never re-evaluated unless forced. diff --git a/data/curation_sources.yaml b/data/curation_sources.yaml index e4d55dbf..375527d9 100644 --- a/data/curation_sources.yaml +++ b/data/curation_sources.yaml @@ -47,9 +47,17 @@ sources: - "ApacheSpark" - "snowflakedb" - - topic: "Infrastructure as Code & GitOps" + - topic: \"Infrastructure as Code & GitOps\" accounts: - - "HashiCorp" - - "PulumiCorp" - - "ArgoProj" - - "fluxcd" + - \"HashiCorp\" + - \"PulumiCorp\" + - \"ArgoProj\" + - \"fluxcd\" + feeds: + - \"https://netflixtechblog.com/feed\" + - \"https://engineering.atlassian.com/feed\" + - \"https://blog.cloudflare.com/rss/\" + - \"https://medium.com/feed/uber-engineering\" + - \"https://engineering.fb.com/feed/\" + - \"https://aws.amazon.com/blogs/aws/feed/\" + - \"https://cloud.google.com/blog/rss/\" diff --git a/src/ingestion_rss.py b/src/ingestion_rss.py new file mode 100644 index 00000000..b96227ff --- /dev/null +++ b/src/ingestion_rss.py @@ -0,0 +1,55 @@ +import feedparser +import asyncio +import re +from datetime import datetime +from typing import List, Dict, Optional +from src.logger import log_event +from src.config import MADRID_TZ + +class RSSDataExtractor: + def __init__(self): + self.audit_trail = [] + + def log_audit(self, method: str, success: Optional[bool], msg: str): + icons = {True: "✅ SUCCESS", False: "❌ FAILURE", None: "⚡ ATTEMPT"} + entry = f"**RSS ({method})** - {icons.get(success, 'ℹ️ INFO')}: {msg}" + self.audit_trail.append(entry) + log_event(entry) + + async def fetch_links_since(self, since_date: datetime, feeds: List[str]) -> List[Dict]: + all_articles = [] + for url in feeds: + self.log_audit("Discovery", None, f"Parsing feed: {url}") + try: + # Use a thread for feedparser as it's blocking + feed = await asyncio.to_thread(feedparser.parse, url) + + if feed.bozo: + self.log_audit("Parsing", False, f"Malformed feed: {url}") + continue + + for entry in feed.entries: + # Parse published date + published = None + if hasattr(entry, 'published_parsed') and entry.published_parsed: + published = datetime(*entry.published_parsed[:6]).replace(tzinfo=MADRID_TZ) + elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: + published = datetime(*entry.updated_parsed[:6]).replace(tzinfo=MADRID_TZ) + + if not published: + published = datetime.now(MADRID_TZ) + + if published >= since_date: + all_articles.append({ + "url": entry.link, + "title": entry.title, + "text": entry.get("summary", "")[:500], + "timestamp": published.isoformat(), + "source_type": f"RSS Feed ({feed.feed.get('title', 'Unknown')})" + }) + + self.log_audit("Parsing", True, f"Extracted {len(feed.entries)} entries from {url}") + except Exception as e: + self.log_audit("Error", False, f"Feed error {url}: {str(e)[:50]}") + + return all_articles diff --git a/src/intelligent_health_checker.py b/src/intelligent_health_checker.py index a0339235..2bbc7012 100644 --- a/src/intelligent_health_checker.py +++ b/src/intelligent_health_checker.py @@ -412,8 +412,58 @@ class IntelligentLinkCleaner: except Exception as e: log_event(f" [!] Batch enrichment error: {e}") + async def run_semantic_deduplication(self): + """ + SEMANTIC DEDUPLICATION ENGINE: Identifies multiple URLs pointing to the same technical project + (e.g., project.github.io vs github.com/user/project) and consolidates them. + """ + log_event("RUNNING SEMANTIC DEDUPLICATION (AI CONFLICT RESOLUTION)...", section_break=True) + + # 1. Identify potential conflicts via GitHub Metadata + gh_projects = {} # {repo_path: [urls]} + for url, meta in self.inventory.items(): + if "github.com" in url: + match = re.search(r'github\.com/([^/]+/[^/]+)', url) + if match: + repo = match.group(1).lower().replace(".git", "") + if repo not in gh_projects: gh_projects[repo] = [] + gh_projects[repo].append(url) + + # Cross-reference with GitHub Pages + if ".github.io" in url: + match = re.search(r'https?://([^.]+)\.github\.io/([^/]+)', url) + if match: + user, project = match.groups() + repo = f"{user}/{project}".lower() + if repo not in gh_projects: gh_projects[repo] = [] + gh_projects[repo].append(url) + + # 2. Consolidate conflicts + consolidations = 0 + for repo, urls in gh_projects.items(): + if len(set(urls)) > 1: + # We have a conflict. Prefer the GitHub Repository URL as Canonical + repo_url = f"https://github.com/{repo}" + others = [u for u in set(urls) if normalize_url(u) != normalize_url(repo_url)] + + for duplicate in others: + if duplicate in self.link_registry: + # Mark for consolidation + self.dead_links[duplicate] = (f"CANONICAL:{repo_url}", f"Semantic duplicate of {repo}") + consolidations += 1 + log_event(f" [♻️] Semantic Merge: {duplicate} -> {repo_url}") + + if consolidations > 0: + log_event(f" [OK] Identified {consolidations} semantic conflicts for consolidation.") + else: + log_event(" [OK] No semantic duplicates found.") + async def apply_changes(self): log_event("APPLYING INTELLIGENT CLEANING & PR GENERATION...", section_break=True) + + # Run Semantic Dedup before applying + await self.run_semantic_deduplication() + file_updates = {} def track(file, op, url, reason, cat=None): if file not in self.detailed_stats["by_file"]: self.detailed_stats["by_file"][file] = {"removed": 0, "modified": 0, "created": 0} diff --git a/src/main.py b/src/main.py index 877f3c50..0a9f1d26 100644 --- a/src/main.py +++ b/src/main.py @@ -80,6 +80,7 @@ async def master_orchestrator(): # 2. Load Multi-source Accounts with Topic Filtering accounts_to_scan = [] + feeds_to_scan = [] sources_file = "data/curation_sources.yaml" # Topic Inclusion Flags (from Env) @@ -106,9 +107,14 @@ async def master_orchestrator(): for acc in topic_data.get("accounts", []): if acc.lower() not in exclude_list: all_accounts.add(acc) + for feed in topic_data.get("feeds", []): + feeds_to_scan.append(feed) + if all_accounts: accounts_to_scan = list(all_accounts) log_event(f"[*] Multi-source loaded: {len(accounts_to_scan)} accounts from enabled topics.") + if feeds_to_scan: + log_event(f"[*] RSS Feeds loaded: {len(feeds_to_scan)} technical blogs.") except Exception as e: log_event(f"[!] Error loading sources: {e}") @@ -119,16 +125,27 @@ async def master_orchestrator(): # 3. Multi-source Ingestion backup_file = os.getenv("BACKUP_FILE") x_audit_trail = [] + raw_social = [] + if backup_file and os.path.exists(backup_file): from src.ingestion_backup import BackupDataExtractor extractor = BackupDataExtractor(backup_file) raw_social = await extractor.fetch_links() x_audit_trail = extractor.audit_trail else: + # A. X.com Extraction strategy = os.getenv("EXTRACTION_STRATEGY", "search") twitter_client = SocialDataExtractor() raw_social = await twitter_client.fetch_links_since(since_date, until_date=until_date, strategy=strategy, accounts=accounts_to_scan) x_audit_trail = twitter_client.audit_trail + + # B. RSS Extraction + if feeds_to_scan: + from src.ingestion_rss import RSSDataExtractor + rss_client = RSSDataExtractor() + raw_rss = await rss_client.fetch_links_since(since_date, feeds_to_scan) + raw_social.extend(raw_rss) + x_audit_trail.extend(rss_client.audit_trail) trending = [] if not is_historical and not backup_file: