连接kafka

安装kafka-python

pip install kafka-python

KafkaProducer

  1. 创建一个生产者对象
  2. 通过producer.send()生产消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from kafka import KafkaProducer
    from time import sleep

    def start_producer():
    producer = KafkaProducer(bootstrap_servers='ip:9092')
    for i in range(0,100000):
    msg = 'msg is ' + str(i)
    producer.send('topic_name', msg.encode('utf-8'))
    sleep(3)

    if __name__ == '__main__':
    start_producer()

KafkaConsumer

  1. 创建一个消费者对象
  2. 通过循环遍历获得消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from kafka import KafkaConsumer
def start_consumer():
consumer = KafkaConsumer('topic_name',
bootstrap_servers=['ip1:9092','ip2:9092','ip3:9092'],
auto_offset_reset='earliest')
for msg in consumer:
print(msg)
print("topic = %s" % msg.topic) # topic default is string
print("partition = %d" % msg.offset)
print("value = %s" % msg.value.decode()) # bytes to string
print("timestamp = %d" % msg.timestamp)
print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 ))
consumer.close()
if __name__ == '__main__':
start_consumer()
  • auto_offset_reset
    earliest:从上一次未消费的位置开始读
    latest:从当前时刻开始读

连接MySQL

配置远程连接

  1. 登录数据库

     mysql -u root -p
    
  2. 创建用户远程连接

     GRANT ALL PRIVILEGES ON *.* TO 'user_name'@'%' IDENTIFIED BY 'password' WITH GRANT OPTION;
    

    user_name表示用户名
    %表示所有的ip都可以连接,也可以设置某个ip地址运行连接
    password表示该权限所识别的密码

  3. 命令立即生效

     flush privileges;  
    
  4. 修改MySQL配置文件

     vim /etc/mysql/my.cnf
    

    将 bind-address = 127.0.0.1修改为bind-address = 0.0.0.0

     /etc/init.d/mysql stop
     /etc/init.d/mysql start
    

如果配置出错,可以按照下面方法重开

ubuntu卸载重装MySQL
  1. 删除MySQL

     sudo apt-get remove mysql-*
    
  2. 清理残留数据

    dpkg -l |grep ^rc|awk '{print $2}' |sudo xargs dpkg -P
    

    会跳出一个对话框 选择yes即可

  3. 安装MySQL

     sudo apt-get install mysql-client mysql-server
    

    安装的时候会提示要设置root密码,如果你没有在卸载时清理残留数据则不会有提示

  4. 检查/运行/停止 服务

     service mysql status | start | stop
    

参考:https://www.cnblogs.com/duolamengxiong/p/13650684.html

IDEA连接MySQL数据库

  1. 选择右侧Database添加MySQL连接
  2. 配置相关信息

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pymysql
import time
import traceback
def get_conn():
conn = pymysql.connect(host="176.32.34.231", # 本地ip或你的域名地址
user="root", # MySQL登录用户名
password="123456", # MySQL登录密码
db="test", # 储存数据的数据库名
charset="utf8")
# conn表示连接数据库,对数据的操作需要通过cursor来实现
cursor = conn.cursor()
return conn, cursor

def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()

def update_data(result,num):

cursor = None
conn = None
try:
conn, cursor = get_conn()
sql_insert = "insert into etc_all(XH, CP, CX1, CK, CKSJ, RK,RKSJ,CX2)" \
"values(%s,%s,%s,%s,%s,%s,%s,%s)"
sql_ifExist="select XH from etc_all where XH=%s"
if not cursor.execute(sql_ifExist,num):
cursor.execute(sql_insert, result)
conn.commit()
print('%s数据更新完成!' % time.strftime("%Y-%m-%d %H:%M:%S"))
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)

消费kafka数据存入MySQL

  1. 完成上述连接及配置
  2. 在update_data函数中写好insert语句
  3. 函数放在消费消息循环
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def kafka2Mysql():
    consumer = KafkaConsumer('topic_name',
    bootstrap_servers=['ip1:9092','ip2:9092','ip3:9092'],
    auto_offset_reset='earliest')
    for msg in consumer:
    msg=msg.value.decode()
    update_data(msg,msg[0])
    consumer.close()
    if __name__ == '__main__':
    kafka2Mysql()
    实现实时从kafka消费消息并存入MySQL数据库中