Which Confluent Schema ID?
kafkacat -u -b localhost:9092 -t azure -J| python3 -c "$(echo 'aW1wb3J0IHN5cwppbXBvcnQganNvbgpmb3IgbSBpbiBzeXMuc3RkaW46CiAgICBqID0ganNvbi5sb2FkcyhtKQogICAgcmkgPSBqWydwYXlsb2FkJ11bMTo1XS5lbmNvZGUoJ3V0Zi04JykKICAgIHNpID0gaW50LmZyb21fYnl0ZXMocmksICJiaWciKQogICAgcHJpbnQoZiJwYXJ0aXRpb246IHtqWydwYXJ0aXRpb24nXX0gb2Zmc2V0OiB7alsnb2Zmc2V0J119IHNjaGVtYSBpZDoge3NpfSIpCgo=' | base64 -d)"
Why?
Sometimes you just want to see the message partition, offset and schema IDs as messages are produced so that you can can concisely present information. Other approaches are the Confluent Control Center and the kafka*console-consumer*
commands but the output of these can be hard to format/screenshot.
The command above can be copy-pasted into a terminal and has no dependencies other then kafkacat and python3.
How?
- Use kcat (kafkacat) to read raw bytes off the topic
- Disable buffering (for low traffic topics) -
-u
- Output as JSON -
-J
- Pipe to base64 encoded inline python script
Whats in the script?
import sys
import json
for m in sys.stdin:
j = json.loads(m)
ri = j['payload'][1:5].encode('utf-8')
si = int.from_bytes(ri, "big")
print(f"partition: {j['partition']} offset: {j['offset']} schema id: {si}")
Sample output
$ kafkacat -u -b localhost:9092 -t azure -J| python3 -c "$(echo 'aW1wb3J0IHN5cwppbXBvcnQganNvbgpmb3IgbSBpbiBzeXMuc3RkaW46CiAgICBqID0ganNvbi5sb2FkcyhtKQogICAgcmkgPSBqWydwYXlsb2FkJ11bMTo1XS5lbmNvZGUoJ3V0Zi04JykKICAgIHNpID0gaW50LmZyb21fYnl0ZXMocmksICJiaWciKQogICAgcHJpbnQoZiJwYXJ0aXRpb246IHtqWydwYXJ0aXRpb24nXX0gb2Zmc2V0OiB7alsnb2Zmc2V0J119IHNjaGVtYSBpZDoge3NpfSIpCgo=' | base64 -d)"
% Auto-selecting Consumer mode (use -P or -C to override)
partition: 0 offset: 0 schema id: 3
partition: 0 offset: 1 schema id: 3
% Reached end of topic azure [0] at offset 2
Notes
j['payload'][1:5]
extracts bytes 1-5 from the message which contain the schema ID. Byte 0 is a magic byte which is always\u0000
and must be ignored- How to generate the base64 in one line?
base64 -w0
- Cant use kafkacat?
- Try messing with the console consumers + awk
- Write a consumer app
- Use kafkatool to inspect bytes 1-5 of the message manually