Kafka配置
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。
# 在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,
# 在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,
# 记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
acks: all
#如果该值大于零时,表示启用重试失败的发送次数
retries: 3
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
batch-size: 16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
buffer-memory: 33554432
#生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4'),
#它还接受'uncompressed'以及'producer',分别表示没有压缩以及保留生产者设置的原始压缩编解码器,
#不管是哪种方式的压缩,都和消费者无关,消费到的还是发送前的原始数据。
#默认值为producer
compression-type: gzip
client-id: esports-im-producer
consumer:
key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
#用于标识此使用者所属的使用者组的唯一字符串。
group-id: esports-im
client-id: esports-im-client-c
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
enable-auto-commit: false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 1000
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
auto-offset-reset: latest
#一次调用poll()操作时返回的最大记录数,默认值为500
max-poll-records: 100
#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
#默认值为500
fetch-max-wait: 500
#服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。
fetch-min-size: 1
# 每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat,
# group coordinator会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,
# 这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况。
heartbeatInterval: 3000ms
# 设置
properties:
# 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB
max.partition.fetch.bytes: 1048576
# 如果consumer两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。
max.poll.interval.ms: 300000
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
# group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。
# 它指定了一个阈值10秒,在这个阈值内如果group coordinator未收到consumer的任何消息(指心跳),那coordinator就认为consumer挂了。
session.timeout.ms: 15000
# 消费请求超时时间
request.timeout.ms: 15000
listener:
# ack-mode:当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
# record 每处理一条commit一次
# batch(默认) 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
# time 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
# count 累积达到ackCount次的ack去commit
# count_time ackTime或ackCount哪个条件先满足,就commit
# manual listener负责ack,但是背后也是批量上去
# manual_immediate listener负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
#在侦听器容器中运行的线程数, 跟每个topic的partition一致
concurrency: 10
#轮询消费者时使用的超时(以毫秒为单位)
poll-timeout: 2000
#topic不存在时是否报错
#missing-topics-fatal: true
client-id: esports-im-client-l
type: batch
bootstrap-servers: kafka:9092,kafka:9092,kafka:9092
评论区