scrapy使用item,pipeline爬取数据,并且上传图片到oss

目录
  1. 1. 声明item对象
  2. 2. 编写爬虫脚本
  3. 3. pipeline管道处理数据
  4. 4. IwatchImgUploadPipeline
  5. 5. IwatchDataMysqlPipeline
  6. 6. 入库封装
  7. 7. redis去重,进行永久性存储
  8. 8. log日志处理

首先是要理解scrapy的运作原理是什么。这里不做解释,网上有很多资源可以理解。

声明item对象

明确了需要爬取哪些字段,就现在items.py里面定义字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import scrapy

class IwatchImgItem(scrapy.Item):
"""
下载项目图片的item定义
依次对应:图片链接、图片对象、图片保存地址,插入之后返回的id
"""
image_urls = scrapy.Field()
images = scrapy.Field()
image_paths = scrapy.Field()
id = scrapy.Field()

class IwatchBrandItem(IwatchImgItem):
"""
品牌的参数定义
依次对应:品牌中文名字、品牌英文名字、品牌链接、品牌图片链接、品牌商标图片链接、品牌介绍
"""
brand_chname= scrapy.Field()
brand_enname = scrapy.Field()
brand_url = scrapy.Field()
brand_img_url = scrapy.Field()
brand_trademark_url = scrapy.Field()
brand_introduction = scrapy.Field()


class IwatchSerieItem(IwatchImgItem):
"""
品牌之系列参数定义
依次对应:品牌名字、系列名字、系列链接、系列介绍、系列图片链接
"""
brand_chname = scrapy.Field()
serie_name = scrapy.Field()
serie_url = scrapy.Field()
serie_introduction = scrapy.Field()
serie_img_url = scrapy.Field()


class IwatchwatchItem(IwatchImgItem):
"""
系列之商品手表参数定义
依次对应:手表名字、手表链接、手表图片链接、手表美元价格、手表港元价格、手表欧元价格
手表基本信息、手表机芯、手表外观、手表功能
"""
iwatch_name = scrapy.Field()
iwatch_url = scrapy.Field()
iwatch_img_url = scrapy.Field()
iwatch_price_yuan = scrapy.Field()
iwatch_price_hkd = scrapy.Field()
iwatch_price_euro = scrapy.Field()
iwatch_base = scrapy.Field()
iwatch_mechanism = scrapy.Field()
iwatch_exterior = scrapy.Field()
iwatch_fuction = scrapy.Field()

我这里有4个item对象,其中IwatchBrandItem,IwatchSerieItem,IwatchwatchItem都继承了IwatchImgItem,因为这三个item里面都需要下载项目的图片到oss上面去。

编写爬虫脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import scrapy
from spider_iwatch.items import IwatchBrandItem, IwatchSerieItem, IwatchwatchItem

class IwatchSpider(scrapy.Spider):
name = 'iwatch' # 爬虫名,这是独一无二的
allowed_domains = [''] 这里放允许爬取网址的域名
start_urls = [''] # 这里放最开始爬取的网址,我保密删除了

def parse(self, response):
div_list = response.xpath(".//div[@class='item_one']")
for div in div_list:
li_list = div.xpath(".//div[@class='list_main']//li")
for li in li_list:
# 省略部分解析内容
brandItem = IwatchBrandItem()
follow_serie_url = li.xpath(".//div[@class='item_btn']/a/@href").extract_first()
yield scrapy.Request(follow_serie_url, callback=self.parse_serie, meta={'brand_chname': brand_chname}, priority=30
yield brandItem

def parse_serie(self, response):
# 省略部分解析内容
a_list = response.xpath(".//div[@class='c_wc_list']//a")
for a in a_list:
serie_url = a.xpath("./@href").extract_first().replace(".html", "_1.html")
yield scrapy.Request(serie_url, callback=self.parse_iwatch_page_first
yield serieitem

def parse_iwatch_page_first(self, response):

li_list = response.xpath(".//div[@class='w_class_list']//li")
for li in li_list:
iwatch_url = li.xpath("./a/@href").extract_first()
yield scrapy.Request(iwatch_url, callback=self.parse_iwatch)

try:
page = response.xpath(".//div[@class='manu']/a/text()")[-2].extract()
except:
page = 1
for i in range(1, int(page)):
next_link = response.url.replace("_1.html", "_{}.html".format(str(i + 1)))
yield scrapy.Request(next_link, callback=self.parse_iwatch_page_next)

def parse_iwatch_page_next(self, response):
li_list = response.xpath(".//div[@class='w_class_list']//li")
for li in li_list:
iwatch_url = li.xpath("./a/@href").extract_first()
yield scrapy.Request(iwatch_url, callback=self.parse_iwatch)

def parse_iwatch(self, response):
watchitem = IwatchwatchItem()
# 解析内容省略
yield watchitem

pipeline管道处理数据

pipeline.py代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import hashlib, logging
import os, oss2, requests

import scrapy
from scrapy.pipelines.images import ImagesPipeline
from spider_iwatch.items import IwatchBrandItem, IwatchSerieItem, IwatchwatchItem
from scrapy.exceptions import DropItem, NotConfigured
from functools import reduce
from spider_iwatch.common.iwatch_sql import DBHelper
from spider_iwatch.common import config

from spider_iwatch.utils.redis_filter.request_filter import RequestFilter


class IwatchDataMysqlPipeline(object):

def __init__(self):
self.rf = RequestFilter()

def process_item(self, item, spider):

# 保存brand信息
if isinstance(item, IwatchBrandItem):
brand_id = DBHelper().get_brand_id(item['brand_chname'])
if brand_id == False:
image_paths = reduce(lambda x, y: x + ';' + y, item['image_paths'])
value = (
item['brand_chname'], item['brand_enname'], item['brand_url'], item['brand_img_url'], item['brand_trademark_url'],
item['brand_introduction'], image_paths)
result = DBHelper().insert_brand(value)
if isinstance(result, Exception):
logging.warning("\n数据插入失败,捕获原因:{}\n数据:{}".format(result[1:], item))
else:
item['id'] = result
else:
item['id'] = brand_id


# 保存serie信息
elif isinstance(item, IwatchSerieItem):
brand_id = DBHelper().get_brand_id(item['brand_chname'])
serie_id = DBHelper().get_serie_id(brand_id, item['serie_name'])
if serie_id == False:
image_paths = item['image_paths'][0]
value = (brand_id, item['serie_name'], item['serie_url'], item['serie_introduction'], item['serie_img_url'], image_paths)
result = DBHelper().insert_serie(value)
# 这里添加日志信息,需要检验插入是否成功
if isinstance(result, Exception):
logging.warning("\n数据插入失败,捕获原因:{}\n数据:{}".format(result[1:], item))
else:
item['id'] = result
else:
item['id'] = serie_id


# 保存手表信息
else:
brand_id = DBHelper().get_brand_id(item['iwatch_base']['iwatch_brand'])
serie_id = DBHelper().get_serie_id(brand_id, item['iwatch_base']['iwatch_serie'])
iwatch_id = DBHelper().get_iwatch_id(serie_id, item['iwatch_base']['iwatch_model'])
if iwatch_id == False:
image_paths = item['image_paths'][0]
value = (serie_id, item['iwatch_name'], item['iwatch_url'], item['iwatch_img_url'], item['iwatch_price_yuan'],
item['iwatch_price_hkd'], item['iwatch_price_euro'], item['iwatch_base']['iwatch_model'],
item['iwatch_base']['iwatch_style'], item['iwatch_base']['iwatch_waterproof'], item['iwatch_base']['iwatch_ttm'],
item['iwatch_base']['iwatch_num'], item['iwatch_mechanism']['iwatch_mech_type'],
item['iwatch_mechanism']['iwatch_mech_model'], item['iwatch_mechanism']['iwatch_power'],
item['iwatch_mechanism']['iwatch_hole'], item['iwatch_mechanism']['iwatch_obser_certi'],
item['iwatch_mechanism']['iwatch_geneve'], item['iwatch_exterior']['iwatch_surface_diameter'],
item['iwatch_exterior']['iwatch_material'], item['iwatch_exterior']['iwatch_band_material'],
item['iwatch_exterior']['iwatch_dial_color'], item['iwatch_exterior']['iwatch_mirror_material'],
item['iwatch_exterior']['iwatch_bottom'], item['iwatch_exterior']['iwatch_diamonds'],
item['iwatch_exterior']['iwatch_thickness'], item['iwatch_exterior']['iwatch_shape'],
item['iwatch_exterior']['iwatch_clasp_material'], item['iwatch_exterior']['iwatch_band_color'],
item['iwatch_exterior']['iwatch_clasp_type'], item['iwatch_exterior']['iwatch_fritillaria'],
item['iwatch_exterior']['iwatch_noctilucent'], item['iwatch_fuction'], image_paths)
result = DBHelper().insert_iwatch(value)
# 这里添加日志信息,需要检验插入是否成功
if isinstance(result, Exception):
logging.warning("\n数据插入失败,捕获原因:{}\n数据:{}".format(result[1:], item))
else:
# 保存链接到redis上面去
self.rf.mark_url(item['iwatch_url'])
item['id'] = result
else:
item['id'] = iwatch_id
return item


class IwatchImgUploadPipeline(object):
'''上传图片到oss的管道'''

def __init__(self):
'''创建oss连接对象'''
self.endpoint = config.OSS_ENDPOINT
self.access_key_id = config.OSS_ACCESS_KEY_ID
self.access_key_secret = config.OSS_ACCESS_KEY_SECRET
self.bucket_name = config.OSS_IMG_BUCKET_NAME
self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
self.rf = RequestFilter()

def process_item(self, item, spider):
item['image_paths'] = []
for image_url in item['image_urls']:
image_name = self.calSha1(image_url)
if self.rf.imgurl_is_exists(image_url):
item['image_paths'].append("oss地址" + image_name)
print('该图片链接已经请求过啦')

else:
# 处理错误链接
try:
image_input = requests.get(image_url, timeout=(4, 7))
except:
item['image_paths'].append("")
continue
# 上传到oss
result = self.bucket.put_object("image/" + image_name, image_input)
if result.status == 200:
# print("该图片上传成功")
item['image_paths'].append("oss地址" + image_name)
self.rf.mark_url(image_url)
return item

def calSha1(self, image_url):
'''给image_url加密'''
sha1obj = hashlib.sha1()
sha1obj.update(image_url.encode())
hash = sha1obj.hexdigest()
return hash + '.jpg'

这里创建了两个管道,一个是IwatchDataMysqlPipeline,数据入库。另一个是IwatchImgUploadPipeline,上传图片到oss。

注意:使用pymysql的时候,不能在构造函数用实例化一个mysql_util一直使用,然后在析构函数中close,如果这么做,那么即便数据库中的数据发生改变了,但是在爬虫进程结束之前,不管查询多少次,查询结果永远不变。正确的做法是每次操作sql都重新实例化一个mysql_util。

IwatchImgUploadPipeline

首先所有返回的item,都会有item['image_urls'],循环读出url,然后用request.get请求,把请求流上传到oss上面去。注意,我这里没有使用ImagesPipeline,因为使用了ImagesPipeline,会把图片下载下来,之后就需要图片本地的路径,然后上传再删除本地路径,这样做很繁琐,所以我没有采用这样的方式,我直接获取了图片的流,只上传流。如果既需要上传,也需要下载保存到本地,建议使用ImagesPipeline。

同时我把url,用哈希算法加密了,如果上传成功,就把url保存到redis上面去,做永久存储。
添加item['image_paths'],把图片路径返回回去。

IwatchDataMysqlPipeline

这里是进行数据存储的。因为主要返回3个类型的item,需要对item进行判断,分别进行处理
scrapy处理返回的多个item,使用isinstance(item, item类型)来判断。
scrapy处理返回的多个爬虫,使用spider.name="爬虫名"来判断

入库封装

conn_mysql.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# -*- coding:utf-8 -*-
'''
mysql连接
'''

import pymysql.cursors
from spider_iwatch.common import config
from DBUtils.PooledDB import PooledDB


class MysqlUtil(object):

# 连接MySQL
def connection(self):
try:
db = pymysql.connect(config.DB_HOST, config.DB_USER, config.DB_PWD, config.DB_NAME,
charset=config.DB_CHARSET)
cursor = db.cursor()
return cursor
except Exception:
# 数据库连接失败
return False

"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = MysqlUtil.get_conn()
释放连接对象;conn.close()或del conn
"""
# 连接池对象
pool = None

def __init__(self):
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self.conn = MysqlUtil.get_conn()
self.cursor = self.conn.cursor()

@staticmethod
def get_conn():
"""
@summary: 静态方法,从连接池中取出连接
@return pymysql.connection
"""
connargs = {"host": config.DB_HOST, "user": config.DB_USER, "passwd": config.DB_PWD, "db": config.DB_NAME,
"charset": config.DB_CHARSET}
if MysqlUtil.pool is None:
MysqlUtil.pool = PooledDB(creator=pymysql, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED,
maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS,
blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE,
setsession=config.DB_SET_SESSION, **connargs)
# print MysqlUtil.pool
return MysqlUtil.pool.connection()

def get_all(self, sql, param=None):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if param is None:
count = self.cursor.execute(sql)
else:
count = self.cursor.execute(sql, param)
if count > 0:
result = self.cursor.fetchall()
else:
result = False
return result

def get_one(self, sql, param=None):
"""
@summary: 执行查询,并取出第一条
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self.cursor.execute(sql)
else:
count = self.cursor.execute(sql, param)
if count > 0:
result = self.cursor.fetchone()
else:
result = False
return result

def get_many(self, sql, num, param=None):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询sql,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self.cursor.execute(sql)
else:
count = self.cursor.execute(sql, param)
if count > 0:
result = self.cursor.fetchmany(num)
else:
result = False
return result

def insert_one(self, sql, value):
"""
@summary: 向数据表插入一条记录
@param sql:要插入的sql格式
@param value:要插入的记录数据tuple/list
@return: insert_id 受影响的行数
"""
try:
self.cursor.execute(sql, value)
self.conn.commit()
return self.get_insert_id()
except Exception as e:
self.conn.rollback()
return e

def insert_many(self, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的sql格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
try:
count = self.cursor.executemany(sql, values)
self.conn.commit()
return count
except:
self.conn.rollback()
return 0

def get_insert_id(self):
"""
获取当前连接最后一次插入操作生成的id,如果没有则为0
"""
self.cursor.execute("SELECT @@IDENTITY AS id")
result = self.cursor.fetchall()
return result[0][0]

def query(self, sql, param=None):
try:
if param is None:
count = self.cursor.execute(sql)
self.conn.commit()
else:
count = self.cursor.execute(sql, param)
self.conn.commit()
return count

except:
self.conn.rollback()
return 0

def update(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: sql格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.query(sql, param)

def delete(self, sql, param=None):
"""
@summary: 删除数据表记录
@param sql: sql格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return self.query(sql, param)

def begin(self):
"""
@summary: 开启事务
"""
self.conn.autocommit(0)

def end(self, option='commit'):
"""
@summary: 结束事务
"""
if option == 'commit':
self.conn.commit()
else:
self.conn.rollback()

def dispose(self, is_end=1):
"""
@summary: 释放连接池资源
"""
if is_end == 1:
self.end('commit')
else:
self.end('rollback');
self.cursor.close()
self.conn.close()

def execute_sql(self, sql):
"""
@summary:执行原生SQL,主要是插入、更新、删除
:param sql: 要执行的SQL语句
:return: 1代表成功 0代表失败
"""
try:
self.cursor.execute(sql)
self.conn.commit()
return 1
except:
self.conn.rollback()
return 0

conn_mysql主要是对mysql数据库的常规操作。
之后编写操作数据的原生sql语句。

iwatch_sql.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
数据库相关操作sql原生语句
"""

from spider_iwatch.common import conn_mysql


class DBHelper(object):

def __init__(self):
self.mysql_util = conn_mysql.MysqlUtil()

def get_brand_id(self, brand_chname):
"""
@summary:根据品牌名字获取品牌id
:param brand_chname:品牌中文名字
:return brand_id:品牌id
"""

select_sql = "SELECT id FROM brand WHERE brand_chname = %s"
select_result = self.mysql_util.get_one(select_sql, (brand_chname,))
if select_result:
return select_result[0]
else:
return False

def insert_brand(self, value):
"""
@summary:插入品牌信息
:param value:品牌的所有参数
:return brand_id:品牌id
"""
insert_sql = "INSERT INTO brand(brand_chname,brand_enname, brand_url, brand_img_url, brand_trademark_url, brand_introduction, brand_image_paths) VALUES (%s,%s,%s,%s,%s,%s,%s)"
insert_result = self.mysql_util.insert_one(insert_sql, value)
return insert_result

def get_serie_id(self, brand_id, serie_name):
"""
@summary:根据品牌id,系列名字,获取系列id
:param brand_id serie_name:品牌id 系列名字
:return serie_id:系列id
"""
select_sql = "SELECT id FROM serie WHERE brand_id = %s AND serie_name = %s"
select_result = self.mysql_util.get_one(select_sql, (brand_id, serie_name))
if select_result:
return select_result[0]
else:
return False

def insert_serie(self, value):
"""
@summary:插入系列信息
:param value:系列的所有参数
:return serie_id:系列id
"""
insert_sql = "INSERT INTO serie(brand_id, serie_name, serie_url, serie_introduction, serie_img_url, serie_image_paths)VALUES (%s,%s,%s,%s,%s,%s)"
insert_result = self.mysql_util.insert_one(insert_sql, value)
return insert_result

def get_iwatch_id(self, serie_id, iwatch_model):
"""
@summary:根据系列id,手表型号,获取手表id
:param serie_id iwatch_model:系列id 手表型号
:return iwatch_id:手表id
"""
select_sql = "SELECT id FROM iwatch WHERE iwatch_serie_id = %s AND iwatch_model = %s"
select_result = self.mysql_util.get_one(select_sql, (serie_id, iwatch_model))
if select_result:
return select_result[0]
else:
return False

def insert_iwatch(self, value):
"""
@summary:插入手表信息
:param value:手表的所有参数
:return iwatch_id:手表id
"""
insert_sql = "INSERT INTO iwatch(iwatch_serie_id, iwatch_name, iwatch_url, iwatch_img_url, iwatch_price_yuan, " \
"iwatch_price_hkd, iwatch_price_euro, iwatch_model, iwatch_style, iwatch_waterproof, iwatch_ttm, " \
"iwatch_num, iwatch_mech_type, iwatch_mech_model, iwatch_power, iwatch_hole, iwatch_obser_certi, " \
"iwatch_geneve, iwatch_sur_dia, iwatch_material, iwatch_band_material, iwatch_dial_color, " \
"iwatch_mirror_material, iwatch_bottom, iwatch_diamonds, iwatch_thickness, iwatch_shape, " \
"iwatch_clasp_material, iwatch_band_color, iwatch_clasp_type, iwatch_fritillaria, " \
"iwatch_noctilucent,iwatch_fuction,iwatch_image_paths) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s," \
"%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
insert_result = self.mysql_util.insert_one(insert_sql, value)
return insert_result

redis去重,进行永久性存储

redis_filter.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import redis,hashlib,six

class RedisFilter(object):
'''基于redis的持久化存储的去重判断'''

def __init__(self,
hash_func_name="sha1",
redis_host="localhost",
redis_port=6379,
redis_db=0,
redis_key="filter"
):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_key = redis_key


# 创建一个hash对象
self.hash_func = getattr(hashlib, hash_func_name)
# 创建一个redis连接对象
pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=self.redis_db)
self.storage = redis.StrictRedis(connection_pool=pool)

def _safe_data(self, data):
'''
:param data: 给定的原始数据
:return: 二进制类型的字符串数据
'''
if six.PY3:
if isinstance(data, bytes):
return data
elif isinstance(data, str):
return data.encode()
else:
raise Exception("must provide a string")
else:
if isinstance(data, str): # Python2中str就是Python3中的bytes
return data
elif isinstance(data, unicode): # Python3中没有Unicode类型,所以报错,str就是Unicode
return data.encode()
else:
raise Exception("must provide a string")

def _get_hash_value(self, data):
'''
根据给定的数据,返回的对应的hash值
:param data: 给定的原始数据(二进制类型的字符串数据)
:return: hash值
'''
hash_obj = self.hash_func()
hash_obj.update(self._safe_data(data))
hash_value = hash_obj.hexdigest()
return hash_value

def save(self, data):
'''
根据data计算出对应的指纹进行存储
:param data: 给定的原始数据
:return: 存储的结果
'''
hash_value = self._get_hash_value(data)
return self.storage.sadd(self.redis_key, hash_value)
# return self._save(hash_value)

def is_exists(self, data):
'''
判断给定的数据对应的指纹是否存在
:param data: 给定的原始数据
:return: True or False
'''
hash_value = self._get_hash_value(data)
return self.storage.sismember(self.redis_key, hash_value)

request_filter.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from .redis_filter import RedisFilter
import urllib.parse


class RequestFilter(object):
'''实现请求去重的逻辑'''

def __init__(self):
self.r = RedisFilter()

def request_is_exists(self, url):
'''
判断请求是否已经处理过
return: True or False
'''
# data = self._get_request_filter_data(request_obj)
return self.r.is_exists(url)

def imgurl_is_exists(self,image_url):
return self.r.is_exists(image_url)

def mark_request(self, request_obj):
'''
标记已经处理过的请求对象
:param request_obj:
:return: 标记
'''
data = self._get_request_filter_data(request_obj)
return self.r.save(data)

def mark_url(self,url):
return self.r.save(url)

def _get_request_filter_data(self, request_obj):
'''
根据一个请求对象 将url,method,body拼接为字符串 然后再进行去重处理
:param request_obj:
:return: 转换后的字符串
'''
# 1. 把协议和域名部分进行大小写统一,其他的保留原始大小写格式 对查询参数进行简单的排序
url = request_obj.url
_ = urllib.parse.urlparse(url)
url_without_query = _.scheme + "://" + _.hostname + _.path
url_query = urllib.parse.parse_qsl(_.query)

# 2.method: "Get".upper(),保证请求方式全部大写
method = request_obj.method.upper()

url_with_query = url_without_query + "?" + urllib.parse.urlencode(url_query)

# 4.body: 是字典需要排序才能保持一致
str_body = str(sorted(request_obj.body.items()))

data = url_with_query + method + str_body
# url method body
return data

主要是mark_url,其它方法我需要在别的项目里用到,也没有删除,可以无视。
讲下这个去重是怎么做的。
主要就是请求的时候,会先把url加密成指纹,然后去redis上找,有没有这个指纹存在,如果存在就不进行请求。不存在就请求。请求成功后,入库也成功后,在把这个指纹保存到redis里面去。你们可以看见我在pipeline.py里面有这个:

1
2
# 保存链接到redis上面去
self.rf.mark_url(item['iwatch_url'])

我是确保入库成功了,才入库的。

middlewares.py

1
2
3
4
5
6
7
8
9
10
11
class SpiderIwatchDetailFilterMiddleware(object):
"""使用redis存储已经请求过的详情页request"""

def __init__(self):
self.rf = RequestFilter()

def process_request(self, request, spider):
"""处理request请求,将已经请求的request忽略,没有请求过的继续请求"""
url = request.url
if self.rf.request_is_exists(url):
raise IgnoreRequest

添加了一个中间件,用于过滤请求的。
注意:只能把详情页URL存储到redis中,如果把手表品牌URL存储到redis,则该品牌URL下次爬取时会被忽略,当这个品牌增加新款手表,则下次执行爬虫时爬取不到这些增加的内容。

settings .py

1
2
3
4
5
6
7
8
ITEM_PIPELINES = {
'spider_iwatch.pipelines.IwatchImgUploadPipeline':300,
'spider_iwatch.pipelines.IwatchDataMysqlPipeline': 300,
}

DOWNLOADER_MIDDLEWARES = {
'spider_iwatch.middlewares.SpiderIwatchDetailFilterMiddleware': 543,
}

一定要打开管道和中间件,不然是不起作用的。

log日志处理

用于保存异常情况,尤其是插入异常
settings .py

1
2
3
4
5
to_day =datetime.datetime.now()
log_file_path = "log/scrapy_{}_{}_{}".format(to_day.year,to_day.month,to_day.day)

LOG_LEVEL = "WARNING"
LOG_FILE = log_file_path

pipeline里面使用了日志保存

1
2
import logging
logging.warning("\n数据插入失败,捕获原因:{}\n数据:{}".format(result[1:], item))

日志处理我写的并不详细,具体可以网上百度找找看,资料很多的。

Python日志库logging总结-可能是目前为止将logging库总结的最好的一篇文章
Python中logging模块的基本用法
python中logging模块的一些简单用法


上面的代码中请求图片使用的是requests,会导致阻塞,使用aiohttp更好(如果使用的不是OSS的SDK,而是调用API接口,建议也使用aiohttp),因为aiohttp是异步的,它的用法和requests相似。在python3.5中,加入了asyncio/await 关键字,使得回调的写法更加直观和人性化。而aiohttp是一个提供异步web服务的库,asyncio可以实现单线程并发IO操作。

requests写爬虫是同步的,是等待网页下载好才会执行下面的解析、入库操作,如果在下载网页时间太长会导致阻塞,使用multiprocessing或者threading加速爬虫也是一种方法。

我们现在使用的aiohttp是异步的,简单来说,就是不需要等待,你尽管去下载网页就好了,我不用傻傻的等待你完成才进行下一步,我还有别的活要干。这样就极大的提高了下载网页的效率。

另外,Scrapy也是异步的,是基于Twisted事件驱动的。在任何情况下,都不要写阻塞的代码。阻塞的代码包括:

  • 访问文件、数据库或者Web
  • 产生新的进程并需要处理新进程的输出,如运行shell命令
  • 执行系统层次操作的代码,如等待系统队列

Python学习之路37-使用asyncio包处理并发
第十八章 - 使用asyncio处理并发
这两篇都是《流畅的Python》读书笔记,详细内容是第十八章。