Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.data;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.DataException;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -164,7 +165,9 @@ public String getString(String fieldName) {
public byte[] getBytes(String fieldName) {
Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
if (bytes instanceof ByteBuffer)
return ((ByteBuffer) bytes).array();
// ByteBuffer.array() ignores position/limit/arrayOffset and throws
// for direct buffers; return the logical remaining bytes instead.
return Utils.toArray((ByteBuffer) bytes);
return (byte[]) bytes;
}

Expand Down Expand Up @@ -240,12 +243,34 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Struct struct = (Struct) o;
return Objects.equals(schema, struct.schema) &&
Arrays.deepEquals(values, struct.values);
Arrays.deepEquals(normalizedBytesValues(), struct.normalizedBytesValues());
}

@Override
public int hashCode() {
return Objects.hash(schema, Arrays.deepHashCode(values));
return Objects.hash(schema, Arrays.deepHashCode(normalizedBytesValues()));
}

/**
* Returns a copy of {@link #values} with top-level BYTES fields stored as
* {@code byte[]}, so that an equivalent value supplied as {@code byte[]} or
* {@code ByteBuffer} compares equal and hashes the same. The original
* {@link #values} array is left untouched; callers that observe a field via
* {@link #get(String)} still see whatever representation was put in.
*/
private Object[] normalizedBytesValues() {
Object[] normalized = null;
List<Field> fields = schema.fields();
for (int i = 0; i < values.length; i++) {
Object value = values[i];
if (!(value instanceof ByteBuffer)) continue;
if (i >= fields.size() || fields.get(i).schema().type() != Schema.Type.BYTES) continue;
if (normalized == null) {
normalized = values.clone();
}
normalized[i] = Utils.toArray((ByteBuffer) value);
}
return normalized == null ? values : normalized;
}

private Field lookupField(String fieldName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -89,6 +90,75 @@ public void testFlatStruct() {
struct.validate();
}

@Test
public void testGetBytesPreservesByteBufferRemainingBytes() {
ByteBuffer bytes = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
bytes.position(1);
bytes.limit(3);

Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", bytes);

assertArrayEquals(new byte[] {2, 3}, struct.getBytes("bytes"));
}

@Test
public void testGetBytesSupportsDirectByteBuffer() {
ByteBuffer bytes = ByteBuffer.allocateDirect(2);
bytes.put((byte) 1);
bytes.put((byte) 2);
bytes.flip();

Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", bytes);

assertArrayEquals(new byte[] {1, 2}, struct.getBytes("bytes"));
}

@Test
public void testEqualsAndHashCodeWithEquivalentByteArrayAndByteBufferValues() {
Struct byteArrayStruct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", "foobar".getBytes());

Struct byteBufferStruct = new Struct(FLAT_STRUCT_SCHEMA)
.put("int8", (byte) 12)
.put("int16", (short) 12)
.put("int32", 12)
.put("int64", (long) 12)
.put("float32", 12.f)
.put("float64", 12.)
.put("boolean", true)
.put("string", "foobar")
.put("bytes", ByteBuffer.wrap("foobar".getBytes()));

assertEquals(byteArrayStruct, byteBufferStruct);
assertEquals(byteArrayStruct.hashCode(), byteBufferStruct.hashCode());
}

@Test
public void testComplexStruct() {
List<Byte> array = List.of((byte) 1, (byte) 2);
Expand Down