×

api开发 电商平台 数据挖掘

商品详情页实时数据采集系统搭建与演示:京东 API 接口集成开发

admin admin 发表于2025-11-14 11:45:24 浏览105 评论0

抢沙发发表评论

在电商数据分析、竞品监控和价格追踪等场景中,实时获取商品详情页数据具有重要价值。本文将详细介绍如何搭建一个基于京东 API 接口的商品详情页实时数据采集系统,包括系统架构设计、接口集成开发、数据处理及可视化展示,并提供完整的代码实现。

一、系统架构设计

京东商品数据采集系统主要由以下模块组成:

  1. API 请求模块:负责与京东平台接口交互,发送请求并获取原始数据

  2. 数据解析模块:对 API 返回的 JSON 数据进行解析和清洗

  3. 数据存储模块:将处理后的数据存储到数据库(本文使用 MySQL)

  4. 定时任务模块:实现周期性数据采集功能

  5. 可视化展示模块:通过 Web 页面展示采集的商品数据

系统架构采用分层设计,各模块之间低耦合,便于维护和扩展。

二、京东 API 接口准备

1. 京东api接入流程

  • 注册开发者账号

  • 获取apikeyapisecret(此为接口调用凭证)

  • 申请商品详情相关 API 接口的调用权限

  • 了解接口文档,明确请求参数、返回格式和调用限制

2. 核心接口说明

本文使用京东的商品详情查询接口:

  • 接口名称:

  • 功能:获取京东商品的详细信息,包括标题、价格、图片、规格等

  • 请求方式:HTTPS POST

  • 接口地址:

三、系统开发实现

1. 环境准备

  • Python 3.8+

  • 依赖库:requests(网络请求)、pymysql(数据库操作)、schedule(定时任务)、flask(Web 展示)

  • MySQL 5.7+

安装依赖库:

pip install requests pymysql schedule flask python-dotenv

2. 配置文件

创建.env配置文件存储敏感信息:

# 京东API配置
JD_APPKEY=你的appkey
JD_APPSECRET=你的appsecret
JD_API_URL=https://api.jd.com/routerjson

# 数据库配置
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=你的密码
DB_NAME=jd_goods

3. 数据库设计

创建商品数据表:

CREATE DATABASE IF NOT EXISTS jd_goods;
USE jd_goods;

CREATE TABLE IF NOT EXISTS goods_info (
    id INT AUTO_INCREMENT PRIMARY KEY,
    goods_id VARCHAR(50) NOT NULL COMMENT '商品ID',
    title VARCHAR(255) COMMENT '商品标题',
    price DECIMAL(10,2) COMMENT '商品价格',
    original_price DECIMAL(10,2) COMMENT '原价',
    main_image VARCHAR(255) COMMENT '主图URL',
    brand VARCHAR(100) COMMENT '品牌',
    shop_name VARCHAR(100) COMMENT '店铺名称',
    stock INT COMMENT '库存',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    UNIQUE KEY uk_goods_id (goods_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='京东商品信息表';

4. 核心代码实现

(1)API 请求模块

import time
import hashlib
import requests
from dotenv import load_dotenv
import os

# 加载环境变量
load_dotenv()

class JdApiClient:
    def __init__(self):
        self.appkey = os.getenv('JD_APPKEY')
        self.appsecret = os.getenv('JD_APPSECRET')
        self.api_url = os.getenv('JD_API_URL')
        
    def _generate_sign(self, params):
        """生成签名"""
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        sign_str = self.appsecret
        for k, v in sorted_params:
            sign_str += f"{k}{v}"
        sign_str += self.appsecret
        return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
    
    def get_goods_detail(self, goods_id):
        """获取商品详情"""
        params = {
            'method': 'jd.union.open.goods.detail.query',
            'app_key': self.appkey,
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'format': 'json',
            'v': '1.0',
            'param_json': f'{{"skuIds":["{goods_id}"]}}'
        }
        
        # 生成签名
        params['sign'] = self._generate_sign(params)
        
        try:
            response = requests.post(self.api_url, data=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"API请求错误: {str(e)}")
            return None

(2)数据解析模块

class DataParser:
    @staticmethod
    def parse_goods_detail(api_response):
        """解析商品详情数据"""
        if not api_response or 'error_response' in api_response:
            print(f"API返回错误: {api_response.get('error_response', {}).get('msg', '未知错误')}")
            return None
            
        try:
            # 解析API返回的嵌套结构
            result = api_response.get('jd_union_open_goods_detail_query_response', {}) \
                               .get('result', {})
                               
            if not result:
                return None
                
            # 提取需要的字段
            goods_data = result.get('data', [])[0] if result.get('data') else {}
            
            parsed_data = {
                'goods_id': goods_data.get('skuId'),
                'title': goods_data.get('skuName'),
                'price': float(goods_data.get('price', 0)),
                'original_price': float(goods_data.get('originalPrice', 0)),
                'main_image': goods_data.get('imageUrl'),
                'brand': goods_data.get('brandName'),
                'shop_name': goods_data.get('shopName'),
                'stock': int(goods_data.get('stock', 0))
            }
            
            return parsed_data
        except Exception as e:
            print(f"数据解析错误: {str(e)}")
            return None

(3)数据存储模块

import pymysql
from pymysql.cursors import DictCursor

class DatabaseHandler:
    def __init__(self):
        self.db_config = {
            'host': os.getenv('DB_HOST'),
            'port': int(os.getenv('DB_PORT')),
            'user': os.getenv('DB_USER'),
            'password': os.getenv('DB_PASSWORD'),
            'db': os.getenv('DB_NAME'),
            'charset': 'utf8mb4'
        }
        self.connection = None
        
    def connect(self):
        """连接数据库"""
        try:
            self.connection = pymysql.connect(
                **self.db_config,
                cursorclass=DictCursor
            )
            return True
        except Exception as e:
            print(f"数据库连接错误: {str(e)}")
            self.connection = None
            return False
            
    def close(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            self.connection = None
            
    def save_goods_info(self, goods_data):
        """保存商品信息,存在则更新"""
        if not goods_data or not self.connect():
            return False
            
        try:
            with self.connection.cursor() as cursor:
                # 检查商品是否已存在
                sql = "SELECT id FROM goods_info WHERE goods_id = %s"
                cursor.execute(sql, (goods_data['goods_id'],))
                exists = cursor.fetchone()
                
                if exists:
                    # 更新数据
                    update_sql = """
                    UPDATE goods_info 
                    SET title = %s, price = %s, original_price = %s, 
                        main_image = %s, brand = %s, shop_name = %s, stock = %s
                    WHERE goods_id = %s
                    """
                    cursor.execute(update_sql, (
                        goods_data['title'], goods_data['price'], goods_data['original_price'],
                        goods_data['main_image'], goods_data['brand'], goods_data['shop_name'],
                        goods_data['stock'], goods_data['goods_id']
                    ))
                else:
                    # 插入新数据
                    insert_sql = """
                    INSERT INTO goods_info 
                    (goods_id, title, price, original_price, main_image, brand, shop_name, stock)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """
                    cursor.execute(insert_sql, (
                        goods_data['goods_id'], goods_data['title'], goods_data['price'],
                        goods_data['original_price'], goods_data['main_image'], goods_data['brand'],
                        goods_data['shop_name'], goods_data['stock']
                    ))
                
                self.connection.commit()
                return True
        except Exception as e:
            print(f"数据保存错误: {str(e)}")
            self.connection.rollback()
            return False
        finally:
            self.close()
            
    def get_goods_list(self, limit=100):
        """获取商品列表"""
        if not self.connect():
            return []
            
        try:
            with self.connection.cursor() as cursor:
                sql = "SELECT * FROM goods_info ORDER BY update_time DESC LIMIT %s"
                cursor.execute(sql, (limit,))
                return cursor.fetchall()
        except Exception as e:
            print(f"获取商品列表错误: {str(e)}")
            return []
        finally:
            self.close()

(4)定时任务模块

import schedule
import time as t

class Scheduler:
    def __init__(self, goods_ids, interval=60):
        self.goods_ids = goods_ids  # 需要监控的商品ID列表
        self.interval = interval    # 采集间隔(分钟)
        self.api_client = JdApiClient()
        self.parser = DataParser()
        self.db_handler = DatabaseHandler()
        
    def fetch_and_save_goods(self, goods_id):
        """采集并保存单个商品数据"""
        print(f"开始采集商品 {goods_id} 数据...")
        response = self.api_client.get_goods_detail(goods_id)
        if not response:
            return False
            
        parsed_data = self.parser.parse_goods_detail(response)
        if not parsed_data:
            return False
            
        return self.db_handler.save_goods_info(parsed_data)
        
    def job(self):
        """定时任务执行函数"""
        print(f"===== 开始执行定时任务 {t.strftime('%Y-%m-%d %H:%M:%S')} =====")
        for goods_id in self.goods_ids:
            self.fetch_and_save_goods(goods_id)
        print(f"===== 定时任务执行完成 {t.strftime('%Y-%m-%d %H:%M:%S')} =====")
        
    def start(self):
        """启动定时任务"""
        # 立即执行一次
        self.job()
        # 设置定时任务
        schedule.every(self.interval).minutes.do(self.job)
        print(f"定时任务已启动,每{self.interval}分钟执行一次")
        
        # 循环执行任务
        while True:
            schedule.run_pending()
            t.sleep(1)

(5)Web 可视化模块

from flask import Flask, render_template

app = Flask(__name__)
db_handler = DatabaseHandler()

@app.route('/')
def index():
    """商品列表页面"""
    goods_list = db_handler.get_goods_list(20)
    return render_template('index.html', goods_list=goods_list)

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

创建templates/index.html模板文件:

<!DOCTYPE html>
<html>
<head>
    <title>京东商品监控系统</title>
    <meta charset="utf-8">
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
</head>
<body>
    <div class="container mt-4">
        <h1 class="mb-4">京东商品监控列表</h1>
        <div class="row">
            {% for goods in goods_list %}
            <div class="col-md-4 mb-4">
                <div class="card h-100">
                    <img src="{{ goods.main_image }}" class="card-img-top" alt="{{ goods.title }}" style="height: 200px; object-fit: contain;">
                    <div class="card-body">
                        <h5 class="card-title" style="height: 50px; overflow: hidden;">{{ goods.title }}</h5>
                        <p class="card-text">
                            <span class="text-danger fs-5">¥{{ goods.price }}</span>
                            {% if goods.original_price > goods.price %}
                            <span class="text-secondary text-decoration-line-through ms-2">¥{{ goods.original_price }}</span>
                            {% endif %}
                        </p>
                        <p class="card-text">品牌: {{ goods.brand or '未知' }}</p>
                        <p class="card-text">店铺: {{ goods.shop_name or '未知' }}</p>
                        <p class="card-text">库存: {{ goods.stock }}件</p>
                    </div>
                    <div class="card-footer">
                        <small class="text-muted">最后更新: {{ goods.update_time }}</small>
                    </div>
                </div>
            </div>
            {% else %}
            <div class="col-12">
                <p class="text-center">暂无商品数据</p>
            </div>
            {% endfor %}
        </div>
    </div>
</body>
</html>

(6)主程序入口

import threading

def main():
    # 需要监控的京东商品ID列表(可从数据库或配置文件读取)
    goods_ids = [
        "100012345678",  # 示例商品ID,需替换为实际ID
        "100009876543",
        "100005678901"
    ]
    
    # 启动定时采集任务(作为后台线程)
    scheduler = Scheduler(goods_ids, interval=30)  # 每30分钟采集一次
    scheduler_thread = threading.Thread(target=scheduler.start, daemon=True)
    scheduler_thread.start()
    
    # 启动Web服务
    print("启动Web服务,访问 http://localhost:5000 查看商品数据")
    app.run(debug=False, host='0.0.0.0', port=5000)

if __name__ == "__main__":
    main()

四、系统运行与演示

  1. 准备工作

    • 替换代码中的appkeyappsecret为实际申请的值

    • 配置正确的数据库连接信息

    • 替换goods_ids列表为实际需要监控的京东商品 ID

  2. 启动系统

python main.py

3.系统演示

  • 系统启动后会立即采集一次商品数据

  • 之后每 30 分钟(可配置)自动更新一次数据

  • 访问http://localhost:5000可查看商品监控列表,包括商品图片、价格、品牌、库存等信息

五、系统扩展建议

  1. 增加异常处理:完善 API 调用频率限制处理、网络中断重试机制

  2. 数据增量存储:保存商品价格历史变化,支持价格趋势分析

  3. 告警功能:当商品价格低于阈值或库存不足时发送通知(邮件 / 短信)

  4. 分布式部署:对于大量商品监控,可采用分布式架构提高采集效率

  5. 用户认证:为 Web 界面增加用户登录功能,保障数据安全

通过本文介绍的系统,您可以快速搭建一个京东商品详情页实时数据采集平台,实现对商品信息的持续监控和分析,为电商运营决策提供数据支持。系统设计具有良好的可扩展性,可根据实际需求进行功能扩展和性能优化。


少长咸集

群贤毕至

访客