Spaces:
Runtime error
Runtime error
| """ | |
| OSINT engine for username and person search. | |
| """ | |
| from typing import Dict, List, Any, Optional | |
| import asyncio | |
| import json | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import whois | |
| from holehe.core import * | |
| from geopy.geocoders import Nominatim | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| from duckduckgo_search import DDGS | |
| class OSINTEngine: | |
| def __init__(self): | |
| self.geolocator = Nominatim(user_agent="ise_search") | |
| self.holehe_modules = import_submodules("holehe.modules") | |
| self.known_platforms = [ | |
| "twitter.com", "facebook.com", "instagram.com", "linkedin.com", | |
| "github.com", "youtube.com", "reddit.com", "pinterest.com", | |
| "medium.com", "tumblr.com", "flickr.com", "vimeo.com" | |
| ] | |
| async def search_username(self, username: str) -> Dict[str, Any]: | |
| """Search for username across platforms.""" | |
| results = { | |
| "found": [], | |
| "not_found": [], | |
| "error": [] | |
| } | |
| # Manual platform check | |
| for platform in self.known_platforms: | |
| try: | |
| url = f"https://{platform}/{username}" | |
| response = requests.head(url, timeout=5, allow_redirects=True) | |
| if response.status_code == 200: | |
| results["found"].append({ | |
| "platform": platform.split(".")[0].title(), | |
| "url": url | |
| }) | |
| else: | |
| results["not_found"].append(platform.split(".")[0].title()) | |
| except Exception as e: | |
| results["error"].append({ | |
| "platform": platform, | |
| "error": str(e) | |
| }) | |
| # Run holehe checks | |
| try: | |
| holehe_results = [] | |
| for module in self.holehe_modules: | |
| try: | |
| check_func = getattr(module, "check") | |
| out = await check_func(username) | |
| if out and out.get("exists"): | |
| results["found"].append({ | |
| "platform": out["name"], | |
| "url": out.get("url", ""), | |
| "email": out.get("email", "") | |
| }) | |
| except Exception as e: | |
| print(f"Error in holehe module {module.__name__}: {e}") | |
| results["holehe"] = holehe_results | |
| except Exception as e: | |
| print(f"Error running holehe: {e}") | |
| return results | |
| async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: | |
| """Search for person information.""" | |
| results = { | |
| "basic_info": {}, | |
| "social_profiles": [], | |
| "locations": [], | |
| "possible_relatives": [], | |
| "error": None | |
| } | |
| try: | |
| # Geocode location if provided | |
| if location: | |
| try: | |
| loc = self.geolocator.geocode(location) | |
| if loc: | |
| results["locations"].append({ | |
| "address": loc.address, | |
| "latitude": loc.latitude, | |
| "longitude": loc.longitude | |
| }) | |
| except Exception as e: | |
| print(f"Error geocoding location: {e}") | |
| # Basic search query | |
| search_query = f"{name}" | |
| if location: | |
| search_query += f" {location}" | |
| if age: | |
| search_query += f" {age} years old" | |
| # Use DuckDuckGo for initial search | |
| with DDGS() as ddgs: | |
| search_results = [r for r in ddgs.text(search_query, max_results=10)] | |
| for result in search_results: | |
| try: | |
| url = result["link"] | |
| # Check if URL is from a known social platform | |
| if any(platform in url.lower() for platform in self.known_platforms): | |
| platform = next(p for p in self.known_platforms if p in url.lower()) | |
| results["social_profiles"].append({ | |
| "platform": platform.split(".")[0].title(), | |
| "url": url, | |
| "title": result.get("title", "") | |
| }) | |
| except Exception as e: | |
| print(f"Error processing search result: {e}") | |
| except Exception as e: | |
| results["error"] = str(e) | |
| return results | |
| async def domain_lookup(self, domain: str) -> Dict[str, Any]: | |
| """Perform WHOIS lookup for a domain.""" | |
| try: | |
| w = whois.whois(domain) | |
| return { | |
| "domain_name": w.domain_name, | |
| "registrar": w.registrar, | |
| "creation_date": w.creation_date, | |
| "expiration_date": w.expiration_date, | |
| "name_servers": w.name_servers, | |
| "status": w.status, | |
| "emails": w.emails, | |
| "dnssec": w.dnssec, | |
| "name": w.name, | |
| "org": w.org, | |
| "address": w.address, | |
| "city": w.city, | |
| "state": w.state, | |
| "zipcode": w.zipcode, | |
| "country": w.country | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def analyze_social_profile(self, url: str) -> Dict[str, Any]: | |
| """Analyze a social media profile.""" | |
| results = { | |
| "profile_info": {}, | |
| "recent_activity": [], | |
| "connections": [], | |
| "error": None | |
| } | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Extract basic profile info | |
| results["profile_info"]["title"] = soup.title.string if soup.title else None | |
| # Extract meta information | |
| for meta in soup.find_all("meta"): | |
| property = meta.get("property", "") | |
| content = meta.get("content", "") | |
| if "og:title" in property: | |
| results["profile_info"]["og_title"] = content | |
| elif "og:description" in property: | |
| results["profile_info"]["og_description"] = content | |
| elif "og:image" in property: | |
| results["profile_info"]["og_image"] = content | |
| except Exception as e: | |
| results["error"] = str(e) | |
| return results | |