牛客网爬虫

爬取牛客网的帖子,获取一些感兴趣的信息,比如职位内推等。
为获得每日最新数据,系统使用增量更新,老数据保存到mysql数据库中用于去重,使用NLP分类模型进行内容筛选,过滤掉无关内容,每日新增的数据发邮件提醒,使用crontab定时任务每天自动运行,邮件效果如下图所示:

邮件截图

MySQL数据表结构如下

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS `newcoder_search`(
`id` BIGINT NOT NULL,
`title` VARCHAR(40),
`content` text NOT NULL,
`user` VARCHAR(40) NOT NULL,
`url` VARCHAR(60) NOT NULL,
`created_time` datetime NOT NULL,
`edited_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

python爬虫

爬取并解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import requests
import json
import time
import re

def _parse_newcoder_page(data, skip_words, start_date):
assert data['success'] == True
pattern = re.compile("|".join(skip_words))
res = []
for x in data['data']['records']:
x = x['data']
dic = {"user": x['userBrief']['nickname']}

x = x['contentData'] if 'contentData' in x else x['momentData']
dic['title'] = x['title']
dic['content'] = x['content']
dic['id'] = int(x['id'])
dic['url'] = 'https://www.nowcoder.com/discuss/' + str(x['id'])

if len(skip_words) > 0 and pattern.search(x['title'] + x['content']) != None: #关键词正则过滤
continue

createdTime = x['createdAt'] if 'createdAt' in x else x['createTime']
dic['createTime'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(createdTime // 1000))
dic['editTime'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(x['editTime'] // 1000))

if dic['editTime'] < start_date: # 根据时间过滤
continue
res.append(dic)

return res


def get_newcoder_page(page = 1, keyword = "校招", skip_words = [], start_date = '2023'):
header = {
"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36",
"content-type":"application/json"
}
data = {
"type": "all",
"query": keyword,
"page": page,
"tag": [],
"order": "create"
}
x = requests.post('https://gw-c.nowcoder.com/api/sparta/pc/search', data = json.dumps(data), headers = header, )
data = _parse_newcoder_page(x.json(), skip_words, start_date)
return data

数据落库去重

将数据存入数据库,根据id去重,如果id不存在则insert,如果id已存在但editTime 有变化则update, 否则是重复的过滤掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import pymysql

def upsert_to_db(data, host, user, passwd, database,charset, port):
db = pymysql.connect(
host=host,
user=user,
passwd=passwd,
database = database,
charset=charset,
port = port
)
try:
cursor = db.cursor()
sql = "select id, edited_time from newcoder_search where id in ({})".format(",".join([str(x['id']) for x in data]))
cursor.execute(sql)
exists = cursor.fetchall()
dic = {x[0] : x[1].strftime("%Y-%m-%d %H:%M:%S") for x in exists}

insert_data = [[x[k] for k in x] for x in data if x['id'] not in dic]
update_data = [(x['editTime'], x['id']) for x in data if x['id'] in dic and dic[x['id']] != x['editTime']]
sql = "INSERT INTO newcoder_search (user, title, content, id, url, created_time, edited_time) VALUES(%s, %s, %s, %s, %s, %s, %s)"
cursor.executemany(sql, insert_data)
sql = "update newcoder_search set edited_time = %s where id = %s"
cursor.executemany(sql, update_data)
db.commit()
except Exception as e:
print("db error: ", e)
db.close()
return [x for x in data if x['id'] not in dic], [x for x in data if x['id'] in dic and dic[x['id']] != x['editTime']]

邮件发送

将新创建的帖子与原有的但更改过的帖子区分开发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import smtplib
from email.mime.text import MIMEText

def _table_html_generate(data):
s = '<table>'
s += '<tr>' + "\n".join(["<th>" + x + '</th>' for x in data[0]]) + '</tr>'
for d in data:
s += '<tr>' + "\n".join(["<td>" + str(d[x]) + '</td>' for x in d]) + '</tr>'
s += '</table>'
return s

def send_email(insert_data, update_data, mail_host, mail_user, mail_pass, sender, receivers):
msg = ''
if len(insert_data) > 0:
msg += '<h1>insert</h1></br>' + _table_html_generate(insert_data) + '</br></br>'
if len(update_data) > 0:
msg += '<h1>update</h1></br>' + _table_html_generate(update_data) + '</br></br>'
if msg == '':
msg = '<h1>今日无新增数据</h1></br>'

message = MIMEText(msg, 'html', 'utf-8')
message['Subject'] = '牛客网{}招聘信息'.format(time.strftime("%Y-%m-%d"))
message['From'] = sender
message['To'] = receivers[0]
try:
smtpObj = smtplib.SMTP_SSL(mail_host, 465)
#smtpObj.connect(mail_host, 465)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(
sender, receivers, message.as_string())
smtpObj.quit()
return True
except smtplib.SMTPException as e:
print('email send error: ', e)
return False

集成控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

def run(keywords, skip_words, db_config, mail_config = None):
res = []
for key in keywords:
print(key, time.strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 11):
print(i)
page = get_newcoder_page(i, key, skip_words)
if not page:
break
res.extend(page)
time.sleep(1)

result, ids = [], set() # 去重
for x in res:
if x['id'] in ids: continue
ids.add(x['id'])
result.append(x)

print("total num: ", len(result))
x = upsert_to_db(result, **db_config) # insert_data, update_data
if mail_config:
send_email(*x, **mail_config)

def main():
# 指定要过滤的词
skip_words=['求捞', '泡池子', '池子了', '池子中', 'offer对比', '总结一下', '给个建议', '开奖群', '没消息', '有消息', '拉垮', '求一个', '求助', '池子的', '决赛圈', 'offer比较', '求捞', '补录面经', '捞捞', '收了我吧', 'offer选择', '有offer了', '想问一下', 'kpi吗', 'kpi面吗', 'kpi面吧']

# 指定搜索的关键词
keywords = ['补招', '补录']

#配置数据库信息
db_config = {
"host" : "localhost",
"user" : "root",
"passwd" : "your password",
"database" : 'your database',
"charset" : 'utf8',
"port": your mysql port
}

# 配置邮箱信息
mail_config = {
"mail_host" : 'smtp server host',
"mail_user" : 'your user name',
"mail_pass" : 'password', # 密码(部分邮箱为授权码)
"sender" : 'sender email',
"receivers" : ["receivers email"]
}

run(keywords, skip_words, db_config, mail_config)



if __name__ == "__main__":
main()
print("end")

内容筛选

直接爬下来的帖子既包含招聘信息,又包含求职者发布的面经、讨论贴等其他内容,现在希望过滤掉那些无关的帖子。
第一版只写了一个根据关键词和正则表达式进行过滤的功能,用户指定skip_words,凡包含这里面的关键词的都会被过滤。过滤方法如下:

1
2
3
4
5
6
7
import re
skip_words = ['求捞', '泡池子', '兄弟们', '姐妹们', '家人们', '狗都\\w{0,2}去', '有推荐\\w{0,5}吗', '选offer', '交流一下', '该怎么办', '坐立不安', '辗转难眠', '哈哈哈', '求支招', '求经验', '抱大腿', '有没有\\w{0,3}懂', '诈骗', '毁约', '秋招历程', '求\\w{0,5}建议', '二战', '感觉有点悬', '写给\\w{0,10}同学', '好心人', '一脸懵逼', '纠结', '有推荐\\w{0,1}的', '如何准备', '帮\\w{0,1}选一下', '考研\\w{0,2}失败', '求指导', '开始了吗', '秋招总结', '校招总结', '还有机会吗', '池子了', '池子中', 'offer对比', '开奖群', '拉垮', '求一个', '求助', '池子的', '决赛圈', 'offer比较', '迷茫的人', '年度总结', '有没有友友', '救救孩子', '骂醒', '问\\w{0,2}大佬', '一般\\w{0,4}怎么找', '考研人', '求指导', '求捞', '补录面经', '捞捞', '收了我吧', 'offer选择', '想问一下', 'kpi\\w{0,1}吗', 'kpi\\w{0,1}吧']
s = "大佬们可以帮我看一下简历吗。 想参加春招,可以帮忙看一下简历吗 #如何看待2023届秋招# #简历# #春招提前批#...北京某末流211工商管理大类专业,没什么特别突出的经历,学校没有参加过大赛,"

pattern = re.compile("|".join(skip_words))
if pattern.search(s) != None:
print("filter out this data")

但这种方法也不是很准,还是会有漏网之鱼,怎么才能实现更精准的过滤呢?
一种可能的方案是训练一个NLP分类模型进行过滤,但这需要大量数据进行训练,目前我已爬取牛客网上历史数据4万多条,但需要标注数据,不太想人工去标数据,这个计划暂时搁置,代码和历史数据已开源在github newcoder-crawler
**[更新]**:最后还是忍不住想玩一下,花了一周时间,训练了一个帖子分类模型进行过滤,详细构建过程见如何从零开始构建一个网络讨论帖分类模型?,模型地址:roberta4h512.zip.
下载下来后解压,将roberta4h512文件夹放到与爬虫脚本同级目录下,模型推理的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from transformers import AutoTokenizer, AutoModelForSequenceClassification

def _batch_generate(texts, model, tokenizer, id2label = {0: '招聘信息', 1: '经验贴', 2: '求助贴'}, max_length = 128):
inputs = tokenizer( texts, return_tensors="pt", max_length=128, padding=True, truncation=True)
outputs = model(**inputs).logits.argmax(-1).tolist()
return [id2label[x] for x in outputs]

def model_predict(text_list, model = None, tokenizer = None, model_name = "roberta4h512", batch_size = 4):
if not text_list: return []
if not model:
model = AutoModelForSequenceClassification.from_pretrained(model_name)
if not tokenizer:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model.eval()
result, start = [], 0
while(start < len(text_list)):
result.extend(_batch_generate(text_list[start : start + batch_size], model, tokenizer))
start += batch_size
return result

最后,稍微改一下run函数,加入模型过滤的逻辑即可,另外还有一些人会把一个信息重复发布多份,这里加入一个根据内容进行去重的逻辑,修改后的run函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

def filter(data, unique_content, model = None, tokenizer = None):
# 模型过滤,根据页面内容去重
labels = model_predict([(str(x['title']) if x['title'] else "" ) + "\t" +
(str(x['content']) if x['content'] else "" ) for x in data], model, tokenizer)
result = []
for i, x in enumerate(data):
if x['content'] in unique_content or labels[i] != "招聘信息":
continue
unique_content.add(x['content'])
result.append(x)
return result

def run(keywords, skip_words, db_config, mail_config = None):
res = []
for key in keywords:
print(key, time.strftime("%Y-%m-%d %H:%M:%S"))
for i in range(1, 21):
print(i)
page = get_newcoder_page(i, key, skip_words,
start_date = time.strftime("%Y-%m-%d", time.localtime(time.time() - 15 * 24 * 60 * 60)))
if not page:
break
res.extend(page)
time.sleep(1)

res.sort(key = lambda x: len(x['content']))
result, ids = [], set() # 根据id去重
for x in res:
if x['id'] in ids:
continue
ids.add(x['id'])
result.append(x)

print("total num: ", len(result))
#print(result)
insert_data, update_data = upsert_to_db(result, **db_config)

if mail_config:
unique_content, shared_model, shared_tokenizer = set(), None, None
insert_data = filter(insert_data, unique_content, shared_model, shared_tokenizer)
update_data = filter(update_data, unique_content, shared_model, shared_tokenizer)
send_email(insert_data, update_data, **mail_config)


编写shell脚本,使用crontab自动运行

代码希望每天运行一次, 如果每次都手动运行的话,使用体验就很不好了,最好是放在服务器中,弄个croontab定时任务,每天自动运行一次。把启动的命令写成shell脚本如下:
shell脚本newcoder.sh内容如下:

1
2
3
source /root/anaconda3/bin/activate base
cd /root/chadqiu/crawler
python newcoder.py > server.log 2>&1

crontab配置

1
2
3
crontab -l  # 查看已经存在的定时任务
crontab -e #编辑/新加定时任务
service crond restart #重启,是刚才的配置更改生效

这里crontab -e新加配置内容如下,每天18:30运行一次:

1
30 18 * * * bash /root/chadqiu/crawler/newcoder.sh

cron配置语法规则
5个位置含义如下:

1
2
 Minute Hour Day  Month    Dayofweek   command
分钟 小时 天 月 天每星期 命令
1
2
3
4
“*”代表取值范围内的数字,
“/”代表”每”,
“-”代表从某个数字到某个数字,
“,”分开几个离散的数字

Nacos配置中心

刚才的main函数里有很多配置需要写,特别是过滤词、接收邮箱列表等可能会经常改变,每改一次就得重新改代码非常麻烦,因此引入了Nacos注册中心,将keywords, skip_words, db_config, mail_config这四个配置变量放在Nacos中,这样就可以动态修改了,在Nacos中配置为json格式,如下图所示
Nacos添加的参数截图
Nacos关键词配置截图
Nacos邮箱配置截图

代码稍作修改,加入一个get_config函数,并修改一下main函数,修改的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import nacos
import json

def get_config(SERVER_ADDRESSES, NAMESPACE, GROUP):
print(SERVER_ADDRESSES, NAMESPACE)
client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)

keywords = json.loads(client.get_config("newcoder.crawler.keywords", GROUP))
skip_words = json.loads(client.get_config("newcoder.crawler.skip_words", GROUP))
db_config = json.loads(client.get_config("newcoder.crawler.db_config", GROUP))
mail_config= json.loads(client.get_config("newcoder.crawler.mail_config", GROUP))
return keywords, skip_words, db_config, mail_config

def main():
SERVER_ADDRESSES = "ip:port"
NAMESPACE = "your namespace"
GROUP= "your group"

run(*get_config(SERVER_ADDRESSES, NAMESPACE, GROUP))

if __name__ == "__main__":
main()
print("end")

将上面两份代码整合后的完整代码见 github