You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
mobilecrawler/main.py

257 lines
12 KiB

import os
import os
import re
import threading
import time
import requests
from bs4 import BeautifulSoup
# 手机实体类
from openpyxl import load_workbook, Workbook
from Crawler import MobiePhoneCrawler
from config.config import cf, config_path
from config.log import writeInfo, writeError
# 评测中心手机爬虫
# http://product.cnmo.com/all/product.html
class CnmoCrawler(MobiePhoneCrawler):
def __init__(self) -> None:
super().__init__()
self.threads = []
self.threadLock = threading.Lock()
try:
# 线程池大小
self.thread_count = int(cf.get('excel', 'thread_count'))
# 数据指定缓存数写入一次excel
self.data_size = int(cf.get('excel', 'data_size'))
# 获取文件保存路径
self.file1 = cf.get('excel', 'file1')
self.file2 = cf.get('excel', 'file2')
# 获取保存参数列表
self.param_name_list = cf.get('excel', 'param_name').split(',')
# 获取非空参数个数
self.param_required_index = int(cf.get('excel', 'param_required_index'))
# 采集数据量
self.max_count = int(cf.get('excel', 'max_count'))
except Exception as e:
writeError("初始化参数失败,异常信息{0},请检查配置文件{1}的配置".format(e, config_path))
raise
# 清空上次采集数据
if os.path.exists(self.file1):
os.remove(self.file1)
if os.path.exists(self.file2):
os.remove(self.file2)
def get_page(self):
# 起始页链接
start_url = 'http://product.cnmo.com/all/product.html'
# 下一页链接
next_page_url = None
while True:
current_page_url = start_url if next_page_url is None else next_page_url
writeInfo("开始解析列表页:{0}".format(current_page_url))
# 调用解析器解析网页请求体
res = self.get_req(current_page_url)
# 判断响应状态码,200正常返回
if res is not None and res.status_code == 200:
try:
writeInfo("列表页:{0}解析成功".format(current_page_url))
res_html = BeautifulSoup(self.uzipData(res.content), 'html.parser')
# 解析列表数据
li_s = res_html.select("ul.all-con-con-ul.cf>li")
for li in li_s:
if len(self.mobie_list) > self.max_count:
return
p = li.select_one('p.red')
# 多线程获取手机详情参数
time_to_market = re.search('\d{4}\d{2}', p.text)
thread = myThread(self, 'http:{0}'.format(li.select_one('a.name')['href']),
'http:{0}'.format(li.select_one('div.info>a:contains(参数)')['href']),
上市时间=None if time_to_market is None else time_to_market.group())
thread.start()
if len(self.threads) == self.thread_count:
for t in self.threads:
t.join()
writeInfo("清空线程池")
self.threads.clear()
self.threads.append(thread)
# 获取下一页链接
href = res_html.select_one(".pnext")["href"]
if len(href) == 0:
writeInfo('已经没有更多数据,爬虫程序将结束')
return
else:
next_page_url = 'http:{0}'.format(res_html.select_one(".pnext")["href"])
except Exception as e:
writeError("解析列表页出现异常信息:{0}".format(e))
else:
raise Exception("列表页:{0}解析失败".format(current_page_url))
def run(self):
try:
self.get_page()
except Exception as e:
writeError("采集数据出现异常,开始清洗脏数据")
writeInfo('采集数据完毕,开始清洗脏数据')
self.clear_data()
writeInfo('清洗脏数据完毕')
# 获取价格标签数据
def find_chinese(self, html, text):
if html.select_one('span:contains({0})'.format(text)) is None:
return html.select_one('span:contains({0})'.format(text.encode('gbk').decode('iso8859-1')))
else:
return html.select_one('span:contains({0})'.format(text))
def decode(self, text):
encoding = ['iso8859-1', 'iso8859-9']
decoding = 'gbk'
for i in encoding:
try:
return text.encode(i).decode(decoding)
except:
pass
return text
def get_mobie(self, base_url, param_url, **kwargs):
# 字典存储手机详细参数
param_dict = {}
writeInfo("开始解析手机详情参数页{0}".format(param_url))
# 获取网友综合评分
score_res = self.get_req(base_url)
if score_res is not None and score_res.status_code == 200:
score_res_html = BeautifulSoup(self.uzipData(score_res.content), 'html.parser')
param_dict['网友综合评分'] = score_res_html.select_one('div.pro-comm-stars').find_next('span',
{'class': 'red'}).text
mobie_res = self.get_req(param_url)
# 判断响应状态码,200正常返回
if mobie_res is not None and mobie_res.status_code == 200:
# 调用解析器解析网页请求体
try:
mobie_res_html = BeautifulSoup(mobie_res.content, 'html.parser')
phone_name = self.decode(mobie_res_html.select_one('#proName>a').text)
param_dict['手机名称'] = phone_name
writeInfo("开始解析手机\"{0}\"详细参数".format(phone_name))
# 参考价格
price = self.find_chinese(mobie_res_html, '参考价格')
if price is not None and price.find_next() is not None and "".join(
filter(str.isdigit, price.find_next().text)).isdigit():
param_dict['参考价格'] = "".join(filter(str.isdigit, price.find_next().text))
# 电商报价
price = self.find_chinese(mobie_res_html, '电商报价')
if price is not None and price.next_sibling is not None and "".join(
filter(str.isdigit, price.next_sibling.strip())).isdigit():
param_dict['电商报价'] = "".join(filter(str.isdigit, price.next_sibling().strip))
# 获取参数名
param_name_list = mobie_res_html.select('div.right>p')
for param_name in param_name_list:
# 获取参数值
param_dict[param_name['paramname']] = self.decode(param_name['paramvalue'])
# 获取锁,用于线程同步
self.threadLock.acquire()
self.save_mobie(dict(param_dict, **kwargs))
# 释放锁,开启下一个线程
self.threadLock.release()
except Exception as e:
writeError("解析手机出现异常信息:{0}".format(e))
else:
writeError("解析手机详情参数页{0}失败".format(param_url))
def save_mobie(self, mobie, ingore=False):
self.mobie_list.append(mobie)
writeInfo("目前爬虫进度,爬取数据量/目标数据量={current}/{max_count}({num})".format(current=len(self.mobie_list),
max_count=self.max_count,
num="{:.2%}".format(
len(self.mobie_list) / self.max_count)))
if not ingore and len(self.mobie_list) % self.data_size == 0:
self.save_excel(self.mobie_list[-self.data_size:])
elif ingore and len(self.mobie_list) % self.data_size != 0:
self.save_excel(self.mobie_list[-(len(self.mobie_list) % self.data_size):])
else:
writeInfo('缓存数据不足{0}条或没有剩余数据,不需要写入'.format(self.data_size))
def init_excel(self, file, max_index=None):
wb = Workbook()
ws = wb.active
for index, param_name in enumerate(self.param_name_list):
if max_index is None or index < max_index:
ws.cell(row=1, column=index + 1, value=param_name)
wb.save(file)
wb.close()
# 保存数据到excel
def save_excel(self, data_list):
# 文件不存在,初始化表头
if not os.path.exists(self.file1):
self.init_excel(self.file1)
wb = load_workbook(self.file1)
ws = wb.active
# 写入数据
max_row = ws.max_row
for row_index, data in enumerate(data_list):
for column_index, param_name in enumerate(self.param_name_list):
ws.cell(row=max_row + row_index + 1, column=column_index + 1,
value=data[param_name] if param_name in data else None)
wb.save(self.file1)
wb.close()
# 清洗脏数据
def clear_data(self):
# 源数据
source_wb = load_workbook(self.file1)
source_ws = source_wb.active
# 清洗脏数据后的新报表
self.init_excel(self.file2, max_index=self.param_required_index)
target_wb = load_workbook(self.file2)
target_ws = target_wb.active
write_row = 2
for current_row in range(2, source_ws.max_row + 1):
for current_column in range(1, self.param_required_index + 1):
val = source_ws.cell(row=current_row, column=current_column).value
if val is None or len(val) == 0 or (
'参考价格' in self.param_name_list and
current_column == list(self.param_name_list).index(
'参考价格') + 1 and val in ['曝光','即将上市']):
for i in range(1, self.param_required_index + 1):
target_ws.cell(row=write_row, column=i, value='')
break
else:
target_ws.cell(row=write_row, column=current_column, value=val)
if current_column == self.param_required_index:
write_row += 1
# 保存清洗结果
target_wb.save(self.file2)
target_wb.close()
def get_req(self, url, max_retries=3, **kwargs):
try:
return requests.get(url, headers=dict({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
},
**kwargs))
except Exception as e:
writeError(e)
time.sleep(10)
max_retries -= 1
if max_retries > 0:
self.get_req(url, max_retries, **kwargs)
else:
return None
class myThread(threading.Thread):
def __init__(self, crawler, base_url, param_url, **kwargs):
threading.Thread.__init__(self)
self.crawler = crawler
self.base_url = base_url
self.param_url = param_url
self.kwargs = kwargs
def run(self) -> None:
self.crawler.get_mobie(self.base_url, self.param_url, **self.kwargs)