import asyncio
import sys
import random
import time
import re
import logging
import socket
import base64
import hashlib
import aiohttp
import pymongo
import geoip2.database
from typing import List, Dict, Any
# Windows üzerinde asyncio'nun düzgün çalışması için gerekli
if sys.platform == 'win32':
import asyncio.windows_events
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class ProxyIntelligentSystem:
def _init_(self, mongodb_uri: str = None):
# Proxy kaynakları
self.proxy_sources = [
"https://www.proxy-list.download/api/v1/get?type=http",
"https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list-raw.txt",
"https://raw.githubusercontent.com/jetkai/proxy-list/main/online-proxies/txt/proxies-socks4.txt",
"https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt",
f"https://api.github.com/gists/proxy_sources?timestamp={int(time.time())}",
f"https://pastebin.com/raw/proxy_list?unique={random.randint(1000, 9999)}"
]
# User Agent listesi
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_15_7)",
"Mozilla/5.0 (X11; Linux x86_64)",
"Mozilla/5.0 (Windows NT 6.1; WOW64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
]
# MongoDB bağlantısı
self.mongo_client = pymongo.MongoClient(mongodb_uri) if mongodb_uri else None
# Log yapılandırması
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - [%(levelname)s] %(message)s',
handlers=[logging.FileHandler("proxy_intelligence.log"), logging.StreamHandler()]
)
self.logger = logging.getLogger(_name_)
# Coğrafi veritabanı yükleme
self.geo_reader = self._load_geo_database()
def _load_geo_database(self):
"""Coğrafi konum veritabanını yükleme"""
try:
return geoip2.database.Reader('./GeoLite2-Country.mmdb')
except Exception as e:
self.logger.warning(f"Coğrafi konum veritabanı hatası: {e}")
return None
async def fetch_proxy_source(self, session: aiohttp.ClientSession, url: str) -> set:
"""Proxy kaynaklarını çekme"""
try:
headers = {
"User-Agent": random.choice(self.user_agents),
"Accept": "text/plain, application/json",
"X-Requested-With": "XMLHttpRequest"
}
# Dinamik URL işleme (GitHub için token ekleme)
if 'github.com' in url or 'gist' in url:
headers['Authorization'] = f'token {self._generate_github_token()}'
async with session.get(url, headers=headers, timeout=10) as response:
if response.status == 200:
content = await response.text()
proxies = set(re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5}', content))
# Proxy listelerini çözümleme (Base64 kodlu içerik)
decoded_proxies = self._decode_proxy_content(content)
proxies.update(decoded_proxies)
self.logger.debug(f"Çekilen proxyler: {proxies}")
return proxies
return set()
except Exception as e:
self.logger.error(f"Kaynak çekme hatası {url}: {e}")
return set()
def _generate_github_token(self) -> str:
"""GitHub API Token kullanma"""
return "YOUR_GITHUB_API_TOKEN"
def _decode_proxy_content(self, content: str) -> set:
"""Base64 kodlu proxy içeriğini çözme"""
decoded_proxies = set()
base64_matches = re.findall(r'([A-Za-z0-9+/]+={0,2})', content)
for match in base64_matches:
try:
decoded = base64.b64decode(match).decode('utf-8')
proxies = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5}', decoded)
decoded_proxies.update(proxies)
except Exception as e:
self.logger.debug(f"Base64 çözümleme hatası: {e}")
return decoded_proxies
async def comprehensive_proxy_test(self, proxy: str) -> Dict[str, Any]:
"""Detaylı proxy testi (performans ve güvenlik)"""
try:
ip, port = proxy.split(':')
port = int(port)
# Testleri paralel olarak çalıştırma
tests = [
self._dns_resolve_test(ip),
self._ping_test(ip),
self._connection_test(proxy),
self._anonymity_check(proxy)
]
test_results = await asyncio.gather(*tests)
# Coğrafi konum verisi
location = self._get_ip_location(ip) if self.geo_reader else "Bilinmiyor"
# Proxy benzersizliği (MD5 hash)
proxy_hash = hashlib.md5(proxy.encode()).hexdigest()
return {
"proxy": proxy,
"hash": proxy_hash,
"location": location,
"tests": {
"dns_resolve": test_results[0],
"ping": test_results[1],
"connection": test_results[2],
"anonymity": test_results[3]
}
}
except Exception as e:
return {"proxy": proxy, "status": "inactive", "error": str(e)}
async def _dns_resolve_test(self, ip: str) -> Dict[str, Any]:
"""DNS çözümleme testi"""
try:
start = time.time()
resolved_ip = socket.gethostbyname(ip)
return {"status": "success", "time": time.time() - start, "resolved_ip": resolved_ip}
except Exception as e:
return {"status": "failed", "error": str(e)}
async def _ping_test(self, ip: str) -> Dict[str, Any]:
"""Ping testi"""
try:
start = time.time()
proc = await asyncio.create_subprocess_shell(f"ping -c 4 {ip}", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, _ = await proc.communicate()
ping_times = re.findall(r'time=(\d+\.\d+)', stdout.decode())
return {
"status": "success" if ping_times else "failed",
"avg_time": float(sum(map(float, ping_times)) / len(ping_times)) if ping_times else None
}
except Exception as e:
return {"status": "failed", "error": str(e)}
async def _connection_test(self, proxy: str) -> Dict[str, Any]:
"""Bağlantı testi"""
try:
async with aiohttp.ClientSession() as session:
start = time.time()
async with session.get('https://httpbin.org/ip', proxy=f'http://{proxy}', timeout=5.0) as response:
return {
"status": "success" if response.status == 200 else "failed",
"connection_time": time.time() - start
}
except Exception as e:
return {"status": "failed", "error": str(e)}
async def _anonymity_check(self, proxy: str) -> Dict[str, Any]:
"""Anonymlik seviyesi kontrolü"""
try:
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/ip', proxy=f'http://{proxy}', timeout=5.0) as response:
ip_info = await response.json()
ip = ip_info.get('origin', 'Bilinmiyor')
return {"status": "success", "proxy_ip": ip}
except Exception as e:
return {"status": "failed", "error": str(e)}
def _get_ip_location(self, ip: str) -> str:
"""IP'nin coğrafi konumunu almak"""
try:
response = self.geo_reader.country(ip)
return response.country.name if response else "Bilinmiyor"
except Exception as e:
self.logger.error(f"Coğrafi konum hatası: {e}")
return "Bilinmiyor"