You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mobilecrawler/main.py

502 lines
25 KiB

5 years ago
import gzip
import json
import os
import random
import re
import threading
import time
import zlib
import io
from typing import Optional, Callable, Any, Iterable, Mapping
import requests
import win32api
import win32con
from PIL import Image
from bs4 import BeautifulSoup
# 手机实体类
from openpyxl import load_workbook, Workbook
from pynput.mouse import Controller, Button
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome import webdriver
from selenium.webdriver.chrome.options import Options
from urllib3.exceptions import HeaderParsingError
from Crawler import MobilePhoneCrawler
from config.config import cf, config_path
from config.log import writeInfo, writeError
from bs4 import BeautifulSoup
import re
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'cookie': 'hng=CN%7Czh-CN%7CCNY%7C156; lid=tb84443556; enc=Di1rYCRWDQZC0UbccH38rIzuBg8LLFGKPSeQNu0fJ6Atw1lfF%2BtBE6Jm3vKtkZ%2FcJwoY%2FA2OAFq1CCgzrB0Wmg%3D%3D; t=2ce95276273d2f0fec4b735114efb9f0; uc3=id2=UonSf2s8K7H57A%3D%3D&nk2=F5RNYQezF9ZVJA%3D%3D&lg2=URm48syIIVrSKA%3D%3D&vt3=F8dByuPZuePp%2FK4exO4%3D; tracknick=tb84443556; uc4=nk4=0%40FY4Gtg6GE3gLVPH74U0sgDg9VVYt&id4=0%40UOE4tAnGHWIKt7PI5bS6f4noV%2Bbp; lgc=tb84443556; _tb_token_=e83ebbe73eeff; cookie2=170434fc3c4cae5b8d05c6ca3e035a7d; cna=3uL0FRGpQX4CAdoUCaGR7Dw2; UM_distinctid=16d0a0bd4f67e9-0657eeb9eeb382-5373e62-384000-16d0a0bd4f7aff; otherx=e%3D1%26p%3D*%26s%3D0%26c%3D0%26f%3D0%26g%3D0%26t%3D0; _med=dw:2560&dh:1440&pw:2560&ph:1440&ist:0; cq=ccp%3D1; swfstore=175036; res=scroll%3A1613*5900-client%3A1613*924-offset%3A1613*5900-screen%3A2560*1440; CNZZDATA1256793290=2086205104-1567833369-%7C1567844169; pnm_cku822=098%23E1hvKpvUvbpvUpCkvvvvvjiPRFS96jtjn2MWsjD2PmPp1jr8RLqwtjY8RFdUsjtbRpGCvvLMMQvvmphvLhbyi9mFecXPjV5vsEe4jVDQpGoHbdiQpznCwaZOecXPjV5vsEe4jVDQpYLhbdiQpzwBwaZOecEvjV5vsCkwjVDQpGFvbdiQpzvxwa7ivpvUphvhrpcsXvmEvpvVpyUUCE%2BfKphv8hCvCbyvvh89phvOJpvvpYYvpm9vvvCHtZCv8vvvvhcDphvOKpvvBHvCvpvVphhvvvvvRphvChCvvvm5vpvhphvhHv%3D%3D; isg=BHBwqfKM2QBI7oUELIuAZ-OhQT7CuVQDVfnti2rBAUuPJRHPEs91k6SXeW3gtQzb; l=cBO-RSocq17jyErCBOCZlurza77TvIRAguPzaNbMi_5QN1TQITQOkrDBxe96cjWdtj8B4JuaUMv9-etuigILNzGHtBUV.'
}
# 获取字典cookie
cookies = headers['cookie'].split(';')
cookie_list = []
for cookie in cookies:
cookie_list.append({'name': cookie.split('=')[0], 'value': cookie.split('=')[1]})
# 天猫手机爬虫
# https://list.tmall.com/search_product.htm?q=%CA%D6%BB%FA&click_id=%CA%D6%BB%FA&from=mallfp..pc_1.7_hq&spm=875.7931836%2FB.a1z5h.8.66144265MD9ShM
class TmallCrawler(MobilePhoneCrawler):
def __init__(self) -> None:
super().__init__()
# 手机实体数据
self.session = requests.Session()
# 登录
# self.login()
self.get_page()
'''
登录
'''
def login(self):
# 获取验证码图片
login_url = 'https://qrlogin.taobao.com/qrcodelogin/generateQRCode4Login.do?from=tmall&appkey=00000000&umid_token=T4C16243DC287A311CA928E0D5EA177D443B864009178BBAA55A4CB86A4'
writeInfo(login_url)
login_res = self.session.get(login_url)
res_content = login_res.content.decode()
res_json = json.loads(res_content[res_content.index("{"):res_content.index("}") + 1])
writeInfo(json.dumps(res_json, indent=1))
img_url = res_json["url"]
img_res = self.session.get("http:%s" % img_url)
if img_res.status_code == 200:
img_name = 'login.png'
# 保存二维码图片
with open(img_name, 'wb') as file:
file.write(img_res.content)
# 打开二维码图片
Image.open(img_name).show()
win32api.MessageBox(0, "请打开手机淘宝扫描二维码", "提醒", win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL)
while True:
login_url = "https://qrlogin.taobao.com/qrcodelogin/qrcodeLoginCheck.do?lgToken={0}&defaulturl=https%3A%2F%2Fwww.tmall.com".format(
res_json['lgToken'])
writeInfo("login_url:{0}".format(login_url))
check_login_res = self.session.get(login_url)
# 检查扫码结果
if check_login_res.status_code == 200:
check_login_res_json = json.loads(check_login_res.content.decode())
writeInfo(json.dumps(check_login_res_json, indent=1))
if check_login_res_json['code'] == '10006':
# 扫码成功
check_login_url = check_login_res_json['url']
writeInfo("check_login_url={0}".format(check_login_url))
login_res = self.session.get(check_login_url)
if login_res.status_code == 200:
# 重定向登陆身份验证
login_res_html = BeautifulSoup(login_res.content, 'html.parser')
check_url = login_res_html.select_one("iframe")["src"]
writeInfo("check_url={0}".format(check_url))
# 登录身份验证
check_login_res = self.session.get(check_url)
if check_login_res.status_code == 200:
check_login_res_content = check_login_res.content.decode()
# 阿里巴巴集团 | 身份验证
verify_modes_url = re.search("http.*verify_modes.*=",
check_login_res_content).group() + '1'
verify_modes_res = self.session.get(verify_modes_url)
if verify_modes_res.status_code == 200:
verify_modes_res_content = verify_modes_res.content.decode()
if '你最近购买过什么商品' in verify_modes_res_content:
raise Exception("触发图片验证,模拟请求失败")
else:
win32api.MessageBox(0, "请在手机淘宝上点击确认按钮登录", "提醒",
win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL)
# 检测手机淘宝确认状态
htoken = re.search("htoken\".*[a-zA-Z]", verify_modes_res_content).group()
htoken = htoken[htoken.index(":") + 2:]
while True:
time.sleep(1)
check_status_res = self.session.get(
"https://passport.taobao.com/iv/onekey/check_status.do?htoken={0}".format(
htoken))
if check_status_res.status_code == 200:
check_status_res_json = json.loads(check_status_res.content.decode())
if check_status_res_json['content']['code'] == '1':
login_safe_res = self.session.get(
check_status_res_json['content']['url'])
if login_safe_res.status_code == 200:
# login_safe_res_content=login_safe_res.content.decode(login_safe_res.apparent_encoding)
# login_safe_href=re.search("https.*pass.tmall.com.*\w",login_safe_res_content).group()
# index_res = self.session.get(login_safe_href)
writeInfo("登录成功")
break
else:
raise Exception("模拟登陆请求失败!!!")
else:
writeInfo(json.dumps(check_status_res_json, indent=1))
else:
raise Exception("模拟登陆请求失败!!!")
break
else:
raise Exception("模拟登陆请求失败!!!")
else:
raise Exception("模拟登陆请求失败!!!")
else:
raise Exception("模拟登陆请求失败!!!")
elif check_login_res_json['code'] == '10004':
self.login()
time.sleep(1)
else:
raise Exception("获取登陆二维码图片失败")
'''
获取分页数据
url:分页url
'''
def get_page(self):
# 商品列表页地址
domain = "https://list.tmall.com/search_product.htm"
url = '{0}?q=%CA%D6%BB%FA&type=p&vmarket=&spm=875.7931836%2FB.a2227oh.d100&xl=shouji_1&from=mallfp..pc_1_suggest'.format(
domain)
while True:
# 获取分页响应数据
res = self.session.get(url, headers=headers)
# 判断响应状态码200才做处理
if res.status_code == 200:
try:
# 使用BeautifulSoup解析html
res_html = BeautifulSoup(res.content, 'html.parser')
# 验证码检测
if 'security-X5' == res_html.select_one("title").text:
self.clickCaptcha(url)
# 获取当前页
current_page = res_html.select_one("b[class=ui-page-cur]")
writeInfo("开始解析第{0}页的数据,url:{1}".format(current_page.text, url))
# 获取商品列表里的每个超链接
product_hrefs = res_html.select("#J_ItemList .productTitle>a")
for product_href in product_hrefs:
# 轮询超链接获取商品详情数据
self.get_mobile("https:{0}".format(product_href['href']))
# 超过指定数据量结束循环
if len(self.mobile_list) == self.max_count:
break
except Exception as e:
writeError(e)
else:
writeError("获取分页信息失败,url:%s响应状态码:%d" % (url, res.status_code))
url = "{0}{1}".format(domain, current_page.find_next_siblings()[0]['href'])
'''
滑动认证
res_html滑动验证码页面源代码
url滑动验证码页面url
'''
def clickCaptcha(self, url):
try:
chrome_options = Options()
chrome_options.binary_location = cf.get('selenium', 'binary_location')
# 以root权限运行
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--headless')
# 设置用户数据路径
chrome_options.add_argument('--user-data-dir={0}'.format(cf.get('selenium', 'user_data_dir')))
# 不加载图片
chrome_options.add_argument('blink-settings=imagesEnabled=false')
# 禁用gpu加速
chrome_options.add_argument('--disable-gpu')
# 最大化
chrome_options.add_argument('--start-maximized')
# 全屏模式
chrome_options.add_argument('start-fullscreen')
# 设置为开发者模式,防止被识别出来使用了Selenium
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
driver = webdriver.WebDriver(options=chrome_options, service_log_path="I:\ChromeUpdater\selenium.log")
# driver.set_window_rect(0,0,1024,768)
# 访问滑动验证页面
driver.get(url)
try:
# 获取滑块
nc_1_n1z = driver.find_element_by_css_selector("#nc_1_n1z")
# 获取滑动条
nc_1__scale_text = driver.find_element_by_css_selector("#nc_1__scale_text")
# 滑块坐标中心
mouse = Controller()
# 移动到滑块坐标中心
x = nc_1_n1z.rect['x'] + nc_1_n1z.rect['width'] / 2
y = nc_1_n1z.rect['y'] + nc_1_n1z.rect['height'] / 2
mouse.position = (x, y)
time.sleep(0.5)
mouse.press(Button.left)
time.sleep(0.5)
mouse.move(x + nc_1__scale_text.rect['width'] - nc_1_n1z.rect['width'], y)
time.sleep(0.5)
mouse.release(Button.left)
while True:
if len(driver.find_elements_by_css_selector(".errloading")) > 0:
driver.quit()
self.clickCaptcha(url)
break
else:
pass
# sub_slide_width = random.randint(30, 50)
# action.move_by_offset(sub_slide_width, 0).perform() # 移动滑块
# start += sub_slide_width
time.sleep(random.randint(1, 10) / 10)
cookie_list = driver.get_cookies()
# 关闭浏览器
driver.quit()
except NoSuchElementException as e:
writeError(e)
driver.quit()
self.clickCaptcha(url)
except Exception as e:
writeError(e)
raise Exception("模拟滑动验证失败")
'''
获取手机详情数据
url手机链接
'''
def get_mobile(self, url, param_url=None, **kwargs):
res = self.session.get(url)
if res.status_code == 200:
res_html = BeautifulSoup(res.content, 'html.parser')
# 验证码检测
if 'security-X5' == res_html.select_one("title").text:
self.clickCaptcha(url)
# 获取手机规格参数
# 判断手机是否有规格参数
if res_html.select_one("#J_Attrs") is None:
writeInfo("手机详情url:%s没有规格参数" % url)
else:
try:
ths = res_html.select("table:contains('规格参数') tbody>tr:not([class='tm-tableAttrSub']) th")
# 轮询规格参数表格里的每一行参数
mobile_dict = {}
for th in ths:
if 'colspan' in th.attrs:
continue
# 字典存储规格参数
key = str(th.text).strip()
value = str(th.next_sibling.text).strip()
mobile_dict[key] = value
# 存放到列表里
self.mobile_list.append(mobile_dict)
writeInfo("添加手机:{0}信息".format(str(res_html.select_one("div[class=tb-detail-hd]>h1").text).strip()))
except Exception as e:
writeError(e)
else:
writeError("手机url:%s响应状态码:%d" % (url, res.status_code))
# 保存手机数据
def save_mobile(self, mobile):
self.mobile_list.append(mobile)
# 评测中心手机爬虫
# http://product.cnmo.com/all/product.html
class CnmoCrawler(MobilePhoneCrawler):
def __init__(self) -> None:
super().__init__()
self.threads = []
self.threadLock = threading.Lock()
try:
# 线程池大小
self.thread_count = int(cf.get('excel', 'thread_count'))
# 数据指定缓存数写入一次excel
self.data_size = int(cf.get('excel', 'data_size'))
# 获取文件保存路径
self.file1 = cf.get('excel', 'file1')
self.file2 = cf.get('excel', 'file2')
# 获取保存参数列表
self.param_name_list = cf.get('excel', 'param_name').split(',')
# 获取非空参数个数
self.param_required_index = int(cf.get('excel', 'param_required_index'))
# 采集数据量
self.max_count = int(cf.get('excel', 'max_count'))
except Exception as e:
writeError("初始化参数失败,异常信息{0},请检查配置文件{1}的配置".format(e, config_path))
raise
# 清空上次采集数据
if os.path.exists(self.file1):
os.remove(self.file1)
if os.path.exists(self.file2):
os.remove(self.file2)
def get_page(self):
# 起始页链接
start_url = 'http://product.cnmo.com/all/product.html'
# 下一页链接
next_page_url = None
while True:
current_page_url = start_url if next_page_url is None else next_page_url
writeInfo("开始解析列表页:{0}".format(current_page_url))
# 调用解析器解析网页请求体
res = self.get_req(current_page_url)
# 判断响应状态码,200正常返回
if res is not None and res.status_code == 200:
try:
writeInfo("列表页:{0}解析成功".format(current_page_url))
res_html = BeautifulSoup(self.uzipData(res.content), 'html.parser')
# 解析列表数据
li_s = res_html.select("ul.all-con-con-ul.cf>li")
for li in li_s:
if len(self.mobile_list) > self.max_count:
return
p = li.select_one('p.red')
# 多线程获取手机详情参数
time_to_market = re.search('\d{4}\d{2}', p.text)
thread = myThread(self, 'http:{0}'.format(li.select_one('a.name')['href']),
'http:{0}'.format(li.select_one('div.info>a:contains(参数)')['href']),
上市时间=None if time_to_market is None else time_to_market.group())
thread.start()
if len(self.threads) == self.thread_count:
for t in self.threads:
t.join()
writeInfo("清空线程池")
self.threads.clear()
self.threads.append(thread)
# 获取下一页链接
next_page_url = 'http:{0}'.format(res_html.select_one(".pnext")["href"])
except Exception as e:
writeError("解析列表页出现异常信息:{0}".format(e))
else:
raise Exception("列表页:{0}解析失败".format(current_page_url))
def run(self):
self.get_page()
writeInfo('采集数据完毕,开始清洗脏数据')
self.clear_data()
writeInfo('清洗脏数据完毕')
def get_mobile(self, base_url, param_url, **kwargs):
# 字典存储手机详细参数
param_dict = {}
writeInfo("开始解析手机详情参数页{0}".format(param_url))
# 获取网友综合评分
score_res = self.get_req(base_url)
if score_res is not None and score_res.status_code == 200:
score_res_html = BeautifulSoup(self.uzipData(score_res.content), 'html.parser')
param_dict['网友综合评分'] = score_res_html.select_one('div.pro-comm-stars').find_next('span',
{'class': 'red'}).text
mobile_res = self.get_req(param_url)
# 判断响应状态码,200正常返回
if mobile_res is not None and mobile_res.status_code == 200:
# 调用解析器解析网页请求体
try:
mobile_res_html = BeautifulSoup(self.uzipData(mobile_res.content), 'html.parser')
phone_name = mobile_res_html.select_one('#proName>a').text
param_dict['手机名称'] = phone_name
writeInfo("开始解析手机{0}详细参数".format(phone_name))
# 参考价格
param_dict['参考价格'] = mobile_res_html.select_one('span:contains(参考价格)').find_next().text
# 电商报价
param_dict['电商报价'] = mobile_res_html.select_one('span:contains(电商报价)').next_sibling.strip()
# 获取参数名
param_name_list = mobile_res_html.select('div.right>p')
for param_name in param_name_list:
# 获取参数值
param_dict[param_name['paramname']] = param_name['paramvalue']
# 获取锁,用于线程同步
self.threadLock.acquire()
self.save_mobile(dict(param_dict, **kwargs))
# 释放锁,开启下一个线程
self.threadLock.release()
except Exception as e:
writeError("解析手机出现异常信息:{0}".format(e))
else:
writeError("解析手机详情参数页{0}失败".format(param_url))
def save_mobile(self, mobile, ingore=False):
self.mobile_list.append(mobile)
writeInfo("当前已爬取{0}台手机".format(len(self.mobile_list)))
if not ingore and len(self.mobile_list) % self.data_size == 0:
self.save_excel(self.mobile_list[-self.data_size:])
elif ingore and len(self.mobile_list) % self.data_size != 0:
self.save_excel(self.mobile_list[-(len(self.mobile_list) % self.data_size):])
else:
writeInfo('缓存数据不足{0}条或没有剩余数据,不需要写入'.format(self.data_size))
def init_excel(self, file, max_index=None):
wb = Workbook()
ws = wb.active
for index, param_name in enumerate(self.param_name_list):
if max_index is None or index < max_index:
ws.cell(row=1, column=index + 1, value=param_name)
wb.save(file)
wb.close()
# 保存数据到excel
def save_excel(self, data_list):
# 文件不存在,初始化表头
if not os.path.exists(self.file1):
self.init_excel(self.file1)
wb = load_workbook(self.file1)
ws = wb.active
# 写入数据
max_row = ws.max_row
for row_index, data in enumerate(data_list):
for column_index, param_name in enumerate(self.param_name_list):
ws.cell(row=max_row + row_index + 1, column=column_index + 1,
value=data[param_name] if param_name in data else None)
wb.save(self.file1)
wb.close()
# 清洗脏数据
def clear_data(self):
# 源数据
source_wb = load_workbook(self.file1)
source_ws = source_wb.active
# 清洗脏数据后的新报表
self.init_excel(self.file2,max_index=self.param_required_index)
target_wb = load_workbook(self.file2)
target_ws = target_wb.active
write_row = 2
for current_row in range(2, source_ws.max_row + 1):
for current_column in range(1, self.param_required_index + 1):
val = source_ws.cell(row=current_row, column=current_column).value
if val is None or len(val) == 0 or (
current_column == 2 and val == '曝光' or val == '即将上市'):
for i in range(1, self.param_required_index + 1):
target_ws.cell(row=write_row, column=i, value='')
break
else:
target_ws.cell(row=write_row, column=current_column, value=val)
if current_column == self.param_required_index:
write_row += 1
# 保存清洗结果
target_wb.save(self.file2)
def get_req(self, url, max_retries=3, **kwargs):
try:
return requests.get(url, headers=dict({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
},
**kwargs))
except Exception as e:
writeError(e)
time.sleep(10)
max_retries -= 1
if max_retries > 0:
self.get_req(url, max_retries, **kwargs)
else:
return None
class myThread(threading.Thread):
def __init__(self, crawler, base_url, param_url, **kwargs):
threading.Thread.__init__(self)
self.crawler = crawler
self.base_url = base_url
self.param_url = param_url
self.kwargs = kwargs
def run(self) -> None:
self.crawler.get_mobile(self.base_url, self.param_url, **self.kwargs)