科技创新型企业爬虫与提醒导入
2025-07-06 18:38:41 # 技术笔记

1.每日爬取数据后,将爬取的公告通知,同步到数据库

2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)

取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据

检索是否有科技创新企业的公告/通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# 检查当日数据是否有科创企业名录

import re
import time
import pymysql
import requests
from gxt_spider import get_industry
from kjt_spider import get_sci_kjt
from sdszf_spider import get_sci_sdszf
from jinja2 import Template
import json

def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection


def query_today_kc_enterprises():
keywords = [
"科技型中小企业",
"高新技术企业",
"众创空间",
"科技领军企业",
"技术先进型服务企业",
"技术创新示范企业",
"专精特新",
"科技企业",
"瞪羚",
"独角兽",
"科技小巨人企业",
'小巨人']
not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"]
sql = build_sql_query(keywords, not_contain_keywords)

connection = connect_to_database()
try:
with connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()

return {
"total": len(results),
"list": results
}
finally:
connection.close()

def build_sql_query(keywords, not_contain_keywords):
like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords])
not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords])
sql = f"""
SELECT
CASE type
WHEN '1' THEN '山东省科学技术厅'
WHEN '2' THEN '山东省工业和技术化厅'
WHEN '3' THEN '山东省人民政府'
ELSE '未知类型'
END AS type_name,date,title,url FROM `sci_spider`
WHERE ({like_conditions})
AND ({not_like_conditions})
AND DATE(create_date) = DATE(NOW())
"""
return sql


def mail_sender(content):
import smtplib
from email.mime.text import MIMEText
from email.header import Header
# 第三方 SMTP 服务
mail_host = "smtp.163.com" # 设置服务器
mail_user = "18631839859@163.com" # 用户名
mail_pass = "GENGs7dM45TJDH6y" # 口令
sender = '18631839859@163.com'
receivers = ['wonder1999@126.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱

# message = MIMEText(content, 'plain', 'utf-8')
message = MIMEText(content, 'html', 'utf-8')
message['From'] = Header("科技型中小企业通知", 'utf-8')
message['To'] = Header("科技型中小企业", 'utf-8')

subject = '科技型中小企业通知'
message['Subject'] = Header(subject, 'utf-8')

try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException:
print("Error: 无法发送邮件")


def wx_web_hook(data):
"""
通过企业微信Webhook发送Markdown格式的消息
:param data: 包含通知数据的字典,结构应包含'total'和'list'键
:return: None
"""
# Webhook地址(请替换为你的实际Key)
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e"

# 构造Markdown内容
content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n"
for row in data['list']:
content += (
f"- [{row['title']}]({row['url']}) "
f"<font color=\"comment\">{row['date']}</font> "
f"<font color=\"warning\">{row['type_name']}</font>\n"
)

# 构建请求体
payload = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
# 发送请求并处理响应
try:
response = requests.post(webhook_url, json=payload)
response.raise_for_status() # 抛出HTTP错误
result = response.json()

if result.get("errcode") == 0:
print("✅ 消息发送成功")
else:
print(f"❌ 消息发送失败: {result.get('errmsg')}")

except requests.exceptions.RequestException as e:
print(f"⚠️ 请求异常: {e}")

if __name__ == '__main__':
get_industry(1, 2)
get_sci_kjt(1, 1)
get_sci_sdszf(1, 3)
data = query_today_kc_enterprises()
title = f"找到 {data['total']} 条疑似符合条件的记录:"
for row in data['list']:
print(row)

if data['total'] > 0:
wx_web_hook(data)
# mail_sender('测试消息')

工信厅爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import re
import time
import pymysql
import requests


# 数据库链接
def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection


def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()


def get_industry(page_num, type):
url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}')

user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue

da = re.findall(r'<div class="bottom"> <span> (.*?) </span>', response)
in_url = re.findall(r'target="_blank" href="(.*?)">', response)
content = re.findall(r'<a title="(.*?)" target="_blank"', response)

for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + in_url[i])

if len(da)*2 != len(in_url) or len(da)*2 != len(content):
print("数据不完整,跳过插入")
return

new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期

connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i][0:10] > new_date:
count = count + 1
cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type))

connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()

if __name__ == '__main__':
get_industry(1, 2)

科技厅爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import re
import time
import pymysql
import requests


def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection


def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()


def get_sci_kjt(page_num, type):
url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}')
user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue

da = re.findall(r'<span class="pull-right">(.*?)</span>', response)
sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response)
content = re.findall(r'<s></s>(.*?)</a></li>', response)

for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

if len(da) != len(sci_url) or len(da) != len(content):
print("数据不完整,跳过插入")
return

new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期

connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i] > new_date:
count = count + 1
cursor.execute(sql, (content[i], sci_url[i], da[i], type))

connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()


if __name__ == '__main__':
get_sci_kjt(1, 1)

山东省人民政府爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import re
import time
import pymysql
import requests


def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection


def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()


def get_sci_sdszf(page_num, type):
url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}')
user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue

# 提取日期
da = re.findall(r'<span>\s*(\d{4}-\d{2}-\d{2})\s*</span>', response)
# 提取链接
sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response)
# 提取标题(title 属性)
content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response)
# return
print(len(da), len(sci_url), len(content))

for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

if len(da) != len(sci_url) or len(da) != len(content):
print("数据不完整,跳过插入")
return

new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期

connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i] > new_date:
count = count + 1
cursor.execute(sql, (content[i], sci_url[i], da[i], type))

connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()


if __name__ == '__main__':
get_sci_sdszf(1, 3)