[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --partitions 1 --replication-factor 3 --topic peter-test05
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2026-04-05 16:15:36,665] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
(kafka.admin.TopicCommand$)
[root@peter-kafka02 peter-test04-0]# systemctl status kafka-server
○ kafka-server.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka-server.service; disabled; preset: disabled)
Active: inactive (dead)
[root@peter-kafka02 peter-test04-0]# systemctl start kafka-server
[root@peter-kafka02 peter-test04-0]#
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --create --partitions 1 --replication-factor 3 --topic peter-test05
Created topic peter-test05.
[root@peter-kafka02 peter-test04-0]# java -version
openjdk version "1.8.0_482"
OpenJDK Runtime Environment (build 1.8.0_482-b08)
OpenJDK 64-Bit Server VM (build 25.482-b08, mixed mode)
~/gitclone/kafka2/chapter5 $ docker cp ExactlyOnceProducer.jar becc008ababe:/root/
Successfully copied 12.6MB to becc008ababe:/root/
[root@peter-kafka02 ~]# ls -l
total 12304
-rw-r--r-- 1 ec2-user ec2-user 12583074 Mar 30 22:22 ExactlyOnceProducer.jar
-rw------- 1 root root 2506 Nov 20 2023 anaconda-ks.cfg
-rw-r--r-- 1 root root 133 Nov 20 2023 anaconda-post.log
-rw------- 1 root root 2067 Nov 20 2023 original-ks.cfg
[root@peter-kafka02 ~]# java -jar ExactlyOnceProducer.jar
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [peter-kafka01.foo.bar:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-peter-transaction-01
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = peter-transaction-01
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Instantiated a transactional producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775373660140
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Invoking InitProducerId for the first time in order to acquire a producer ID
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Discovered transaction coordinator peter-kafka02.foo.bar:9092 (id: 2 rack: null)
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] ProducerId set to 3000 with epoch 0
Message sent successfully
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-peter-transaction-01 unregistered
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server peter-kafka01.foo.bar:9092 --list
__transaction_state
peter-test04
peter-test05
[root@peter-kafka01 ~]# echo "exclude.internal.topics=false" > consumer.config
[root@peter-kafka01 ~]# cat consumer.config
exclude.internal.topics=false
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server peter-kafka01.foo.bar:9092 --topic __transaction_state --consumer.config /root/consumer.config --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter" --from-beginning
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Empty, pendingState=None, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1775373661519)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661693)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=PrepareCommit, pendingState=None, topicPartitions=Set(peter-test05-0), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661781)
peter-transaction-01::TransactionMetadata(transactionalId=peter-transaction-01, producerId=3000, producerEpoch=0, txnTimeoutMs=60000, state=CompleteCommit, pendingState=None, topicPartitions=Set(), txnStartTimestamp=1775373661693, txnLastUpdateTimestamp=1775373661789)
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-dump-log.sh --print-data-log --files /data/kafka-logs/peter-test05-0/00000000000000000000.log
Dumping /data/kafka-logs/peter-test05-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1775373661673 size: 120 magic: 2 compresscodec: NONE crc: 1337877055 isvalid: true
| offset: 0 CreateTime: 1775373661673 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1775373661811 size: 78 magic: 2 compresscodec: NONE crc: 1319622529 isvalid: true
| offset: 1 CreateTime: 1775373661811 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
[root@peter-kafka01 ~]# python3 --version
Python 3.9.25
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka01:/root/
Successfully copied 2.05kB to peter-kafka01:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka02:/root/
Successfully copied 2.05kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ docker cp pip_install_confluent-kafka.sh peter-kafka03:/root/
Successfully copied 2.05kB to peter-kafka03:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production $ cat pip_install_confluent-kafka.sh
#!/bin/bash
python3 -m venv venv6
source venv6/bin/activate
pip install confluent-kafka
[root@peter-kafka01 ~]# bash pip_install_confluent-kafka.sh
Collecting confluent-kafka
Downloading confluent_kafka-2.14.0-cp39-cp39-manylinux_2_28_x86_64.whl (4.0 MB)
|████████████████████████████████| 4.0 MB 8.8 MB/s
Collecting typing-extensions
Downloading typing_extensions-4.15.0-py3-none-any.whl (44 kB)
|████████████████████████████████| 44 kB 2.6 MB/s
Installing collected packages: typing-extensions, confluent-kafka
Successfully installed confluent-kafka-2.14.0 typing-extensions-4.15.0
WARNING: You are using pip version 21.2.3; however, version 26.0.1 is available.
You should consider upgrading via the '/root/venv6/bin/python3 -m pip install --upgrade pip' command.
[root@peter-kafka01 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka01 ~]#
(venv6) [root@peter-kafka01 ~]# pip list
Package Version
----------------- -------
confluent-kafka 2.14.0
pip 21.2.3
setuptools 53.0.0
typing_extensions 4.15.0
WARNING: You are using pip version 21.2.3; however, version 26.0.1 is available.
You should consider upgrading via the '/root/venv6/bin/python3 -m pip install --upgrade pip' command.
[root@peter-kafka02 ~]# bash pip_install_confluent-kafka.sh
[root@peter-kafka02 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka02 ~]#
[root@peter-kafka03 ~]# bash pip_install_confluent-kafka.sh
[root@peter-kafka03 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka03 ~]#
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka01:/root/
Successfully copied 2.56kB to peter-kafka01:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka02:/root/
Successfully copied 2.56kB to peter-kafka02:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp consumer_standard.py peter-kafka03:/root/
Successfully copied 2.56kB to peter-kafka03:/root/
(venv6) [root@peter-kafka01 ~]# python consumer_standard.py
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}
(venv6) [root@peter-kafka02 ~]# python consumer_standard.py
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}
(venv6) [root@peter-kafka03 ~]# python consumer_standard.py
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp producer.py peter-kafka01:/root/
Successfully copied 3.07kB to peter-kafka01:/root/
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker exec -it peter-kafka01 bash
[root@peter-kafka01 /]#
[root@peter-kafka01 ~]# source venv6/bin/activate
(venv6) [root@peter-kafka01 ~]# python producer.py
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
Message delivered to peter-test06 [0]
(venv6) [root@peter-kafka03 ~]# python consumer_standard.py
Consumer error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: peter-test06: Broker: Unknown topic or partition"}
Topic: peter-test06, Partition: 0, Offset: 0, Received message: Apache Kafka is a distributed streaming platform - 1
Topic: peter-test06, Partition: 0, Offset: 1, Received message: Apache Kafka is a distributed streaming platform - 2
Topic: peter-test06, Partition: 0, Offset: 2, Received message: Apache Kafka is a distributed streaming platform - 3
Topic: peter-test06, Partition: 0, Offset: 3, Received message: Apache Kafka is a distributed streaming platform - 4
Topic: peter-test06, Partition: 0, Offset: 4, Received message: Apache Kafka is a distributed streaming platform - 5
Topic: peter-test06, Partition: 0, Offset: 5, Received message: Apache Kafka is a distributed streaming platform - 6
Topic: peter-test06, Partition: 0, Offset: 6, Received message: Apache Kafka is a distributed streaming platform - 7
Topic: peter-test06, Partition: 0, Offset: 7, Received message: Apache Kafka is a distributed streaming platform - 8
Topic: peter-test06, Partition: 0, Offset: 8, Received message: Apache Kafka is a distributed streaming platform - 9
Topic: peter-test06, Partition: 0, Offset: 9, Received message: Apache Kafka is a distributed streaming platform - 10
[root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
peter-consumer01 peter-test06 0 10 10 0 rdkafka-2f18f8ce-7b12-4cc0-8979-70befababfc6 /172.18.0.6 rdkafka
^CTraceback (most recent call last):
File "/root/consumer_standard.py", line 15, in <module>
msg = c.poll(1.0)
KeyboardInterrupt
(venv6) [root@peter-kafka03 ~]# hostname -i
172.18.0.6
(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
peter-consumer01 peter-test06 0 10 10 0 rdkafka-2f18f8ce-7b12-4cc0-8979-70befababfc6 /172.18.0.6 rdkafka
(venv6) [root@peter-kafka01 ~]# hostname -i
172.18.0.2
(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
peter-consumer01 peter-test06 0 10 10 0 rdkafka-3b13021c-5930-4775-a45e-95bb6ce788f7 /172.18.0.3 rdkafka
(venv6) [root@peter-kafka01 ~]# /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server peter-kafka01.foo.bar:9092 --group peter-consumer01 --describe
~/gitclone/playground/reading/58_Practical_Kafka_From_Development_to_Production/kafka2/chapter6 $ docker cp ExactlyOnceConsumer.jar peter-kafka01:/root/
Successfully copied 12.6MB to peter-kafka01:/root/
[root@peter-kafka01 ~]# ls -l
total 12324
-rw-r--r-- 1 ec2-user ec2-user 12581452 Mar 30 22:22 ExactlyOnceConsumer.jar
-rw------- 1 root root 2506 Nov 20 2023 anaconda-ks.cfg
-rw-r--r-- 1 root root 133 Nov 20 2023 anaconda-post.log
-rw-r--r-- 1 root root 30 Apr 5 16:23 consumer.config
-rw-r--r-- 1 ec2-user ec2-user 786 Mar 30 22:22 consumer_standard.py
-rw------- 1 root root 2067 Nov 20 2023 original-ks.cfg
-rw-r--r-- 1 ec2-user ec2-user 88 Apr 5 16:32 pip_install_confluent-kafka.sh
-rw-r--r-- 1 root root 83 Apr 5 16:01 producer.config
-rw-r--r-- 1 ec2-user ec2-user 1278 Mar 30 22:22 producer.py
drwxr-xr-x 5 root root 4096 Apr 5 16:33 venv6
[root@peter-kafka01 ~]# java -jar ExactlyOnceConsumer.jar
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [peter-kafka01.foo.bar:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-peter-consumer-01-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = peter-consumer-01
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775375550930
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Subscribed to topic(s): peter-test05
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Discovered group coordinator peter-kafka01.foo.bar:9092 (id: 2147483646 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Successfully joined group with generation Generation{generationId=1, memberId='consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Finished assignment for group at generation 1: {consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432=Assignment(partitions=[peter-test05-0])}
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Successfully synced group in generation Generation{generationId=1, memberId='consumer-peter-consumer-01-1-c6535e02-ae12-4324-9c74-6a2d69d02432', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Notifying assignor about the new Assignment(partitions=[peter-test05-0])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Adding newly assigned partitions: peter-test05-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Found no committed offset for partition peter-test05-0
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-peter-consumer-01-1, groupId=peter-consumer-01] Resetting offset for partition peter-test05-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[peter-kafka02.foo.bar:9092 (id: 2 rack: null)], epoch=0}}.
Topic: peter-test05, Partition: 0, Offset: 0, Key: null, Value: Apache Kafka is a distributed streaming platform - 0
[root@peter-kafka02 ~]# java -jar ExactlyOnceProducer.jar
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [peter-kafka01.foo.bar:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-peter-transaction-01
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = peter-transaction-01
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Instantiated a transactional producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1775375727516
[main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Invoking InitProducerId for the first time in order to acquire a producer I
D
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Cluster ID: aFOppqLIReSiX0uZr_tWJA
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Discovered transac
tion coordinator peter-kafka02.foo.bar:9092 (id: 2 rack: null)
[kafka-producer-network-thread | producer-peter-transaction-01] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] ProducerId set to
3000 with epoch 1
Message sent successfully
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-peter-transaction-01, transactionalId=peter-transaction-01] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-peter-transaction-01 unregistered
[root@peter-kafka01 ~]# java -jar ExactlyOnceConsumer.jar
...
Topic: peter-test05, Partition: 0, Offset: 2, Key: null, Value: Apache Kafka is a distributed streaming platform - 0
[root@peter-kafka02 ~]# /usr/local/kafka/bin/kafka-dump-log.sh --print-data-log --files /data/kafka-logs/peter-test05-0/00000000000000000000.log
Dumping /data/kafka-logs/peter-test05-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1775373661673 size: 120 magic: 2 compresscodec: NONE crc: 1337877055 isvalid: true
| offset: 0 CreateTime: 1775373661673 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 120 CreateTime: 1775373661811 size: 78 magic: 2 compresscodec: NONE crc: 1319622529 isvalid: true
| offset: 1 CreateTime: 1775373661811 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 0 lastSequence: 0 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 198 CreateTime: 1775375727897 size: 120 magic: 2 compresscodec: NONE crc: 1898446716 isvalid: true
| offset: 2 CreateTime: 1775375727897 keysize: -1 valuesize: 52 sequence: 0 headerKeys: [] payload: Apache Kafka is a distributed streaming platform - 0
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3000 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 318 CreateTime: 1775375727942 size: 78 magic: 2 compresscodec: NONE crc: 4201990516 isvalid: true
| offset: 3 CreateTime: 1775375727942 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0