Confluent S3 Sink Connector EOS
Confluents S3 Sink Connector is an easy way to use Kafka Connect to dump data in AWS S3 object store.
A useful feature of the connector is its ability to support Exactly Once Semantics (EOS)
S3 and EOS
EOS is notoriously hard to get right with S3 because S3 objects are themselves immutable - its impossible to append them as you would a regular file. An object either exists in its entirety or it doesnt exist at all.
Key difficulties around EOS in S3 are:
- Late messages - Messages are late if they show up some time after they were produced (an hour, a day, etc). There are many reasons for this such as intermittent connectivity or upstream outages. If an S3 object has been written we cannot just add messages to it as the non-late data needed is gone from the connector buffer. If we were to cache the data in the connector memory usage could be huge. If we try to read data back from the topic the other messages that should be in the file may have already been expired from the topic. Reconstructing the file based on whats already on S3 would be slow and presumably some other system has already consumed the data so not only would the write operation not be idempotent, it would likely be useless too.
- Out of order messages - Similar to late messages except that messages arent late as such, their just jumbled up. This is less of a problem with a sink connector since the messages are already ordered within the source topic. The connector does not attempt to sort them.
- Duplicate messages - to avoid duplicating messages, we must only read messages after the last committed offset (via consumer groups)
For the gory details consult the docs: https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups
You cant have your cake and eat it
Unfortunately, the reality of physics is you have a choice to make when you use this connector. Message delivery can be EITHER:
- Every X seconds
- Exactly once
You CANNOT have both! Even though both cakes are delicious and everyone likes cakes.
Worked example
Why can you only have one? Here’s the reality of each choice. Times below are expressed as timestamps in milliseconds since the Unix epoch (January 1, 1970).
Every X seconds
# x = 3 seconds
rotate.schedule.interval.ms=3000
Heres an example of the connector working on a quiet topic. There is a gap in incoming data and rotate.schedule.interval.ms
forces a flush:
Wall clock time | Message time | Offset | Action | Description |
---|---|---|---|---|
1692368054000 | 1692368054000 | 100 | buffer | consume topic |
1692368054200 | 1692368054200 | 101 | buffer | consume topic |
1692368054600 | 1692368054600 | 102 | buffer | consume topic |
1692368057000 | n/a | n/a | flush | wall clock caused flush |
1692368057200 | 1692368057200 | 103 | buffer | consume topic |
1692368057400 | 1692368057400 | 104 | buffer | consume topic |
But what if we have a very busy topic? We might instead have something more like this:
Wall clock time | Message time | Offset | Action | Description |
---|---|---|---|---|
1692368054000 | 1692368054000 | 100 | buffer | consume topic |
… | … | … | buffer | consume topic |
1692368054600 | 1692368054600 | 9000 | buffer | consume topic |
1692368057000 | n/a | n/a | flush | wall clock caused flush to start |
1692368057001 | 1692368057001 | 9001 | buffer | consume topic |
1692368057002 | 1692368057002 | 9002 | buffer | consume topic |
1692368057003 | 1692368057003 | 9003 | buffer | consume topic |
1692368057010 | n/a | n/a | n/a | S3 upload complete. Commit offset |
1692368057211 | 1692368057200 | 9004 | buffer | consume topic |
1692368057412 | 1692368057400 | 9005 | buffer | consume topic |
In this example, messages 9001-9003 will be duplicated over two consecutive files. This is because the messages arrive while the file is being written, so they are both inserted into the buffer of the next file to be written and the current file being written.
If we receive data while S3 is being uploaded there will be duplicates because offset cant be committed until the upload is complete. There are several other conditions that can cause the same behaviour such as restarting the connector.
Exactly once
rotate.interval.ms=3000
Wall clock time | Message time | Offset | Action | Description |
---|---|---|---|---|
1692368054000 | 1692368054000 | 100 | buffer | consume topic |
1692368054200 | 1692368054200 | 101 | buffer | consume topic |
1692368054600 | 1692368054600 | 102 | buffer | consume topic |
1692368057200 | 1692368057200 | 103 | flush | rotate.interval.ms exceeded |
1692368057400 | 1692368057400 | 104 | buffer | consume topic |
One problem with approach is that if no further data arrives, the buffer will NEVER be flushed (offest 104 is “stuck”). For most customers this isnt an issue as the flow of data never stops. If EOS is required “heartbeat” messages produced into the source topic are an option but these present their own problems as they would need to be removed or ignored by whatever system reads from S3 and complexity increases as the number of watched topics and partitions grows making this an ugly workaround.
Why cant we rotate the file while using EOS like we do on a schedule by having rotate.interval.ms
trigger on a timer instead of data?
Wall clock time | Message time | Offset | Action | Description |
---|---|---|---|---|
1692368054000 | 1692368054000 | 100 | buffer | consume topic |
1692368054200 | 1692368054200 | 101 | buffer | consume topic |
1692368054600 | 1692368054600 | 102 | buffer | consume topic |
1692368057000 | n/a | n/a | flush | rotate.interval.ms triggered by timer |
1692368057200 | 1692368057200 | 103 | ? | this data should have been in the previous file |
Files uploaded to S3 with EOS have deterministic filenames. A given filename MUST contain a deterministic set of data. To put it another way, a given message is destined to reach a particular filename no matter what.
If we rotate a file according to schedule and new data subsequently arrives that should have been in the previous file we now have an error condition which is impossible to reconcile without updating the original file. Not only would this be very difficult to do due to availability of required data, doing so would almost certainly break downstream systems.
TLDR/Why cant I have both?
Since the flush when using EOS is dependent on receiving new data, we cannot guarantee that new data will arrive in time to trigger the flush logic so its impossible to say with certainty that data will be written to S3 according to schedule.
If data must be produced to S3 according to some schedule then rotate.schedule.interval.ms
is the way to do this but you will get duplicates under certain conditions and therefore lose the EOS guarantee.
Get out of jail card
One trick that will work when you need EOS and rapid S3 updates is setting flush.size=0
to force a file on S3 to be written for every message. This might work if your needs are modest but if your doing say shopping transactions for a chain of supermarkets you would see drastically reduced connector throughput/lag and have a lot of files be created on S3, eg one per purchase per customer per store.
Wouldnt double buffering fix this?
At first glance, I appear to have described a bug that would be fixed by double buffering and in most cases it would. After all, this is a solution your graphics drivers have used for decades to get smooth updates. Most !=
all however…
The buffer swap isnt itself a problem since Java atomics can do this for us. Instead the problem lies with the time based partitioning strategy. Since Kafka has incredible throughput and users love to pump insane amounts of data into kafka, its possible that messages could exceed the granularity of time around rolling time which would cause the duplicate message problem above despite our mitigation.
For this reason, theres no double buffering in the connector.
Alternatives to S3
While S3 is an obvious goto technology, its not the only choice. Presumably if files are being written to S3, something else needs to then read those files and process them. Depending on what this something does, there are likely better options. Some examples:
- Hookup the something direct to Kafka and enjoy near realtime event streaming, using the Kafka client built in EOS
- Use connect to write to a database such as AWS Aurora. In this case you have the databases ACID guarantee
- Use some other connector
Understanding overall goal of the something system will let you pick the right technology choice and avoid shifting problems from one area to another. For example using a database would be one way to eliminate duplicates but if your selecting the last 3 seconds worth of data from the database then the problems above have simply been shifted into the application code.
Good luck!