Python | Kafka对接MySQL
连接kafka
安装kafka-python
pip install kafka-python
KafkaProducer
- 创建一个生产者对象
- 通过
producer.send()
生产消息1
2
3
4
5
6
7
8
9
10
11
12
13from kafka import KafkaProducer
from time import sleep
def start_producer():
producer = KafkaProducer(bootstrap_servers='ip:9092')
for i in range(0,100000):
msg = 'msg is ' + str(i)
producer.send('topic_name', msg.encode('utf-8'))
sleep(3)
if __name__ == '__main__':
start_producer()
KafkaConsumer
- 创建一个消费者对象
- 通过循环遍历获得消息
1 | from kafka import KafkaConsumer |
- auto_offset_reset
earliest
:从上一次未消费的位置开始读
latest
:从当前时刻开始读
连接MySQL
配置远程连接
登录数据库
mysql -u root -p
创建用户远程连接
GRANT ALL PRIVILEGES ON *.* TO 'user_name'@'%' IDENTIFIED BY 'password' WITH GRANT OPTION;
user_name
表示用户名
%
表示所有的ip都可以连接,也可以设置某个ip地址
运行连接
password
表示该权限所识别的密码命令立即生效
flush privileges;
修改MySQL配置文件
vim /etc/mysql/my.cnf
将 bind-address = 127.0.0.1修改为
bind-address = 0.0.0.0
/etc/init.d/mysql stop /etc/init.d/mysql start
如果配置出错,可以按照下面方法重开
IDEA连接MySQL数据库
- 选择右侧
Database
添加MySQL
连接
- 配置相关信息
代码
1 | import pymysql |
消费kafka数据存入MySQL
- 完成上述连接及配置
- 在update_data函数中写好insert语句
- 将
函数
放在消费消息
的循环
中实现实时从kafka消费消息并存入MySQL数据库中1
2
3
4
5
6
7
8
9
10def kafka2Mysql():
consumer = KafkaConsumer('topic_name',
bootstrap_servers=['ip1:9092','ip2:9092','ip3:9092'],
auto_offset_reset='earliest')
for msg in consumer:
msg=msg.value.decode()
update_data(msg,msg[0])
consumer.close()
if __name__ == '__main__':
kafka2Mysql()
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 SYolin's Blog!
评论