From 42673dcf1266810408cb4f60a5e7e2bfc34025c6 Mon Sep 17 00:00:00 2001 From: kun du Date: Wed, 12 Feb 2020 10:49:01 -0800 Subject: [PATCH] Implementation of recycling memory pool. (#72) --- .../common/memory/RecyclingMemoryPool.java | 114 ++++++++++++++++++ .../memory/RecyclingMemoryPoolTest.java | 110 +++++++++++++++++ .../scala/kafka/network/SocketServer.scala | 10 +- .../main/scala/kafka/server/KafkaConfig.scala | 10 ++ .../unit/kafka/server/KafkaConfigTest.scala | 2 + 5 files changed, 241 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/memory/RecyclingMemoryPool.java create mode 100644 clients/src/test/java/org/apache/kafka/common/memory/RecyclingMemoryPoolTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/memory/RecyclingMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/RecyclingMemoryPool.java new file mode 100644 index 0000000000000..971f0be72972c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/memory/RecyclingMemoryPool.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.memory; + +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.kafka.common.metrics.Sensor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of memory pool which recycles buffers of commonly used size. + * This memory pool is useful if most of the requested buffers' size are within close size range. + * In this case, instead of deallocate and reallocate the buffer, the memory pool will recycle the buffer for future use. + */ +public class RecyclingMemoryPool implements MemoryPool { + protected static final Logger log = LoggerFactory.getLogger(RecyclingMemoryPool.class); + protected final int cacheableBufferSizeUpperThreshold; + protected final int cacheableBufferSizeLowerThreshold; + protected final LinkedBlockingQueue bufferCache; + protected final Sensor requestSensor; + + public RecyclingMemoryPool(int cacheableBufferSize, int bufferCacheCapacity, Sensor requestSensor) { + if (bufferCacheCapacity <= 0 || cacheableBufferSize <= 0) { + throw new IllegalArgumentException(String.format("Must provide a positive cacheable buffer size and buffer cache " + + "capacity, provided %d and %d respectively.", cacheableBufferSize, bufferCacheCapacity)); + } + this.bufferCache = new LinkedBlockingQueue<>(bufferCacheCapacity); + for (int i = 0; i < bufferCacheCapacity; i++) { + bufferCache.offer(ByteBuffer.allocate(cacheableBufferSize)); + } + this.cacheableBufferSizeUpperThreshold = cacheableBufferSize; + this.cacheableBufferSizeLowerThreshold = cacheableBufferSize / 2; + this.requestSensor = requestSensor; + } + + @Override + public ByteBuffer tryAllocate(int sizeBytes) { + if (sizeBytes < 1) { + throw new IllegalArgumentException("requested size " + sizeBytes + "<=0"); + } + + ByteBuffer allocated = null; + if (sizeBytes > cacheableBufferSizeLowerThreshold && sizeBytes <= cacheableBufferSizeUpperThreshold) { + allocated = bufferCache.poll(); + } + if (allocated != null) { + allocated.limit(sizeBytes); + } else { + allocated = ByteBuffer.allocate(sizeBytes); + } + bufferToBeAllocated(allocated); + return allocated; + } + + @Override + public void release(ByteBuffer previouslyAllocated) { + if (previouslyAllocated == null) { + throw new IllegalArgumentException("provided null buffer"); + } + if (previouslyAllocated.capacity() == cacheableBufferSizeUpperThreshold) { + previouslyAllocated.clear(); + bufferCache.offer(previouslyAllocated); + } else { + bufferToBeReleased(previouslyAllocated); + } + } + + //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code. + protected void bufferToBeAllocated(ByteBuffer justAllocated) { + try { + this.requestSensor.record(justAllocated.limit()); + } catch (Exception e) { + log.debug("failed to record size of allocated buffer"); + } + log.trace("allocated buffer of size {}", justAllocated.capacity()); + } + + //allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed. + protected void bufferToBeReleased(ByteBuffer justReleased) { + log.trace("released buffer of size {}", justReleased.capacity()); + } + + @Override + public long size() { + return Long.MAX_VALUE; + } + + @Override + public long availableMemory() { + return Long.MAX_VALUE; + } + + @Override + public boolean isOutOfMemory() { + return false; + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/memory/RecyclingMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/RecyclingMemoryPoolTest.java new file mode 100644 index 0000000000000..eec9e7e41668c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/memory/RecyclingMemoryPoolTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.memory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.junit.Assert; +import org.junit.Test; + + +public class RecyclingMemoryPoolTest { + private static final int TWO_KILOBYTES = 2048; + private static final int CACHEABLE_BUFFER_SIZE = 1024; + private static final int BUFFER_CACHE_CAPACITY = 2; + private static final Sensor ALLOCATE_SENSOR = new Metrics().sensor("allocate_sensor"); + + @Test(expected = IllegalArgumentException.class) + public void testNegativeAllocation() { + RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR); + memoryPool.tryAllocate(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testZeroAllocation() { + RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR); + memoryPool.tryAllocate(0); + } + + @Test(expected = IllegalArgumentException.class) + public void testNullRelease() { + RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR); + memoryPool.release(null); + } + + @Test + public void testAllocation() { + RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR); + ByteBuffer buffer1 = memoryPool.tryAllocate(TWO_KILOBYTES); + ByteBuffer buffer2 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE); + ByteBuffer buffer3 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE * 2 / 3); + ByteBuffer buffer4 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE); + + memoryPool.release(buffer1); + ByteBuffer reuse1 = memoryPool.tryAllocate(TWO_KILOBYTES); + // Compare the references + Assert.assertNotEquals(System.identityHashCode(reuse1), System.identityHashCode(buffer1)); + + memoryPool.release(buffer2); + memoryPool.release(buffer3); + memoryPool.release(buffer4); + ByteBuffer reuse2 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE); + ByteBuffer reuse3 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE * 2 / 3); + ByteBuffer reuse4 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE); + + Assert.assertEquals(System.identityHashCode(reuse2), System.identityHashCode(buffer2)); + Assert.assertEquals(System.identityHashCode(reuse3), System.identityHashCode(buffer3)); + Assert.assertNotEquals(System.identityHashCode(reuse4), System.identityHashCode(buffer4)); + } + + @Test + public void testMultiThreadAllocation() { + RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR); + AtomicReference error = new AtomicReference<>(); + List processorThreads = new ArrayList<>(3); + for (int i = 0; i < 3; i++) { + processorThreads.add(new Thread(() -> { + try { + ByteBuffer buffer = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE); + Thread.sleep(1000); + memoryPool.release(buffer); + } catch (InterruptedException e) { + error.compareAndSet(null, e); + } + })); + } + processorThreads.forEach(t -> { + t.setDaemon(true); + t.start(); + }); + processorThreads.forEach(t -> { + try { + t.join(30000); + } catch (InterruptedException e) { + error.compareAndSet(null, e); + } + }); + + Assert.assertNull(error.get()); + Assert.assertEquals(memoryPool.bufferCache.size(), BUFFER_CACHE_CAPACITY); + } +} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d172c0fcc6208..b0e79c1dd712c 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -37,7 +37,7 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig} import kafka.utils._ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.{KafkaException, Reconfigurable} -import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} +import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Total import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing @@ -89,13 +89,13 @@ class SocketServer(val config: KafkaConfig, private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup) memoryPoolUsageSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPoolAllocationSensor = metrics.sensor("MemoryPoolAllocation") - private val memoryPoolMaxAllocateSizeMetricName = metrics.metricName("MemoryPoolMaxAllocateSize", "socket-server-metrics") - private val memoryPoolAllocateSizePercentilesMetricName = metrics.metricName("MemoryPoolAllocateSizePercentiles", "socket-server-metrics") - private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), "socket-server-metrics"), i * 10)) + private val memoryPoolMaxAllocateSizeMetricName = metrics.metricName("MemoryPoolMaxAllocateSize", MetricsGroup) memoryPoolAllocationSensor.add(memoryPoolMaxAllocateSizeMetricName, new Max()) + private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), MetricsGroup), i * 10)) // At current stage, we do not know the max decrypted request size, temporarily set it to 10MB. - memoryPoolAllocationSensor.add(memoryPoolAllocateSizePercentilesMetricName, new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*)) + memoryPoolAllocationSensor.add(new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor) + else if (config.socketRequestCommonBytes > 0) new RecyclingMemoryPool(config.socketRequestCommonBytes, config.socketRequestBufferCacheSize, memoryPoolAllocationSensor) else MemoryPool.NONE // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0b40d7ac43476..faa15527d9689 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -81,6 +81,8 @@ object Defaults { val SocketSendBufferBytes: Int = 100 * 1024 val SocketReceiveBufferBytes: Int = 100 * 1024 val SocketRequestMaxBytes: Int = 100 * 1024 * 1024 + val SocketRequestCommonBytes: Int = -1 + val SocketRequestBufferCacheSize: Int = 0 val RequestMaxLocalTimeMs = Long.MaxValue val MaxConnectionsPerIp: Int = Int.MaxValue val MaxConnectionsPerIpOverrides: String = "" @@ -320,6 +322,8 @@ object KafkaConfig { val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val RequestMaxLocalTimeMsProp = "request.max.local.time.ms" val SocketRequestMaxBytesProp = "socket.request.max.bytes" + val SocketRequestCommonBytesProp = "socket.request.common.bytes" + val SocketRequestBufferCacheSizeProp = "socket.request.buffer.cache.size" val MaxConnectionsPerIpProp = "max.connections.per.ip" val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" val MaxConnectionsProp = "max.connections" @@ -617,6 +621,8 @@ object KafkaConfig { "takes longer than this time the broker will kill itself as violating this timeout is a symptom of a more serious broker zombie state." + " It is useful to observe the RequestDequeuePollIntervalMs metric to find a suitable setting for this configuration." val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" + val SocketRequestCommonBytesDoc = "The common size in bytes of a socket request" + val SocketRequestBufferCacheSizeDoc = "The maximal number of cache slot recycling memory pool will keep" val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " + s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached." val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " + @@ -952,6 +958,8 @@ object KafkaConfig { .define(RequestMaxLocalTimeMsProp, LONG, Defaults.RequestMaxLocalTimeMs, atLeast(1), MEDIUM, RequestMaxLocalTimeMsDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) + .define(SocketRequestCommonBytesProp, INT, Defaults.SocketRequestCommonBytes, MEDIUM, SocketRequestCommonBytesDoc) + .define(SocketRequestBufferCacheSizeProp, INT, Defaults.SocketRequestBufferCacheSize, atLeast(0), MEDIUM, SocketRequestBufferCacheSizeDoc) .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc) .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) .define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc) @@ -1259,6 +1267,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp) val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp) val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp) + val socketRequestCommonBytes = getInt(KafkaConfig.SocketRequestCommonBytesProp) + val socketRequestBufferCacheSize = getInt(KafkaConfig.SocketRequestBufferCacheSizeProp) val requestMaxLocalTimeMs = getLong(KafkaConfig.RequestMaxLocalTimeMsProp) val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp) val maxConnectionsPerIpOverrides: Map[String, Int] = diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2cae69a265476..2df4cef3a4c49 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -611,6 +611,8 @@ class KafkaConfigTest { case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.SocketRequestCommonBytesProp => + case KafkaConfig.SocketRequestBufferCacheSizeProp => case KafkaConfig.MaxConnectionsPerIpOverridesProp => assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number") case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")