关于

  1. 数据源

    点击:深圳市政府数据开放平台

    Untitled

  2. 需求

    • 每秒钟的车辆出站数
    • 实时更新当前时间出站总数
  3. 架构

    Jmeter + kafka + MySQL + flask + echarts

  4. 效果

  5. 项目网址
    点击:深圳ETC数据存储可视化

Jmeter

Jmeter请求数据

参考教程:Jmeter | HTTP请求

接口数据存入csv文件,利用HTTP请求从csv文件取数据

  1. 配置HTTP请求

    添加HTTP Request

    Untitled

添加参数param接收CSV文件中提取出的数据,值为data变量的值

  1. 配置csv文件

    添加 CSV Data Set Config

    添加Debug Sample:查看变量值

    Untitled

    • 自定义变量data,接收从csv文件中提取出的数据

    • \t 分隔符 保证每次请求获得一行数据

  2. 测试

    Untitled

    自定义变量test作为值传到HTTP的参数data中

    Untitled

Jmeter连接kafka

参考教程:Jmeter | 连接Kafka

  1. 配置java请求

    添加 Java Request : 连接kafka

    Untitled

    • kafka_message:将HTTP请求中的data参数作为变量

      如此可以实现,CSV中的数据以行为单位一条一条传入Kafka,

  2. 测试

    Untitled

发送数据

线程组中的循环次数设置为永远

可以在线程组最后添加constant timer组件,设置请求数据的间隔

在以上的配置基础上,就可以将csv中的所有数据发送到kafka中的指定主题和分区中,供后续消费使用。

Untitled

Kafka对接MySQL

准备工作

  1. 数据分析
    a. 备注全为“深圳入”,所以删掉
    b. 将车型分为两个属性存储 *型车——CX1,客/货——CX2
    c. 车牌信息只保留省份

  2. 建库建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create database etc;
use etc;
create table etc_all
(
XH int not null
primary key,
CP char(8) null comment '车牌',
CX1 varchar(10) null comment '车型',
CK varchar(10) null comment '出口',
CKSJ datetime null comment '出口时间',
RK varchar(10) null comment '入口',
RKSJ datetime null comment '入口时间',
CX2 varchar(10) null comment '车型2'
);

代码

  1. 连接MySQL
  2. 数据存入MySQL
  3. 连接kafka
  4. 在消费者的循环中添加存入MySQL的函数调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from kafka import KafkaConsumer
import pymysql
import time
import traceback


# -------- 1.连接MySQL -------------

def get_conn():
conn = pymysql.connect(host="******", # 本地ip或你的域名地址
user="******", # MySQL登录用户名
password="******", # MySQL登录密码
db="etc", # 储存数据的数据库名
charset="utf8")
# conn表示连接数据库,对数据的操作需要通过cursor来实现
cursor = conn.cursor()
return conn, cursor

def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()


# -------- 2.数据存入MySQL -------------

def update_data(result,num):

cursor = None
conn = None
try:
conn, cursor = get_conn()

# 插入数据
sql_insert = "insert into etc_all(XH, CP, CX1, CK, CKSJ, RK,RKSJ,CX2)" \
"values(%s,%s,%s,%s,%s,%s,%s,%s)"

# 设置“判断是否存在” 的SQL语句,不存在数据更新情况,所以如果有重复数据就选择不插入了
sql_ifExist="select XH from etc_all where XH=%s"

if not cursor.execute(sql_ifExist,num):
cursor.execute(sql_insert, result)
conn.commit()
print('%s数据更新完成!' % time.strftime("%Y-%m-%d %H:%M:%S"))
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)


# -------- 3.连接kafka -------------
# 将插入数据的函数调用,放在获取消息的循环中即可

def Kafka2Mysql():
consumer = KafkaConsumer('etc_all',bootstrap_servers=['ip1','ip2','ip3'],auto_offset_reset='earliest')
for m in consumer:
msg = m.value.decode('utf-8')
print(msg)
# 【output】
# 1,粤S88***,一型车(客),广东水朗D站,2020/12/22 0:00,广东龙景站,2020/12/21 23:39,深圳入

# -- 对每条消息进行处理 --
msg = msg.split(",") # 将接收的消息转换为列表类型
msg.pop(7) # 删除“深圳人”
msg[1] = msg[1][0] # 车牌只保留省份
msg.append(msg[2][4]) # 车类型(客/货)添加到列表尾部作为CX1属性列
msg[2] = msg[2][:3] # CX2属性列只保留"*型车"

print(msg)
# 【output】
# ['1', '粤', '一型车', '广东水朗D站', '2020/12/22 0:00', '广东龙景站', '2020/12/21 23:39', '客']

# -- 向MySQL插入数据--
update_data(msg, msg[0])

consumer.close()

if __name__ == '__main__':
Kafka2Mysql()

Mysql对接Flask

假设表中已经存有全部的数据,将数据全部取出,通过前端实现实时效果。

  1. 连接MySQL
  2. 根据需求创建函数获取数据
  3. 在flask中导入文件调用函数
  • 每秒出站数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    import pymysql
    import traceback


    # -------- 1.连接MySQL -------------

    def get_conn():
    conn = pymysql.connect(host="******", # 本地ip或你的域名地址
    user="******", # MySQL登录用户名
    password="******", # MySQL登录密码
    db="etc", # 储存数据的数据库名
    charset="utf8")
    # conn表示连接数据库,对数据的操作需要通过cursor来实现
    cursor = conn.cursor()
    return conn, cursor

    def close_conn(conn, cursor):
    if cursor:
    cursor.close()
    if conn:
    conn.close()

    # -------- 2.获取数据 -------------

    def cksj():
    cursor = None
    conn = None
    try:
    # --------- 获取数据 ----------------
    conn, cursor = get_conn()
    sql = '''
    select count(XH),CKSJ
    from main
    group by CKSJ
    order by CKSJ
    '''
    cursor.execute(sql)
    data = cursor.fetchall()

    print(data)
    # 【output】(部分)
    # ((26, datetime.datetime(2020, 12, 22, 0, 0)), (32, datetime.datetime(2020, 12, 22, 0, 1)), (36, datetime.datetime(2020, 12, 22, 0, 2)),...)

    # --------- 加工数据 ---------------
    List = [] # 初始化数据集
    for i in data:
    # 将每秒的数据封装成echarts可以识别的种类
    item = {'name': i[1].strftime('%Y-%m-%d %H:%M:%S'),
    'value': [i[1].strftime('%Y-%m-%d %H:%M:%S'), i[0]]}
    List.append(item)

    print(List)
    # 【output】(部分)
    # [{'name': '2020-12-22 00:00:00', 'value': ['2020-12-22 00:00:00', 26]}, {'name': '2020-12-22 00:01:00', 'value': ['2020-12-22 00:01:00', 32]},...]

    conn.commit()
    except:
    traceback.print_exc()
    finally:
    close_conn(conn, cursor)

    return List


    if __name__ == '__main__':
    cksj()
  • 截止当前时间总出站数
    通过flask调用cksj()获取返回的数据进行加工即可,详见下文

flask 获取数据

为每个需求所需的数据创建不同的路由接口,前端采用ajax()方法获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from flask import Flask
from flask import render_template
from flask import jsonify

import Mysql2Flask # 导入python文件

app = Flask(__name__)


@app.route('/')
def main():
return render_template('main.html')

# 需求1:每秒出站数
@app.route('/c1')
def get_data():
List=Mysql2Flask.cksj()
print(List)
return jsonify(List)

# 需求2:截止当前总出站数
@app.route('/Sum')
def get_sum():
Sum=0
SumList=[]
List=Mysql2Flask.cksj()

for item in List:
Sum+=item["value"][1]
SumList.append(Sum) #保留每秒出站数量的累加结果,添加到列表中

print(SumList)
# 【output】(部分)
# [26, 58, 94, 132, 166, 194, 223, 257, 286, 309, 336, 369, 403, 447, 476, 514, 542, 567, 597, ...]

return json.dumps(SumList)

前端实现实时效果

  • back_to_front.js:利用ajax方法获取后台数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // back_to_front.js
    function get_data() {
    $.ajax({
    url: "/c1",
    dataType: "json",
    success: function(data) {
    ck=data;
    }
    });
    }

    function get_sum() {
    $.ajax({
    url:"/Sum",
    dataType: "json",
    traditional: true,
    success: function(data) {
    sum=data
    },
    error: function(xhr, type, errorThrown) {
    }
    })
    }

    get_data()
    get_sum()

    // 因为一次获取了全部数据,所以不需要设置获取后台数据的刷新时间
    // setInterval(get_sum, 1000)
    // setInterval(get_data, 1000)
  • cksj.js:初始化echarts实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    // cksj.js
    //初始化echarts实例
    var chartDom=document.getElementById('c2');
    var myChart= echarts.init(chartDom,'dark');
    var option_cksj;

    function randomData() {
    now= newDate(+now+oneDay);
    return {
    name:now.toString(),
    value: [
    // 初始化表中前20s的数据,设置值为10
    [[now.getFullYear(),now.getMonth()+1,now.getDate()].join('-'),[now.getHours(),now.getMinutes()].join(':')].join(' '),
    10
    ]
    };
    }

    // 初始化表中的前20条数据
    let data= [];
    let now= newDate(2020,11,21,23,40,0); // 因为数据起始是2020-11-22-00,所以初始化的数据起始为20s前
    let oneDay= 60 * 1000;

    // 将初始化的20条数据加入数据集中
    for(let i =0;i<20;i++){
    data.push(randomData())
    }

    option_cksj= {
    title: {
    text: '出站数量实时统计'
    },
    tooltip: {
    trigger: 'axis',
    formatter: function (params) {

    params = params[0];

    var date = newDate(params['name']);
    return (
    date.getDate() +
    '日' +
    date.getHours() +
    '点' +
    date.getMinutes() +
    ' 分: ' +
    params['value'][1]
    );
    },
    axisPointer: {
    animation: false
    }
    },
    xAxis: {
    type: 'time',
    splitLine: {
    show: false
    },
    formatter: function (data) {

    var t_date = newDate(data[0]['name']);

    return [t_date.getMonth() , t_date.getDate()].join('/') + " "

    + [t_date.getHours(), t_date.getMinutes()].join(':');

    }
    },
    yAxis: {
    type: 'value',
    boundaryGap: [0, '100%'],
    splitLine: {
    show: false
    }
    },
    series: [
    {
    name: 'Fake Data',
    type: 'line',
    showSymbol: false,
    data:data
    }
    ]
    };

    setInterval(function () {

    for (var i = 0; i < 1; i++) {
    data.shift() // 删除echarts数据集中的第一个数据
    data.push(ck[0]) // 在末尾添加后台传来的数据集中的第一条数据
    ck.shift() // 删除后台数据集中的第一条数据,实现下一秒的数据在第一位等待添加进echarts数据集
    }

    myChart.setOption({
    series: [
    {
    data:data
    }
    ]
    });
    }, 1000);

    option_cksj&&myChart.setOption(option_cksj);
  • main.html

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    <!DOCTYPE html>
    <html>
    <head>
    <!-- 刷新sum数据 -->
    <script>
    function all(){
    setInterval(function (){
    $("#a1").html(sum[0]); // 将id为a1的标签内容赋值为sum数组第一个元素
    sum.shift() // 删除数组第一个元素,使下一秒的元素处在第一位
    },1000);
    }
    </script>

    <meta charset="utf-8">
    <title>疫情监控</title>
    <link href="../static/css/main.css" rel="stylesheet"/>
    <script src="../static/js/echarts.js"></script>
    </head>
    <body onload="all()">
    <div id='title'>
    深圳ETC
    <div id='time'>时间</div>
    </div>
    <div id='c1'>
    <div class="num" id="a1"></div>
    <div class="num">*</div>
    <div class="num">*</div>
    <div class="num">*</div>
    <div class="txt">总出站数</div>
    <div class="txt">待定</div>
    <div class="txt">待定</div>
    <div class="txt">待定</div>
    </div>
    <div id='c2'>c2</div>
    <div id='l1'>l1</div>
    <div id='l2'>l2</div>
    <div id='r1'>r1</div>
    <div id='r2'>r2</div>

    <script src="../static/js/back_to_front.js"></script>
    <script src="../static/js/cksj.js"></script>

    </body>
    </html>

附 :后端实现实时效果

在后端实现了数据的实时传输,但静态数据集的限制使得前端的展示效果不能严格用“实时出站”解释

每次展示的时候,清空etc表的全部数据,运行jmeter脚本,数据会一条一条进入kafka存入MySQL然后被flask获取最后在前端展示

本质区别是从数据库获取数据的时间点

  • 前者是当全部数据存入MySQL后,对全部数据进行查询
  • 后者是实时消费kafka的数据,立即存入MySQL,并且立即进行查询

需求1:每秒出站数

  1. 需要修改SQL的查询语句,并且不需要循环对每条数据进行包装,只需要对查询到的一条数据包装即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    def cksj():
    cursor = None
    conn = None
    try:
    conn, cursor = get_conn()
    sql = '''
    select count(XH),CKSJ
    from etc
    group by CKSJ
    order by CKSJ
    desc
    limit 1;'''
    cursor.execute(sql)
    data=cursor.fetchall()

    print(data)
    # 【output】(某一时刻)
    # ((11, datetime.datetime(2020, 12, 22, 3, 15)),)

    List={'name': data[0][1].strftime('%Y-%m-%d %H:%M:%S'), 'value': [data[0][1].strftime('%Y-%m-%d %H:%M:%S'),data[0][0]]}

    print(List)
    # 【output】(某一时刻)
    # {'name': '2020-12-22 03:15:00', 'value': ['2020-12-22 03:15:00', 11]}

    conn.commit()
    except:
    traceback.print_exc()
    finally:
    close_conn(conn, cursor)

    return List
  2. back_to_front.js中封装了ajax方法的get_data()需要被定时刷新,不停地更新获取的数据,在文件末尾加上setInterval(get_data, 1000)

  3. cksj.js中的ck变量也不再是完整的数据集,而是每一秒刷新的一条实时数据

    cksj.jssetInterval中设置if判断,如果得到的新数据与上次有相同的出站时间,则这一秒进行的就是数量的刷新,如果是新的时间,就将echarts数据集刷新,添加新的数据

1
2
3
4
5
6
7
8
9
10
11
12
setInterval(function () {

for (var i = 0; i < 1; i++) {
if(data[data.length-1]["name"]==ck['name']){
data[data.length-1]['value']=ck['value']
}
else {
data.shift();
data.push(ck);
}

}

需求2:截止当前出站总数

  1. 不停地统计当前MySQL表中数据总量,就能反映出总出站数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def getAll():
    cursor = None
    conn = None
    try:
    conn, cursor = get_conn()
    sql = '''
    select count(XH)
    from etc
    '''
    cursor.execute(sql)
    data = cursor.fetchall()[0][0]
    conn.commit()
    except:
    traceback.print_exc()
    finally:
    close_conn(conn, cursor)

    return data
  2. flask添加数据传递

    1
    2
    3
    4
    5
    @app.route('/Sum')
    def get_sum():
    Sum=Mysql2Flask.getAll()
    print(Sum)
    return jsonify(Sum)
  3. back_to_front.jsget_sum()同样需要被定时刷新,不停地更新获取的数据,在文件末尾加上setInterval(get_sum, 1000)