在电商数据分析、竞品监控和价格追踪等场景中,实时获取商品详情页数据具有重要价值。本文将详细介绍如何搭建一个基于京东 API 接口的商品详情页实时数据采集系统,包括系统架构设计、接口集成开发、数据处理及可视化展示,并提供完整的代码实现。
一、系统架构设计
京东商品数据采集系统主要由以下模块组成:
API 请求模块:负责与京东平台接口交互,发送请求并获取原始数据
数据解析模块:对 API 返回的 JSON 数据进行解析和清洗
数据存储模块:将处理后的数据存储到数据库(本文使用 MySQL)
定时任务模块:实现周期性数据采集功能
可视化展示模块:通过 Web 页面展示采集的商品数据
系统架构采用分层设计,各模块之间低耦合,便于维护和扩展。
二、京东 API 接口准备
1. 京东api接入流程
注册开发者账号
获取
apikey和apisecret(此为接口调用凭证)申请商品详情相关 API 接口的调用权限
了解接口文档,明确请求参数、返回格式和调用限制
2. 核心接口说明
本文使用京东的商品详情查询接口:
接口名称:
功能:获取京东商品的详细信息,包括标题、价格、图片、规格等
请求方式:HTTPS POST
接口地址:
三、系统开发实现
1. 环境准备
Python 3.8+
依赖库:
requests(网络请求)、pymysql(数据库操作)、schedule(定时任务)、flask(Web 展示)MySQL 5.7+
安装依赖库:
pip install requests pymysql schedule flask python-dotenv
2. 配置文件
创建.env配置文件存储敏感信息:
# 京东API配置 JD_APPKEY=你的appkey JD_APPSECRET=你的appsecret JD_API_URL=https://api.jd.com/routerjson # 数据库配置 DB_HOST=localhost DB_PORT=3306 DB_USER=root DB_PASSWORD=你的密码 DB_NAME=jd_goods
3. 数据库设计
创建商品数据表:
CREATE DATABASE IF NOT EXISTS jd_goods; USE jd_goods; CREATE TABLE IF NOT EXISTS goods_info ( id INT AUTO_INCREMENT PRIMARY KEY, goods_id VARCHAR(50) NOT NULL COMMENT '商品ID', title VARCHAR(255) COMMENT '商品标题', price DECIMAL(10,2) COMMENT '商品价格', original_price DECIMAL(10,2) COMMENT '原价', main_image VARCHAR(255) COMMENT '主图URL', brand VARCHAR(100) COMMENT '品牌', shop_name VARCHAR(100) COMMENT '店铺名称', stock INT COMMENT '库存', create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uk_goods_id (goods_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='京东商品信息表';
4. 核心代码实现
(1)API 请求模块
import time
import hashlib
import requests
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
class JdApiClient:
def __init__(self):
self.appkey = os.getenv('JD_APPKEY')
self.appsecret = os.getenv('JD_APPSECRET')
self.api_url = os.getenv('JD_API_URL')
def _generate_sign(self, params):
"""生成签名"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def get_goods_detail(self, goods_id):
"""获取商品详情"""
params = {
'method': 'jd.union.open.goods.detail.query',
'app_key': self.appkey,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'format': 'json',
'v': '1.0',
'param_json': f'{{"skuIds":["{goods_id}"]}}'
}
# 生成签名
params['sign'] = self._generate_sign(params)
try:
response = requests.post(self.api_url, data=params, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"API请求错误: {str(e)}")
return None(2)数据解析模块
class DataParser:
@staticmethod
def parse_goods_detail(api_response):
"""解析商品详情数据"""
if not api_response or 'error_response' in api_response:
print(f"API返回错误: {api_response.get('error_response', {}).get('msg', '未知错误')}")
return None
try:
# 解析API返回的嵌套结构
result = api_response.get('jd_union_open_goods_detail_query_response', {}) \
.get('result', {})
if not result:
return None
# 提取需要的字段
goods_data = result.get('data', [])[0] if result.get('data') else {}
parsed_data = {
'goods_id': goods_data.get('skuId'),
'title': goods_data.get('skuName'),
'price': float(goods_data.get('price', 0)),
'original_price': float(goods_data.get('originalPrice', 0)),
'main_image': goods_data.get('imageUrl'),
'brand': goods_data.get('brandName'),
'shop_name': goods_data.get('shopName'),
'stock': int(goods_data.get('stock', 0))
}
return parsed_data
except Exception as e:
print(f"数据解析错误: {str(e)}")
return None(3)数据存储模块
import pymysql
from pymysql.cursors import DictCursor
class DatabaseHandler:
def __init__(self):
self.db_config = {
'host': os.getenv('DB_HOST'),
'port': int(os.getenv('DB_PORT')),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'db': os.getenv('DB_NAME'),
'charset': 'utf8mb4'
}
self.connection = None
def connect(self):
"""连接数据库"""
try:
self.connection = pymysql.connect(
**self.db_config,
cursorclass=DictCursor
)
return True
except Exception as e:
print(f"数据库连接错误: {str(e)}")
self.connection = None
return False
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
def save_goods_info(self, goods_data):
"""保存商品信息,存在则更新"""
if not goods_data or not self.connect():
return False
try:
with self.connection.cursor() as cursor:
# 检查商品是否已存在
sql = "SELECT id FROM goods_info WHERE goods_id = %s"
cursor.execute(sql, (goods_data['goods_id'],))
exists = cursor.fetchone()
if exists:
# 更新数据
update_sql = """
UPDATE goods_info
SET title = %s, price = %s, original_price = %s,
main_image = %s, brand = %s, shop_name = %s, stock = %s
WHERE goods_id = %s
"""
cursor.execute(update_sql, (
goods_data['title'], goods_data['price'], goods_data['original_price'],
goods_data['main_image'], goods_data['brand'], goods_data['shop_name'],
goods_data['stock'], goods_data['goods_id']
))
else:
# 插入新数据
insert_sql = """
INSERT INTO goods_info
(goods_id, title, price, original_price, main_image, brand, shop_name, stock)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
goods_data['goods_id'], goods_data['title'], goods_data['price'],
goods_data['original_price'], goods_data['main_image'], goods_data['brand'],
goods_data['shop_name'], goods_data['stock']
))
self.connection.commit()
return True
except Exception as e:
print(f"数据保存错误: {str(e)}")
self.connection.rollback()
return False
finally:
self.close()
def get_goods_list(self, limit=100):
"""获取商品列表"""
if not self.connect():
return []
try:
with self.connection.cursor() as cursor:
sql = "SELECT * FROM goods_info ORDER BY update_time DESC LIMIT %s"
cursor.execute(sql, (limit,))
return cursor.fetchall()
except Exception as e:
print(f"获取商品列表错误: {str(e)}")
return []
finally:
self.close()(4)定时任务模块
import schedule
import time as t
class Scheduler:
def __init__(self, goods_ids, interval=60):
self.goods_ids = goods_ids # 需要监控的商品ID列表
self.interval = interval # 采集间隔(分钟)
self.api_client = JdApiClient()
self.parser = DataParser()
self.db_handler = DatabaseHandler()
def fetch_and_save_goods(self, goods_id):
"""采集并保存单个商品数据"""
print(f"开始采集商品 {goods_id} 数据...")
response = self.api_client.get_goods_detail(goods_id)
if not response:
return False
parsed_data = self.parser.parse_goods_detail(response)
if not parsed_data:
return False
return self.db_handler.save_goods_info(parsed_data)
def job(self):
"""定时任务执行函数"""
print(f"===== 开始执行定时任务 {t.strftime('%Y-%m-%d %H:%M:%S')} =====")
for goods_id in self.goods_ids:
self.fetch_and_save_goods(goods_id)
print(f"===== 定时任务执行完成 {t.strftime('%Y-%m-%d %H:%M:%S')} =====")
def start(self):
"""启动定时任务"""
# 立即执行一次
self.job()
# 设置定时任务
schedule.every(self.interval).minutes.do(self.job)
print(f"定时任务已启动,每{self.interval}分钟执行一次")
# 循环执行任务
while True:
schedule.run_pending()
t.sleep(1)(5)Web 可视化模块
from flask import Flask, render_template
app = Flask(__name__)
db_handler = DatabaseHandler()
@app.route('/')
def index():
"""商品列表页面"""
goods_list = db_handler.get_goods_list(20)
return render_template('index.html', goods_list=goods_list)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)创建templates/index.html模板文件:
<!DOCTYPE html>
<html>
<head>
<title>京东商品监控系统</title>
<meta charset="utf-8">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css">
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">京东商品监控列表</h1>
<div class="row">
{% for goods in goods_list %}
<div class="col-md-4 mb-4">
<div class="card h-100">
<img src="{{ goods.main_image }}" class="card-img-top" alt="{{ goods.title }}" style="height: 200px; object-fit: contain;">
<div class="card-body">
<h5 class="card-title" style="height: 50px; overflow: hidden;">{{ goods.title }}</h5>
<p class="card-text">
<span class="text-danger fs-5">¥{{ goods.price }}</span>
{% if goods.original_price > goods.price %}
<span class="text-secondary text-decoration-line-through ms-2">¥{{ goods.original_price }}</span>
{% endif %}
</p>
<p class="card-text">品牌: {{ goods.brand or '未知' }}</p>
<p class="card-text">店铺: {{ goods.shop_name or '未知' }}</p>
<p class="card-text">库存: {{ goods.stock }}件</p>
</div>
<div class="card-footer">
<small class="text-muted">最后更新: {{ goods.update_time }}</small>
</div>
</div>
</div>
{% else %}
<div class="col-12">
<p class="text-center">暂无商品数据</p>
</div>
{% endfor %}
</div>
</div>
</body>
</html>(6)主程序入口
import threading
def main():
# 需要监控的京东商品ID列表(可从数据库或配置文件读取)
goods_ids = [
"100012345678", # 示例商品ID,需替换为实际ID
"100009876543",
"100005678901"
]
# 启动定时采集任务(作为后台线程)
scheduler = Scheduler(goods_ids, interval=30) # 每30分钟采集一次
scheduler_thread = threading.Thread(target=scheduler.start, daemon=True)
scheduler_thread.start()
# 启动Web服务
print("启动Web服务,访问 http://localhost:5000 查看商品数据")
app.run(debug=False, host='0.0.0.0', port=5000)
if __name__ == "__main__":
main()四、系统运行与演示
准备工作:
替换代码中的
appkey和appsecret为实际申请的值配置正确的数据库连接信息
替换
goods_ids列表为实际需要监控的京东商品 ID启动系统:
python main.py
3.系统演示:
系统启动后会立即采集一次商品数据
之后每 30 分钟(可配置)自动更新一次数据
访问
http://localhost:5000可查看商品监控列表,包括商品图片、价格、品牌、库存等信息
五、系统扩展建议
增加异常处理:完善 API 调用频率限制处理、网络中断重试机制
数据增量存储:保存商品价格历史变化,支持价格趋势分析
告警功能:当商品价格低于阈值或库存不足时发送通知(邮件 / 短信)
分布式部署:对于大量商品监控,可采用分布式架构提高采集效率
用户认证:为 Web 界面增加用户登录功能,保障数据安全
通过本文介绍的系统,您可以快速搭建一个京东商品详情页实时数据采集平台,实现对商品信息的持续监控和分析,为电商运营决策提供数据支持。系统设计具有良好的可扩展性,可根据实际需求进行功能扩展和性能优化。