KAFKA-20656: Struct honors ByteBuffer remaining bytes#22536
Open
BK202503 wants to merge 1 commit into
Open
Conversation
Kafka Connect BYTES values may be either `byte[]` or `ByteBuffer`, and `ConnectSchema` recommends `ByteBuffer` because plain arrays do not implement content-based `equals()`/`hashCode()`. `Struct` did not honor that contract: - `getBytes(...)` called `ByteBuffer.array()`, returning the whole backing array instead of the logical remaining bytes and throwing `UnsupportedOperationException` for direct buffers. - `equals` and `hashCode` compared raw stored objects, so two structs holding the same logical BYTES value supplied as `byte[]` and `ByteBuffer` did not compare equal. `getBytes` now goes through `Utils.toArray(ByteBuffer)`, which copies the buffer's remaining bytes and supports direct buffers (the same approach already used by `Values.convertToBytes` in this module). `equals`/`hashCode` now normalize top-level BYTES fields stored as `ByteBuffer` into `byte[]` (without mutating the underlying `values` array) before delegating to `Arrays.deepEquals`/`deepHashCode`, so the representation a caller used to `put(...)` no longer leaks into struct identity. Added three regression tests in `StructTest`: - `testGetBytesPreservesByteBufferRemainingBytes`: sliced `ByteBuffer` returns only the logical bytes. - `testGetBytesSupportsDirectByteBuffer`: a direct buffer is serialized instead of throwing. - `testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues`: a `byte[]`-valued struct and a `ByteBuffer`-valued struct with the same content are `.equals` and share a `hashCode`. The existing `testFlatStruct` and `testEqualsAndHashCodeWithByteArrayValue` still pass on JDK 17. Signed-off-by: BK202503 <199436087+BK202503@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
JIRA: KAFKA-20656
What
Kafka Connect BYTES values may be either
byte[]orByteBuffer, andConnectSchemarecommendsByteBufferbecause plain arrays do not implement content-basedequals()/hashCode().Structdid not honor that contract:getBytes(...)calledByteBuffer.array(), returning the entire backing array instead of the logical remaining bytes, and throwingUnsupportedOperationExceptionfor direct buffers.equalsandhashCodedelegated straight toArrays.deepEquals/deepHashCodeover the rawvaluesarray, so two structs holding the same logical BYTES value supplied asbyte[]vs.ByteBufferwere not equal.Changes
Struct.getBytes(String)now goes throughUtils.toArray(ByteBuffer). That copies the buffer's remaining bytes and works for direct buffers — the same approachValues.convertToBytesalready uses elsewhere in this module.Struct.equals(Object)andStruct.hashCode()now compare/hash anormalizedBytesValues()view of the struct that copies top-level BYTES fields stored asByteBufferintobyte[]. The underlyingvaluesarray is not mutated, soget(String)callers still see whatever representation was put in. Non-BYTES fields go throughArrays.deepEquals/deepHashCodeunchanged.The normalization is scoped to top-level BYTES fields, matching the reporter's reproducer. Nested BYTES inside
ARRAY/MAP/STRUCTfields keep the previous behavior in this PR.Tests
Added three regression tests in
StructTestthat fail against the previous implementation and pass with this change:testGetBytesPreservesByteBufferRemainingBytes: a slicedByteBufferreturns only the logical bytes.testGetBytesSupportsDirectByteBuffer: a direct buffer serializes instead of throwing.testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues: abyte[]-valued struct and aByteBuffer-valued struct with the same logical content are.equalsand share ahashCode.Validation
All
StructTesttests pass on JDK 17, including the new three, the existingtestFlatStruct, and the existingtestEqualsAndHashCodeWithByteArrayValue(which exercises the unchangedbyte[]-only equality path).Scope
This PR completes the four-ticket
ByteBuffer.array()cluster for Kafka Connect BYTES values; the other three are split into independent PRs to keep each reviewable:JsonConverter) — KAFKA-20657: JsonConverter BYTES uses ByteBuffer remaining bytes #22533CastSMT) — KAFKA-20658: Cast SMT honors ByteBuffer remaining bytes #22534Committer Checklist