from gevent import monkey from gevent.pool import Pool import util monkey.patch_all(select=False) from requests.adapters import HTTPAdapter from urllib3 import Retry import time import os import logging import requests from queue import Queue from bs4 import BeautifulSoup def get_logger(): """ 创建日志实例 """ formatter = logging.Formatter("%(asctime)s - %(message)s") logger = logging.getLogger("monitor") logger.setLevel(LOG_LEVEL) ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) return logger HEADERS = { "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36" "(KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36", } START_URL = ( 'https://search.51job.com/list/030200,000000,0000,00,9,99,%2520,2,{}.html?' 'lang=c&stype=&postchannel=0000&workyear=99&cotype=99°reefrom=99&jobterm=99&' 'companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&' 'fromType=&dibiaoid=0&address=&line=&specialarea=00&from=&welfare=' ) LOG_LEVEL = logging.INFO # 日志等级 POOL_MAXSIZE = 8 # 线程池最大容量 logger = get_logger() session = requests.Session() retry = Retry(connect=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) class JobSpider: """ Job 网站爬虫类 """ job_dir = 'data' def __init__(self): self.count = 1 # 记录当前爬第几条数据 self.company = [] self.desc_url_queue = Queue() # 线程池队列 self.pool = Pool(POOL_MAXSIZE) # 线程池管理线程,最大协程数 # 获取信息 def job_spider(self): """ 爬虫入口 """ urls = [START_URL.format(p) for p in range(1, 200)] # #resultList > div:nth-child(53) for url in urls: logger.info("爬取链接:{}\n第 {} 页".format(url, urls.index(url) + 1)) html = session.get(url, headers=HEADERS).content bs = BeautifulSoup(html, "lxml").find("div", class_="dw_table").find_all("div", class_="el") for b in bs: try: href = b.find("a")["href"] self.desc_url_queue.put(href) # 岗位详情链接加入队列 except Exception: pass # 打印队列长度,即多少条岗位详情 url logger.info("队列长度为 {} ".format(self.desc_url_queue.qsize())) def post_require(self): """ 爬取职位描述 """ while True: # 从队列中取 url url = self.desc_url_queue.get() resp = session.get(url, headers=HEADERS) if resp.status_code == 200: logger.info("爬取链接:{}\n第 {} 条岗位详情".format(url, self.count)) html = resp.content self.desc_url_queue.task_done() self.count += 1 else: self.desc_url_queue.put(url) continue try: bs_tmp = BeautifulSoup(html, 'lxml').select( 'body > div.tCompanyPage > div.tCompany_center.clearfix > div.tHeader.tHjob > div > div.cn')[0] bs_tmp1 = bs_tmp.select('h1')[0] bs_tmp2 = bs_tmp.select('strong')[0] bs_tmp3 = bs_tmp.select('p.cname > a.catn')[0] bs_tmp4 = bs_tmp.select(' p.msg.ltype')[0].text.replace(u'\xa0', '').split('|') if len(bs_tmp4) == 5: with open(os.path.join(self.job_dir, "岗位信息.txt"), 'ab+') as f: tmp = {"岗位": bs_tmp1.text, "公司": bs_tmp3['title'], "薪资": bs_tmp2.text, '位置': bs_tmp4[0], '工作经验': bs_tmp4[1], '学历': bs_tmp4[2], '招聘人数': bs_tmp4[3], '发布时间': bs_tmp4[4]} f.write((str(tmp) + '\n').encode('utf-8')) bs = BeautifulSoup(html, "lxml").find("div", class_="bmsg job_msg inbox").text s = bs.replace("微信", "").replace("分享", "").replace("邮件", "").replace("\t", "").strip() with open(os.path.join(self.job_dir, "岗位描述.txt"), "a", encoding="utf-8") as f: f.write(s) except Exception as e: logger.error(e) logger.warning(url) def execute_more_tasks(self, target): """ 协程池接收请求任务,可以扩展把解析,存储耗时操作加入各自队列,效率最大化 :param target: 任务函数 :param count: 启动线程数量 """ for i in range(POOL_MAXSIZE): self.pool.apply_async(target) def run(self): if os.path.exists(self.job_dir): util.clearDir(self.job_dir) else: os.mkdir(self.job_dir) """ 多线程爬取数据 """ self.job_spider() self.execute_more_tasks(self.post_require) self.desc_url_queue.join() # 主线程阻塞,等待队列清空 if __name__ == "__main__": spider = JobSpider() start = time.time() spider.run() logger.info("总耗时 {} 秒".format(time.time() - start))