kafka4

post subtitle

By widehyo
[root@peter-kafka01 ~]# vi producer.config
[root@peter-kafka01 ~]# cat !$
cat producer.config
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=5
[root@peter-kafka01 ~]# realpath !$
realpath producer.config
/root/producer.config
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic peter-test04 --producer.config /root/producer.config 
org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
        at org.apache.kafka.clients.producer.ProducerConfig.maybeOverrideAcksAndRetries(ProducerConfig.java:459)
        at org.apache.kafka.clients.producer.ProducerConfig.postProcessParsedConfig(ProducerConfig.java:420)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:110)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
        at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:501)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:329)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

[root@peter-kafka01 ~]# cat !$
cat producer.config
enable.idempotence=true
max.in.flight.requests.per.connection=5
retries=5
acks=all
[root@peter-kafka01 ~]# !?peter
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic peter-test04 --producer.config /root/producer.config 
>exactly once1
[2026-04-05 16:02:19,500] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {peter-test04=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2026-04-05 16:02:19,607] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {peter-test04=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>
[root@peter-kafka01 /]# ls /data/kafka-logs/
cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint

[root@peter-kafka02 /]# ls /data/kafka-logs/
cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  peter-test04-0  recovery-point-offset-checkpoint  replication-offset-checkpoint

[root@peter-kafka03 /]# ls /data/kafka-logs/
cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint


~/gitclone/kafka2/chapter2/ansible_playbook $ docker compose stop peter-kafka02
[+] Stopping 1/1
 ✔ Container peter-kafka02  Stopped                                                                                                                                                                                                        4.4s 
~/gitclone/kafka2/chapter2/ansible_playbook $ docker compose start peter-kafka02
[+] Running 1/1
 ✔ Container peter-kafka02  Started                                                                                                                                                                                                        0.2s 
~/gitclone/kafka2/chapter2/ansible_playbook $ !?exec?:p
docker compose exec peter-kafka02 bash
~/gitclone/kafka2/chapter2/ansible_playbook $ docker compose exec peter-kafka02 bash
[root@peter-kafka02 /]# cd /data/kafka-logs/peter-test04-0/
[root@peter-kafka02 peter-test04-0]# ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  00000000000000000001.snapshot  leader-epoch-checkpoint

[root@peter-kafka02 peter-test04-0]# /usr/local/kafka/bin/kafka-dump-log.sh --print-data-log --files /data/kafka-logs/peter-test04-0/00000000000000000001.snapshot 
Dumping /data/kafka-logs/peter-test04-0/00000000000000000001.snapshot
producerId: 0 producerEpoch: 0 coordinatorEpoch: -1 currentTxnFirstOffset: None firstSequence: 0 lastSequence: 0 lastOffset: 0 offsetDelta: 0 timestamp: 1775372539728
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --partitions 1 --replication-factor 3 --topic peter-test05
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2026-04-05 16:15:36,665] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
 (kafka.admin.TopicCommand$)

[root@peter-kafka02 peter-test04-0]# systemctl status kafka-server
○ kafka-server.service - kafka-server
     Loaded: loaded (/etc/systemd/system/kafka-server.service; disabled; preset: disabled)
     Active: inactive (dead)

[root@peter-kafka02 peter-test04-0]# systemctl start kafka-server
[root@peter-kafka02 peter-test04-0]# 


[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --partitions 1 --replication-factor 3 --topic peter-test05
Created topic peter-test05.


[root@peter-kafka02 peter-test04-0]# java -version
openjdk version "1.8.0_482"
OpenJDK Runtime Environment (build 1.8.0_482-b08)
OpenJDK 64-Bit Server VM (build 25.482-b08, mixed mode)

~/gitclone/kafka2/chapter5 $ docker cp ExactlyOnceProducer.jar becc008ababe:/root/
Successfully copied 12.6MB to becc008ababe:/root/

[root@peter-kafka02 ~]# ls -l
total 12304
-rw-r--r-- 1 ec2-user ec2-user 12583074 Mar 30 22:22 ExactlyOnceProducer.jar
-rw------- 1 root     root         2506 Nov 20  2023 anaconda-ks.cfg
-rw-r--r-- 1 root     root          133 Nov 20  2023 anaconda-post.log
-rw------- 1 root     root         2067 Nov 20  2023 original-ks.cfg

[root@peter-kafka02 ~]# java -jar ExactlyOnceProducer.jar 
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [peter-kafka01.foo.bar:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-peter-transaction-01
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = false
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = peter-transaction-01
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Instantiated a transactional producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775373660140
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Invoking InitProducerId for the first time in order to acquire a producer ID
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Discovered transaction coordinator peter-kafka02.foo.bar:9092 (id: 2 rack: null)
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] ProducerId set to 3000 with epoch 0
Message sent successfully
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-peter-transaction-01 unregistered

[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --list
__transaction_state
peter-test04
peter-test05

[root@peter-kafka01 ~]# echo "exclude.internal.topics=false" > consumer.config
[root@peter-kafka01 ~]# cat consumer.config 
exclude.internal.topics=false

[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic __transaction_state --consumer.config /root/consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginning
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1775373661519)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661693)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661781)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661789)


[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-dump-log.sh --print-data-log --files /data/kafka-logs/peter-test05-0/00000000000000000000.log 
Dumping /data/kafka-logs/peter-test05-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1775373661673 size: 120 magic: 2 compresscodec: NONE crc: 1337877055 isvalid: true
| offset: 0 CreateTime: 1775373661673 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1775373661811 size: 78 magic: 2 compresscodec: NONE crc: 1319622529 isvalid: true
| offset: 1 CreateTime: 1775373661811 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

[root@peter-kafka01 ~]# python3 --version
Python 3.9.25



~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka01:/root/
Successfully copied 2.05kB to peter-kafka01:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka02:/root/
Successfully copied 2.05kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka03:/root/
Successfully copied 2.05kB to peter-kafka03:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ cat pip_install_confluent-kafka.sh 
#!/bin/bash
python3 -m venv venv6
source venv6/bin/activate
pip install confluent-kafka


[root@peter-kafka01 ~]# bash pip_install_confluent-kafka.sh 
Collecting confluent-kafka
  Downloading confluent_kafka-2.14.0-cp39-cp39-manylinux_2_28_x86_64.whl (4.0 MB)
     |████████████████████████████████| 4.0 MB 8.8 MB/s 
Collecting typing-extensions
  Downloading typing_extensions-4.15.0-py3-none-any.whl (44 kB)
     |████████████████████████████████| 44 kB 2.6 MB/s 
Installing collected packages: typing-extensions, confluent-kafka
Successfully installed confluent-kafka-2.14.0 typing-extensions-4.15.0
WARNING: You are using pip version 21.2.3; however, version 26.0.1 is available.
You should consider upgrading via the '/root/venv6/bin/python3 -m pip install --upgrade pip' command.
[root@peter-kafka01 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka01 ~]# 

(venv6) [root@peter-kafka01 ~]# pip list
Package           Version
----------------- -------
confluent-kafka   2.14.0
pip               21.2.3
setuptools        53.0.0
typing_extensions 4.15.0
WARNING: You are using pip version 21.2.3; however, version 26.0.1 is available.
You should consider upgrading via the '/root/venv6/bin/python3 -m pip install --upgrade pip' command.

[root@peter-kafka02 ~]# bash pip_install_confluent-kafka.sh 
[root@peter-kafka02 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka02 ~]# 

[root@peter-kafka03 ~]# bash pip_install_confluent-kafka.sh
[root@peter-kafka03 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka03 ~]# 

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka01:/root/
Successfully copied 2.56kB to peter-kafka01:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka02:/root/
Successfully copied 2.56kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka03:/root/
Successfully copied 2.56kB to peter-kafka03:/root/

(venv6) [root@peter-kafka01 ~]# python consumer_standard.py 
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}

(venv6) [root@peter-kafka02 ~]# python consumer_standard.py 
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}

(venv6) [root@peter-kafka03 ~]# python consumer_standard.py 
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp producer.py peter-kafka01:/root/
Successfully copied 3.07kB to peter-kafka01:/root/

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker exec -it peter-kafka01 bash
[root@peter-kafka01 /]# 

[root@peter-kafka01 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka01 ~]# python producer.py 
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]


(venv6) [root@peter-kafka03 ~]# python consumer_standard.py 
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}
Topic: peter-test06, Partition: 0, Offset: 0, Received message: Apache Kafka is a distributed streaming platform - 1
Topic: peter-test06, Partition: 0, Offset: 1, Received message: Apache Kafka is a distributed streaming platform - 2
Topic: peter-test06, Partition: 0, Offset: 2, Received message: Apache Kafka is a distributed streaming platform - 3
Topic: peter-test06, Partition: 0, Offset: 3, Received message: Apache Kafka is a distributed streaming platform - 4
Topic: peter-test06, Partition: 0, Offset: 4, Received message: Apache Kafka is a distributed streaming platform - 5
Topic: peter-test06, Partition: 0, Offset: 5, Received message: Apache Kafka is a distributed streaming platform - 6
Topic: peter-test06, Partition: 0, Offset: 6, Received message: Apache Kafka is a distributed streaming platform - 7
Topic: peter-test06, Partition: 0, Offset: 7, Received message: Apache Kafka is a distributed streaming platform - 8
Topic: peter-test06, Partition: 0, Offset: 8, Received message: Apache Kafka is a distributed streaming platform - 9
Topic: peter-test06, Partition: 0, Offset: 9, Received message: Apache Kafka is a distributed streaming platform - 10


[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe

GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
peter-consumer01 peter-test06    0          10              10              0               rdkafka-2f18f8ce-7b12-4cc0-8979-70befababfc6 /172.18.0.6     rdkafka


^CTraceback (most recent call last):
  File "/root/consumer_standard.py", line 15, in <module>
    msg = c.poll(1.0)
KeyboardInterrupt
(venv6) [root@peter-kafka03 ~]# hostname -i
172.18.0.6


(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe

GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
peter-consumer01 peter-test06    0          10              10              0               rdkafka-2f18f8ce-7b12-4cc0-8979-70befababfc6 /172.18.0.6     rdkafka
(venv6) [root@peter-kafka01 ~]# hostname -i
172.18.0.2
(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe

GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
peter-consumer01 peter-test06    0          10              10              0               rdkafka-3b13021c-5930-4775-a45e-95bb6ce788f7 /172.18.0.3     rdkafka
(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp ExactlyOnceConsumer.jar peter-kafka01:/root/
Successfully copied 12.6MB to peter-kafka01:/root/

[root@peter-kafka01 ~]# ls -l 
total 12324
-rw-r--r-- 1 ec2-user ec2-user 12581452 Mar 30 22:22 ExactlyOnceConsumer.jar
-rw------- 1 root     root         2506 Nov 20  2023 anaconda-ks.cfg
-rw-r--r-- 1 root     root          133 Nov 20  2023 anaconda-post.log
-rw-r--r-- 1 root     root           30 Apr  5 16:23 consumer.config
-rw-r--r-- 1 ec2-user ec2-user      786 Mar 30 22:22 consumer_standard.py
-rw------- 1 root     root         2067 Nov 20  2023 original-ks.cfg
-rw-r--r-- 1 ec2-user ec2-user       88 Apr  5 16:32 pip_install_confluent-kafka.sh
-rw-r--r-- 1 root     root           83 Apr  5 16:01 producer.config
-rw-r--r-- 1 ec2-user ec2-user     1278 Mar 30 22:22 producer.py
drwxr-xr-x 5 root     root         4096 Apr  5 16:33 venv6

[root@peter-kafka01 ~]# java -jar ExactlyOnceConsumer.jar 
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [peter-kafka01.foo.bar:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-peter-consumer-01-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = peter-consumer-01
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_committed
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775375550930
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Subscribed to topic(s): peter-test05
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Discovered group coordinator peter-kafka01.foo.bar:9092 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Successfully joined group with generation Generation{generationId=1, memberId='consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Finished assignment for group at generation 1: {consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432=Assignment(partitions=[peter-test05-0])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Successfully synced group in generation Generation{generationId=1, memberId='consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Notifying assignor about the new Assignment(partitions=[peter-test05-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Adding newly assigned partitions: peter-test05-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Found no committed offset for partition peter-test05-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Resetting offset for partition peter-test05-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[peter-kafka02.foo.bar:9092 (id: 2 rack: null)], epoch=0}}.
Topic: peter-test05, Partition: 0, Offset: 0, Key: null, Value: Apache Kafka is a distributed streaming platform - 0


[root@peter-kafka02 ~]# java -jar ExactlyOnceProducer.jar 
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [peter-kafka01.foo.bar:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-peter-transaction-01
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = false
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = peter-transaction-01
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Instantiated a transactional producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775375727516
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Invoking InitProducerId for the first time in order to acquire a producer I
D
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Discovered transac
tion coordinator peter-kafka02.foo.bar:9092 (id: 2 rack: null)
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] ProducerId set to 
3000 with epoch 1
Message sent successfully
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-peter-transaction-01 unregistered



[root@peter-kafka01 ~]# java -jar ExactlyOnceConsumer.jar 
...
Topic: peter-test05, Partition: 0, Offset: 2, Key: null, Value: Apache Kafka is a distributed streaming platform - 0


[root@peter-kafka02 ~]# /usr/local/kafka/bin/kafka-dump-log.sh --print-data-log --files /data/kafka-logs/peter-test05-0/00000000000000000000.log 
Dumping /data/kafka-logs/peter-test05-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1775373661673 size: 120 magic: 2 compresscodec: NONE crc: 1337877055 isvalid: true
| offset: 0 CreateTime: 1775373661673 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1775373661811 size: 78 magic: 2 compresscodec: NONE crc: 1319622529 isvalid: true
| offset: 1 CreateTime: 1775373661811 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 198 CreateTime: 1775375727897 size: 120 magic: 2 compresscodec: NONE crc: 1898446716 isvalid: true
| offset: 2 CreateTime: 1775375727897 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 318 CreateTime: 1775375727942 size: 78 magic: 2 compresscodec: NONE crc: 4201990516 isvalid: true
| offset: 3 CreateTime: 1775375727942 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
[root@peter-kafka01 /]# awk 'length && !/^#/' /usr/local/kafka/config/log4j.properties 
log4j.rootLogger=INFO, stdout, kafkaAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.org.apache.zookeeper=INFO
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.state.change.logger=INFO, stateChangeAppender
log4j.additivity.state.change.logger=false
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false


[root@peter-kafka01 /]# vi !$
vi /usr/local/kafka/config/log4j.properties
[root@peter-kafka01 /]# !awk
awk 'length && !/^#/' /usr/local/kafka/config/log4j.properties 
log4j.logger.org.apache.zookeeper=INFO
log4j.logger.kafka=DEBUG
log4j.logger.org.apache.kafka=DEBUG
log4j.logger.kafka.request.logger=WARN, requestAppender


[root@peter-kafka01 /]# wc /usr/local/kafka/logs/server.log
  2676  41383 500384 /usr/local/kafka/logs/server.log
[root@peter-kafka01 /]# tail -n 40 !$
tail -n 40 /usr/local/kafka/logs/server.log
[2026-04-05 17:13:51,848] DEBUG [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=broker-1-fetcher-0, correlationId=35): org.apache.kafka.common.requests.FetchResponse@799918b8 (org.apache.kafka.clients.NetworkClient)
[2026-04-05 17:13:51,849] DEBUG [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Node 0 sent an incremental fetch response with throttleTimeMs = 2 for session 570064363 with 0 response partition(s), 52 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler)
[root@peter-kafka01 /]# wc /usr/local/kafka/logs/server.log
  2676  41383 500384 /usr/local/kafka/logs/server.log
[root@peter-kafka01 /]# tail -n 40 !$
tail -n 40 /usr/local/kafka/logs/server.log
/usr/local/kafka/logs/server.log
[root@peter-kafka01 /]# cd !$:h
cd /usr/local/kafka/logs
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/ansible_playbook $ git diff docker-compose.yml
diff --git a/chapter2/ansible_playbook/docker-compose.yml b/chapter2/ansible_playbook/docker-compose.yml
index 75d07e5..3864ddf 100644
--- a/chapter2/ansible_playbook/docker-compose.yml
+++ b/chapter2/ansible_playbook/docker-compose.yml
@@ -66,6 +66,24 @@ services:
         aliases:
           - peter-kafka03.foo.bar
 
+  prometheus:
+    image: prom/prometheus
+    container_name: prometheus
+    ports:
+      - "9090:9090"
+    volumes:
+      - ./prometheus.yml:/etc/prometheus.yml
+    networks:
+      kafka-net:
+
+  grafana:
+    image: grafana/grafana:7.3.7
+    container_name: grafana
+    ports:
+      - "3000:3000"
+    networks:
+      kafka-net:
+
 networks:
   kafka-net:
     driver: bridge
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/ansible_playbook $ cat prometheus.yml 
# prometheus config
global:
  scrape_interval:     5s
  evaluation_interval: 5s

scrape_configs:
  - job_name: 'peter-jmx-kafka'
    static_configs:
      - targets:
        - peter-kafka01.foo.bar:7071
        - peter-kafka02.foo.bar:7071
        - peter-kafka03.foo.bar:7071

  - job_name: 'peter-kafka-nodes'
    static_configs:
      - targets:
          - peter-kafka01.foo.bar:9100
          - peter-kafka02.foo.bar:9100
          - peter-kafka03.foo.bar:9100

  - job_name: 'peter-kafka-exporter'
    static_configs:
      - targets:
          - peter-kafka01.foo.bar:9308
          - peter-kafka02.foo.bar:9308
          - peter-kafka03.foo.bar:9308

[root@peter-kafka01 jmx]# dnf install net-tools
[root@peter-kafka01 logs]# netstat -tnlp | grep 9999
tcp6       0      0 :::9999                 :::*                    LISTEN      6259/java           

[root@peter-kafka01 logs]# mkdir -p /usr/local/jmx

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar peter-kafka01:/usr/local/jmx/
Successfully copied 362kB to peter-kafka01:/usr/local/jmx/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver.yml peter-kafka01:/usr/local/jmx/
Successfully copied 2.05kB to peter-kafka01:/usr/local/jmx/


[root@peter-kafka01 system]# vi jmx-exporter.service
[root@peter-kafka01 system]# cat !$
cat jmx-exporter.service
[Unit]
Description=JMX Exporter for Kafka
After=kafka-server.target

[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/java -jar /usr/local/jmx/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar 7071 /usr/local/jmx/jmx_prometheus_httpserver.yml

[Install]
WantedBy=multi-user.target
[root@peter-kafka01 system]# systemctl daemon-reload
[root@peter-kafka01 system]# systemctl status jmx-exporter
○ jmx-exporter.service - JMX Exporter for Kafka
     Loaded: loaded (/etc/systemd/system/jmx-exporter.service; disabled; preset: disabled)
     Active: inactive (dead)
[root@peter-kafka01 system]# systemctl start !$
systemctl start jmx-exporter
[root@peter-kafka01 system]# !?status
systemctl status jmx-exporter
● jmx-exporter.service - JMX Exporter for Kafka
     Loaded: loaded (/etc/systemd/system/jmx-exporter.service; disabled; preset: disabled)
     Active: active (running) since Sun 2026-04-05 17:52:29 KST; 5s ago
   Main PID: 6813 (java)
      Tasks: 22 (limit: 7493)
     Memory: 20.9M (peak: 21.8M)
        CPU: 238ms
     CGroup: /system.slice/jmx-exporter.service
             └─6813 /usr/bin/java -jar /usr/local/jmx/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar 7071 /usr/local/jmx/jmx_prometheus_httpserver.yml

Apr 05 17:52:29 peter-kafka01.foo.bar systemd[1]: Started JMX Exporter for Kafka.


[root@peter-kafka01 system]# curl http://localhost:7071/metrics
# HELP jmx_config_reload_success_total Number of times configuration have successfully been reloaded.
# TYPE jmx_config_reload_success_total counter
jmx_config_reload_success_total 0.0
# HELP jmx_config_reload_failure_total Number of times configuration have failed to be reloaded.
# TYPE jmx_config_reload_failure_total counter
jmx_config_reload_failure_total 0.0
# HELP kafka_server_socket_server_metrics_connection_creation_total The total number of new connections established (kafka.server<type=socket-server-metrics, listener=PLAINTEXT, networkProcessor=1><>connection-creation-total)
...

[root@peter-kafka01 system]# curl http://localhost:7071/metrics | wc
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  702k  100  702k    0     0   560k      0  0:00:01  0:00:01 --:--:--  560k
   7477   18908  719205

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver.yml peter-kafka02:/root/
Successfully copied 2.05kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver.yml peter-kafka03:/root/
Successfully copied 2.05kB to peter-kafka03:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar peter-kafka02:/root/
Successfully copied 362kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter7 $ docker cp jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar peter-kafka03:/root/
Successfully copied 362kB to peter-kafka03:/root/

~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/ansible_playbook $ cat register-jmx-exporter.sh 
#!/bin/bash
mkdir -p /usr/local/jmx
cp /root/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/jmx
cp /root/jmx_prometheus_httpserver.yml /usr/local/jmx
cat > /etc/systemd/system/jmx-exporter.service <<EOF
cat jmx-exporter.service
[Unit]
Description=JMX Exporter for Kafka
After=kafka-server.target

[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/java -jar /usr/local/jmx/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar 7071 /usr/local/jmx/jmx_prometheus_httpserver.yml

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl start jmx-exporter
systemctl status jmx-exporter
curl http://localhost:7071/metrics | wc
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/ansible_playbook $ docker cp register-jmx-exporter.sh peter-kafka02:/root/
Successfully copied 2.56kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/ansible_playbook $ docker cp register-jmx-exporter.sh peter-kafka03:/root/
Successfully copied 2.56kB to peter-kafka03:/root/


[root@peter-kafka02 ~]# bash register-jmx-exporter.sh 
● jmx-exporter.service - JMX Exporter for Kafka
     Loaded: loaded (/etc/systemd/system/jmx-exporter.service; disabled; preset: disabled)
     Active: active (running) since Sun 2026-04-05 18:47:56 KST; 15ms ago
   Main PID: 1477 (java)
      Tasks: 10 (limit: 7493)
     Memory: 4.7M (peak: 5.0M)
        CPU: 12ms
     CGroup: /system.slice/jmx-exporter.service
             └─1477 /usr/bin/java -jar /usr/local/jmx/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar 7071 /usr/local/jmx/jmx_prometheus_httpserver.yml

Apr 05 18:47:56 peter-kafka02.foo.bar systemd[1]: Started JMX Exporter for Kafka.
Apr 05 18:47:56 peter-kafka02.foo.bar systemd[1]: /etc/systemd/system/jmx-exporter.service:1: Assignment outside of section. Ignoring.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  723k  100  723k    0     0   315k      0  0:00:02  0:00:02 --:--:--  315k
   7637   19233  740704


[root@peter-kafka03 ~]# bash register-jmx-exporter.sh 
● jmx-exporter.service - JMX Exporter for Kafka
     Loaded: loaded (/etc/systemd/system/jmx-exporter.service; disabled; preset: disabled)
     Active: active (running) since Sun 2026-04-05 18:47:45 KST; 12ms ago
   Main PID: 1889 (java)
      Tasks: 2 (limit: 7493)
     Memory: 1.6M (peak: 1.7M)
        CPU: 7ms
     CGroup: /system.slice/jmx-exporter.service
             └─1889 /usr/bin/java -jar /usr/local/jmx/jmx_prometheus_httpserver-0.13.1-SNAPSHOT-jar-with-dependencies.jar 7071 /usr/local/jmx/jmx_prometheus_httpserver.yml

Apr 05 18:47:45 peter-kafka03.foo.bar systemd[1]: Started JMX Exporter for Kafka.
Apr 05 18:47:45 peter-kafka03.foo.bar systemd[1]: /etc/systemd/system/jmx-exporter.service:1: Assignment outside of section. Ignoring.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  758k  100  758k    0     0   308k      0  0:00:02  0:00:02 --:--:--  308k
   7937   20540  776243

~/gitclone/kafka2/chapter2/ansible_playbook $ docker cp register-node-exporter.sh peter-kafka02:/root/
Successfully copied 2.56kB to peter-kafka02:/root/
~/gitclone/kafka2/chapter2/ansible_playbook $ docker cp register-node-exporter.sh peter-kafka03:/root/
Successfully copied 2.56kB to peter-kafka03:/root/
~/gitclone/kafka2/chapter2/ansible_playbook $ cat register-node-exporter.sh 
#!/bin/bash
dnf install -y wget
wget https://github.com/prometheus/node_exporter/releases/download/v1.0.1/node_exporter-1.0.1.linux-386.tar.gz
tar zxf node_exporter-1.0.1.linux-386.tar.gz -C /usr/local/
ln -s /usr/local/node_exporter-1.0.1.linux-386 /usr/local/node_exporter
cat > /etc/systemd/system/node-exporter.service <<EOF
[Unit]
Description=Node Exporter
After=network-online.target

[Service]
Type=simple
ExecStart=/usr/local/node_exporter/node_exporter

[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
systemctl start node-exporter
systemctl status node-exporter


[root@peter-kafka02 ~]# bash register-node-exporter.sh 
● node-exporter.service - Node Exporter
     Loaded: loaded (/etc/systemd/system/node-exporter.service; disabled; preset: disabled)
     Active: active (running) since Sun 2026-04-05 19:00:43 KST; 13ms ago
   Main PID: 1585 (node_exporter)
      Tasks: 6 (limit: 7493)
     Memory: 5.0M (peak: 5.3M)
        CPU: 13ms
     CGroup: /system.slice/node-exporter.service
             └─1585 /usr/local/node_exporter/node_exporter

Apr 05 19:00:43 peter-kafka02.foo.bar node_exporter[1585]: level=info ts=2026-04-05T10:00:43.806Z caller=node_exporter.go:112 collector=thermal_zone

[root@peter-kafka03 ~]# bash register-node-exporter.sh 
● node-exporter.service - Node Exporter
     Loaded: loaded (/etc/systemd/system/node-exporter.service; disabled; preset: disabled)
     Active: active (running) since Sun 2026-04-05 19:00:57 KST; 11ms ago
   Main PID: 1971 (node_exporter)
      Tasks: 5 (limit: 7493)
     Memory: 4.8M (peak: 4.8M)
        CPU: 11ms
     CGroup: /system.slice/node-exporter.service
             └─1971 /usr/local/node_exporter/node_exporter

Apr 05 19:00:57 peter-kafka03.foo.bar node_exporter[1971]: level=info ts=2026-04-05T10:00:57.780Z caller=node_exporter.go:112 collector=thermal_zone
~/gitclone/kafka2/chapter2/ansible_playbook $ cat docker-compose-busybox.yml 
services:
  busybox:
    image: alpine
    container_name: busybox
    networks:
      kafka-net:
    command: sleep infinity

networks:
  kafka-net:
    external: true
    name: ansible_playbook_kafka-net
~/gitclone/kafka2/chapter2/ansible_playbook $ docker compose -f docker-compose-busybox.yml up -d
...
 ✔ Container busybox  Started              0.4s 
/gitclone/kafka2/chapter2/ansible_playbook $ docker exec -it busybox sh
/ # apk add curl
( 1/10) Installing brotli-libs (1.2.0-r0)
( 2/10) Installing c-ares (1.34.6-r0)
( 3/10) Installing libunistring (1.4.1-r0)
( 4/10) Installing libidn2 (2.3.8-r0)
( 5/10) Installing nghttp2-libs (1.68.0-r0)
( 6/10) Installing nghttp3 (1.13.1-r0)
( 7/10) Installing libpsl (0.21.5-r3)
( 8/10) Installing zstd-libs (1.5.7-r2)
( 9/10) Installing libcurl (8.17.0-r1)
(10/10) Installing curl (8.17.0-r1)
Executing busybox-1.37.0-r30.trigger
OK: 13.2 MiB in 26 packages
/ # curl -s http://peter-kafka01:7071 | wc
     7477     18908    719374
/ # curl -s http://peter-kafka02:7071 | wc
     7637     19233    740771
/ # curl -s http://peter-kafka03:7071 | wc
     7937     20540    776333
/ # curl -s http://peter-kafka01:9100 | wc
        6        10       150
/ # curl -s http://peter-kafka02:9100 | wc
        6        10       150
/ # curl -s http://peter-kafka03:9100 | wc
        6        10       150
walk(if type == "object" and has("datasource") then .datasource = "Prometheus" else . end)
walk(if type == "object" and .type? == "timeseries" then .type = "graph" else . end)
Tags: tag