import os import re from concurrent.futures.thread import ThreadPoolExecutor from typing import Tuple from urllib.parse import unquote import jieba import pymysql as pymysql import requests from Scripts import pdf2txt from bs4 import BeautifulSoup from jieba import posseg from lxml import etree from requests.cookies import RequestsCookieJar from config.config import cf from config.log import writeInfo, writeError import time # mysql数据库 class MysqlDB: # 建立连接 def connect(self): mysql = 'mysql' host = cf.get(mysql, 'host') user = cf.get(mysql, 'user') passwd = cf.get(mysql, 'passwd') db = cf.get(mysql, 'db') port = int(cf.get(mysql, 'port')) charset = cf.get(mysql, 'charset') return pymysql.connect(host=host, user=user, passwd=passwd, db=db, port=port, charset=charset) # 执行insert语句 def modify(self, sql, params=()): connection = self.connect() try: with connection.cursor() as cursor: # Create a new record if isinstance(params, Tuple) and len(params) > 0 and isinstance(params[0], Tuple): cursor.executemany(sql, params) else: cursor.execute(sql, params) # connection is not autocommit by default. So you must commit to save # your changes. # 提交事务 sql = ''' select LAST_INSERT_ID() ''' num = cursor.execute(sql) if num > 0: id = cursor.fetchall()[0] connection.commit() return id except Exception as e: writeError(e) finally: connection.close() # 查询语句 def query(self, sql, params=()): connection = self.connect() try: with connection.cursor() as cursor: cursor.execute(sql, params) return cursor.fetchall() except Exception as e: writeError(e) finally: connection.close() def parse(content): res_html = BeautifulSoup(content, "html.parser") # 论文下载标签 ResultCont ='div.ResultCont') params_list = [] for result in ResultCont: # 论文标题 title = str(result.select_one('div.title>a:nth-child(3)').text).strip() # 授予学位 resultResouceType = str(result.select_one('span.resultResouceType').text).strip() # 作者 author = str(result.select_one('>a').text).strip() # 学校 source = etree.HTML(result.select_one('div.Source').prettify()).xpath('//comment()[2]') if len(source) > 0: school = source[0].tail.strip() else: school = '' # 年份 year = str(result.select_one('span.blockspan').text).strip() # 关键词 tag = '' for a in'div.Keyword>a'): tag += f",{a.text}" if len(tag) > 0: tag = tag[1:] # 摘要 if result.select_one('div.summary'): summary = result.select_one('div.summary').text else: summary = '' info = { "title": title, "resultResouceType": resultResouceType, "author": author, "school": school, "year": "".join(filter(str.isdigit, year)), "tag": tag, "summary": summary } writeInfo('正在获取论文《{title}》的真实下载地址'.format(title=title)) onClick = result.select_one('a.result_opera_down')['onclick'] prefix = 'downLoadPermissions' suffix = ",'0'" match = re.findall('{prefix}.*{suffix}'.format(prefix=prefix, suffix=suffix), onClick) if len(match) > 0: match = match[0] # 下载参数 params_str = match[len(prefix) + 1:].split(",'") param_keys = ["page_cnt", "language", "resourceType", "source", "resourceId", "resourceTitle", "isoa"] params_obj = {} if len(params_str) == len(param_keys): for index, key in enumerate(param_keys): params_obj[key] = params_str[index].replace("'", "") params_list.append({**params_obj, **info}) else: writeError('匹配下载参数失败') else: writeError('匹配下载参数失败') return params_list base_url = '' profession = "计算机软件与理论" keyword = f'(专业%3A"{profession}")' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36", } db = MysqlDB() session = requests.Session() cookies = RequestsCookieJar() cookies.set('wengine_vpn_ticketlibcon_bupt_edu_cn', '85f585f32fdf27ca', path='/', domain='') cookies.set('remember_token', 'yeysYCGGzNIrcDDUNXOVyNXODUZzJpROJgLmsObLAXrsmibvEqcwqzRkDYlODYWA', path='/', domain='') session.cookies.update(cookies) pdf_dir = 'pdf' html_dir = 'html' executor = ThreadPoolExecutor(max_workers=2) class Word: def __init__(self, word, flag): self.word = word self.flag = flag def __eq__(self, other: object) -> bool: if isinstance(other, self.__class__): return self.word == other.word else: return False def __hash__(self) -> int: return hash(self.word) # 更新词库 def split_word(): jieba.enable_paddle() start = db.query('select min(id) from sys_paper')[0][0] end = db.query('select max(id) from sys_paper')[0][0] result = db.query('select word,flag from sys_word') filter_word = set(Word(_[0], _[1]) for _ in result) new_word=set() count = 0 for i in range(start, end + 1): txt_content = db.query('select txt_content from sys_paper where id=%s', (i))[0][0] words = posseg.cut(txt_content, use_paddle=True) for word, flag in words: # writeInfo(f'word={word},flag={flag}') if flag == 'n': word = re.sub(u"([^\u4e00-\u9fa5a-zA-Z])", "", word) w=Word(word, flag) if len(word) > 0 and w not in filter_word: new_word.add(w) count = count + 1 writeInfo(f'从{count}个词语中过滤出{len(new_word)}个新词汇') if len(new_word)>0: words = tuple((_.word, _.flag) for _ in new_word) db.modify('insert into sys_word(word,flag) values (%s,%s)', words) create_doc_vector() else: writeInfo('没有发现新词汇,不需要更新词库') table_name = 'sys_tfidf' def create_doc_vector(): start=time.time() writeInfo('开始计算文档向量') db.modify(f'drop table if exists {table_name}') db.modify(f''' create table {table_name} ( id bigint NOT NULL AUTO_INCREMENT, tfidf longtext not null, primary key (id) ) as select id, group_concat(tf * idf order by word) as tfidf from (select f.word, df, f.idf, round((LENGTH(txt_content) - LENGTH(REPLACE(txt_content, word, ''))) / LENGTH(word)) AS tf, id from sys_paper, (select word, sum(if(locate(word, txt_content) > 0, 1, 0)) as df, log((select count(*) from sys_paper) / sum(if(locate(word, txt_content) > 0, 1, 0))) + 1 as idf from sys_paper, sys_word group by word) as f) as f group by id ''') writeInfo(f'计算文档向量花费{round(time.time()-start)}s') # 文档向量计算 def compare_doc_vector(ids=None): if ids is not None: result1 = db.query(f'select * from {table_name} where id in ({",".join(ids)})') result2 = db.query(f'select * from {table_name} where id not in ({",".join(ids)})') for id1, tfidf1 in result1: for id2,tfidf2 in result2: print(f'id={id1}和id={id2}比较') # 文件格式转换保存到数据库 def save(des, res, params): des = des[1].split('=') file_name = unquote(des[1], 'utf-8').replace('"', '') if not os.path.exists(pdf_dir): os.mkdir(pdf_dir) writeInfo(f'{params["title"]} PDF文件大小{len(res.content)}字节') with open(f'{pdf_dir}/{file_name}', 'wb') as file: file.write(res.content) if not os.path.exists(html_dir): os.mkdir(html_dir) html_file = f'{html_dir}/{file_name.replace("pdf", "html")}' writeInfo(f'{params["title"]} BEGIN PDF转HTML') pdf2txt.main(['-o', html_file, '-Y', 'exact', f'{pdf_dir}/{file_name}']) writeInfo(f'{params["title"]} END PDF转HTML') with open(html_file, 'rb') as file: html_content = parse_html = BeautifulSoup(html_content, "html.parser") txt_content = parse_html.text.replace('\n', '').replace(' ', '') info = { "title": params['title'], "type": params['resultResouceType'], "author": params['author'], "profession": profession, "school": params['school'], "year": params['year'], "summary": params['summary'], "tag": params['tag'], "pdf_content": res.content, "html_content": html_content, "txt_content": txt_content, "create_time": time.time() } # writeInfo('论文信息{info}'.format(info=info)) writeInfo(f'{params["title"]} 插入数据库') db.modify( f'insert into sys_paper (author, create_time, pdf_content,html_content,txt_content, profession, school, summary, tag, title, type, year) value(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', ( info['author'], info['create_time'], info['pdf_content'], info['html_content'], info['txt_content'], info['profession'], info['school'], info['summary'], info['tag'] , info['title'], info['type'], info['year'] )) # 万方平台论文采集 def run(): for page in range(1, 100): res = session.get( f"{base_url}/search/{page}&pageSize=20&searchWord={keyword}&isTriggerTag=", headers=headers) if res.status_code == 200: params_list = parse(res.content) for params in params_list: params["base_url"] = base_url url = '{base_url}/search/{page_cnt}&language={language}&resourceType={resourceType}&source={source}&resourceId={resourceId}&resourceTitle={resourceTitle}&isoa={isoa}&type={resourceType}&first=null'.format( **params) res = session.get(url, headers=headers) if res.status_code == 200 and '' in res.url: res_html = BeautifulSoup(res.content, "html.parser") downloadIframe = res_html.select_one('#downloadIframe') if downloadIframe: res = session.get(downloadIframe["src"]) if res.status_code == 200 and 'download.ashx' in res.url: writeInfo("成功获取真实下载地址{path}".format(path=res.url)) res = session.get(res.url, headers=headers, stream=True) if res.status_code == 200 and 'pdf' in res.headers['Content-Type']: des = res.headers['Content-Disposition'].split(';') if len(des) == 2 and len(des[1].split('=')) == 2: executor.submit(save, des, res, params) else: writeError("非法响应类型") else: writeError("无法获取文档信息") else: writeError("无法获取文档真实下载地址") else: writeError("无法获取真实下载地址") else: writeError('error code={code}'.format(code=res.status_code)) else: writeError('error code={code}'.format(code=res.status_code))