Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public String toString() {
topicPartition, upstreamOffset, downstreamOffset);
}

static long downstreamOffsetAfterSync(long downstreamOffset) {
return downstreamOffset + 1;
}

long translateDownstream(long upstreamOffset) {
return upstreamOffset == this.upstreamOffset ? downstreamOffset : downstreamOffsetAfterSync(downstreamOffset);
}

ByteBuffer serializeValue() {
Struct struct = valueStruct();
ByteBuffer buffer = ByteBuffer.allocate(VALUE_SCHEMA.sizeOf(struct));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ public OptionalLong translateDownstream(String group, TopicPartition sourceTopic
// | /
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
long translatedDownstreamOffset = offsetSync.get().translateDownstream(upstreamOffset);
log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
group, sourceTopicPartition, upstreamOffset,
offsetSync.get().downstreamOffset() + upstreamStep,
translatedDownstreamOffset,
offsetSync.get()
);
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
return OptionalLong.of(translatedDownstreamOffset);
} else {
log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
group, sourceTopicPartition, upstreamOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ static class PartitionState {
boolean update(long upstreamOffset, long downstreamOffset) {
// Emit an offset sync if any of the following conditions are true
boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L;
// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1.
// TODO: share common implementation to enforce this relationship
boolean translatedOffsetTooStale = downstreamOffset - (lastSyncDownstreamOffset + 1) >= maxOffsetLag;
// OffsetSync translates offsets after the last sync to one downstream offset past the sync.
boolean translatedOffsetTooStale = downstreamOffset - OffsetSync.downstreamOffsetAfterSync(lastSyncDownstreamOffset) >= maxOffsetLag;
boolean skippedUpstreamRecord = upstreamOffset - previousUpstreamOffset != 1L;
boolean truncatedDownstreamTopic = downstreamOffset < previousDownstreamOffset;
if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {
Expand Down
Loading