×

api开发 电商平台 数据挖掘

构建企业级 1688 数据管道:商品详情 API 的分布式采集与容错设计

admin admin 发表于2025-12-19 11:07:11 浏览15 评论0

抢沙发发表评论

在电商数据分析、供应链管理、竞品监控等场景中,1688 平台的商品详情数据是核心资产。但面对海量商品、高频采集需求以及 API 调用限制,传统的单进程采集方案极易出现性能瓶颈、数据丢失、服务不可用等问题。本文将从企业级视角,详解基于分布式架构的 1688 商品详情 API 采集管道设计,重点解决高并发、容错、可扩展等核心问题,并提供可落地的代码实现。

一、需求与挑战分析

1.1 核心需求

  • 支持百万级商品 ID 的分布式采集,单批次采集效率≥1000QPS;

  • 应对 1688 API 的限流机制(单 IP / 账号调用频率限制);

  • 保障采集过程的容错性:网络波动、API 返回异常、节点宕机不导致数据丢失;

  • 采集结果可追溯、可重试,最终数据一致性可达 99.99%;

  • 架构具备可扩展性,支持后续接入更多数据源(如商品销量、评价)。

1.2 关键挑战

  • API 限流:1688 开放平台对商品详情 API(alibaba.item.get)有严格的 QPS 限制,单应用账号通常为 10QPS;

  • 分布式一致性:多节点采集时需避免重复采集、漏采;

  • 容错恢复:网络超时、API 返回错误码(如403500)需自动重试,节点故障需无感切换;

  • 数据落地:采集结果需高效写入存储,同时支持实时查询与批量分析。

二、整体架构设计

本文设计的分布式采集管道采用 “任务分发 - 节点采集 - 结果汇总 - 容错补偿” 四层架构,核心组件包括:

组件 功能 技术选型
任务调度中心 拆分商品 ID 任务、分发至采集节点、监控任务状态 Redis(ZSet/Hash)+ 定时任务
分布式采集节点 消费任务、调用 API、处理异常、上报结果 Python(FastAPI + Celery)
限流控制层 按账号 / IP 维度限流,避免触发平台封禁 令牌桶算法 + 账号池管理
容错补偿层 失败任务重试、断点续传、数据校验 Redis(失败队列)+ 定时补偿任务
数据存储层 原始数据 + 结构化数据存储 MongoDB(原始 JSON)+ MySQL(结构化字段)

架构流程图如下:

plaintext

商品ID池 → 任务调度中心(拆分分片)→ 采集节点(多账号/IP)→ 限流控制 → 1688 API
                                                          ↓
失败任务 → 容错补偿层(重试/人工介入)→ 数据存储层
成功任务 → 结果汇总 → 数据存储层

三、核心技术实现

3.1 环境准备

  • 依赖库:requests(API 调用)、celery(分布式任务)、redis(任务存储)、pymongo(数据存储)、ratelimit(限流);

  • 前置条件:已在 1688 开放平台申请应用,获取appkeyappsecret,并开通alibaba.item.get API 权限;

  • 多账号 / IP 准备:搭建代理池或准备多个 1688 开发者账号,应对限流。

3.2 任务调度中心:分布式任务拆分与分发

采用 Redis 的 ZSet 实现任务分片,按商品 ID 哈希值分配至不同节点,避免重复消费;Hash 结构记录任务状态(待执行 / 执行中 / 成功 / 失败)。

import redis
import hashlib
from typing import List

# 初始化Redis连接
redis_client = redis.Redis(host="127.0.0.1", port=6379, db=0, password="your_redis_pwd")

class TaskScheduler:
    """任务调度中心:拆分商品ID任务,分发至采集节点"""
    def __init__(self, task_topic: str = "1688_item_task", node_count: int = 5):
        self.task_topic = task_topic
        self.node_count = node_count  # 采集节点数量

    def split_task(self, item_ids: List[str]):
        """拆分商品ID任务,按哈希分配至不同节点"""
        for item_id in item_ids:
            # 按商品ID哈希值分配节点,保证同一商品ID始终分配至同一节点
            node_id = int(hashlib.md5(item_id.encode()).hexdigest(), 16) % self.node_count
            # 任务存储:ZSet(key=节点ID,value=商品ID,score=时间戳)
            redis_client.zadd(
                f"{self.task_topic}_node_{node_id}",
                {item_id: float(time.time())}
            )
            # 记录任务初始状态:Hash(key=task_status,field=商品ID,value=waiting)
            redis_client.hset(f"{self.task_topic}_status", item_id, "waiting")
        print(f"成功拆分{len(item_ids)}个商品ID任务至{self.node_count}个采集节点")

    def get_node_task(self, node_id: int, limit: int = 1000) -> List[str]:
        """获取指定节点的待执行任务"""
        task_key = f"{self.task_topic}_node_{node_id}"
        # 按时间戳升序获取任务,避免重复消费(获取后删除)
        item_ids = redis_client.zrange(task_key, 0, limit-1)
        if item_ids:
            redis_client.zremrangebyrank(task_key, 0, limit-1)
            # 更新任务状态为running
            for item_id in item_ids:
                redis_client.hset(f"{self.task_topic}_status", item_id, "running")
        return [item_id.decode() for item_id in item_ids]

3.3 限流控制层:多账号 / IP 的令牌桶限流

针对 1688 API 的限流机制,实现多账号 / IP 的令牌桶限流,每个账号独立控制 QPS,避免触发平台封禁。

import time
import threading
from collections import defaultdict

class TokenBucket:
    """令牌桶限流:单账号/IP的QPS控制"""
    def __init__(self, capacity: int, rate: float):
        self.capacity = capacity  # 令牌桶容量(最大并发数)
        self.rate = rate  # 令牌生成速率(个/秒)
        self.tokens = capacity  # 当前令牌数
        self.last_refill_time = time.time()
        self.lock = threading.Lock()

    def refill(self):
        """补充令牌"""
        now = time.time()
        time_passed = now - self.last_refill_time
        new_tokens = time_passed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill_time = now

    def acquire(self, block: bool = True) -> bool:
        """获取令牌:block=True则阻塞等待,False则直接返回"""
        with self.lock:
            self.refill()
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            if not block:
                return False
            # 阻塞等待令牌生成
            time.sleep(1/self.rate)
            self.refill()
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

class APIRateLimiter:
    """多账号/IP的API限流管理器"""
    def __init__(self, account_pool: dict):
        """
        account_pool: 账号池,格式{"account1": {"appkey": "xxx", "appsecret": "xxx", "qps": 10}, ...}
        """
        self.account_pool = account_pool
        self.token_buckets = defaultdict(
            lambda account: TokenBucket(capacity=account["qps"], rate=account["qps"])
        )
        # 初始化每个账号的令牌桶
        for account, config in account_pool.items():
            self.token_buckets[account] = TokenBucket(capacity=config["qps"], rate=config["qps"])
        self.account_index = 0

    def get_available_account(self) -> dict:
        """获取可用的账号(令牌桶有令牌)"""
        account_list = list(self.account_pool.values())
        while True:
            account = account_list[self.account_index % len(account_list)]
            self.account_index += 1
            if self.token_buckets[account["appkey"]].acquire(block=False):
                return account
            # 所有账号都无令牌,短暂休眠后重试
            time.sleep(0.1)

3.4 分布式采集节点:API 调用与异常处理

采集节点基于 Celery 实现分布式部署,核心逻辑包括:获取节点任务、调用 API、处理异常、上报结果、失败任务入队。

import requests
import hmac
import base64
from celery import Celery
from pymongo import MongoClient

# 初始化Celery(分布式任务队列)
celery_app = Celery(
    "1688_crawler",
    broker="redis://:your_redis_pwd@127.0.0.1:6379/1",
    backend="redis://:your_redis_pwd@127.0.0.1:6379/2"
)

# 初始化MongoDB(存储原始数据)
mongo_client = MongoClient("mongodb://localhost:27017/")
mongo_db = mongo_client["1688_data"]
item_collection = mongo_db["item_detail"]

# 初始化MySQL(存储结构化数据,需提前创建表:CREATE TABLE item_detail (item_id VARCHAR(64) PRIMARY KEY, title VARCHAR(256), price DECIMAL(10,2), ...);)
import pymysql
mysql_conn = pymysql.connect(
    host="localhost",
    user="root",
    password="your_mysql_pwd",
    database="1688_data"
)

# 初始化限流管理器
ACCOUNT_POOL = {
    "account1": {"appkey": "your_appkey1", "appsecret": "your_appsecret1", "qps": 10},
    "account2": {"appkey": "your_appkey2", "appsecret": "your_appsecret2", "qps": 10},
    # 可扩展更多账号
}
rate_limiter = APIRateLimiter(ACCOUNT_POOL)

class ItemCrawler:
    """1688商品详情采集节点"""
    def __init__(self, node_id: int):
        self.node_id = node_id
        self.task_scheduler = TaskScheduler()
        self.fail_task_topic = "1688_item_fail_task"  # 失败任务队列

    def sign_request(self, params: dict, appsecret: str) -> str:
        """1688 API请求签名(按开放平台规则)"""
        # 排序参数
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        # 拼接签名字符串
        sign_str = "".join([f"{k}{v}" for k, v in sorted_params]) + appsecret
        # HMAC-SHA1加密并Base64编码
        sign = base64.b64encode(hmac.new(appsecret.encode(), sign_str.encode(), "sha1").digest()).decode()
        return sign

    def call_item_api(self, item_id: str) -> dict:
        """调用1688商品详情API"""
        # 获取可用账号
        account = rate_limiter.get_available_account()
        appkey = account["appkey"]
        appsecret = account["appsecret"]

        # 构造API请求参数
        params = {
            "method": "alibaba.item.get",
            "app_key": appkey,
            "format": "json",
            "v": "2.0",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
            "item_id": item_id,
            "fields": "item_id,title,price,sku_info,category_id,seller_nick"
        }
        # 生成签名
        params["sign"] = self.sign_request(params, appsecret)

        # 调用API(支持代理IP)
        try:
            response = requests.get(
                "https://gw.api.1688.com/openapi/param2/1/alibaba.item.get/2.0",
                params=params,
                timeout=10,
                # proxies={"http": "http://proxy_ip:port", "https": "https://proxy_ip:port"}
            )
            response.raise_for_status()
            result = response.json()
            if "error_response" in result:
                raise Exception(f"API错误:{result['error_response']['msg']}")
            return result
        except Exception as e:
            # 失败任务入队
            redis_client.lpush(self.fail_task_topic, item_id)
            # 更新任务状态为failed
            redis_client.hset(f"{self.task_scheduler.task_topic}_status", item_id, "failed")
            raise Exception(f"采集商品{item_id}失败:{str(e)}")

    def save_data(self, item_data: dict):
        """保存采集结果:MongoDB(原始数据)+ MySQL(结构化数据)"""
        # 1. 写入MongoDB
        item_collection.update_one(
            {"item_id": item_data["item"]["item_id"]},
            {"$set": item_data},
            upsert=True
        )
        # 2. 写入MySQL
        item = item_data["item"]
        with mysql_conn.cursor() as cursor:
            sql = """
            INSERT INTO item_detail (item_id, title, price, category_id, seller_nick, create_time)
            VALUES (%s, %s, %s, %s, %s, NOW())
            ON DUPLICATE KEY UPDATE
            title=%s, price=%s, category_id=%s, seller_nick=%s, update_time=NOW()
            """
            cursor.execute(
                sql,
                (
                    item["item_id"], item["title"], item["price"], item["category_id"], item["seller_nick"],
                    item["title"], item["price"], item["category_id"], item["seller_nick"]
                )
            )
        mysql_conn.commit()

    @celery_app.task(bind=True, max_retries=3)
    def crawl_task(self, item_id: str):
        """Celery任务:采集单个商品详情"""
        try:
            # 调用API获取数据
            item_data = self.call_item_api(item_id)
            # 保存数据
            self.save_data(item_data)
            # 更新任务状态为success
            redis_client.hset(f"{self.task_scheduler.task_topic}_status", item_id, "success")
            return f"采集商品{item_id}成功"
        except Exception as e:
            # 重试机制:最多重试3次,每次间隔5秒
            self.retry(exc=e, countdown=5)

    def run(self):
        """启动采集节点:循环获取任务并执行"""
        while True:
            # 获取当前节点的待执行任务
            item_ids = self.task_scheduler.get_node_task(self.node_id, limit=100)
            if not item_ids:
                # 无任务时休眠5秒
                time.sleep(5)
                continue
            # 分布式执行采集任务
            for item_id in item_ids:
                self.crawl_task.delay(item_id)
            print(f"节点{self.node_id}已提交{len(item_ids)}个采集任务至Celery队列")

3.5 容错补偿层:失败任务重试与断点续传

针对失败任务(网络超时、API 错误、数据解析失败),实现定时重试机制;通过任务状态记录,支持断点续传。

class FaultTolerance:
    """容错补偿层:失败任务重试、断点续传"""
    def __init__(self, fail_task_topic: str = "1688_item_fail_task", task_topic: str = "1688_item_task"):
        self.fail_task_topic = fail_task_topic
        self.task_topic = task_topic
        self.task_scheduler = TaskScheduler(task_topic=task_topic)

    def retry_fail_task(self, max_retry_times: int = 3):
        """重试失败任务:最多重试3次"""
        while True:
            # 从失败队列获取任务
            item_id = redis_client.rpop(self.fail_task_topic)
            if not item_id:
                time.sleep(10)
                continue
            item_id = item_id.decode()
            # 获取当前重试次数
            retry_count = redis_client.hget(f"{self.task_topic}_retry", item_id)
            retry_count = int(retry_count) if retry_count else 0
            if retry_count >= max_retry_times:
                # 超过最大重试次数,标记为abandoned
                redis_client.hset(f"{self.task_topic}_status", item_id, "abandoned")
                print(f"商品{item_id}重试{max_retry_times}次失败,标记为废弃")
                continue
            # 递增重试次数
            redis_client.hset(f"{self.task_topic}_retry", item_id, retry_count + 1)
            # 重新分发任务至原节点
            node_id = int(hashlib.md5(item_id.encode()).hexdigest(), 16) % self.task_scheduler.node_count
            redis_client.zadd(
                f"{self.task_topic}_node_{node_id}",
                {item_id: float(time.time())}
            )
            # 更新任务状态为waiting
            redis_client.hset(f"{self.task_topic}_status", item_id, "waiting")
            print(f"商品{item_id}第{retry_count+1}次重试,已重新分发至节点{node_id}")

    def resume_task(self):
        """断点续传:恢复未完成的任务(running/waiting状态)"""
        # 获取所有未完成的任务
        task_status = redis_client.hgetall(f"{self.task_topic}_status")
        for item_id, status in task_status.items():
            item_id = item_id.decode()
            status = status.decode()
            if status in ["running", "waiting"]:
                # 重新分发任务至原节点
                node_id = int(hashlib.md5(item_id.encode()).hexdigest(), 16) % self.task_scheduler.node_count
                redis_client.zadd(
                    f"{self.task_topic}_node_{node_id}",
                    {item_id: float(time.time())}
                )
                print(f"恢复任务{item_id}(状态:{status})至节点{node_id}")
        print("断点续传完成")

四、部署与运行

4.1 启动步骤

  1. 启动 Redis、MongoDB、MySQL 服务;

  2. 启动 Celery Worker(分布式采集节点):

celery -A crawler_app worker --loglevel=info --concurrency=10 -n node_0@%h
celery -A crawler_app worker --loglevel=info --concurrency=10 -n node_1@%h
# 启动多个节点,对应node_id=0,1,2...

3.任务分发与采集:

if __name__ == "__main__":
    # 1. 初始化调度中心,拆分任务
    scheduler = TaskScheduler(node_count=5)
    # 模拟商品ID池(实际场景可从文件/数据库读取)
    item_ids = [f"123456789{i}" for i in range(100000)]
    scheduler.split_task(item_ids)

    # 2. 启动采集节点(每个节点独立运行)
    node_0 = ItemCrawler(node_id=0)
    # 启动节点0的采集循环(后台运行)
    import threading
    threading.Thread(target=node_0.run, daemon=True).start()

    # 3. 启动容错补偿层
    fault_tolerance = FaultTolerance()
    # 启动失败任务重试(后台运行)
    threading.Thread(target=fault_tolerance.retry_fail_task, daemon=True).start()
    # 断点续传(启动时执行一次)
    fault_tolerance.resume_task()

    # 保持主线程运行
    while True:
        time.sleep(3600)

4.2 监控与运维

  • 任务状态监控:通过 Redis Hash 1688_item_task_status 查询各商品 ID 的采集状态;

  • 失败任务排查:通过 Redis List 1688_item_fail_task 查看失败任务,结合日志分析原因;

  • 性能监控:通过 Celery Flower 监控任务执行情况,通过 Prometheus+Grafana 监控 Redis/MongoDB/MySQL 的性能指标;

  • 扩容策略:当采集效率不足时,增加 Celery Worker 节点数量,调整node_count参数即可。

五、架构优化与扩展

5.1 性能优化

  • 批量调用:针对 1688 批量商品详情 API(若有),实现批量任务采集,降低网络开销;

  • 异步写入:将数据写入操作异步化(如通过 Kafka),避免采集节点阻塞;

  • 缓存优化:缓存已采集的商品 ID,避免重复采集。

5.2 扩展方向

  • 多数据源接入:扩展采集商品销量、评价、店铺信息等数据;

  • 动态限流:根据 1688 API 返回的限流提示,动态调整令牌桶速率;

  • 容器化部署:基于 Docker+K8s 实现采集节点的弹性伸缩;

  • 数据治理:增加数据清洗、去重、标准化模块,提升数据质量。

六、总结

本文设计的企业级 1688 数据管道,通过分布式任务调度、多账号限流、容错补偿等机制,解决了海量商品详情采集的性能、容错、合规问题。核心亮点包括:

  1. 分布式架构:任务分片至多个节点,突破单进程性能瓶颈;

  2. 精细化限流:多账号令牌桶限流,避免触发平台 API 封禁;

  3. 全链路容错:失败任务自动重试、断点续传,保障数据完整性;

  4. 可扩展设计:支持节点扩容、多数据源接入、存储层替换。

该架构可直接落地至电商数据分析、供应链监控等场景,也可适配淘宝、京东等其他电商平台的 API 采集需求,具备较强的通用性和企业级可用性。


少长咸集

群贤毕至

访客