欢迎光临散文网 会员登陆 & 注册

[学习笔记][kafka] Python使用kafka消息队列

2023-02-12 19:40 作者:扣丁船  | 我要投稿

之前做过的项目使用的都是rabbitmq,最近项目中需要使用kafka,现将安装和简单使用python代码做一个小结。

在本地开发时,使用docker安装kafka及其依赖。(以下操作基于Ubuntu操作系统,以及默认已经安装docker。)

实际本地开发时,在windows上使用wsl linux子系统,比较方便调试,且将容器设置为自动重启之后,每次在命令中键入bash回车,即可进入子系统,如果为Ubuntu, 在命令行中输入以下命令,启动docker。

service docker start

如果容器创建时使用了参数--restart=always,docker容器将随着docker启动而自动启动。


一、Docker安装kafka

1. 创建网络

使用以下命令创建一个名叫app-tier网络,该网络作用为将zookeeper(kafka依赖zookeeper运行)和Kafka置于同一网络中。

docker network create app-tier --driver bridge

参数解释:

--driver: 网络驱动方式,这里选择bridge桥接模式。具体桥接模式是啥,这里不再展开。


2. 安装zookeeper

docker安装命令:

docker run --name zookeeper-server --restart=always --privileged=true -d \

    --network app-tier \

    -e ALLOW_ANONYMOUS_LOGIN=yes \

    bitnami/zookeeper:latest

以上命令,将创建一个名为zookeeper-server的服务。

参数解释:

1) --name zookeeper-server: 容器名称配置,这里配置为zookeeper-server。

2) --restart=always:跟随docker自动重启。

3) --privileged=true: 容器内部权限,可选true和false。

这里用我自己的话理解就是,在容器内部有一个root用户,但是这个用户权限在true和false下有区别。

若为true,则容器内部的root拥有真正的root权限,比如docker中安装了一个linux系统,则其可以执行mount将分区挂载到其中的一个文件夹上。同时还可以在容器内部再启动docker,也就是可以继续套娃。

4)-d: 容器在后台运行。

5)--network app-tier:这里配置的是容器使用的网络,可以配置host, 此时使用的网络与宿主机的网络相同,好处是性能比较高,坏处是不安全。这里使用的是桥接模式创建的网络,因此安全性较好。

6) -e: 该参数是用来向容器传递环境变量的。ALLOW_ANONYMOUS_LOGIN=yes 表明zookeeper会读取该环境变量,从字面意思也可知,该参数配置的为是否可以让匿名用户访问服务,说白了也就是游客也可以访问。

7)bitnami/zookeeper:latest 这里配置该zookeeper容器依据什么版本的镜像创建,bitnami是镜像源头,zookeeper:latest 表示使用最新版本的zookeeper,该配置会自动拉取镜像,而不必在创建容器之前手动拉取。如果手动拉取,则使用docker pull命令即可。


3. 安装kafka

docker run -d --name kafka-server --network app-tier -p 9092:9092 \

    -e ALLOW_PLAINTEXT_LISTENER=yes \

    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \

    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \

    bitnami/kafka:latest

通过以上命令即可安装一个kafka服务。

具体参数解释,可以参看安装zookeeper时的,-p是端口映射,这里9092:9092,前一个9092为物理机的端口,后一个9092为容器暴露的端口。组合起来则是,容器内部的9092端口,可以通过物理的机的9092访问,言下之意就是,也可以配置为9093:9092,即通过物理机的9093端口访问kafka。

这里有几个通过环境变量的配置解释一下:

1) ALLOW_PLAINTEXT_LISTENER=yes表示允许使用 PLAINTEXT 监听器。

2)KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181这里配置连接zookeeper服务,2181是zookeeper的默认端口。这里顺便说一下,kafka之所以依赖zookeeper,是因为zooKeeper是一个给分布式系统提供协调服务的工具, 借助zookeeper,kafka就能够在生产者和消费者互不知道状态的情况下,建立起订阅关系,做到负载均衡。

通过以下命令来验证kafka的安装。

docker logs -f kafka


4. 图形界面

消息队列的图形界面,能够使很多方面的操作变得便捷,比如rabbitmq自带图形界面,可以很方便的看到消息生产和消费的积压情况,也可以对消息做直接的增删等操作。

kafka没有自带的图形界面,可以使用一些三方的,可选的比较多,这里选择kafka-map。docker安装命令如下:

docker run -d --name kafka-map \

    --network app-tier \

    -p 9001:8080 \

    -v /opt/kafka-map/data:/usr/local/kafka-map/data \

    -e DEFAULT_USERNAME=admin \

    -e DEFAULT_PASSWORD=admin \

    --restart always dushixiang/kafka-map:latest

简单介绍参数,DEFAULT_USERNAME和DEFAULT_PASSWORD配置默认的用户和密码。

使用9001作为在物理机上的暴露端口。通过以下地址访问(本地访问)。

http://localhost:9001/

使用admin:admin账号密码登录之后是这样的。

kafka-map界面


二、Python中使用kafka

至于在python中的使用,我在gitee上放了example,地址如下:

仓库地址:

其中也有参考文档。

使用步骤:

1) 安装项目依赖。

pip install -r requirements.txt

2)启动consumer

python consumer.py

3) 生产消息

python producer.py

如果需要更改消息,可以在producer.py文件中自定义,在消费者consumer.py启动进程控制台查看消息。


三、结语

本篇主要目的是记录kafka服务搭建过程和在python中的基础使用,一些技术细节和问题,在以后的工作中,还将继续补充完善。

本篇关键点:

  1. kafka这个消息中间件的安装依赖zookeeper;

  2. 配置kafka-map作为其图形界面;

  3. 在python中简单验证,启动消费者监听消息,生产者发送消息;

  4. 模式为发布订阅模式。

本篇完。


[学习笔记][kafka] Python使用kafka消息队列的评论 (共 条)

分享到微博请遵守国家法律