Files
awesome-kubernetes/src/gemini_utils.py

333 lines
16 KiB
Python

import httpx
import asyncio
import random
import json
import re
import os
import time
from datetime import datetime
from typing import Dict, Any, List, Optional
from src.config import GEMINI_API_KEYS, GEMINI_API_VERSION, GEMINI_API_KEYS_DATA
from src.logger import log_event
# Global state for rate limiting and discovery
CURRENT_KEY_INDEX = 0
DISCOVERED_MODELS = []
GLOBAL_COOLDOWN_UNTIL = 0
THROTTLED_MODELS = {} # {model_name: timestamp}
GLOBAL_AI_SEMAPHORE = asyncio.Semaphore(5) # Max 5 concurrent calls globally
class GeminiSessionTracker:
def __init__(self):
self.model_usage = {} # {model_name: count}
self.key_stats = {i: {"calls": 0, "429s": 0, "404s": 0, "type": GEMINI_API_KEYS_DATA[i]["type"], "label": GEMINI_API_KEYS_DATA[i]["label"]} for i in range(len(GEMINI_API_KEYS))}
self.discovery_log = []
self.start_time = datetime.now()
self.total_throttles = 0
self.total_tokens_prompt = 0
self.total_tokens_completion = 0
self.cache_hits = 0
self.est_tokens_saved = 0
def track_cache_hit(self, est_tokens: int = 1500):
self.cache_hits += 1
self.est_tokens_saved += est_tokens
def track_call(self, key_idx: int, model: str, status: int, usage: Dict = None):
if status == 200:
self.model_usage[model] = self.model_usage.get(model, 0) + 1
if usage:
self.total_tokens_prompt += usage.get("promptTokenCount", 0)
self.total_tokens_completion += usage.get("candidatesTokenCount", 0)
self.key_stats[key_idx]["calls"] += 1
if status == 429:
self.key_stats[key_idx]["429s"] += 1
self.total_throttles += 1
if status == 404: self.key_stats[key_idx]["404s"] += 1
def get_intelligence_report(self) -> str:
report = "\n### 🧠 AI Intelligence & Observability Report\n\n"
report += "#### 🤖 Model Selection Logic (Dynamic)\n"
report += f"Execution started with discovery of top models based on May 2026 hierarchy.\n\n"
report += "| Model Used | Successful Calls | Hierarchy Logic |\n| :--- | :---: | :--- |\n"
usage_items = sorted(self.model_usage.items(), key=lambda x: x[1], reverse=True)
for model, count in usage_items:
logic = "Elite/Pro (Complex Reasoning)" if "pro" in model else "Flash/Lite (High Speed)"
report += f"| `{model}` | **{count}** | {logic} |\n"
if not self.model_usage: report += "| No AI calls | 0 | N/A |\n"
report += "\n#### 🔑 API Infrastructure & Quota Management\n"
report += "| Key Index | Type | Provider Label | Usage | Errors (429/404) |\n| :--- | :--- | :--- | :---: | :---: |\n"
for idx, stats in self.key_stats.items():
usage_bar = "" * min(stats["calls"] // 5, 10) or ""
report += f"| Key {idx+1} | `{stats['type']}` | {stats['label']} | {usage_bar} ({stats['calls']}) | {stats['429s']} / {stats['404s']} |\n"
report += f"\n#### 📊 Consumption and Efficiency Metrics (2026 Units)\n"
report += f"- **Total Prompt Tokens**: {self.total_tokens_prompt:,}\n"
report += f"- **Total Completion Tokens**: {self.total_tokens_completion:,}\n"
# Cache-First Metrics
hit_ratio = (self.cache_hits / (self.cache_hits + sum(self.model_usage.values())) * 100) if (self.cache_hits + sum(self.model_usage.values())) > 0 else 0
report += f"- **Database-First Cache Hits**: **{self.cache_hits}** ({hit_ratio:.1f}% hit ratio)\n"
report += f"- **Estimated Tokens Saved**: ~{self.est_tokens_saved:,} (Zero-API cost)\n"
report += f"- **Execution Efficiency**: {((self.total_tokens_completion / self.total_tokens_prompt * 100) if self.total_tokens_prompt > 0 else 0):.1f}% (Completion/Prompt)\n"
status_msg = f"{len(DISCOVERED_MODELS)} models verified."
if self.total_throttles > 0:
status_msg += f" **Adaptive Tiering active ({self.total_throttles} throttles managed).**"
report += f"\n*Status: {status_msg} System auto-adopted newest versions found.*"
return report
SESSION_TRACKER = GeminiSessionTracker()
async def discover_optimal_models():
global DISCOVERED_MODELS
if DISCOVERED_MODELS: return DISCOVERED_MODELS
log_event("[*] Starting AI Model Auto-Discovery...", section_break=True)
all_supported = []
for key in GEMINI_API_KEYS:
try:
async with httpx.AsyncClient() as client:
url = f"https://generativelanguage.googleapis.com/v1beta/models?key={key}"
resp = await client.get(url, timeout=10)
if resp.status_code == 200:
models_data = resp.json().get("models", [])
for m in models_data:
name = m.get("name", "").replace("models/", "")
if "generateContent" in m.get("supportedGenerationMethods", []):
if name not in all_supported: all_supported.append(name)
elif resp.status_code == 429:
log_event(f" [!] Discovery Key is rate-limited (429). Skipping.")
except: pass
if not all_supported:
log_event(" [!] Discovery failed. Falling back to safe defaults.")
DISCOVERED_MODELS = ["gemini-1.5-flash-latest", "gemini-1.5-flash", "gemini-1.5-pro"]
return DISCOVERED_MODELS
def score_model(name: str) -> float:
score = 0.0
version_match = re.search(r'(\d+\.\d+)', name)
if version_match:
try:
version = float(version_match.group(1))
score += version * 50
except: pass
if "-ultra" in name: score += 100
elif "-pro" in name: score += 50
elif "-flash" in name: score += 25
elif "-lite" in name: score += 10
if "-latest" in name: score += 5
if "experimental" in name or "exp" in name: score -= 15
return score
DISCOVERED_MODELS = sorted(all_supported, key=score_model, reverse=True)
log_event(f" [+] Discovered {len(DISCOVERED_MODELS)} suitable models.")
log_event(f" [+] Top Tier AI: {', '.join(DISCOVERED_MODELS[:3])}")
return DISCOVERED_MODELS
class GeminiDiagnostics:
def __init__(self):
self.attempts = []
def add_attempt(self, model: str, status: int, error: str = None, response_text: str = None):
self.attempts.append({"model": model, "status": status, "error": error, "response_preview": response_text[:200] if response_text else None})
def get_report(self) -> str:
report = "DIAGNÓSTICO GEMINI:\n"
for i, a in enumerate(self.attempts):
report += f" {i+1}. [{a['model']}] Status: {a['status']}"
if a['error']: report += f" | Error: {a['error']}"
if a['response_preview']: report += f" | Resp: {a['response_preview']}"
report += "\n"
return report
async def resolve_url(url: str) -> str:
shorteners = ['t.co', 'bit.ly', 'buff.ly', 'goo.gl', 'tinyurl.com', 't.ly', 'rb.gy', 'is.gd', 'drp.li', 't.me', 'lnkd.in']
try: domain = url.split("//")[-1].split("/")[0].lower()
except: return url
final_url, max_hops, current_hop = url, 5, 0
async with httpx.AsyncClient(follow_redirects=True, timeout=8) as client:
while current_hop < max_hops:
try:
current_domain = final_url.split("//")[-1].split("/")[0].lower()
if current_hop > 0 and current_domain not in shorteners: break
resp = await client.head(final_url, timeout=5)
new_url = str(resp.url)
if new_url == final_url: break
final_url, current_hop = new_url, current_hop + 1
except: break
# Mandate 34: Prevent multiple trailing slashes using centralized utility
return sanitize_trailing_slashes(final_url)
def clean_toc_text(text: str) -> str:
"""
Ensures technical titles and TOC entries are robust.
Strips emojis, replaces ampersands, and removes special chars.
"""
if not text: return ""
# 1. Replace ampersands
text = text.replace("&", "and")
# 2. Strip Emojis (Regex for Unicode emoji ranges)
text = re.sub(r'[\U00010000-\U0010ffff]', '', text)
# 3. Strip other common problematic non-alphanumeric chars (except spaces and hyphens)
text = re.sub(r'[^\w\s\-.]', '', text)
return text.strip()
def sanitize_trailing_slashes(url: str) -> str:
"""
Mandate 34: Enforces a ZERO trailing slash policy.
Removes ALL trailing slashes and question marks from the end of the URL.
Does NOT collapse slashes in the middle of the URL (to avoid breaking protocol or deep links).
"""
if not url or '://' not in url: return url
# Remove all trailing slashes and question marks from the end of the entire string
return url.rstrip('/').rstrip('?')
def normalize_url(url: str) -> str:
"""
Normalización de URLs de alta precisión para Nubenetes.
Preserva anclajes de línea (#L) y evita forzar minúsculas en rutas profundas.
"""
if not url: return ""
# 0. Mandate 34: Cleanup redundant slashes first
url = sanitize_trailing_slashes(url)
# 1. Separar fragmento (pero preservar si es técnico como #L123)
fragment = ""
if "#" in url:
url, fragment = url.split("#", 1)
if not re.match(r'^L\d+', fragment): fragment = "" # Solo preservamos anclajes de línea
# 2. Limpiar parámetros de tracking social (UTM, etc.)
url = re.sub(r'(\?|&)(utm_[^&]+|s=[^&]+|t=[^&]+|ref=[^&]+|fbclid=[^&]+)', '', url)
# Mandate 34: Remove all trailing slashes and question marks for internal canonical comparison
url = url.rstrip("/").rstrip("?")
# 3. Normalizar protocolo y dominio (Case Insensitive)
match = re.match(r'^(https?://)([^/]+)(.*)', url, re.IGNORECASE)
if match:
proto, domain, path = match.groups()
# El dominio es Case-Insensitive, el path puede ser Case-Sensitive
url = f"https://{domain.lower()}{path}"
return f"{url}#{fragment}" if fragment else url
def is_fuzzy_duplicate(url_a: str, url_b: str) -> bool:
return normalize_url(url_a) == normalize_url(url_b)
async def call_gemini_with_retry(prompt: str, response_format: str = "json", max_retries: int = 3, prefer_flash: bool = False, use_grounding: bool = False):
global CURRENT_KEY_INDEX, GLOBAL_COOLDOWN_UNTIL
if not GEMINI_API_KEYS: raise ValueError("No GEMINI_API_KEYS configured.")
models_pool = await discover_optimal_models()
# 1. Smart Re-ordering
if prefer_flash:
models = sorted(models_pool, key=lambda m: 0 if "flash" in m or "lite" in m else 1)
else:
models = models_pool
total_keys = len(GEMINI_API_KEYS)
async with GLOBAL_AI_SEMAPHORE:
for attempt_round in range(max_retries + 1):
now = time.time()
if now < GLOBAL_COOLDOWN_UNTIL:
await asyncio.sleep(GLOBAL_COOLDOWN_UNTIL - now)
for key_offset in range(total_keys):
current_idx = (CURRENT_KEY_INDEX + key_offset) % total_keys
api_key = GEMINI_API_KEYS[current_idx]
async with httpx.AsyncClient() as client:
for model in models:
if THROTTLED_MODELS.get(model, 0) > time.time():
continue
full_model_name = f"models/{model}"
api_url = f"https://generativelanguage.googleapis.com/{GEMINI_API_VERSION}/{full_model_name}:generateContent?key={api_key}"
try:
# --- TOOL ENABLING (MCP-LIKE GROUNDING) ---
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"tools": [{"google_search_retrieval": {}}] if use_grounding else []
}
response = await client.post(api_url, json=payload, timeout=50)
resp_json = {}
try: resp_json = response.json()
except: pass
usage = resp_json.get("usageMetadata", {})
SESSION_TRACKER.track_call(current_idx, model, response.status_code, usage)
if response.status_code == 200:
CURRENT_KEY_INDEX = current_idx
if 'candidates' in resp_json and resp_json['candidates']:
text_resp = resp_json['candidates'][0]['content']['parts'][0]['text']
if response_format == "json":
match = re.search(r'\{.*\}|\[.*\]', text_resp, re.DOTALL)
if match:
try:
data = json.loads(match.group(0))
return data[0] if isinstance(data, list) and len(data) > 0 else data
except: pass
# QUALITY UPGRADE: If flash failed parsing, don't give up on the key, try a Pro model
if ("flash" in model or "lite" in model) and any("pro" in m for m in models):
diagnostics.add_attempt(model, 200, "Flash JSON error - Upgrading to Pro...")
continue
diagnostics.add_attempt(model, 200, "JSON not found")
break
return text_resp
diagnostics.add_attempt(model, 200, "No candidates")
break
elif response.status_code == 429:
consecutive_429s += 1
# 2. ADAPTIVE TIERING: Mark this specific model as throttled
throttle_duration = 30 if "pro" in model else 15
THROTTLED_MODELS[model] = time.time() + throttle_duration
# 3. GLOBAL THROTTLING: Slow down entire engine
GLOBAL_COOLDOWN_UNTIL = time.time() + 3.0
wait = base_wait_time * (1.8 ** (consecutive_429s - 1)) + random.uniform(1.0, 2.0)
log_event(f" [!] API 429 on `{model}` (Key {current_idx+1}). Tiering down & backing off {wait:.1f}s...")
await asyncio.sleep(wait)
# Continue to next model in current key (likely Flash)
continue
elif response.status_code == 404:
diagnostics.add_attempt(model, 404, "Not Found")
break
elif response.status_code in [500, 503, 504]:
diagnostics.add_attempt(model, response.status_code, "Server Error")
continue
else:
diagnostics.add_attempt(model, response.status_code, "API Error", response.text)
break
except Exception as e:
SESSION_TRACKER.track_call(current_idx, model, 0, {})
diagnostics.add_attempt(model, 0, str(e))
break
if attempt_round < max_retries:
wait_round = base_wait_time * (2 ** attempt_round)
log_event(f" [!] Exhausted tier options in round {attempt_round+1}. Cooling down {wait_round}s...")
await asyncio.sleep(wait_round)
raise Exception(f"Critical Gemini failure after adaptive tiering.\n{diagnostics.get_report()}")