实战 | 深圳ETC数据存储可视化
关于
数据源
点击:深圳市政府数据开放平台
需求
- 每秒钟的车辆出站数
- 实时更新当前时间出站总数
架构
Jmeter + kafka + MySQL + flask + echarts
效果
项目网址
点击:深圳ETC数据存储可视化
Jmeter
Jmeter请求数据
参考教程:Jmeter | HTTP请求
接口数据存入csv文件,利用HTTP请求从csv文件取数据
配置HTTP请求
添加HTTP Request
添加参数param接收CSV文件中提取出的数据,值为data变量的值
配置csv文件
添加
CSV Data Set Config
添加
Debug Sample
:查看变量值自定义变量
data
,接收从csv文件中提取出的数据\t
分隔符 保证每次请求获得一行数据
测试
自定义变量test作为值传到HTTP的参数data中
Jmeter连接kafka
参考教程:Jmeter | 连接Kafka
配置java请求
添加
Java Request
: 连接kafkakafka_message
:将HTTP请求中的data参数作为变量如此可以实现,CSV中的数据以行为单位一条一条传入Kafka,
测试
发送数据
将线程组
中的循环次数
设置为永远
可以在线程组
最后添加constant timer
组件,设置请求数据的间隔
在以上的配置基础上,就可以将csv中的所有数据发送到kafka中的指定主题和分区中,供后续消费使用。
Kafka对接MySQL
准备工作
数据分析
a. 备注全为“深圳入”,所以删掉
b. 将车型分为两个属性存储 *型车——CX1,客/货——CX2
c. 车牌信息只保留省份建库建表
1 | create database etc; |
代码
- 连接MySQL
- 数据存入MySQL
- 连接kafka
- 在消费者的循环中添加存入MySQL的函数调用
1 | from kafka import KafkaConsumer |
Mysql对接Flask
假设表中已经存有全部的数据,将数据全部取出,通过前端实现实时效果。
- 连接MySQL
- 根据需求创建函数获取数据
- 在flask中导入文件调用函数
每秒出站数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66import pymysql
import traceback
# -------- 1.连接MySQL -------------
def get_conn():
conn = pymysql.connect(host="******", # 本地ip或你的域名地址
user="******", # MySQL登录用户名
password="******", # MySQL登录密码
db="etc", # 储存数据的数据库名
charset="utf8")
# conn表示连接数据库,对数据的操作需要通过cursor来实现
cursor = conn.cursor()
return conn, cursor
def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
# -------- 2.获取数据 -------------
def cksj():
cursor = None
conn = None
try:
# --------- 获取数据 ----------------
conn, cursor = get_conn()
sql = '''
select count(XH),CKSJ
from main
group by CKSJ
order by CKSJ
'''
cursor.execute(sql)
data = cursor.fetchall()
print(data)
# 【output】(部分)
# ((26, datetime.datetime(2020, 12, 22, 0, 0)), (32, datetime.datetime(2020, 12, 22, 0, 1)), (36, datetime.datetime(2020, 12, 22, 0, 2)),...)
# --------- 加工数据 ---------------
List = [] # 初始化数据集
for i in data:
# 将每秒的数据封装成echarts可以识别的种类
item = {'name': i[1].strftime('%Y-%m-%d %H:%M:%S'),
'value': [i[1].strftime('%Y-%m-%d %H:%M:%S'), i[0]]}
List.append(item)
print(List)
# 【output】(部分)
# [{'name': '2020-12-22 00:00:00', 'value': ['2020-12-22 00:00:00', 26]}, {'name': '2020-12-22 00:01:00', 'value': ['2020-12-22 00:01:00', 32]},...]
conn.commit()
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
return List
if __name__ == '__main__':
cksj()- 截止当前时间总出站数
通过flask调用cksj()
获取返回的数据进行加工即可,详见下文
flask 获取数据
为每个需求所需的数据创建不同的路由接口,前端采用ajax()方法获取数据
1 | import json |
前端实现实时效果
back_to_front.js
:利用ajax方法获取后台数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// back_to_front.js
function get_data() {
$.ajax({
url: "/c1",
dataType: "json",
success: function(data) {
ck=data;
}
});
}
function get_sum() {
$.ajax({
url:"/Sum",
dataType: "json",
traditional: true,
success: function(data) {
sum=data
},
error: function(xhr, type, errorThrown) {
}
})
}
get_data()
get_sum()
// 因为一次获取了全部数据,所以不需要设置获取后台数据的刷新时间
// setInterval(get_sum, 1000)
// setInterval(get_data, 1000)cksj.js
:初始化echarts实例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103// cksj.js
//初始化echarts实例
var chartDom=document.getElementById('c2');
var myChart= echarts.init(chartDom,'dark');
var option_cksj;
function randomData() {
now= newDate(+now+oneDay);
return {
name:now.toString(),
value: [
// 初始化表中前20s的数据,设置值为10
[[now.getFullYear(),now.getMonth()+1,now.getDate()].join('-'),[now.getHours(),now.getMinutes()].join(':')].join(' '),
10
]
};
}
// 初始化表中的前20条数据
let data= [];
let now= newDate(2020,11,21,23,40,0); // 因为数据起始是2020-11-22-00,所以初始化的数据起始为20s前
let oneDay= 60 * 1000;
// 将初始化的20条数据加入数据集中
for(let i =0;i<20;i++){
data.push(randomData())
}
option_cksj= {
title: {
text: '出站数量实时统计'
},
tooltip: {
trigger: 'axis',
formatter: function (params) {
params = params[0];
var date = newDate(params['name']);
return (
date.getDate() +
'日' +
date.getHours() +
'点' +
date.getMinutes() +
' 分: ' +
params['value'][1]
);
},
axisPointer: {
animation: false
}
},
xAxis: {
type: 'time',
splitLine: {
show: false
},
formatter: function (data) {
var t_date = newDate(data[0]['name']);
return [t_date.getMonth() , t_date.getDate()].join('/') + " "
+ [t_date.getHours(), t_date.getMinutes()].join(':');
}
},
yAxis: {
type: 'value',
boundaryGap: [0, '100%'],
splitLine: {
show: false
}
},
series: [
{
name: 'Fake Data',
type: 'line',
showSymbol: false,
data:data
}
]
};
setInterval(function () {
for (var i = 0; i < 1; i++) {
data.shift() // 删除echarts数据集中的第一个数据
data.push(ck[0]) // 在末尾添加后台传来的数据集中的第一条数据
ck.shift() // 删除后台数据集中的第一条数据,实现下一秒的数据在第一位等待添加进echarts数据集
}
myChart.setOption({
series: [
{
data:data
}
]
});
}, 1000);
option_cksj&&myChart.setOption(option_cksj);main.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<html>
<head>
<!-- 刷新sum数据 -->
<script>
function all(){
setInterval(function (){
$("#a1").html(sum[0]); // 将id为a1的标签内容赋值为sum数组第一个元素
sum.shift() // 删除数组第一个元素,使下一秒的元素处在第一位
},1000);
}
</script>
<meta charset="utf-8">
<title>疫情监控</title>
<link href="../static/css/main.css" rel="stylesheet"/>
<script src="../static/js/echarts.js"></script>
</head>
<body onload="all()">
<div id='title'>
深圳ETC
<div id='time'>时间</div>
</div>
<div id='c1'>
<div class="num" id="a1"></div>
<div class="num">*</div>
<div class="num">*</div>
<div class="num">*</div>
<div class="txt">总出站数</div>
<div class="txt">待定</div>
<div class="txt">待定</div>
<div class="txt">待定</div>
</div>
<div id='c2'>c2</div>
<div id='l1'>l1</div>
<div id='l2'>l2</div>
<div id='r1'>r1</div>
<div id='r2'>r2</div>
<script src="../static/js/back_to_front.js"></script>
<script src="../static/js/cksj.js"></script>
</body>
</html>
附 :后端实现实时效果
在后端实现了数据的实时传输,但静态数据集的限制使得前端的展示效果不能严格用“实时出站”解释
每次展示的时候,清空etc表的全部数据,运行jmeter脚本,数据会一条一条进入kafka存入MySQL然后被flask获取最后在前端展示
本质区别是从数据库获取数据的时间点
- 前者是当全部数据存入MySQL后,对全部数据进行查询
- 后者是实时消费kafka的数据,立即存入MySQL,并且立即进行查询
需求1:每秒出站数
需要修改SQL的查询语句,并且不需要循环对每条数据进行包装,只需要对查询到的一条数据包装即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32def cksj():
cursor = None
conn = None
try:
conn, cursor = get_conn()
sql = '''
select count(XH),CKSJ
from etc
group by CKSJ
order by CKSJ
desc
limit 1;'''
cursor.execute(sql)
data=cursor.fetchall()
print(data)
# 【output】(某一时刻)
# ((11, datetime.datetime(2020, 12, 22, 3, 15)),)
List={'name': data[0][1].strftime('%Y-%m-%d %H:%M:%S'), 'value': [data[0][1].strftime('%Y-%m-%d %H:%M:%S'),data[0][0]]}
print(List)
# 【output】(某一时刻)
# {'name': '2020-12-22 03:15:00', 'value': ['2020-12-22 03:15:00', 11]}
conn.commit()
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
return List在
back_to_front.js
中封装了ajax方法的get_data()
需要被定时刷新,不停地更新获取的数据,在文件末尾加上setInterval(get_data, 1000)
cksj.js
中的ck
变量也不再是完整的数据集,而是每一秒刷新的一条实时数据在
cksj.js
的setInterval
中设置if判断,如果得到的新数据与上次有相同的出站时间,则这一秒进行的就是数量的刷新,如果是新的时间,就将echarts数据集刷新,添加新的数据
1
2
3
4
5
6
7
8
9
10
11
12
setInterval(function () {
for (var i = 0; i < 1; i++) {
if(data[data.length-1]["name"]==ck['name']){
data[data.length-1]['value']=ck['value']
}
else {
data.shift();
data.push(ck);
}
}
需求2:截止当前出站总数
- 不停地统计当前MySQL表中数据总量,就能反映出总出站数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18def getAll():
cursor = None
conn = None
try:
conn, cursor = get_conn()
sql = '''
select count(XH)
from etc
'''
cursor.execute(sql)
data = cursor.fetchall()[0][0]
conn.commit()
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
return data flask添加数据传递
1
2
3
4
5
def get_sum():
Sum=Mysql2Flask.getAll()
print(Sum)
return jsonify(Sum)- 在
back_to_front.js
中get_sum()
同样需要被定时刷新,不停地更新获取的数据,在文件末尾加上setInterval(get_sum, 1000)