|
|
|
import os
|
|
|
|
import os
|
|
|
|
import re
|
|
|
|
import threading
|
|
|
|
import time
|
|
|
|
|
|
|
|
import requests
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
# 手机实体类
|
|
|
|
from openpyxl import load_workbook, Workbook
|
|
|
|
|
|
|
|
from Crawler import MobiePhoneCrawler
|
|
|
|
from config.config import cf, config_path
|
|
|
|
from config.log import writeInfo, writeError
|
|
|
|
|
|
|
|
|
|
|
|
# 评测中心手机爬虫
|
|
|
|
# http://product.cnmo.com/all/product.html
|
|
|
|
class CnmoCrawler(MobiePhoneCrawler):
|
|
|
|
def __init__(self) -> None:
|
|
|
|
super().__init__()
|
|
|
|
self.threads = []
|
|
|
|
self.threadLock = threading.Lock()
|
|
|
|
try:
|
|
|
|
# 线程池大小
|
|
|
|
self.thread_count = int(cf.get('excel', 'thread_count'))
|
|
|
|
# 数据指定缓存数写入一次excel
|
|
|
|
self.data_size = int(cf.get('excel', 'data_size'))
|
|
|
|
# 获取文件保存路径
|
|
|
|
self.file1 = cf.get('excel', 'file1')
|
|
|
|
self.file2 = cf.get('excel', 'file2')
|
|
|
|
# 获取保存参数列表
|
|
|
|
self.param_name_list = cf.get('excel', 'param_name').split(',')
|
|
|
|
# 获取非空参数个数
|
|
|
|
self.param_required_index = int(cf.get('excel', 'param_required_index'))
|
|
|
|
# 采集数据量
|
|
|
|
self.max_count = int(cf.get('excel', 'max_count'))
|
|
|
|
except Exception as e:
|
|
|
|
writeError("初始化参数失败,异常信息{0},请检查配置文件{1}的配置".format(e, config_path))
|
|
|
|
raise
|
|
|
|
# 清空上次采集数据
|
|
|
|
if os.path.exists(self.file1):
|
|
|
|
os.remove(self.file1)
|
|
|
|
if os.path.exists(self.file2):
|
|
|
|
os.remove(self.file2)
|
|
|
|
|
|
|
|
def get_page(self):
|
|
|
|
# 起始页链接
|
|
|
|
start_url = 'http://product.cnmo.com/all/product.html'
|
|
|
|
# 下一页链接
|
|
|
|
next_page_url = None
|
|
|
|
while True:
|
|
|
|
current_page_url = start_url if next_page_url is None else next_page_url
|
|
|
|
writeInfo("开始解析列表页:{0}".format(current_page_url))
|
|
|
|
# 调用解析器解析网页请求体
|
|
|
|
res = self.get_req(current_page_url)
|
|
|
|
# 判断响应状态码,200正常返回
|
|
|
|
if res is not None and res.status_code == 200:
|
|
|
|
try:
|
|
|
|
writeInfo("列表页:{0}解析成功".format(current_page_url))
|
|
|
|
res_html = BeautifulSoup(self.uzipData(res.content), 'html.parser')
|
|
|
|
# 解析列表数据
|
|
|
|
li_s = res_html.select("ul.all-con-con-ul.cf>li")
|
|
|
|
for li in li_s:
|
|
|
|
if len(self.mobie_list) > self.max_count:
|
|
|
|
return
|
|
|
|
p = li.select_one('p.red')
|
|
|
|
# 多线程获取手机详情参数
|
|
|
|
time_to_market = re.search('\d{4}年\d{2}月', p.text)
|
|
|
|
thread = myThread(self, 'http:{0}'.format(li.select_one('a.name')['href']),
|
|
|
|
'http:{0}'.format(li.select_one('div.info>a:contains(参数)')['href']),
|
|
|
|
上市时间=None if time_to_market is None else time_to_market.group())
|
|
|
|
thread.start()
|
|
|
|
if len(self.threads) == self.thread_count:
|
|
|
|
for t in self.threads:
|
|
|
|
t.join()
|
|
|
|
writeInfo("清空线程池")
|
|
|
|
self.threads.clear()
|
|
|
|
|
|
|
|
self.threads.append(thread)
|
|
|
|
|
|
|
|
# 获取下一页链接
|
|
|
|
href = res_html.select_one(".pnext")["href"]
|
|
|
|
if len(href) == 0:
|
|
|
|
writeInfo('已经没有更多数据,爬虫程序将结束')
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
next_page_url = 'http:{0}'.format(res_html.select_one(".pnext")["href"])
|
|
|
|
except Exception as e:
|
|
|
|
writeError("解析列表页出现异常信息:{0}".format(e))
|
|
|
|
else:
|
|
|
|
raise Exception("列表页:{0}解析失败".format(current_page_url))
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
try:
|
|
|
|
self.get_page()
|
|
|
|
except Exception as e:
|
|
|
|
writeError("采集数据出现异常,开始清洗脏数据")
|
|
|
|
writeInfo('采集数据完毕,开始清洗脏数据')
|
|
|
|
self.clear_data()
|
|
|
|
writeInfo('清洗脏数据完毕')
|
|
|
|
|
|
|
|
# 获取价格标签数据
|
|
|
|
def find_chinese(self, html, text):
|
|
|
|
if html.select_one('span:contains({0})'.format(text)) is None:
|
|
|
|
return html.select_one('span:contains({0})'.format(text.encode('gbk').decode('iso8859-1')))
|
|
|
|
else:
|
|
|
|
return html.select_one('span:contains({0})'.format(text))
|
|
|
|
|
|
|
|
def decode(self, text):
|
|
|
|
encoding = ['iso8859-1', 'iso8859-9']
|
|
|
|
decoding = 'gbk'
|
|
|
|
for i in encoding:
|
|
|
|
try:
|
|
|
|
return text.encode(i).decode(decoding)
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
return text
|
|
|
|
|
|
|
|
def get_mobie(self, base_url, param_url, **kwargs):
|
|
|
|
# 字典存储手机详细参数
|
|
|
|
param_dict = {}
|
|
|
|
writeInfo("开始解析手机详情参数页{0}".format(param_url))
|
|
|
|
|
|
|
|
# 获取网友综合评分
|
|
|
|
score_res = self.get_req(base_url)
|
|
|
|
if score_res is not None and score_res.status_code == 200:
|
|
|
|
score_res_html = BeautifulSoup(self.uzipData(score_res.content), 'html.parser')
|
|
|
|
param_dict['网友综合评分'] = score_res_html.select_one('div.pro-comm-stars').find_next('span',
|
|
|
|
{'class': 'red'}).text
|
|
|
|
mobie_res = self.get_req(param_url)
|
|
|
|
|
|
|
|
# 判断响应状态码,200正常返回
|
|
|
|
if mobie_res is not None and mobie_res.status_code == 200:
|
|
|
|
# 调用解析器解析网页请求体
|
|
|
|
try:
|
|
|
|
mobie_res_html = BeautifulSoup(mobie_res.content, 'html.parser')
|
|
|
|
phone_name = self.decode(mobie_res_html.select_one('#proName>a').text)
|
|
|
|
param_dict['手机名称'] = phone_name
|
|
|
|
writeInfo("开始解析手机\"{0}\"详细参数".format(phone_name))
|
|
|
|
# 参考价格
|
|
|
|
price = self.find_chinese(mobie_res_html, '参考价格')
|
|
|
|
if price is not None and price.find_next() is not None and "".join(
|
|
|
|
filter(str.isdigit, price.find_next().text)).isdigit():
|
|
|
|
param_dict['参考价格'] = "".join(filter(str.isdigit, price.find_next().text))
|
|
|
|
# 电商报价
|
|
|
|
price = self.find_chinese(mobie_res_html, '电商报价')
|
|
|
|
if price is not None and price.next_sibling is not None and "".join(
|
|
|
|
filter(str.isdigit, price.next_sibling.strip())).isdigit():
|
|
|
|
param_dict['电商报价'] = "".join(filter(str.isdigit, price.next_sibling().strip))
|
|
|
|
# 获取参数名
|
|
|
|
param_name_list = mobie_res_html.select('div.right>p')
|
|
|
|
for param_name in param_name_list:
|
|
|
|
# 获取参数值
|
|
|
|
param_dict[param_name['paramname']] = self.decode(param_name['paramvalue'])
|
|
|
|
# 获取锁,用于线程同步
|
|
|
|
self.threadLock.acquire()
|
|
|
|
self.save_mobie(dict(param_dict, **kwargs))
|
|
|
|
# 释放锁,开启下一个线程
|
|
|
|
self.threadLock.release()
|
|
|
|
except Exception as e:
|
|
|
|
writeError("解析手机出现异常信息:{0}".format(e))
|
|
|
|
else:
|
|
|
|
writeError("解析手机详情参数页{0}失败".format(param_url))
|
|
|
|
|
|
|
|
def save_mobie(self, mobie, ingore=False):
|
|
|
|
self.mobie_list.append(mobie)
|
|
|
|
writeInfo("目前爬虫进度,爬取数据量/目标数据量={current}/{max_count}({num})".format(current=len(self.mobie_list),
|
|
|
|
max_count=self.max_count,
|
|
|
|
num="{:.2%}".format(
|
|
|
|
len(self.mobie_list) / self.max_count)))
|
|
|
|
if not ingore and len(self.mobie_list) % self.data_size == 0:
|
|
|
|
self.save_excel(self.mobie_list[-self.data_size:])
|
|
|
|
elif ingore and len(self.mobie_list) % self.data_size != 0:
|
|
|
|
self.save_excel(self.mobie_list[-(len(self.mobie_list) % self.data_size):])
|
|
|
|
else:
|
|
|
|
writeInfo('缓存数据不足{0}条或没有剩余数据,不需要写入'.format(self.data_size))
|
|
|
|
|
|
|
|
def init_excel(self, file, max_index=None):
|
|
|
|
wb = Workbook()
|
|
|
|
ws = wb.active
|
|
|
|
for index, param_name in enumerate(self.param_name_list):
|
|
|
|
if max_index is None or index < max_index:
|
|
|
|
ws.cell(row=1, column=index + 1, value=param_name)
|
|
|
|
wb.save(file)
|
|
|
|
wb.close()
|
|
|
|
|
|
|
|
# 保存数据到excel
|
|
|
|
def save_excel(self, data_list):
|
|
|
|
# 文件不存在,初始化表头
|
|
|
|
if not os.path.exists(self.file1):
|
|
|
|
self.init_excel(self.file1)
|
|
|
|
wb = load_workbook(self.file1)
|
|
|
|
ws = wb.active
|
|
|
|
# 写入数据
|
|
|
|
max_row = ws.max_row
|
|
|
|
for row_index, data in enumerate(data_list):
|
|
|
|
for column_index, param_name in enumerate(self.param_name_list):
|
|
|
|
ws.cell(row=max_row + row_index + 1, column=column_index + 1,
|
|
|
|
value=data[param_name] if param_name in data else None)
|
|
|
|
wb.save(self.file1)
|
|
|
|
wb.close()
|
|
|
|
|
|
|
|
# 清洗脏数据
|
|
|
|
def clear_data(self):
|
|
|
|
# 源数据
|
|
|
|
source_wb = load_workbook(self.file1)
|
|
|
|
source_ws = source_wb.active
|
|
|
|
# 清洗脏数据后的新报表
|
|
|
|
self.init_excel(self.file2, max_index=self.param_required_index)
|
|
|
|
target_wb = load_workbook(self.file2)
|
|
|
|
target_ws = target_wb.active
|
|
|
|
write_row = 2
|
|
|
|
for current_row in range(2, source_ws.max_row + 1):
|
|
|
|
for current_column in range(1, self.param_required_index + 1):
|
|
|
|
val = source_ws.cell(row=current_row, column=current_column).value
|
|
|
|
if val is None or len(val) == 0 or (
|
|
|
|
'参考价格' in self.param_name_list and
|
|
|
|
current_column == list(self.param_name_list).index(
|
|
|
|
'参考价格') + 1 and val in ['曝光','即将上市']):
|
|
|
|
for i in range(1, self.param_required_index + 1):
|
|
|
|
target_ws.cell(row=write_row, column=i, value='')
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
target_ws.cell(row=write_row, column=current_column, value=val)
|
|
|
|
if current_column == self.param_required_index:
|
|
|
|
write_row += 1
|
|
|
|
# 保存清洗结果
|
|
|
|
target_wb.save(self.file2)
|
|
|
|
target_wb.close()
|
|
|
|
|
|
|
|
def get_req(self, url, max_retries=3, **kwargs):
|
|
|
|
try:
|
|
|
|
return requests.get(url, headers=dict({
|
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
|
|
|
|
},
|
|
|
|
**kwargs))
|
|
|
|
except Exception as e:
|
|
|
|
writeError(e)
|
|
|
|
time.sleep(10)
|
|
|
|
max_retries -= 1
|
|
|
|
if max_retries > 0:
|
|
|
|
self.get_req(url, max_retries, **kwargs)
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
class myThread(threading.Thread):
|
|
|
|
def __init__(self, crawler, base_url, param_url, **kwargs):
|
|
|
|
threading.Thread.__init__(self)
|
|
|
|
self.crawler = crawler
|
|
|
|
self.base_url = base_url
|
|
|
|
self.param_url = param_url
|
|
|
|
self.kwargs = kwargs
|
|
|
|
|
|
|
|
def run(self) -> None:
|
|
|
|
self.crawler.get_mobie(self.base_url, self.param_url, **self.kwargs)
|