master
潘啟华 5 years ago
parent 0031d4f9d3
commit f85ab74267
  1. 283
      main.py
  2. 9
      test.py

@ -1,298 +1,17 @@
import gzip
import json
import os import os
import random import os
import re import re
import threading import threading
import time import time
import zlib
import io
from typing import Optional, Callable, Any, Iterable, Mapping
import requests import requests
import win32api
import win32con
from PIL import Image
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
# 手机实体类 # 手机实体类
from openpyxl import load_workbook, Workbook from openpyxl import load_workbook, Workbook
from pynput.mouse import Controller, Button
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome import webdriver
from selenium.webdriver.chrome.options import Options
from urllib3.exceptions import HeaderParsingError
from Crawler import MobilePhoneCrawler from Crawler import MobilePhoneCrawler
from config.config import cf, config_path from config.config import cf, config_path
from config.log import writeInfo, writeError from config.log import writeInfo, writeError
from bs4 import BeautifulSoup
import re
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'cookie': 'hng=CN%7Czh-CN%7CCNY%7C156; lid=tb84443556; enc=Di1rYCRWDQZC0UbccH38rIzuBg8LLFGKPSeQNu0fJ6Atw1lfF%2BtBE6Jm3vKtkZ%2FcJwoY%2FA2OAFq1CCgzrB0Wmg%3D%3D; t=2ce95276273d2f0fec4b735114efb9f0; uc3=id2=UonSf2s8K7H57A%3D%3D&nk2=F5RNYQezF9ZVJA%3D%3D&lg2=URm48syIIVrSKA%3D%3D&vt3=F8dByuPZuePp%2FK4exO4%3D; tracknick=tb84443556; uc4=nk4=0%40FY4Gtg6GE3gLVPH74U0sgDg9VVYt&id4=0%40UOE4tAnGHWIKt7PI5bS6f4noV%2Bbp; lgc=tb84443556; _tb_token_=e83ebbe73eeff; cookie2=170434fc3c4cae5b8d05c6ca3e035a7d; cna=3uL0FRGpQX4CAdoUCaGR7Dw2; UM_distinctid=16d0a0bd4f67e9-0657eeb9eeb382-5373e62-384000-16d0a0bd4f7aff; otherx=e%3D1%26p%3D*%26s%3D0%26c%3D0%26f%3D0%26g%3D0%26t%3D0; _med=dw:2560&dh:1440&pw:2560&ph:1440&ist:0; cq=ccp%3D1; swfstore=175036; res=scroll%3A1613*5900-client%3A1613*924-offset%3A1613*5900-screen%3A2560*1440; CNZZDATA1256793290=2086205104-1567833369-%7C1567844169; pnm_cku822=098%23E1hvKpvUvbpvUpCkvvvvvjiPRFS96jtjn2MWsjD2PmPp1jr8RLqwtjY8RFdUsjtbRpGCvvLMMQvvmphvLhbyi9mFecXPjV5vsEe4jVDQpGoHbdiQpznCwaZOecXPjV5vsEe4jVDQpYLhbdiQpzwBwaZOecEvjV5vsCkwjVDQpGFvbdiQpzvxwa7ivpvUphvhrpcsXvmEvpvVpyUUCE%2BfKphv8hCvCbyvvh89phvOJpvvpYYvpm9vvvCHtZCv8vvvvhcDphvOKpvvBHvCvpvVphhvvvvvRphvChCvvvm5vpvhphvhHv%3D%3D; isg=BHBwqfKM2QBI7oUELIuAZ-OhQT7CuVQDVfnti2rBAUuPJRHPEs91k6SXeW3gtQzb; l=cBO-RSocq17jyErCBOCZlurza77TvIRAguPzaNbMi_5QN1TQITQOkrDBxe96cjWdtj8B4JuaUMv9-etuigILNzGHtBUV.'
}
# 获取字典cookie
cookies = headers['cookie'].split(';')
cookie_list = []
for cookie in cookies:
cookie_list.append({'name': cookie.split('=')[0], 'value': cookie.split('=')[1]})
# 天猫手机爬虫
# https://list.tmall.com/search_product.htm?q=%CA%D6%BB%FA&click_id=%CA%D6%BB%FA&from=mallfp..pc_1.7_hq&spm=875.7931836%2FB.a1z5h.8.66144265MD9ShM
class TmallCrawler(MobilePhoneCrawler):
def __init__(self) -> None:
super().__init__()
# 手机实体数据
self.session = requests.Session()
# 登录
# self.login()
self.get_page()
'''
登录
'''
def login(self):
# 获取验证码图片
login_url = 'https://qrlogin.taobao.com/qrcodelogin/generateQRCode4Login.do?from=tmall&appkey=00000000&umid_token=T4C16243DC287A311CA928E0D5EA177D443B864009178BBAA55A4CB86A4'
writeInfo(login_url)
login_res = self.session.get(login_url)
res_content = login_res.content.decode()
res_json = json.loads(res_content[res_content.index("{"):res_content.index("}") + 1])
writeInfo(json.dumps(res_json, indent=1))
img_url = res_json["url"]
img_res = self.session.get("http:%s" % img_url)
if img_res.status_code == 200:
img_name = 'login.png'
# 保存二维码图片
with open(img_name, 'wb') as file:
file.write(img_res.content)
# 打开二维码图片
Image.open(img_name).show()
win32api.MessageBox(0, "请打开手机淘宝扫描二维码", "提醒", win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL)
while True:
login_url = "https://qrlogin.taobao.com/qrcodelogin/qrcodeLoginCheck.do?lgToken={0}&defaulturl=https%3A%2F%2Fwww.tmall.com".format(
res_json['lgToken'])
writeInfo("login_url:{0}".format(login_url))
check_login_res = self.session.get(login_url)
# 检查扫码结果
if check_login_res.status_code == 200:
check_login_res_json = json.loads(check_login_res.content.decode())
writeInfo(json.dumps(check_login_res_json, indent=1))
if check_login_res_json['code'] == '10006':
# 扫码成功
check_login_url = check_login_res_json['url']
writeInfo("check_login_url={0}".format(check_login_url))
login_res = self.session.get(check_login_url)
if login_res.status_code == 200:
# 重定向登陆身份验证
login_res_html = BeautifulSoup(login_res.content, 'html.parser')
check_url = login_res_html.select_one("iframe")["src"]
writeInfo("check_url={0}".format(check_url))
# 登录身份验证
check_login_res = self.session.get(check_url)
if check_login_res.status_code == 200:
check_login_res_content = check_login_res.content.decode()
# 阿里巴巴集团 | 身份验证
verify_modes_url = re.search("http.*verify_modes.*=",
check_login_res_content).group() + '1'
verify_modes_res = self.session.get(verify_modes_url)
if verify_modes_res.status_code == 200:
verify_modes_res_content = verify_modes_res.content.decode()
if '你最近购买过什么商品' in verify_modes_res_content:
raise Exception("触发图片验证,模拟请求失败")
else:
win32api.MessageBox(0, "请在手机淘宝上点击确认按钮登录", "提醒",
win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL)
# 检测手机淘宝确认状态
htoken = re.search("htoken\".*[a-zA-Z]", verify_modes_res_content).group()
htoken = htoken[htoken.index(":") + 2:]
while True:
time.sleep(1)
check_status_res = self.session.get(
"https://passport.taobao.com/iv/onekey/check_status.do?htoken={0}".format(
htoken))
if check_status_res.status_code == 200:
check_status_res_json = json.loads(check_status_res.content.decode())
if check_status_res_json['content']['code'] == '1':
login_safe_res = self.session.get(
check_status_res_json['content']['url'])
if login_safe_res.status_code == 200:
# login_safe_res_content=login_safe_res.content.decode(login_safe_res.apparent_encoding)
# login_safe_href=re.search("https.*pass.tmall.com.*\w",login_safe_res_content).group()
# index_res = self.session.get(login_safe_href)
writeInfo("登录成功")
break
else:
raise Exception("模拟登陆请求失败!!!")
else:
writeInfo(json.dumps(check_status_res_json, indent=1))
else:
raise Exception("模拟登陆请求失败!!!")
break
else:
raise Exception("模拟登陆请求失败!!!")
else:
raise Exception("模拟登陆请求失败!!!")
else:
raise Exception("模拟登陆请求失败!!!")
elif check_login_res_json['code'] == '10004':
self.login()
time.sleep(1)
else:
raise Exception("获取登陆二维码图片失败")
'''
获取分页数据
url:分页url
'''
def get_page(self):
# 商品列表页地址
domain = "https://list.tmall.com/search_product.htm"
url = '{0}?q=%CA%D6%BB%FA&type=p&vmarket=&spm=875.7931836%2FB.a2227oh.d100&xl=shouji_1&from=mallfp..pc_1_suggest'.format(
domain)
while True:
# 获取分页响应数据
res = self.session.get(url, headers=headers)
# 判断响应状态码200才做处理
if res.status_code == 200:
try:
# 使用BeautifulSoup解析html
res_html = BeautifulSoup(res.content, 'html.parser')
# 验证码检测
if 'security-X5' == res_html.select_one("title").text:
self.clickCaptcha(url)
# 获取当前页
current_page = res_html.select_one("b[class=ui-page-cur]")
writeInfo("开始解析第{0}页的数据,url:{1}".format(current_page.text, url))
# 获取商品列表里的每个超链接
product_hrefs = res_html.select("#J_ItemList .productTitle>a")
for product_href in product_hrefs:
# 轮询超链接获取商品详情数据
self.get_mobile("https:{0}".format(product_href['href']))
# 超过指定数据量结束循环
if len(self.mobile_list) == self.max_count:
break
except Exception as e:
writeError(e)
else:
writeError("获取分页信息失败,url:%s响应状态码:%d" % (url, res.status_code))
url = "{0}{1}".format(domain, current_page.find_next_siblings()[0]['href'])
'''
滑动认证
res_html滑动验证码页面源代码
url滑动验证码页面url
'''
def clickCaptcha(self, url):
try:
chrome_options = Options()
chrome_options.binary_location = cf.get('selenium', 'binary_location')
# 以root权限运行
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
# chrome_options.add_argument('--headless')
# 设置用户数据路径
chrome_options.add_argument('--user-data-dir={0}'.format(cf.get('selenium', 'user_data_dir')))
# 不加载图片
chrome_options.add_argument('blink-settings=imagesEnabled=false')
# 禁用gpu加速
chrome_options.add_argument('--disable-gpu')
# 最大化
chrome_options.add_argument('--start-maximized')
# 全屏模式
chrome_options.add_argument('start-fullscreen')
# 设置为开发者模式,防止被识别出来使用了Selenium
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
driver = webdriver.WebDriver(options=chrome_options, service_log_path="I:\ChromeUpdater\selenium.log")
# driver.set_window_rect(0,0,1024,768)
# 访问滑动验证页面
driver.get(url)
try:
# 获取滑块
nc_1_n1z = driver.find_element_by_css_selector("#nc_1_n1z")
# 获取滑动条
nc_1__scale_text = driver.find_element_by_css_selector("#nc_1__scale_text")
# 滑块坐标中心
mouse = Controller()
# 移动到滑块坐标中心
x = nc_1_n1z.rect['x'] + nc_1_n1z.rect['width'] / 2
y = nc_1_n1z.rect['y'] + nc_1_n1z.rect['height'] / 2
mouse.position = (x, y)
time.sleep(0.5)
mouse.press(Button.left)
time.sleep(0.5)
mouse.move(x + nc_1__scale_text.rect['width'] - nc_1_n1z.rect['width'], y)
time.sleep(0.5)
mouse.release(Button.left)
while True:
if len(driver.find_elements_by_css_selector(".errloading")) > 0:
driver.quit()
self.clickCaptcha(url)
break
else:
pass
# sub_slide_width = random.randint(30, 50)
# action.move_by_offset(sub_slide_width, 0).perform() # 移动滑块
# start += sub_slide_width
time.sleep(random.randint(1, 10) / 10)
cookie_list = driver.get_cookies()
# 关闭浏览器
driver.quit()
except NoSuchElementException as e:
writeError(e)
driver.quit()
self.clickCaptcha(url)
except Exception as e:
writeError(e)
raise Exception("模拟滑动验证失败")
'''
获取手机详情数据
url手机链接
'''
def get_mobile(self, url, param_url=None, **kwargs):
res = self.session.get(url)
if res.status_code == 200:
res_html = BeautifulSoup(res.content, 'html.parser')
# 验证码检测
if 'security-X5' == res_html.select_one("title").text:
self.clickCaptcha(url)
# 获取手机规格参数
# 判断手机是否有规格参数
if res_html.select_one("#J_Attrs") is None:
writeInfo("手机详情url:%s没有规格参数" % url)
else:
try:
ths = res_html.select("table:contains('规格参数') tbody>tr:not([class='tm-tableAttrSub']) th")
# 轮询规格参数表格里的每一行参数
mobile_dict = {}
for th in ths:
if 'colspan' in th.attrs:
continue
# 字典存储规格参数
key = str(th.text).strip()
value = str(th.next_sibling.text).strip()
mobile_dict[key] = value
# 存放到列表里
self.mobile_list.append(mobile_dict)
writeInfo("添加手机:{0}信息".format(str(res_html.select_one("div[class=tb-detail-hd]>h1").text).strip()))
except Exception as e:
writeError(e)
else:
writeError("手机url:%s响应状态码:%d" % (url, res.status_code))
# 保存手机数据
def save_mobile(self, mobile):
self.mobile_list.append(mobile)
# 评测中心手机爬虫 # 评测中心手机爬虫

@ -1,14 +1,7 @@
from main import CnmoCrawler
import requests
from main import TmallCrawler, CnmoCrawler
def abc():
pass
if __name__ == '__main__': if __name__ == '__main__':
# 天猫爬虫测试
# TmallCrawler().get_page()
# 评测中心爬虫测试 # 评测中心爬虫测试
CnmoCrawler().run() CnmoCrawler().run()
# print(int(300/100)) # print(int(300/100))
Loading…
Cancel
Save