diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 2d68214371ab1..ed3d024028cae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.state.internals; import java.util.List; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -27,13 +29,12 @@ import org.apache.kafka.streams.state.KeyValueStore; import java.util.Iterator; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class InMemoryKeyValueStore implements KeyValueStore { private final String name; - private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); + private final NavigableMap map = new TreeMap<>(); private volatile boolean open = false; private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it @@ -71,12 +72,12 @@ public boolean isOpen() { } @Override - public byte[] get(final Bytes key) { + public synchronized byte[] get(final Bytes key) { return map.get(key); } @Override - public void put(final Bytes key, final byte[] value) { + public synchronized void put(final Bytes key, final byte[] value) { if (value == null) { size -= map.remove(key) == null ? 0 : 1; } else { @@ -85,7 +86,7 @@ public void put(final Bytes key, final byte[] value) { } @Override - public byte[] putIfAbsent(final Bytes key, final byte[] value) { + public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); @@ -101,14 +102,14 @@ public void putAll(final List> entries) { } @Override - public byte[] delete(final Bytes key) { + public synchronized byte[] delete(final Bytes key) { final byte[] oldValue = map.remove(key); size -= oldValue == null ? 0 : 1; return oldValue; } @Override - public KeyValueIterator range(final Bytes from, final Bytes to) { + public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { if (from.compareTo(to) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " @@ -119,14 +120,14 @@ public KeyValueIterator range(final Bytes from, final Bytes to) { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator())); + new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet())); } @Override - public KeyValueIterator all() { + public synchronized KeyValueIterator all() { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator(map.entrySet().iterator())); + new InMemoryKeyValueIterator(map.keySet())); } @Override @@ -146,11 +147,11 @@ public void close() { open = false; } - private static class InMemoryKeyValueIterator implements KeyValueIterator { - private final Iterator> iter; + private class InMemoryKeyValueIterator implements KeyValueIterator { + private final Iterator iter; - private InMemoryKeyValueIterator(final Iterator> iter) { - this.iter = iter; + private InMemoryKeyValueIterator(final Set keySet) { + this.iter = new TreeSet<>(keySet).iterator(); } @Override @@ -160,8 +161,8 @@ public boolean hasNext() { @Override public KeyValue next() { - final Map.Entry entry = iter.next(); - return new KeyValue<>(entry.getKey(), entry.getValue()); + final Bytes key = iter.next(); + return new KeyValue<>(key, map.get(key)); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index ec53dfa5d46cd..a1d0aab277052 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -17,7 +17,8 @@ package org.apache.kafka.streams.state.internals; import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.TreeMap; +import java.util.TreeSet; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -39,7 +40,7 @@ class NamedCache { private static final Logger log = LoggerFactory.getLogger(NamedCache.class); private final String name; - private final NavigableMap cache = new ConcurrentSkipListMap<>(); + private final NavigableMap cache = new TreeMap<>(); private final Set dirtyKeys = new LinkedHashSet<>(); private ThreadCache.DirtyEntryFlushListener listener; private LRUNode tail; @@ -270,12 +271,16 @@ public boolean isEmpty() { return cache.isEmpty(); } - synchronized Iterator> subMapIterator(final Bytes from, final Bytes to) { - return cache.subMap(from, true, to, true).entrySet().iterator(); + synchronized Iterator keyRange(final Bytes from, final Bytes to) { + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); } - synchronized Iterator> allIterator() { - return cache.entrySet().iterator(); + private Iterator keySetIterator(final Set keySet) { + return new TreeSet<>(keySet).iterator(); + } + + synchronized Iterator allKeys() { + return keySetIterator(cache.navigableKeySet()); } synchronized LRUCacheEntry first() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 116b1ef8b7d1a..7e3231e5f6ef7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.internals.NamedCache.LRUNode; import org.slf4j.Logger; import java.util.Collections; @@ -181,17 +180,17 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) { public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); } - return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to)); + return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); } public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); } - return new MemoryLRUCacheBytesIterator(cache.allIterator()); + return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); } public long size() { @@ -261,11 +260,13 @@ private synchronized NamedCache getOrCreateCache(final String name) { } static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator { - private final Iterator> underlying; + private final Iterator keys; + private final NamedCache cache; private KeyValue nextEntry; - MemoryLRUCacheBytesIterator(final Iterator> underlying) { - this.underlying = underlying; + MemoryLRUCacheBytesIterator(final Iterator keys, final NamedCache cache) { + this.keys = keys; + this.cache = cache; } public Bytes peekNextKey() { @@ -289,7 +290,7 @@ public boolean hasNext() { return true; } - while (underlying.hasNext() && nextEntry == null) { + while (keys.hasNext() && nextEntry == null) { internalNext(); } @@ -307,9 +308,8 @@ public KeyValue next() { } private void internalNext() { - final Map.Entry mapEntry = underlying.next(); - final Bytes cacheKey = mapEntry.getKey(); - final LRUCacheEntry entry = mapEntry.getValue().entry(); + final Bytes cacheKey = keys.next(); + final LRUCacheEntry entry = cache.get(cacheKey); if (entry == null) { return; }