一、本地环境搭建
安装
https://kafka.apache.org/downloads
官网下载,解压到/usr/local
目录
配置
zookeeper.properties:
1 2 3 4
| dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
|
server.properites:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
|
producer.properties:
1 2
| bootstrap.servers=localhost:9092 compression.type=none
|
consumer.properties:
1 2
| bootstrap.servers=localhost:9092 group.id=test-consumer-group
|
启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/zookeeper-server-stop.sh stop
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-stop.sh stop
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic demo
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic demo
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
|
二、Spring集成
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.0</version> </dependency>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| spring.kafka.producer.acks = -1 spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.properties.linger.ms=0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id= test-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.properties.session.timeout.ms=120000 spring.kafka.consumer.properties.request.timeout.ms=180000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
|
使用
1 2 3 4 5 6 7 8 9
| @Service public class DemoProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private final String topic = "demo"; public void send(String msg) { kafkaTemplate.send(topic, msg); } }
|
1 2 3 4 5 6 7 8
| @Component public class DemoConsumer { @KafkaListener(topics = {"demo"}) public void receive(ConsumerRecord<?, ?> record) { System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }
|