diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 6e5b81ab11537..b6faf85824ac2 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.data; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.DataException; import java.nio.ByteBuffer; @@ -164,7 +165,9 @@ public String getString(String fieldName) { public byte[] getBytes(String fieldName) { Object bytes = getCheckType(fieldName, Schema.Type.BYTES); if (bytes instanceof ByteBuffer) - return ((ByteBuffer) bytes).array(); + // ByteBuffer.array() ignores position/limit/arrayOffset and throws + // for direct buffers; return the logical remaining bytes instead. + return Utils.toArray((ByteBuffer) bytes); return (byte[]) bytes; } @@ -240,12 +243,34 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Struct struct = (Struct) o; return Objects.equals(schema, struct.schema) && - Arrays.deepEquals(values, struct.values); + Arrays.deepEquals(normalizedBytesValues(), struct.normalizedBytesValues()); } @Override public int hashCode() { - return Objects.hash(schema, Arrays.deepHashCode(values)); + return Objects.hash(schema, Arrays.deepHashCode(normalizedBytesValues())); + } + + /** + * Returns a copy of {@link #values} with top-level BYTES fields stored as + * {@code byte[]}, so that an equivalent value supplied as {@code byte[]} or + * {@code ByteBuffer} compares equal and hashes the same. The original + * {@link #values} array is left untouched; callers that observe a field via + * {@link #get(String)} still see whatever representation was put in. + */ + private Object[] normalizedBytesValues() { + Object[] normalized = null; + List fields = schema.fields(); + for (int i = 0; i < values.length; i++) { + Object value = values[i]; + if (!(value instanceof ByteBuffer)) continue; + if (i >= fields.size() || fields.get(i).schema().type() != Schema.Type.BYTES) continue; + if (normalized == null) { + normalized = values.clone(); + } + normalized[i] = Utils.toArray((ByteBuffer) value); + } + return normalized == null ? values : normalized; } private Field lookupField(String fieldName) { diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index bfdec2fcb9b65..04f67f1d4c86e 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -89,6 +90,75 @@ public void testFlatStruct() { struct.validate(); } + @Test + public void testGetBytesPreservesByteBufferRemainingBytes() { + ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4}); + bytes.position(1); + bytes.limit(3); + + Struct struct = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", bytes); + + assertArrayEquals(new byte[] {2, 3}, struct.getBytes("bytes")); + } + + @Test + public void testGetBytesSupportsDirectByteBuffer() { + ByteBuffer bytes = ByteBuffer.allocateDirect(2); + bytes.put((byte) 1); + bytes.put((byte) 2); + bytes.flip(); + + Struct struct = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", bytes); + + assertArrayEquals(new byte[] {1, 2}, struct.getBytes("bytes")); + } + + @Test + public void testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues() { + Struct byteArrayStruct = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", "foobar".getBytes()); + + Struct byteBufferStruct = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", ByteBuffer.wrap("foobar".getBytes())); + + assertEquals(byteArrayStruct, byteBufferStruct); + assertEquals(byteArrayStruct.hashCode(), byteBufferStruct.hashCode()); + } + @Test public void testComplexStruct() { List array = List.of((byte) 1, (byte) 2);