Skip to content

KAFKA-20685: Delete checkpoint file when EOS task completes restoration (4.2)#22548

Open
zoro30102000 wants to merge 1 commit into
apache:4.2from
zoro30102000:KAFKA-20685-4.2
Open

KAFKA-20685: Delete checkpoint file when EOS task completes restoration (4.2)#22548
zoro30102000 wants to merge 1 commit into
apache:4.2from
zoro30102000:KAFKA-20685-4.2

Conversation

@zoro30102000

Copy link
Copy Markdown

Under exactly_once_v2 the default state updater writes an enforced checkpoint when a task finishes restoration (DefaultStateUpdater.maybeCompleteRestoration) and nothing removes that file when the task transitions to RUNNING. StreamTask.completeRestoration only skips writing its own checkpoint under EOS, which suggests the intent was that no checkpoint should exist past that point. The leftover file survives the whole processing session, so an unclean crash during RUNNING finds it on restart, initializes the stores from it and skips the TaskCorruptedException wipe. RocksDB runs with the WAL disabled and Streams does not flush on commit under EOS, so background memtable flushes can persist writes from a transaction that is later aborted, and the read_committed tail replay cannot remove them.

This change deletes the checkpoint file when an EOS task completes restoration, before the transition to RUNNING, restoring the invariant that no checkpoint exists during RUNNING under EOS. It mirrors what KAFKA-10362 did on the resume path with the same helper. The checkpoints written during restoration itself are untouched, so a crash during restoration still resumes from them, which is safe because restored bytes are committed changelog data.

Trunk and 4.3 are not affected: stores that manage their own offsets detect an unclean shutdown through the KIP-1035 status marker, and KAFKA-19712 moved the legacy checkpoint file handling into LegacyCheckpointingStateStore, which writes the file only under at_least_once and deletes it at init under EOS. This PR therefore targets the 4.2 branch.

Testing: three unit tests added to StreamTaskTest next to the existing restoration checkpoint tests. The EOS test fails without the fix, the at_least_once test verifies the delete does not run there, and a third test verifies a failed delete does not prevent the transition to RUNNING. Full StreamTaskTest, checkstyle and spotbugs pass on the streams module.

@zoro30102000

Copy link
Copy Markdown
Author

One point for reviewers: the IOException handling on the delete follows the resume() precedent and logs the error without failing the task, so a failed delete leaves behavior exactly as today for that session. The stricter alternative would be to fail the task, or to throw TaskCorruptedException so the wipe happens immediately through the existing machinery. I kept the precedent shape, happy to change it if you prefer fail closed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant