×

api开发 电商平台 数据挖掘

京东 API 商品数据异步采集方案:基于 Celery 的实时数据拉取实践

admin admin 发表于2025-11-24 14:03:08 浏览69 评论0

抢沙发发表评论

在电商数据分析、价格监控、竞品调研等场景中,实时获取京东商品数据至关重要。京东平台提供了丰富的 API 接口,但直接同步调用存在响应慢、并发受限、任务易阻塞等问题。本文将介绍一种基于 Celery 的异步采集方案,通过任务队列实现高并发、可扩展的商品数据拉取,确保系统稳定性与实时性。

一、方案背景与挑战

1. 业务需求

  • 实时获取商品详情(标题、价格、库存、销量等);

  • 支持批量查询,每秒请求量可达数百次;

  • 异常重试(网络波动、API 限流时自动重试);

  • 任务状态可追踪(提交、执行中、成功、失败)。

2. 传统同步方案的问题

  • 阻塞式调用:同步请求时,主线程需等待 API 响应,无法处理其他任务;

  • 并发限制:单线程并发量低,多线程易引发资源竞争;

  • 容错性差:一次请求失败可能导致整个任务流程中断;

  • 扩展性不足:无法动态增减计算资源应对流量波动。

3. 异步方案优势

  • 高并发:通过任务队列解耦请求与处理,支持海量任务并行执行;

  • 非阻塞:主线程提交任务后立即返回,不影响其他业务逻辑;

  • 容错性强:任务失败可自动重试,支持死信队列处理无法修复的异常;

  • 可扩展:通过增加 Worker 节点横向扩展处理能力。

二、技术选型

组件 作用 优势
Celery 分布式任务队列 轻量、支持多语言、丰富的任务调度功能
Redis Celery 后端(存储任务队列、结果) 高性能、支持持久化、分布式部署
Python 开发语言 简洁易读、丰富的第三方库
Requests HTTP 请求库(调用京东 API) 简单易用、支持超时设置、会话保持
Django/Flask Web 框架(可选,用于任务提交与管理) 快速开发、内置 Admin 后台

三、方案架构设计

1. 整体架构

[业务系统] → [任务提交API] → [Celery任务队列(Redis)] → [Celery Worker] → [京东API]
                                                              ↓
                                                  [任务结果存储(Redis/数据库)]

2. 核心流程

  1. 任务提交:业务系统通过 API 或代码将商品 ID 等参数提交给 Celery 任务队列;

  2. 任务调度:Celery Broker(Redis)将任务分发给空闲的 Worker 节点;

  3. 任务执行:Worker 调用京东 API 获取数据,处理异常(重试、死信队列);

  4. 结果存储:执行结果(成功 / 失败、商品数据)存储到 Redis 或数据库;

  5. 结果查询:业务系统通过任务 ID 查询执行状态与结果。

四、环境搭建

1. 依赖安装

pip install celery redis requests

2. Redis 配置

  • 安装 Redis 并启动,默认端口 6379;

  • 如需密码认证,修改 Redis 配置文件redis.conf

    conf

    requirepass your_password

五、代码实现

1. 京东 API 封装

首先封装京东 API 调用逻辑,处理签名、请求参数、响应解析等。

# jd_api.py
import requests
import time
import hashlib
import json

class JDAPI:
    def __init__(self, app_key, app_secret):
        self.app_key = app_key
        self.app_secret = app_secret
        self.base_url = "https://api.jd.com/routerjson"  # 京东API网关地址

    def _sign(self, params):
        """生成签名(京东API要求的签名算法)"""
        # 1. 按参数名ASCII升序排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        # 2. 拼接参数名与值
        sign_str = self.app_secret + "".join([f"{k}{v}" for k, v in sorted_params]) + self.app_secret
        # 3. MD5加密并转为大写
        return hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper()

    def get_goods_info(self, goods_id):
        """获取商品详情(示例接口:根据商品ID查询商品信息)"""
        params = {
            "app_key": self.app_key,
            "method": "jd.union.open.goods.bigfield.get",  # 京东商品详情接口
            "format": "json",
            "v": "1.0",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "goodsId": goods_id,
        }
        # 生成签名
        params["sign"] = self._sign(params)
        
        try:
            # 发送请求(设置超时时间,避免无限等待)
            response = requests.get(self.base_url, params=params, timeout=10)
            response.raise_for_status()  # 抛出HTTP错误(如404、500)
            result = response.json()
            
            # 解析响应(根据京东API文档调整字段)
            if result.get("code") == 0:
                return result.get("data", {})
            else:
                print(f"京东API错误:{result.get('msg')}")
                return None
        except requests.exceptions.RequestException as e:
            print(f"请求京东API失败:{str(e)}")
            return None

2. Celery 任务配置

创建 Celery 实例,配置 Redis 作为 Broker 和 Backend。

# celery_app.py
from celery import Celery
from celery.schedules import crontab

# 初始化Celery
app = Celery(
    "jd_task",  # 任务名称
    broker="redis://:your_password@localhost:6379/0",  # Redis作为Broker(密码可选)
    backend="redis://:your_password@localhost:6379/1",  # Redis作为Backend(存储结果)
    include=["tasks"]  # 包含任务模块
)

# 配置任务参数
app.conf.update(
    task_serializer="json",  # 任务序列化方式
    result_serializer="json",  # 结果序列化方式
    accept_content=["json"],  # 接受的内容类型
    result_expires=3600,  # 结果过期时间(1小时)
    task_acks_late=True,  # 任务执行完成后再确认(避免Worker崩溃导致任务丢失)
    worker_prefetch_multiplier=1,  # 每个Worker每次预取1个任务(避免任务堆积)
    task_routes={
        "tasks.fetch_jd_goods": {"queue": "jd_goods_queue"}  # 自定义队列
    }
)

# 定时任务配置(可选,如定时拉取商品数据)
app.conf.beat_schedule = {
    "fetch-jd-goods-every-hour": {
        "task": "tasks.fetch_jd_goods",
        "schedule": crontab(minute=0, hour="*"),  # 每小时执行一次
        "args": ("100000000000",),  # 商品ID(可替换为批量ID)
    },
}

3. 异步任务定义

定义商品数据拉取任务,处理异常重试与结果存储。

# tasks.py
from celery_app import app
from jd_api import JDAPI
from celery.exceptions import MaxRetriesExceededError

# 初始化京东API客户端(替换为你的AppKey和AppSecret)
jd_api = JDAPI(app_key="your_app_key", app_secret="your_app_secret")

@app.task(bind=True, max_retries=3, retry_backoff=2, retry_jitter=True)
def fetch_jd_goods(self, goods_id):
    """
    异步拉取京东商品数据
    :param self: 任务实例(用于重试)
    :param goods_id: 商品ID
    :return: 商品数据(字典)
    """
    try:
        print(f"开始拉取商品ID:{goods_id}的数据")
        goods_data = jd_api.get_goods_info(goods_id)
        
        if not goods_data:
            # 数据获取失败,触发重试(最多3次,每次间隔2秒、4秒、8秒...)
            raise self.retry(exc=Exception(f"商品ID:{goods_id}数据获取失败"))
        
        # 存储商品数据(示例:打印数据,实际可存入数据库)
        print(f"商品ID:{goods_id}拉取成功,数据:{json.dumps(goods_data, ensure_ascii=False)}")
        return goods_data
    
    except MaxRetriesExceededError:
        # 达到最大重试次数,记录失败日志(可存入死信队列)
        print(f"商品ID:{goods_id}多次拉取失败,已放弃")
        return None

4. 任务提交与结果查询

通过代码或 API 提交任务,并查询执行结果。

# submit_task.py
from celery_app import app
from tasks import fetch_jd_goods
import time

# 方式1:提交单个任务
task_id = fetch_jd_goods.delay("100000000000").id  # 异步提交任务,返回任务ID
print(f"提交任务成功,任务ID:{task_id}")

# 查询任务结果(轮询方式,实际可通过WebSocket推送)
for _ in range(10):
    result = app.AsyncResult(task_id)
    if result.ready():
        print(f"任务执行状态:{result.status}")
        print(f"任务结果:{result.result}")
        break
    else:
        print("任务执行中...")
        time.sleep(1)

# 方式2:批量提交任务(使用group)
from celery import group

goods_ids = ["100000000000", "100000000001", "100000000002"]  # 批量商品ID
tasks = group(fetch_jd_goods.s(goods_id) for goods_id in goods_ids)
result = tasks.apply_async()

# 等待所有任务完成并获取结果
results = result.get()
print(f"批量任务结果:{results}")

5. 启动 Celery Worker

在终端启动 Celery Worker,监听任务队列:

# 启动单个Worker(监听自定义队列jd_goods_queue)
celery -A celery_app worker -Q jd_goods_queue --loglevel=info

# 启动多个Worker(提高并发处理能力)
celery -A celery_app worker -Q jd_goods_queue --loglevel=info --concurrency=4

# 启动定时任务Beat(如需定时执行任务)
celery -A celery_app beat --loglevel=info

六、关键优化与注意事项

1. 任务去重

避免重复提交相同任务(如同一商品 ID 短时间内多次拉取):

# 在任务提交时检查任务是否已存在
from celery.task.control import inspect

def is_task_exists(task_id):
    """检查任务是否已在队列中或正在执行"""
    i = inspect()
    # 正在执行的任务
    active_tasks = i.active()
    for worker, tasks in active_tasks.items():
        if any(task["id"] == task_id for task in tasks):
            return True
    # 等待中的任务
    reserved_tasks = i.reserved()
    for worker, tasks in reserved_tasks.items():
        if any(task["id"] == task_id for task in tasks):
            return True
    return False

2. 限流控制

京东 API 有请求频率限制(如每秒 100 次),需在 Worker 中添加限流:

# 使用ratelimit装饰器限制请求频率
from celery.decorators import ratelimit

@app.task(bind=True, max_retries=3)
@ratelimit(key="ip", rate="100/m", block=True)  # 每秒最多100次请求
def fetch_jd_goods(self, goods_id):
    # 任务逻辑...
    pass

3. 异常处理

  • 网络异常:通过retry机制自动重试;

  • API 限流:捕获 429 状态码,延迟重试;

  • 数据格式异常:解析响应时添加异常捕获,避免任务崩溃。

4. 监控与运维

  • 使用flower监控 Celery 任务(需安装flower库):

pip install flower
celery -A celery_app flower  # 启动监控界面,访问http://localhost:5555

定期清理过期任务结果(Redis 中):

# 在celery_app.py中添加定时清理任务
app.conf.beat_schedule["clean-expired-results"] = {
    "task": "celery.backend_cleanup",
    "schedule": crontab(hour=3, minute=0),  # 每天凌晨3点执行
}

七、总结

本文提出的基于 Celery 的京东 API 商品数据异步采集方案,通过任务队列解耦请求与处理,实现了高并发、高可用的数据拉取。该方案不仅适用于京东 API,也可扩展到其他第三方 API(如淘宝、拼多多)的数据采集场景。在实际应用中,可根据业务需求优化任务调度策略、增加数据存储层、完善监控告警机制,进一步提升系统的稳定性与可维护性。


少长咸集

群贤毕至

访客