在电商数据整合、供应链分析等场景中,拉取 1688 平台 API 数据是常见需求,但传统方案需要采购服务器、配置运行环境、维护进程稳定性,不仅耗时费力,还会产生闲置资源浪费。Serverless 架构的出现,完美解决了这一痛点 —— 我们无需关心服务器的采购、运维与扩容,只需聚焦业务逻辑本身,按实际运行时长付费,还能借助云厂商自带的监控体系实现全链路可观测。本文将详细介绍如何基于 Serverless 架构,快速搭建、部署并监控 1688 API 数据拉取服务,附带完整可运行代码。
一、方案整体架构设计
本次方案选用 ** 阿里云函数计算(FC)** 作为 Serverless 运行载体(也可选用腾讯云 SCF、AWS Lambda 等),搭配 1688 开放平台 API、云数据库 RDS(存储拉取结果)、日志服务 SLS(日志收集)、云监控 CMS(指标监控),整体架构如下:
触发层:支持定时触发(按固定频率拉取数据)或 HTTP 触发(按需手动调用);
执行层:阿里云 FC 函数(运行 Python 代码,实现 1688 API 调用、数据清洗);
存储层:阿里云 RDS MySQL(存储结构化后的 1688 商品 / 供应商数据);
监控层:FC 自带监控 + SLS 日志查询 + CMS 告警(异常情况实时通知);
依赖层:通过 requirements.txt 管理 Python 第三方依赖(如 requests、pymysql)。
核心优势:无需管理服务器,函数按需启动运行,空闲时不产生任何费用;弹性扩容能力强,面对大批量数据拉取需求可自动分配资源;自带监控体系,降低运维成本。
二、前期准备工作
在开始编码与部署前,需完成以下准备步骤,确保服务能正常运行:
1. 1688 开放平台配置
注册并完成开发者认证(个人 / 企业均可);
获取应用密钥(Api Key、Api Secret);
申请目标 API 的调用权限(如「商品详情查询 API」「供应商信息查询 API」,需遵守 1688 平台调用配额限制);
生成 API 调用所需的 Access Token(参考 1688 开放平台文档,支持授权码模式或客户端模式)。
2. 阿里云资源配置
开通阿里云函数计算(FC);
开通阿里云 RDS MySQL,创建数据库(如1688_data_db)和数据表(如product_info,用于存储商品数据);
开通阿里云日志服务 SLS,为 FC 函数配置日志存储项目;
为 FC 函数配置权限:授予访问 RDS、SLS 的权限(通过 RAM 角色配置)。
3. 数据表创建 SQL
在 RDS MySQL 中执行以下 SQL,创建商品信息存储表:
三、核心代码实现(Python)
本次采用 Python 作为开发语言,实现 1688 API 数据拉取、数据清洗、RDS 存储的核心逻辑,代码结构清晰,可直接复用。
1. 依赖配置(requirements.txt)
创建requirements.txt文件,声明所需第三方依赖:
2. 核心业务代码(index.py)
import os
import json
import time
import requests
import pymysql
from datetime import datetime
# -------------------------- 配置信息(建议通过FC环境变量配置,避免硬编码) --------------------------
# 1688 API配置
APP_KEY = os.environ.get("APP_KEY") # 从FC环境变量获取App Key
APP_SECRET = os.environ.get("APP_SECRET") # 从FC环境变量获取App Secret
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN") # 从FC环境变量获取Access Token
1688_API_URL = "https://gw.open.1688.com/openapi/param2/1/cn.alibaba.open/offer.get/{}".format(APP_KEY) # 商品详情API
# RDS MySQL配置
RDS_HOST = os.environ.get("RDS_HOST") # RDS实例地址
RDS_PORT = int(os.environ.get("RDS_PORT", 3306)) # RDS端口
RDS_USER = os.environ.get("RDS_USER") # RDS用户名
RDS_PASSWORD = os.environ.get("RDS_PASSWORD") # RDS密码
RDS_DB = os.environ.get("RDS_DB", "1688_data_db") # 数据库名
# -------------------------- 工具函数:RDS数据库连接 --------------------------
def get_rds_connection():
"""
获取RDS MySQL数据库连接
"""
try:
conn = pymysql.connect(
host=RDS_HOST,
port=RDS_PORT,
user=RDS_USER,
password=RDS_PASSWORD,
database=RDS_DB,
charset="utf8mb4",
connect_timeout=10
)
return conn
except Exception as e:
print(f"[ERROR] 获取RDS连接失败:{str(e)}")
raise e
# -------------------------- 工具函数:1688 API签名(简化版,完整签名参考1688开放平台文档) --------------------------
def generate_1688_sign(params, app_secret):
"""
生成1688 API调用签名(简化版,实际生产环境需严格按照1688签名规则实现)
"""
# 按参数名升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接字符串
sign_str = app_secret
for key, value in sorted_params:
sign_str += f"{key}{value}"
sign_str += app_secret
# MD5加密(实际需按1688要求编码)
import hashlib
sign = hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper()
return sign
# -------------------------- 核心函数:拉取1688商品数据 --------------------------
def pull_1688_product_data(product_id):
"""
调用1688 API拉取单个商品数据
:param product_id: 1688商品ID
:return: 结构化商品数据
"""
try:
# 1. 构造API请求参数
params = {
"access_token": ACCESS_TOKEN,
"offerId": product_id,
"format": "json",
"timestamp": str(int(time.time() * 1000)),
"v": "2.0"
}
# 2. 生成签名
params["sign"] = generate_1688_sign(params, APP_SECRET)
# 3. 发送API请求
response = requests.get(1688_API_URL, params=params, timeout=30)
response.raise_for_status() # 抛出HTTP请求异常
api_result = response.json()
# 4. 校验API返回结果
if "error" in api_result:
print(f"[ERROR] 1688 API调用失败:{api_result['error']['msg']}(错误码:{api_result['error']['code']})")
raise Exception(f"1688 API错误:{api_result['error']['msg']}")
# 5. 数据结构化处理
product_info = api_result.get("result", {})
structured_data = {
"product_id": product_id,
"product_name": product_info.get("subject", "").strip(),
"supplier_name": product_info.get("seller", {}).get("companyName", "").strip(),
"price": float(product_info.get("price", 0) or 0),
"sales_volume": int(product_info.get("sales", {}).get("volume", 0) or 0)
}
print(f"[INFO] 成功拉取商品{product_id}数据:{structured_data}")
return structured_data
except Exception as e:
print(f"[ERROR] 拉取商品{product_id}数据失败:{str(e)}")
raise e
# -------------------------- 核心函数:存储数据到RDS --------------------------
def save_product_data_to_rds(product_data):
"""
将结构化商品数据存储到RDS MySQL
:param product_data: 结构化商品数据
"""
conn = None
cursor = None
try:
# 1. 获取数据库连接
conn = get_rds_connection()
cursor = conn.cursor()
# 2. 构造SQL(存在则更新,不存在则插入)
sql = """
INSERT INTO product_info (product_id, product_name, supplier_name, price, sales_volume)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
product_name = VALUES(product_name),
supplier_name = VALUES(supplier_name),
price = VALUES(price),
sales_volume = VALUES(sales_volume),
update_time = CURRENT_TIMESTAMP
"""
# 3. 执行SQL
cursor.execute(sql, (
product_data["product_id"],
product_data["product_name"],
product_data["supplier_name"],
product_data["price"],
product_data["sales_volume"]
))
conn.commit()
print(f"[INFO] 商品{product_data['product_id']}数据已成功存储/更新到RDS")
except Exception as e:
if conn:
conn.rollback()
print(f"[ERROR] 存储商品{product_data['product_id']}数据到RDS失败:{str(e)}")
raise e
finally:
# 4. 关闭连接
if cursor:
cursor.close()
if conn:
conn.close()
# -------------------------- FC函数入口 --------------------------
def handler(event, context):
"""
阿里云FC函数入口函数
:param event: 触发事件(定时触发/HTTP触发参数)
:param context: 函数运行上下文
:return: 执行结果
"""
try:
# 1. 解析触发参数(示例:从event中获取需要拉取的商品ID列表)
# 定时触发时,可直接配置固定商品ID列表;HTTP触发时,可从请求参数中获取
event_data = json.loads(event) if isinstance(event, str) else event
product_id_list = event_data.get("product_id_list", ["1234567890", "0987654321"]) # 替换为实际商品ID
# 2. 批量拉取并存储数据
success_count = 0
fail_count = 0
for product_id in product_id_list:
try:
product_data = pull_1688_product_data(product_id)
save_product_data_to_rds(product_data)
success_count += 1
# 避免API调用频率超限,添加短暂延时
time.sleep(0.5)
except Exception as e:
fail_count += 1
continue
# 3. 构造返回结果
result = {
"status": "success",
"message": f"数据拉取完成:成功{success_count}条,失败{fail_count}条",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"success_count": success_count,
"fail_count": fail_count
}
print(f"[INFO] 本次数据拉取任务执行完成:{result}")
return json.dumps(result, ensure_ascii=False)
except Exception as e:
error_result = {
"status": "fail",
"message": f"任务执行失败:{str(e)}",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
print(f"[ERROR] 本次数据拉取任务执行异常:{error_result}")
return json.dumps(error_result, ensure_ascii=False)代码关键说明
配置信息通过FC 环境变量获取:避免硬编码敏感信息(如 App Secret、RDS 密码),提高安全性,同时方便后续修改配置;
数据库连接池优化:通过get_rds_connection函数封装连接逻辑,确保每次调用后关闭连接,避免连接泄露;
幂等性处理:使用INSERT ... ON DUPLICATE KEY UPDATE语法,确保重复拉取同一商品数据时不会产生脏数据,只会更新最新信息;
异常处理:每个核心步骤都添加了异常捕获与日志输出,方便后续排查问题;
频率控制:添加time.sleep(0.5),避免触发 1688 API 调用频率限制。
四、Serverless 服务部署(阿里云 FC)
完成代码编写后,即可将服务部署到阿里云 FC,步骤如下:
1. 创建 FC 函数
登录阿里云 FC 控制台,选择目标地域,点击「创建函数」;
函数类型选择「空白函数」,填写函数名称(如1688_data_pull_service),运行时选择「Python 3.9」;
代码上传方式选择「上传文件夹」,将index.py和requirements.txt打包上传;
入口函数配置为「index.handler」(对应代码中的handler函数)。
2. 配置环境变量
在 FC 函数「配置」-「环境变量」中,添加以下配置(对应代码中的配置项):
| 环境变量名 |
取值说明 |
| APP_KEY |
1688 开放平台应用 App Key |
| APP_SECRET |
1688 开放平台应用 App Secret |
| ACCESS_TOKEN |
1688 API 调用 Access Token |
| RDS_HOST |
RDS MySQL 实例公网 / 内网地址 |
| RDS_PORT |
RDS MySQL 端口(默认 3306) |
| RDS_USER |
RDS MySQL 用户名 |
| RDS_PASSWORD |
RDS MySQL 密码 |
| RDS_DB |
数据库名(默认 1688_data_db) |
3. 配置触发方式
根据实际需求配置触发方式,支持两种常见模式:
4. 配置日志与监控
在 FC 函数「配置」-「日志配置」中,关联已创建的 SLS 日志项目与日志库,开启日志采集;
进入阿里云「云监控 CMS」控制台,为 FC 函数创建监控告警规则(如函数执行失败次数 > 0、运行时间 > 10 秒时,通过短信 / 邮件通知);
为 RDS 配置监控告警(如数据库连接数过高、存储不足时告警)。
五、服务测试与监控
1. 服务测试
在 FC 控制台点击「测试函数」,构造测试事件(JSON 格式)
2.点击「执行」,查看函数执行日志与返回结果,若返回status: success,说明服务运行正常;
3.登录 RDS 控制台,查询product_info表,验证数据是否已成功存储。
2. 监控与排查
日志查询:通过 SLS 控制台,按函数名称、时间范围查询运行日志,快速定位异常(如 API 调用失败、数据库连接超时);
指标监控:在 FC 控制台「监控」页面,查看函数调用次数、执行耗时、错误率等指标,掌握服务运行状态;
告警接收:当服务出现异常时,云监控会通过配置的渠道(短信 / 邮件)发送告警通知,及时介入处理。
六、方案优化与扩展
批量拉取优化:针对大批量商品 ID,采用异步批量处理或分段拉取,避免函数运行超时;
缓存优化:将 1688 API 返回的非实时数据缓存到 Redis(阿里云 Redis 云服务),减少 API 调用次数,降低成本;
数据可视化:将 RDS 中的数据同步到阿里云 DataV,搭建数据可视化大屏,直观展示商品销量、价格趋势等信息;
多 API 扩展:基于现有架构,快速扩展拉取 1688 供应商列表、订单数据等其他 API,只需修改业务逻辑代码;
权限精细化:通过 RAM 角色为 FC 函数配置最小权限,仅授予必要的 RDS、SLS 访问权限,提高服务安全性。
七、总结
基于 Serverless 架构的 1688 API 数据拉取服务,彻底摆脱了传统服务器运维的繁琐工作,实现了「按需运行、按量付费」的轻量化部署。通过本文的步骤,你可以在 1 小时内完成服务的搭建、部署与监控,核心代码可直接复用,后续只需根据业务需求调整商品 ID 列表与触发频率即可。
该方案不仅适用于 1688 API,还可迁移到淘宝、京东等其他开放平台 API 的数据拉取场景,具备极强的通用性与扩展性。在降本增效的同时,借助云厂商的监控体系,确保服务稳定可靠运行,为电商数据分析、供应链管理提供有力支撑。
核心要点回顾
方案核心:以阿里云 FC 为 Serverless 载体,搭配 RDS(存储)、SLS(日志)、CMS(监控),实现 1688 API 数据拉取的全流程闭环;
代码关键:通过环境变量管理敏感配置,用INSERT ... ON DUPLICATE KEY UPDATE保证数据幂等性,添加异常捕获与频率控制提升稳定性;
部署与监控:FC 支持定时 / HTTP 双触发,搭配 SLS 日志查询与 CMS 告警,实现服务可观测与异常快速响应;
优势体现:无需管理服务器、按需付费、弹性扩容,大幅降低运维成本与资源浪费。