Scrapy入门(3):编写pipelines.py

pipelines.py中的类

创建好scrapy项目(项目名称为test0)后,pipelines.py内会有一个默认的pipelines类,如下所示。

1
2
3
4
5
# 当spider.py将提取的item结果提交至pipelines.py后,
# 会默认首先调用Test0Pipeline类中的process_item方法对item数据进行处理。
class Test0Pipeline(object):
def process_item(self, item, spider):
return item

我们可以对类名进行修改,比如因为数据需要存储到Mysql数据库,我将类名改为class MysqlPipeline(object)
但修改类名的同时需要记得去修改settings.py中的设置,否则程序会报错。

1
2
3
4
# 修改前
ITEM_PIPELINES = {'test0.pipelines.Test0Pipeline': 300}
# 修改后
ITEM_PIPELINES = {'test0.pipelines.MysqlPipeline': 300}

item数据的转换及存储

数据转换

去除商品名称两边的空白符

1
item['itemName'] = item['itemName'].strip()

列表转为字符串

1
2
item['havaStockSize'] = ','.join(item['havaStockSize'])
item['noneStockSize'] = ','.join(item['noneStockSize'])

创建更新时间

1
2
import datetime
item['updateTime'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

商品图片上传至七牛云

需先申请好七牛云账号,代码可参考七牛帮助文档
安装七牛包

1
pip3.5 install qiniu

使用七牛包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# urlretrieve,用来下载图片至本地
from urllib.request import urlretrieve
# 引用七牛包
from qiniu import Auth, put_file
qiniu_access_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
qiniu_secret_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
# 构建鉴权对象
q = Auth(qiniu_access_key, qiniu_secret_key)
# 要上传的空间名称
bucket_name = 'xxxxxxxxxx'
# 图片在本地的存储地址,需要先使用下载至本地,再从本地上传到七牛云
imageDir = '../image/'
# 定义个一个上传商品展示图片的方法,参数包括产品ID-productID,展示图片的原http地址列表(多张图片)
def ImgListUpload(self, productID, imgList):
i = 0
newImgList = []
for imgURL in imgList:
# 使用产品ID-productID + 一个递增数字 构造图片的新名称
imgName = productID + "_" + str(i) + '.jpg'
localfile = imageDir + imgName
try:
# 下载图片至本地
urlretrieve(imgURL, localfile)
# 生成上传Token,可以指定过期时间等
token = q.upload_token(bucket_name, imgName, 3600)
# 上传
ret, info = put_file(token, imgName, localfile)
if ret['key'] == imgName:
newImgList.append(imgName)
# assert ret['key'] == imgName, 'image上传失败'
# assert ret['hash'] == etag(localfile)
except:
# 上传失败,将图片URL及构造的新图片名称记录日志
with open('imgError.txt', 'a') as f:
f.write(imgURL + "||" + imgName + "\n")
i += 1
# 返回结果:新的图片名称列表
# 如果使用的是七牛的public空间,直接用域名+图片名称组建URL地址即可访问图片
return newImgList

商品数据存储至Mysql

使用pymysql包将数据存储至Mysql.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import pymysql
import json
import codecs
conn = pymysql.connect(host=host,
port=port,
db=db,
user=fabbi_bi['user'],
password=password,
charset='utf8')
# 先在数据库内查询sourceProductId是否已存在,如果已存在,只需要更新价格和库存信息,否则将这个商品新增入库
# 商品入库或者信息被更新后,记录更新时间,更新状态.等全部商品处理完后将未做更新的商品视为下架商品
try:
cur = conn.cursor()
selectSql = "select id from item where sourceProductId='" + item['sourceProductId'] + "';"
cur.execute(selectSql)
sqlResult = cur.fetchone()
conn.commit()
# 商品新增入库
if sqlResult == None:
# 遍历字典item,对所有的值进行转义
for key in item:
item[key] = pymysql.escape_string(item[key])
# 拼接insert语句
column_name = '' # 列的字段
column_value = '' # 行字段
for key in item.keys():
if item[key] is None:
item[key] = ''
column_name = column_name + key + ','
column_value = column_value + "\"" + item[key] + "\","
cur.execute("INSERT INTO item(%s) VALUES(%s)" % (column_name[:-1], column_value[:-1]))
conn.commit()
# 商品已存在,更新商品的价格,库存,更新时间
else:
updateSql = "UPDATE `scrapyItem`.`item` SET `price` = %s,`havaStockSize` = %s,`noneStockSize` = %s,`updateTime` = %s WHERE `sourceProductId` = %s;"
cur.execute(updateSql, (item['price'],
item['havaStockSize'],
item['noneStockSize'],
item['updateTime'],
item['sourceProductId'])
)
conn.commit()
cur.close()
except:
with codecs.open('itemError.json', 'a', encoding='utf-8') as f:
f.write(json.dumps(dict(item), ensure_ascii=False) + "\n")
finally:
conn.close()


That’s all.
Happy writing!