198 lines
6.3 KiB
Python
198 lines
6.3 KiB
Python
import os
|
||
|
||
import requests
|
||
import time
|
||
from typing import Optional, Tuple, Dict, Any
|
||
|
||
|
||
class ProxyChecker:
|
||
def __init__(self, timeout: int = 5):
|
||
"""
|
||
初始化代理检查器
|
||
|
||
Args:
|
||
api_url: 获取代理的API地址
|
||
timeout: 请求超时时间(秒)
|
||
"""
|
||
self.timeout = timeout
|
||
|
||
def get_proxies_from_api(self) -> list:
|
||
"""
|
||
从API获取代理列表
|
||
|
||
Returns:
|
||
list: 代理列表,格式为 [{"ip": "x.x.x.x", "port": xxxx}, ...]
|
||
"""
|
||
try:
|
||
pconfig = {
|
||
'proxyUser': 'qwkpslims6im',
|
||
'proxyPass': 'z6wM0LnETJG3d3RN',
|
||
'proxyHost': 'us.911proxy.net',
|
||
'proxyPort': '2600'
|
||
}
|
||
url = "https://api.ip.cc/"
|
||
proxies = {
|
||
"http": "http://{}:{}@{}:{}".format(pconfig['proxyUser'], pconfig['proxyPass'], pconfig['proxyHost'],
|
||
pconfig['proxyPort']),
|
||
"https": "http://{}:{}@{}:{}".format(pconfig['proxyUser'], pconfig['proxyPass'], pconfig['proxyHost'],
|
||
pconfig['proxyPort'])
|
||
}
|
||
response = requests.get(url=url, proxies=proxies, timeout=self.timeout)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
# {'http': 'http://qwkpslims6im:z6wM0LnETJG3d3RN@us.911proxy.net:2600',
|
||
# 'https': 'http://qwkpslims6im:z6wM0LnETJG3d3RN@us.911proxy.net:2600'}
|
||
# {"ip": "176.117.106.153", "country_code": "TR", "city": "", "country": "Turkey", "province": "Istanbul",
|
||
# "zip_code": "34122", "timezone": "Europe/Istanbul", "latitude": 41.0082, "longitude": 28.9784,
|
||
# "asn": "AS202561", "asn_name": "High Speed Telekomunikasyon ve Hab. Hiz. Ltd. Sti.",
|
||
# "asn_type": "business"}
|
||
proxy_url = data['http']
|
||
|
||
scheme, rest = proxy_url.split("://")
|
||
auth_hostport = rest.split("@")[1]
|
||
host, port = auth_hostport.split(":")
|
||
|
||
return [{"ip": host, "port": port}]
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"获取代理失败: {e}")
|
||
return []
|
||
except ValueError as e:
|
||
print(f"解析JSON失败: {e}")
|
||
return []
|
||
|
||
def check_proxy(self, proxy: Dict[str, Any]) -> bool:
|
||
"""
|
||
检查单个代理是否有效
|
||
|
||
Args:
|
||
proxy: 代理字典,包含ip和port
|
||
|
||
Returns:
|
||
bool: 代理是否有效
|
||
"""
|
||
proxy_url = f"http://{proxy['ip']}:{proxy['port']}"
|
||
proxies = {
|
||
"http": proxy_url,
|
||
"https": proxy_url
|
||
}
|
||
|
||
test_urls = [
|
||
"https://www.facebook.com"
|
||
]
|
||
|
||
for test_url in test_urls:
|
||
try:
|
||
start_time = time.time()
|
||
response = requests.get(
|
||
test_url,
|
||
proxies=proxies,
|
||
timeout=self.timeout,
|
||
headers={
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||
}
|
||
)
|
||
response_time = time.time() - start_time
|
||
|
||
if response.status_code == 200:
|
||
print(f"代理 {proxy_url} 有效,响应时间: {response_time:.2f}s,测试URL: {test_url}")
|
||
return True
|
||
|
||
except (requests.exceptions.RequestException, requests.exceptions.ProxyError):
|
||
continue
|
||
|
||
return False
|
||
|
||
def get_valid_proxy(self) -> Optional[Tuple[str, int]]:
|
||
"""
|
||
获取并验证代理,返回第一个有效的代理
|
||
|
||
Returns:
|
||
tuple: (ip, port) 或 None(如果没有有效代理)
|
||
"""
|
||
print("正在从API获取代理列表...")
|
||
# 本地开发模式
|
||
if os.getenv("dev"):
|
||
return "127.0.0.1", 1080
|
||
|
||
proxies = self.get_proxies_from_api()
|
||
|
||
if not proxies:
|
||
print("未获取到代理列表")
|
||
return None
|
||
|
||
print(f"获取到 {len(proxies)} 个代理,开始验证...")
|
||
|
||
for proxy in proxies:
|
||
print(f"正在验证代理: {proxy['ip']}:{proxy['port']}")
|
||
if self.check_proxy(proxy):
|
||
print(f"找到有效代理: {proxy['ip']}:{proxy['port']}")
|
||
return (proxy['ip'], proxy['port'])
|
||
|
||
print("所有代理均无效")
|
||
return None
|
||
|
||
def get_valid_proxy_dict(self) -> Optional[Dict[str, str]]:
|
||
"""
|
||
获取有效的代理字典格式
|
||
|
||
Returns:
|
||
dict: {"http": "http://ip:port", "https": "http://ip:port"} 或 None
|
||
"""
|
||
result = self.get_valid_proxy()
|
||
if result:
|
||
ip, port = result
|
||
proxy_url = f"http://{ip}:{port}"
|
||
return {
|
||
"http": proxy_url,
|
||
"https": proxy_url
|
||
}
|
||
return None
|
||
|
||
def get_valid_proxy_url(self) -> Optional[str]:
|
||
"""
|
||
获取有效的代理URL格式 (http://ip:port)
|
||
|
||
Returns:
|
||
str: "http://ip:port" 或 None
|
||
"""
|
||
result = self.get_valid_proxy()
|
||
if result:
|
||
ip, port = result
|
||
return f"http://{ip}:{port}"
|
||
return None
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# 你的API地址
|
||
API_URL = "你的API地址" # 替换为实际的API地址
|
||
|
||
checker = ProxyChecker(timeout=8)
|
||
|
||
# 获取有效代理(返回元组格式)
|
||
valid_proxy = checker.get_valid_proxy()
|
||
if valid_proxy:
|
||
ip, port = valid_proxy
|
||
print(f"\n最终选择的代理: {ip}:{port}")
|
||
|
||
# 使用代理示例
|
||
proxies = {
|
||
"http": f"http://{ip}:{port}",
|
||
"https": f"http://{ip}:{port}"
|
||
}
|
||
|
||
try:
|
||
response = requests.get("http://httpbin.org/ip", proxies=proxies, timeout=10)
|
||
print(f"使用代理请求测试: {response.text}")
|
||
except Exception as e:
|
||
print(f"测试请求失败: {e}")
|
||
else:
|
||
print("未找到有效代理")
|
||
|
||
# 或者直接获取代理字典格式
|
||
# proxy_dict = checker.get_valid_proxy_dict()
|
||
# if proxy_dict:
|
||
# print(f"代理字典: {proxy_dict}")
|