You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
house-job/job_spider.py

151 lines
5.3 KiB

from gevent import monkey
from gevent.pool import Pool
import util
monkey.patch_all(select=False)
from requests.adapters import HTTPAdapter
from urllib3 import Retry
import time
import os
import logging
import requests
from queue import Queue
from bs4 import BeautifulSoup
def get_logger():
"""
创建日志实例
"""
formatter = logging.Formatter("%(asctime)s - %(message)s")
logger = logging.getLogger("monitor")
logger.setLevel(LOG_LEVEL)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
HEADERS = {
"X-Requested-With": "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36"
"(KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36",
}
START_URL = (
'https://search.51job.com/list/030200,000000,0000,00,9,99,%2520,2,{}.html?'
'lang=c&stype=&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&'
'companysize=99&providesalary=99&lonlat=0%2C0&radius=-1&ord_field=0&confirmdate=9&'
'fromType=&dibiaoid=0&address=&line=&specialarea=00&from=&welfare='
)
LOG_LEVEL = logging.INFO # 日志等级
POOL_MAXSIZE = 8 # 线程池最大容量
logger = get_logger()
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
class JobSpider:
"""
Job 网站爬虫类
"""
job_dir = 'data'
def __init__(self):
self.count = 1 # 记录当前爬第几条数据
self.company = []
self.desc_url_queue = Queue() # 线程池队列
self.pool = Pool(POOL_MAXSIZE) # 线程池管理线程,最大协程数
# 获取信息
def job_spider(self):
"""
爬虫入口
"""
urls = [START_URL.format(p) for p in range(1, 200)] # #resultList > div:nth-child(53)
for url in urls:
logger.info("爬取链接:{}\n{}".format(url, urls.index(url) + 1))
html = session.get(url, headers=HEADERS).content
bs = BeautifulSoup(html, "lxml").find("div", class_="dw_table").find_all("div", class_="el")
for b in bs:
try:
href = b.find("a")["href"]
self.desc_url_queue.put(href) # 岗位详情链接加入队列
except Exception:
pass
# 打印队列长度,即多少条岗位详情 url
logger.info("队列长度为 {} ".format(self.desc_url_queue.qsize()))
def post_require(self):
"""
爬取职位描述
"""
while True:
# 从队列中取 url
url = self.desc_url_queue.get()
resp = session.get(url, headers=HEADERS)
if resp.status_code == 200:
logger.info("爬取链接:{}\n{} 条岗位详情".format(url, self.count))
html = resp.content
self.desc_url_queue.task_done()
self.count += 1
else:
self.desc_url_queue.put(url)
continue
try:
bs_tmp = BeautifulSoup(html, 'lxml').select(
'body > div.tCompanyPage > div.tCompany_center.clearfix > div.tHeader.tHjob > div > div.cn')[0]
bs_tmp1 = bs_tmp.select('h1')[0]
bs_tmp2 = bs_tmp.select('strong')[0]
bs_tmp3 = bs_tmp.select('p.cname > a.catn')[0]
bs_tmp4 = bs_tmp.select(' p.msg.ltype')[0].text.replace(u'\xa0', '').split('|')
if len(bs_tmp4) == 5:
with open(os.path.join(self.job_dir, "岗位信息.txt"), 'ab+') as f:
tmp = {"岗位": bs_tmp1.text, "公司": bs_tmp3['title'], "薪资": bs_tmp2.text, '位置': bs_tmp4[0],
'工作经验': bs_tmp4[1], '学历': bs_tmp4[2], '招聘人数': bs_tmp4[3], '发布时间': bs_tmp4[4]}
f.write((str(tmp) + '\n').encode('utf-8'))
bs = BeautifulSoup(html, "lxml").find("div", class_="bmsg job_msg inbox").text
s = bs.replace("微信", "").replace("分享", "").replace("邮件", "").replace("\t", "").strip()
with open(os.path.join(self.job_dir, "岗位描述.txt"), "a", encoding="utf-8") as f:
f.write(s)
except Exception as e:
logger.error(e)
logger.warning(url)
def execute_more_tasks(self, target):
"""
协程池接收请求任务,可以扩展把解析,存储耗时操作加入各自队列,效率最大化
:param target: 任务函数
:param count: 启动线程数量
"""
for i in range(POOL_MAXSIZE):
self.pool.apply_async(target)
def run(self):
if os.path.exists(self.job_dir):
util.clearDir(self.job_dir)
else:
os.mkdir(self.job_dir)
"""
多线程爬取数据
"""
self.job_spider()
self.execute_more_tasks(self.post_require)
self.desc_url_queue.join() # 主线程阻塞,等待队列清空
if __name__ == "__main__":
spider = JobSpider()
start = time.time()
spider.run()
logger.info("总耗时 {}".format(time.time() - start))