diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index 6e366573cfef1..3241e72a132dc 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -45,6 +45,14 @@ public String toString() { topicPartition, upstreamOffset, downstreamOffset); } + static long downstreamOffsetAfterSync(long downstreamOffset) { + return downstreamOffset + 1; + } + + long translateDownstream(long upstreamOffset) { + return upstreamOffset == this.upstreamOffset ? downstreamOffset : downstreamOffsetAfterSync(downstreamOffset); + } + ByteBuffer serializeValue() { Struct struct = valueStruct(); ByteBuffer buffer = ByteBuffer.allocate(VALUE_SCHEMA.sizeOf(struct)); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 635ab7327735e..0829a9adbac16 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -150,13 +150,13 @@ public OptionalLong translateDownstream(String group, TopicPartition sourceTopic // | / // vv // target |-sg----r-----| - long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1; + long translatedDownstreamOffset = offsetSync.get().translateDownstream(upstreamOffset); log.debug("translateDownstream({},{},{}): Translated {} (relative to {})", group, sourceTopicPartition, upstreamOffset, - offsetSync.get().downstreamOffset() + upstreamStep, + translatedDownstreamOffset, offsetSync.get() ); - return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep); + return OptionalLong.of(translatedDownstreamOffset); } else { log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)", group, sourceTopicPartition, upstreamOffset); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java index 1a5ef6cc4583a..c696333ec8212 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java @@ -169,9 +169,8 @@ static class PartitionState { boolean update(long upstreamOffset, long downstreamOffset) { // Emit an offset sync if any of the following conditions are true boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L; - // the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1. - // TODO: share common implementation to enforce this relationship - boolean translatedOffsetTooStale = downstreamOffset - (lastSyncDownstreamOffset + 1) >= maxOffsetLag; + // OffsetSync translates offsets after the last sync to one downstream offset past the sync. + boolean translatedOffsetTooStale = downstreamOffset - OffsetSync.downstreamOffsetAfterSync(lastSyncDownstreamOffset) >= maxOffsetLag; boolean skippedUpstreamRecord = upstreamOffset - previousUpstreamOffset != 1L; boolean truncatedDownstreamTopic = downstreamOffset < previousDownstreamOffset; if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {