KAFKA-20412: Fix prefixScan for KV-Store with headers#21971
KAFKA-20412: Fix prefixScan for KV-Store with headers#21971UladzislauBlok wants to merge 2 commits into
Conversation
| validateStoreOpen(); | ||
| final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().prefixScan(prefix, prefixKeySerializer); | ||
| final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); | ||
| final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, internalContext.headers(), prefix)); |
There was a problem hiding this comment.
I was thinking about this a little bit, and this change sounds generally ok to me.
But I would assume, that there is also other code (ie, other classes implementing prefixScan method that might need some updates, too?
Btw: we also just added CachingKeyValueStoreWithHeaders for which we might want to do this differently, ie, similar to what we did for put(key, null) -- ie, the change would not go into CachingKeyValueStoreWithHeaders but rather MeteredTimestampKeyValueStoreWithHeaders to modify the RecordContext set on the internalContext object. 🤔
|
We filed a new ticket to track the |
05cd69b to
f2eeb22
Compare
f543e18 to
6d0e750
Compare
|
@aliehsaeedii Hello |
Approach is similar to
MeteredTimestampedKeyValueStoreWithHeaders#delete(key)