feat: bypass Twikit challenge with ID and implement deep historical extraction from Wayback

This commit is contained in:
Nubenetes Bot
2026-05-10 21:49:18 +02:00
parent 3b426cfd02
commit b497002dad
2 changed files with 71 additions and 20 deletions

View File

@@ -144,26 +144,33 @@ class SocialDataExtractor:
async def fetch_links_since(self, since_date: datetime) -> list[dict]:
all_results = []
target_user_id = "1387348141150670850" # ID fijo de @nubenetes para bypass
# 1. Intentar Twikit Pro
self.log_audit("Twikit API", False, "Iniciando intento principal...")
# 1. Intentar Twikit Pro (ahora con ID directo)
self.log_audit("Twikit API", None, "Iniciando secuencia con bypass de ID...")
if await self._authenticate():
try:
user = await self.client.get_user_by_screen_name(self.target_account)
tweets = await self.client.get_user_tweets(user.id, 'Tweets')
# Bypass de get_user_by_screen_name para evitar el reto JS inicial
tweets = await self.client.get_user_tweets(target_user_id, 'Tweets')
if tweets:
for t in tweets:
tweet_date = t.created_at_datetime.astimezone(MADRID_TZ)
if tweet_date < since_date: break
txt = t.full_text if hasattr(t, 'full_text') else t.text
for u in self._extract_urls_from_text(txt):
if "x.com" not in u and "twitter.com" not in u:
all_results.append({"url": u, "context": txt, "timestamp": datetime.now(MADRID_TZ).isoformat()})
all_results.append({
"url": u, "context": txt,
"timestamp": tweet_date.isoformat()
})
if all_results:
self.log_audit("Twikit API", True, f"Extraídos {len(all_results)} enlaces directamente.")
self.log_audit("Twikit API", True, f"Extraídos {len(all_results)} enlaces recientes.")
return all_results
except Exception as e:
self.log_audit("Twikit API", False, f"Fallo en lectura de tweets: {str(e)[:50]}")
self.log_audit("Twikit API", False, f"Fallo (ID Bypass): {str(e)[:50]}")
# 2. Intentar RSS (Nuevo y muy resiliente)
# 2. Intentar RSS (Alta disponibilidad)
rss_links = await self._fetch_via_rss()
if rss_links: return rss_links
@@ -171,18 +178,48 @@ class SocialDataExtractor:
nitter_links = await self._fetch_via_nitter()
if nitter_links: return nitter_links
# 4. Intentar Wayback Machine
wayback_links = await self._fetch_via_wayback()
# 4. Intentar Wayback Machine (Histórico profundo)
wayback_links = await self._fetch_via_wayback_deep(since_date)
if wayback_links: return wayback_links
# 5. Intentar Guest Scrape (Último recurso)
# 5. Intentar Guest Scrape
guest_links = await self._fetch_via_guest_scrape()
if guest_links:
self.log_audit("Guest Scrape", True, f"Extraídos {len(guest_links)} enlaces.")
return guest_links
else:
self.log_audit("Estrategia Global", False, "Todos los métodos de evasión han sido agotados sin éxito.")
if guest_links: return guest_links
self.log_audit("Estrategia Global", False, "No se han podido recuperar enlaces de X.com en este ciclo.")
return []
async def _fetch_via_wayback_deep(self, since_date: datetime) -> list[dict]:
"""Estrategia Wayback Mejorada: Busca snapshots en el rango solicitado."""
from_ts = since_date.strftime("%Y%m%d")
self.log_audit("Wayback Deep", None, f"Buscando histórico desde {from_ts}...")
cdx_url = f"https://web.archive.org/cdx/search/cdx?url=twitter.com/{self.target_account}&output=json&from={from_ts}&limit=10&collapse=timestamp:8"
results = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(cdx_url, timeout=30) as resp:
if resp.status == 200:
snaps = await resp.json()
if len(snaps) > 1:
# Iterar sobre algunos snapshots (no todos para evitar timeout)
for snap in snaps[1:4]:
ts = snap[1]
snap_url = f"https://web.archive.org/web/{ts}/https://twitter.com/{self.target_account}"
async with session.get(snap_url, timeout=30) as s_resp:
html = await s_resp.text()
urls = self._extract_urls_from_text(html)
for u in set(urls):
if all(x not in u for x in ["x.com", "twitter.com", "t.co", "archive.org"]):
results.append({
"url": u, "context": f"Wayback Snapshot {ts}",
"timestamp": datetime.now(MADRID_TZ).isoformat()
})
if results:
self.log_audit("Wayback Deep", True, f"Recuperados {len(results)} enlaces históricos.")
return results
except Exception as e:
self.log_audit("Wayback Deep", False, str(e)[:50])
return []
async def _fetch_via_guest_scrape(self) -> list[dict]:

View File

@@ -102,11 +102,25 @@ async def master_orchestrator():
stats["categories_updated"].add(category)
except: continue
# 6. Actualizar Estado de Tiempo
# 6. Actualizar Estado de Tiempo y Persistir en Repo
if raw_social:
new_horizon = max([datetime.fromisoformat(t["timestamp"]) for t in raw_social]) + timedelta(seconds=1)
with open(state_file, 'w') as f:
json.dump({"last_processed_tweet_date": new_horizon.isoformat()}, f)
try:
# Obtener el timestamp más reciente de los nuevos tweets
all_timestamps = [datetime.fromisoformat(t["timestamp"]) for t in raw_social]
new_horizon = max(all_timestamps) + timedelta(seconds=1)
state_data = {"last_processed_tweet_date": new_horizon.isoformat()}
new_state_json = json.dumps(state_data, indent=2)
# Guardar localmente
with open(state_file, 'w') as f:
f.write(new_state_json)
# Incluir en la subida a GitHub para "tener memoria"
file_updates[state_file] = new_state_json
print(f"[+] Memoria actualizada: Siguiente run desde {new_horizon.isoformat()}")
except Exception as e:
print(f"[!] Error actualizando memoria: {e}")
# 7. GitOps
if file_updates or x_diagnostics: