perf(cleaner): parallelize content fetch and drift detection, improve logging transparency

This commit is contained in:
Nubenetes Bot
2026-05-18 16:05:06 +02:00
parent 341545543d
commit 5c7a390bef

View File

@@ -95,17 +95,18 @@ class IntelligentLinkCleaner:
elif (datetime.now().timestamp() - entry.get("last_checked", 0)) > (86400 * 21):
to_check.append(u)
log_event(f"[*] Queue: {len(to_check)} links prioritized for validation.")
total_to_check = len(to_check)
log_event(f"[*] Queue: {total_to_check} links prioritized for validation.")
# 2. Parallel Network Checks
BATCH_SIZE = 20
# 2. Parallel Network Checks (Optimized with Mandate 22 & 33)
BATCH_SIZE = 15 # Slightly smaller batch to accommodate deep fetch
check_results = {}
for i in range(0, len(to_check), BATCH_SIZE):
for i in range(0, total_to_check, BATCH_SIZE):
batch = to_check[i:i+BATCH_SIZE]
tasks = [self._check_url_logic(url) for url in batch]
results = await asyncio.gather(*tasks)
for url, res in zip(batch, results): check_results[url] = res
if i % 100 == 0: log_event(f" [>] Progress: {i}/{len(to_check)} checked...")
if i % 45 == 0: log_event(f" [>] Progress: [{i}/{total_to_check}] links validated...")
# 2.5. UNIVERSAL AI RESCUE
to_rescue = [u for u, res in check_results.items() if not res[0] or res[1] == "generic_redirect_loss"]
@@ -142,42 +143,14 @@ class IntelligentLinkCleaner:
except: pass
# 2.8. Finalize Status
log_event("FINALIZING STATUS AND METRICS...", section_break=True)
for url, (alive, reason, final) in check_results.items():
nu = normalize_url(url); entry = self.inventory.get(nu, {})
score = entry.get("health_score", 100)
score = (score * 0.8) + (100 if alive else 0) * 0.2
entry["health_score"] = round(score, 1); entry["last_checked"] = datetime.now().timestamp()
# --- MANDATE 33: LICENSE & COMPLIANCE GUARD ---
if alive and "github.com" in url:
try:
from src.agentic_curator import _get_github_activity
gh_meta = await _get_github_activity(url)
if gh_meta.get("gh_license"):
old_lic = entry.get("gh_license", "N/A")
new_lic = gh_meta["gh_license"]
if old_lic != "N/A" and old_lic != new_lic:
if any(x in new_lic.upper() for x in ["BSL", "SSPL", "PROPRIETARY"]):
log_event(f" [⚖️] LICENSE ALERT: {url} changed to {new_lic}")
entry["status"] = "review_required"
entry["stars"] = max(entry.get("stars", 1) - 1, 1)
entry["gh_license"] = new_lic
except: pass
# --- MANDATE 22: SEMANTIC DRIFT (SHA256) ---
if alive:
try:
from src.agentic_curator import _deep_fetch_content
text, _ = await _deep_fetch_content(url if not final else final)
if text:
new_hash = hashlib.sha256(text.encode()).hexdigest()
old_hash = entry.get("content_hash", "N/A")
if old_hash != "N/A" and new_hash != old_hash:
log_event(f" [!] DRIFT DETECTED: {url}")
entry["needs_ai_refresh"] = True
entry["content_hash"] = new_hash
except: pass
# Mandate 22 & 33 were handled in _check_url_logic to avoid sequential bottlenecks
is_important = any(occ.get("is_important") for occ in self.link_registry.get(nu, []))
if entry.get("stars", 0) >= 3: is_important = True
@@ -212,35 +185,56 @@ class IntelligentLinkCleaner:
async def _check_url_logic(self, url: str) -> Tuple[bool, str, Optional[str]]:
headers = {"User-Agent": "Mozilla/5.0", "Accept-Language": "en-US,en;q=0.5"}
parked = ["buy this domain", "parked free", "domain is for sale"]
nu = normalize_url(url); entry = self.inventory.get(nu, {})
try:
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=12) as client:
resp = await client.get(url)
if resp.status_code < 400:
text = resp.text.lower()
final_url = str(resp.url)
# Mandate 34: Prevent multiple trailing slashes using centralized utility
from src.gemini_utils import sanitize_trailing_slashes
final_url = sanitize_trailing_slashes(final_url)
if any(kw in text for kw in parked): return False, "parked", None
# Mandate 34: Explicit detection of redundant slashes or single slash policy
# Mandate 22: Content Drift detection (SHA256) - Integrated here for speed
new_hash = hashlib.sha256(resp.text.encode()).hexdigest()
old_hash = entry.get("content_hash", "N/A")
if old_hash != "N/A" and new_hash != old_hash:
log_event(f" [!] DRIFT DETECTED: {url}")
entry["needs_ai_refresh"] = True
entry["content_hash"] = new_hash
# Mandate 33: License Guard for GitHub
if "github.com" in url:
try:
from src.agentic_curator import _get_github_activity
gh_meta = await _get_github_activity(url)
if gh_meta.get("gh_license"):
old_lic = entry.get("gh_license", "N/A")
new_lic = gh_meta["gh_license"]
if old_lic != "N/A" and old_lic != new_lic:
if any(x in new_lic.upper() for x in ["BSL", "SSPL", "PROPRIETARY"]):
log_event(f" [⚖️] LICENSE ALERT: {url} -> {new_lic}")
entry["status"] = "review_required"
entry["gh_license"] = new_lic
except: pass
if final_url != url:
u_p = url.split("://")[-1].rstrip("/"); f_p = final_url.split("://")[-1].rstrip("/")
# If it's just a slash/redundancy fix, we mark it as 'healed' or 'normalized'
if u_p == f_p:
return True, "normalized_slashes", final_url
# Generic redirect loss protection
if u_p == f_p: return True, "normalized_slashes", final_url
if u_p.count("/") >= 3 and (f_p.count("/") <= 2 or any(kw in f_p for kw in ["/about", "/products", "/home"])):
return False, "generic_redirect_loss", None
return True, "OK", final_url if final_url != url else None
if resp.status_code in [404, 410]:
if "github.com" in url:
if "/master/" in url:
h = url.replace("/master/", "/main/")
try:
if (await client.get(h)).status_code < 200: return True, "healed", h
if (await client.get(h)).status_code < 400: return True, "healed", h
except: pass
m = re.search(r'(https?://github\.com/[^/]+/[^/]+)', url)
if m and (await client.get(m.group(1))).status_code < 400: return True, "consolidated", m.group(1)
@@ -261,17 +255,12 @@ class IntelligentLinkCleaner:
if url in line:
if fallback and fallback.startswith("CANONICAL:"):
fallback_url = fallback.replace("CANONICAL:", "")
# Mandate 34: Robust replacement to avoid path/ path// recursion
# We replace exactly the URL within Markdown link syntax or bounded by whitespace
line_updated = line.replace(f"({url})", f"({fallback_url})")
if line_updated == line: # Fallback if not in parens
if line_updated == line:
line_updated = re.sub(rf'({re.escape(url)})(?=[)\s]|$)', fallback_url, line)
# Final safety check: if line still has // after our intended clean URL
line_updated = line_updated.replace(f"{fallback_url}/", fallback_url)
file_updates[path][i] = line_updated
else:
# Delete dead link line
file_updates[path][i] = None
final_payload = {p: "".join([l for l in lines if l is not None]) for p, lines in file_updates.items()}