侧边栏壁纸
博主头像
Terry

『LESSON 5』

  • 累计撰写 90 篇文章
  • 累计创建 21 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

SpringBoot Kafka的一些配置

Terry
2023-01-21 / 0 评论 / 0 点赞 / 100 阅读 / 1,460 字 / 正在检测是否收录...

Kafka配置

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。
      # 在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,
      # 在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,
      # 记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      acks: all
      #如果该值大于零时,表示启用重试失败的发送次数
      retries: 3
      #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
      #这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
      batch-size: 16384
      #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
      buffer-memory: 33554432
      #生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4'),
      #它还接受'uncompressed'以及'producer',分别表示没有压缩以及保留生产者设置的原始压缩编解码器,
      #不管是哪种方式的压缩,都和消费者无关,消费到的还是发送前的原始数据。
      #默认值为producer
      compression-type: gzip
      client-id: esports-im-producer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      #用于标识此使用者所属的使用者组的唯一字符串。
      group-id: esports-im
      client-id: esports-im-client-c
      #如果为true,则消费者的偏移量将在后台定期提交,默认值为true
      enable-auto-commit: false
      #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 1000
      #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
      #可选的值为latest, earliest, none
      auto-offset-reset: latest
      #一次调用poll()操作时返回的最大记录数,默认值为500
      max-poll-records: 100
      #如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
      #默认值为500
      fetch-max-wait: 500
      #服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。
      fetch-min-size: 1
      # 每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat,
      # group coordinator会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,
      # 这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况。
      heartbeatInterval: 3000ms
      # 设置
      properties:
        # 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB
        max.partition.fetch.bytes: 1048576
        # 如果consumer两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。
        max.poll.interval.ms: 300000
        # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
        # group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。
        # 它指定了一个阈值10秒,在这个阈值内如果group coordinator未收到consumer的任何消息(指心跳),那coordinator就认为consumer挂了。
        session.timeout.ms: 15000
        # 消费请求超时时间
        request.timeout.ms: 15000
    listener:
      # ack-mode:当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      # record 每处理一条commit一次
      # batch(默认) 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
      # time 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
      # count 累积达到ackCount次的ack去commit
      # count_time ackTime或ackCount哪个条件先满足,就commit
      # manual listener负责ack,但是背后也是批量上去
      # manual_immediate listener负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      #在侦听器容器中运行的线程数, 跟每个topic的partition一致
      concurrency: 10
      #轮询消费者时使用的超时(以毫秒为单位)
      poll-timeout: 2000
      #topic不存在时是否报错
      #missing-topics-fatal: true
      client-id: esports-im-client-l
      type: batch
    bootstrap-servers: kafka:9092,kafka:9092,kafka:9092
0

评论区