From f85ab74267dc2c7e608574e64b1a71c004d51a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BD=98=E5=95=9F=E5=8D=8E?= <1029559041@qq.com> Date: Mon, 23 Sep 2019 22:57:37 +0800 Subject: [PATCH] init --- main.py | 283 +------------------------------------------------------- test.py | 9 +- 2 files changed, 2 insertions(+), 290 deletions(-) diff --git a/main.py b/main.py index 40027e9..89325cd 100644 --- a/main.py +++ b/main.py @@ -1,298 +1,17 @@ -import gzip -import json import os -import random +import os import re import threading import time -import zlib - -import io -from typing import Optional, Callable, Any, Iterable, Mapping import requests -import win32api -import win32con -from PIL import Image from bs4 import BeautifulSoup # 手机实体类 from openpyxl import load_workbook, Workbook -from pynput.mouse import Controller, Button -from selenium.common.exceptions import NoSuchElementException -from selenium.webdriver.chrome import webdriver -from selenium.webdriver.chrome.options import Options -from urllib3.exceptions import HeaderParsingError from Crawler import MobilePhoneCrawler from config.config import cf, config_path from config.log import writeInfo, writeError -from bs4 import BeautifulSoup -import re - -headers = { - 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36', - 'cookie': 'hng=CN%7Czh-CN%7CCNY%7C156; lid=tb84443556; enc=Di1rYCRWDQZC0UbccH38rIzuBg8LLFGKPSeQNu0fJ6Atw1lfF%2BtBE6Jm3vKtkZ%2FcJwoY%2FA2OAFq1CCgzrB0Wmg%3D%3D; t=2ce95276273d2f0fec4b735114efb9f0; uc3=id2=UonSf2s8K7H57A%3D%3D&nk2=F5RNYQezF9ZVJA%3D%3D&lg2=URm48syIIVrSKA%3D%3D&vt3=F8dByuPZuePp%2FK4exO4%3D; tracknick=tb84443556; uc4=nk4=0%40FY4Gtg6GE3gLVPH74U0sgDg9VVYt&id4=0%40UOE4tAnGHWIKt7PI5bS6f4noV%2Bbp; lgc=tb84443556; _tb_token_=e83ebbe73eeff; cookie2=170434fc3c4cae5b8d05c6ca3e035a7d; cna=3uL0FRGpQX4CAdoUCaGR7Dw2; UM_distinctid=16d0a0bd4f67e9-0657eeb9eeb382-5373e62-384000-16d0a0bd4f7aff; otherx=e%3D1%26p%3D*%26s%3D0%26c%3D0%26f%3D0%26g%3D0%26t%3D0; _med=dw:2560&dh:1440&pw:2560&ph:1440&ist:0; cq=ccp%3D1; swfstore=175036; res=scroll%3A1613*5900-client%3A1613*924-offset%3A1613*5900-screen%3A2560*1440; CNZZDATA1256793290=2086205104-1567833369-%7C1567844169; pnm_cku822=098%23E1hvKpvUvbpvUpCkvvvvvjiPRFS96jtjn2MWsjD2PmPp1jr8RLqwtjY8RFdUsjtbRpGCvvLMMQvvmphvLhbyi9mFecXPjV5vsEe4jVDQpGoHbdiQpznCwaZOecXPjV5vsEe4jVDQpYLhbdiQpzwBwaZOecEvjV5vsCkwjVDQpGFvbdiQpzvxwa7ivpvUphvhrpcsXvmEvpvVpyUUCE%2BfKphv8hCvCbyvvh89phvOJpvvpYYvpm9vvvCHtZCv8vvvvhcDphvOKpvvBHvCvpvVphhvvvvvRphvChCvvvm5vpvhphvhHv%3D%3D; isg=BHBwqfKM2QBI7oUELIuAZ-OhQT7CuVQDVfnti2rBAUuPJRHPEs91k6SXeW3gtQzb; l=cBO-RSocq17jyErCBOCZlurza77TvIRAguPzaNbMi_5QN1TQITQOkrDBxe96cjWdtj8B4JuaUMv9-etuigILNzGHtBUV.' -} - -# 获取字典cookie -cookies = headers['cookie'].split(';') -cookie_list = [] -for cookie in cookies: - cookie_list.append({'name': cookie.split('=')[0], 'value': cookie.split('=')[1]}) - - -# 天猫手机爬虫 -# https://list.tmall.com/search_product.htm?q=%CA%D6%BB%FA&click_id=%CA%D6%BB%FA&from=mallfp..pc_1.7_hq&spm=875.7931836%2FB.a1z5h.8.66144265MD9ShM -class TmallCrawler(MobilePhoneCrawler): - - def __init__(self) -> None: - super().__init__() - # 手机实体数据 - self.session = requests.Session() - # 登录 - # self.login() - self.get_page() - - ''' - 登录 - ''' - - def login(self): - # 获取验证码图片 - login_url = 'https://qrlogin.taobao.com/qrcodelogin/generateQRCode4Login.do?from=tmall&appkey=00000000&umid_token=T4C16243DC287A311CA928E0D5EA177D443B864009178BBAA55A4CB86A4' - writeInfo(login_url) - login_res = self.session.get(login_url) - res_content = login_res.content.decode() - res_json = json.loads(res_content[res_content.index("{"):res_content.index("}") + 1]) - writeInfo(json.dumps(res_json, indent=1)) - img_url = res_json["url"] - img_res = self.session.get("http:%s" % img_url) - if img_res.status_code == 200: - img_name = 'login.png' - # 保存二维码图片 - with open(img_name, 'wb') as file: - file.write(img_res.content) - # 打开二维码图片 - Image.open(img_name).show() - win32api.MessageBox(0, "请打开手机淘宝扫描二维码", "提醒", win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL) - while True: - login_url = "https://qrlogin.taobao.com/qrcodelogin/qrcodeLoginCheck.do?lgToken={0}&defaulturl=https%3A%2F%2Fwww.tmall.com".format( - res_json['lgToken']) - writeInfo("login_url:{0}".format(login_url)) - check_login_res = self.session.get(login_url) - # 检查扫码结果 - if check_login_res.status_code == 200: - check_login_res_json = json.loads(check_login_res.content.decode()) - writeInfo(json.dumps(check_login_res_json, indent=1)) - if check_login_res_json['code'] == '10006': - # 扫码成功 - check_login_url = check_login_res_json['url'] - writeInfo("check_login_url={0}".format(check_login_url)) - login_res = self.session.get(check_login_url) - if login_res.status_code == 200: - # 重定向登陆身份验证 - login_res_html = BeautifulSoup(login_res.content, 'html.parser') - check_url = login_res_html.select_one("iframe")["src"] - writeInfo("check_url={0}".format(check_url)) - # 登录身份验证 - check_login_res = self.session.get(check_url) - if check_login_res.status_code == 200: - check_login_res_content = check_login_res.content.decode() - # 阿里巴巴集团 | 身份验证 - verify_modes_url = re.search("http.*verify_modes.*=", - check_login_res_content).group() + '1' - verify_modes_res = self.session.get(verify_modes_url) - if verify_modes_res.status_code == 200: - verify_modes_res_content = verify_modes_res.content.decode() - if '你最近购买过什么商品' in verify_modes_res_content: - raise Exception("触发图片验证,模拟请求失败") - else: - win32api.MessageBox(0, "请在手机淘宝上点击确认按钮登录", "提醒", - win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL) - # 检测手机淘宝确认状态 - htoken = re.search("htoken\".*[a-zA-Z]", verify_modes_res_content).group() - htoken = htoken[htoken.index(":") + 2:] - while True: - time.sleep(1) - check_status_res = self.session.get( - "https://passport.taobao.com/iv/onekey/check_status.do?htoken={0}".format( - htoken)) - if check_status_res.status_code == 200: - check_status_res_json = json.loads(check_status_res.content.decode()) - if check_status_res_json['content']['code'] == '1': - login_safe_res = self.session.get( - check_status_res_json['content']['url']) - if login_safe_res.status_code == 200: - # login_safe_res_content=login_safe_res.content.decode(login_safe_res.apparent_encoding) - # login_safe_href=re.search("https.*pass.tmall.com.*\w",login_safe_res_content).group() - # index_res = self.session.get(login_safe_href) - writeInfo("登录成功") - break - else: - raise Exception("模拟登陆请求失败!!!") - else: - writeInfo(json.dumps(check_status_res_json, indent=1)) - else: - raise Exception("模拟登陆请求失败!!!") - break - else: - raise Exception("模拟登陆请求失败!!!") - else: - raise Exception("模拟登陆请求失败!!!") - else: - raise Exception("模拟登陆请求失败!!!") - elif check_login_res_json['code'] == '10004': - self.login() - time.sleep(1) - else: - raise Exception("获取登陆二维码图片失败") - - ''' - 获取分页数据 - url:分页url - ''' - - def get_page(self): - # 商品列表页地址 - domain = "https://list.tmall.com/search_product.htm" - url = '{0}?q=%CA%D6%BB%FA&type=p&vmarket=&spm=875.7931836%2FB.a2227oh.d100&xl=shouji_1&from=mallfp..pc_1_suggest'.format( - domain) - while True: - # 获取分页响应数据 - res = self.session.get(url, headers=headers) - # 判断响应状态码200才做处理 - if res.status_code == 200: - try: - # 使用BeautifulSoup解析html - res_html = BeautifulSoup(res.content, 'html.parser') - # 验证码检测 - if 'security-X5' == res_html.select_one("title").text: - self.clickCaptcha(url) - # 获取当前页 - current_page = res_html.select_one("b[class=ui-page-cur]") - writeInfo("开始解析第{0}页的数据,url:{1}".format(current_page.text, url)) - # 获取商品列表里的每个超链接 - product_hrefs = res_html.select("#J_ItemList .productTitle>a") - for product_href in product_hrefs: - # 轮询超链接获取商品详情数据 - self.get_mobile("https:{0}".format(product_href['href'])) - # 超过指定数据量结束循环 - if len(self.mobile_list) == self.max_count: - break - except Exception as e: - writeError(e) - else: - writeError("获取分页信息失败,url:%s响应状态码:%d" % (url, res.status_code)) - url = "{0}{1}".format(domain, current_page.find_next_siblings()[0]['href']) - - ''' - 滑动认证 - res_html:滑动验证码页面源代码 - url:滑动验证码页面url - ''' - - def clickCaptcha(self, url): - try: - chrome_options = Options() - chrome_options.binary_location = cf.get('selenium', 'binary_location') - # 以root权限运行 - chrome_options.add_argument('--no-sandbox') - chrome_options.add_argument('--disable-dev-shm-usage') - # chrome_options.add_argument('--headless') - # 设置用户数据路径 - chrome_options.add_argument('--user-data-dir={0}'.format(cf.get('selenium', 'user_data_dir'))) - # 不加载图片 - chrome_options.add_argument('blink-settings=imagesEnabled=false') - # 禁用gpu加速 - chrome_options.add_argument('--disable-gpu') - # 最大化 - chrome_options.add_argument('--start-maximized') - # 全屏模式 - chrome_options.add_argument('start-fullscreen') - # 设置为开发者模式,防止被识别出来使用了Selenium - chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) - driver = webdriver.WebDriver(options=chrome_options, service_log_path="I:\ChromeUpdater\selenium.log") - # driver.set_window_rect(0,0,1024,768) - # 访问滑动验证页面 - driver.get(url) - try: - # 获取滑块 - nc_1_n1z = driver.find_element_by_css_selector("#nc_1_n1z") - # 获取滑动条 - nc_1__scale_text = driver.find_element_by_css_selector("#nc_1__scale_text") - # 滑块坐标中心 - mouse = Controller() - # 移动到滑块坐标中心 - x = nc_1_n1z.rect['x'] + nc_1_n1z.rect['width'] / 2 - y = nc_1_n1z.rect['y'] + nc_1_n1z.rect['height'] / 2 - mouse.position = (x, y) - time.sleep(0.5) - mouse.press(Button.left) - time.sleep(0.5) - mouse.move(x + nc_1__scale_text.rect['width'] - nc_1_n1z.rect['width'], y) - time.sleep(0.5) - mouse.release(Button.left) - while True: - if len(driver.find_elements_by_css_selector(".errloading")) > 0: - driver.quit() - self.clickCaptcha(url) - break - else: - pass - # sub_slide_width = random.randint(30, 50) - # action.move_by_offset(sub_slide_width, 0).perform() # 移动滑块 - # start += sub_slide_width - time.sleep(random.randint(1, 10) / 10) - cookie_list = driver.get_cookies() - # 关闭浏览器 - driver.quit() - except NoSuchElementException as e: - writeError(e) - driver.quit() - self.clickCaptcha(url) - except Exception as e: - writeError(e) - raise Exception("模拟滑动验证失败") - - ''' - 获取手机详情数据 - url:手机链接 - ''' - - def get_mobile(self, url, param_url=None, **kwargs): - res = self.session.get(url) - if res.status_code == 200: - res_html = BeautifulSoup(res.content, 'html.parser') - # 验证码检测 - if 'security-X5' == res_html.select_one("title").text: - self.clickCaptcha(url) - # 获取手机规格参数 - # 判断手机是否有规格参数 - if res_html.select_one("#J_Attrs") is None: - writeInfo("手机详情url:%s没有规格参数" % url) - else: - try: - ths = res_html.select("table:contains('规格参数') tbody>tr:not([class='tm-tableAttrSub']) th") - # 轮询规格参数表格里的每一行参数 - mobile_dict = {} - for th in ths: - if 'colspan' in th.attrs: - continue - # 字典存储规格参数 - key = str(th.text).strip() - value = str(th.next_sibling.text).strip() - mobile_dict[key] = value - # 存放到列表里 - self.mobile_list.append(mobile_dict) - writeInfo("添加手机:{0}信息".format(str(res_html.select_one("div[class=tb-detail-hd]>h1").text).strip())) - except Exception as e: - writeError(e) - else: - writeError("手机url:%s响应状态码:%d" % (url, res.status_code)) - - # 保存手机数据 - def save_mobile(self, mobile): - self.mobile_list.append(mobile) # 评测中心手机爬虫 diff --git a/test.py b/test.py index fbd571a..06260f5 100644 --- a/test.py +++ b/test.py @@ -1,14 +1,7 @@ +from main import CnmoCrawler -import requests - -from main import TmallCrawler, CnmoCrawler - -def abc(): - pass if __name__ == '__main__': - # 天猫爬虫测试 - # TmallCrawler().get_page() # 评测中心爬虫测试 CnmoCrawler().run() # print(int(300/100)) \ No newline at end of file