Skip to content

Commit

Permalink
Implementation of recycling memory pool. (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
kun du authored Feb 12, 2020
1 parent 5069ba4 commit 42673dc
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.memory;

import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.common.metrics.Sensor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* An implementation of memory pool which recycles buffers of commonly used size.
* This memory pool is useful if most of the requested buffers' size are within close size range.
* In this case, instead of deallocate and reallocate the buffer, the memory pool will recycle the buffer for future use.
*/
public class RecyclingMemoryPool implements MemoryPool {
protected static final Logger log = LoggerFactory.getLogger(RecyclingMemoryPool.class);
protected final int cacheableBufferSizeUpperThreshold;
protected final int cacheableBufferSizeLowerThreshold;
protected final LinkedBlockingQueue<ByteBuffer> bufferCache;
protected final Sensor requestSensor;

public RecyclingMemoryPool(int cacheableBufferSize, int bufferCacheCapacity, Sensor requestSensor) {
if (bufferCacheCapacity <= 0 || cacheableBufferSize <= 0) {
throw new IllegalArgumentException(String.format("Must provide a positive cacheable buffer size and buffer cache " +
"capacity, provided %d and %d respectively.", cacheableBufferSize, bufferCacheCapacity));
}
this.bufferCache = new LinkedBlockingQueue<>(bufferCacheCapacity);
for (int i = 0; i < bufferCacheCapacity; i++) {
bufferCache.offer(ByteBuffer.allocate(cacheableBufferSize));
}
this.cacheableBufferSizeUpperThreshold = cacheableBufferSize;
this.cacheableBufferSizeLowerThreshold = cacheableBufferSize / 2;
this.requestSensor = requestSensor;
}

@Override
public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes < 1) {
throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
}

ByteBuffer allocated = null;
if (sizeBytes > cacheableBufferSizeLowerThreshold && sizeBytes <= cacheableBufferSizeUpperThreshold) {
allocated = bufferCache.poll();
}
if (allocated != null) {
allocated.limit(sizeBytes);
} else {
allocated = ByteBuffer.allocate(sizeBytes);
}
bufferToBeAllocated(allocated);
return allocated;
}

@Override
public void release(ByteBuffer previouslyAllocated) {
if (previouslyAllocated == null) {
throw new IllegalArgumentException("provided null buffer");
}
if (previouslyAllocated.capacity() == cacheableBufferSizeUpperThreshold) {
previouslyAllocated.clear();
bufferCache.offer(previouslyAllocated);
} else {
bufferToBeReleased(previouslyAllocated);
}
}

//allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
protected void bufferToBeAllocated(ByteBuffer justAllocated) {
try {
this.requestSensor.record(justAllocated.limit());
} catch (Exception e) {
log.debug("failed to record size of allocated buffer");
}
log.trace("allocated buffer of size {}", justAllocated.capacity());
}

//allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed.
protected void bufferToBeReleased(ByteBuffer justReleased) {
log.trace("released buffer of size {}", justReleased.capacity());
}

@Override
public long size() {
return Long.MAX_VALUE;
}

@Override
public long availableMemory() {
return Long.MAX_VALUE;
}

@Override
public boolean isOutOfMemory() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.memory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.junit.Assert;
import org.junit.Test;


public class RecyclingMemoryPoolTest {
private static final int TWO_KILOBYTES = 2048;
private static final int CACHEABLE_BUFFER_SIZE = 1024;
private static final int BUFFER_CACHE_CAPACITY = 2;
private static final Sensor ALLOCATE_SENSOR = new Metrics().sensor("allocate_sensor");

@Test(expected = IllegalArgumentException.class)
public void testNegativeAllocation() {
RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR);
memoryPool.tryAllocate(-1);
}

@Test(expected = IllegalArgumentException.class)
public void testZeroAllocation() {
RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR);
memoryPool.tryAllocate(0);
}

@Test(expected = IllegalArgumentException.class)
public void testNullRelease() {
RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR);
memoryPool.release(null);
}

@Test
public void testAllocation() {
RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR);
ByteBuffer buffer1 = memoryPool.tryAllocate(TWO_KILOBYTES);
ByteBuffer buffer2 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE);
ByteBuffer buffer3 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE * 2 / 3);
ByteBuffer buffer4 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE);

memoryPool.release(buffer1);
ByteBuffer reuse1 = memoryPool.tryAllocate(TWO_KILOBYTES);
// Compare the references
Assert.assertNotEquals(System.identityHashCode(reuse1), System.identityHashCode(buffer1));

memoryPool.release(buffer2);
memoryPool.release(buffer3);
memoryPool.release(buffer4);
ByteBuffer reuse2 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE);
ByteBuffer reuse3 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE * 2 / 3);
ByteBuffer reuse4 = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE);

Assert.assertEquals(System.identityHashCode(reuse2), System.identityHashCode(buffer2));
Assert.assertEquals(System.identityHashCode(reuse3), System.identityHashCode(buffer3));
Assert.assertNotEquals(System.identityHashCode(reuse4), System.identityHashCode(buffer4));
}

@Test
public void testMultiThreadAllocation() {
RecyclingMemoryPool memoryPool = new RecyclingMemoryPool(CACHEABLE_BUFFER_SIZE, BUFFER_CACHE_CAPACITY, ALLOCATE_SENSOR);
AtomicReference<Throwable> error = new AtomicReference<>();
List<Thread> processorThreads = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
processorThreads.add(new Thread(() -> {
try {
ByteBuffer buffer = memoryPool.tryAllocate(CACHEABLE_BUFFER_SIZE);
Thread.sleep(1000);
memoryPool.release(buffer);
} catch (InterruptedException e) {
error.compareAndSet(null, e);
}
}));
}
processorThreads.forEach(t -> {
t.setDaemon(true);
t.start();
});
processorThreads.forEach(t -> {
try {
t.join(30000);
} catch (InterruptedException e) {
error.compareAndSet(null, e);
}
});

Assert.assertNull(error.get());
Assert.assertEquals(memoryPool.bufferCache.size(), BUFFER_CACHE_CAPACITY);
}
}
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Total
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
Expand Down Expand Up @@ -89,13 +89,13 @@ class SocketServer(val config: KafkaConfig,
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolUsageSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPoolAllocationSensor = metrics.sensor("MemoryPoolAllocation")
private val memoryPoolMaxAllocateSizeMetricName = metrics.metricName("MemoryPoolMaxAllocateSize", "socket-server-metrics")
private val memoryPoolAllocateSizePercentilesMetricName = metrics.metricName("MemoryPoolAllocateSizePercentiles", "socket-server-metrics")
private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), "socket-server-metrics"), i * 10))
private val memoryPoolMaxAllocateSizeMetricName = metrics.metricName("MemoryPoolMaxAllocateSize", MetricsGroup)
memoryPoolAllocationSensor.add(memoryPoolMaxAllocateSizeMetricName, new Max())
private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), MetricsGroup), i * 10))
// At current stage, we do not know the max decrypted request size, temporarily set it to 10MB.
memoryPoolAllocationSensor.add(memoryPoolAllocateSizePercentilesMetricName, new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*))
memoryPoolAllocationSensor.add(new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor)
else if (config.socketRequestCommonBytes > 0) new RecyclingMemoryPool(config.socketRequestCommonBytes, config.socketRequestBufferCacheSize, memoryPoolAllocationSensor)
else MemoryPool.NONE
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ object Defaults {
val SocketSendBufferBytes: Int = 100 * 1024
val SocketReceiveBufferBytes: Int = 100 * 1024
val SocketRequestMaxBytes: Int = 100 * 1024 * 1024
val SocketRequestCommonBytes: Int = -1
val SocketRequestBufferCacheSize: Int = 0
val RequestMaxLocalTimeMs = Long.MaxValue
val MaxConnectionsPerIp: Int = Int.MaxValue
val MaxConnectionsPerIpOverrides: String = ""
Expand Down Expand Up @@ -320,6 +322,8 @@ object KafkaConfig {
val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val RequestMaxLocalTimeMsProp = "request.max.local.time.ms"
val SocketRequestMaxBytesProp = "socket.request.max.bytes"
val SocketRequestCommonBytesProp = "socket.request.common.bytes"
val SocketRequestBufferCacheSizeProp = "socket.request.buffer.cache.size"
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val MaxConnectionsProp = "max.connections"
Expand Down Expand Up @@ -617,6 +621,8 @@ object KafkaConfig {
"takes longer than this time the broker will kill itself as violating this timeout is a symptom of a more serious broker zombie state." +
" It is useful to observe the RequestDequeuePollIntervalMs metric to find a suitable setting for this configuration."
val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
val SocketRequestCommonBytesDoc = "The common size in bytes of a socket request"
val SocketRequestBufferCacheSizeDoc = "The maximal number of cache slot recycling memory pool will keep"
val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address. This can be set to 0 if there are overrides " +
s"configured using $MaxConnectionsPerIpOverridesProp property. New connections from the ip address are dropped if the limit is reached."
val MaxConnectionsPerIpOverridesDoc = "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. " +
Expand Down Expand Up @@ -952,6 +958,8 @@ object KafkaConfig {
.define(RequestMaxLocalTimeMsProp, LONG, Defaults.RequestMaxLocalTimeMs, atLeast(1), MEDIUM, RequestMaxLocalTimeMsDoc)
.define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
.define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
.define(SocketRequestCommonBytesProp, INT, Defaults.SocketRequestCommonBytes, MEDIUM, SocketRequestCommonBytesDoc)
.define(SocketRequestBufferCacheSizeProp, INT, Defaults.SocketRequestBufferCacheSize, atLeast(0), MEDIUM, SocketRequestBufferCacheSizeDoc)
.define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
.define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(MaxConnectionsProp, INT, Defaults.MaxConnections, atLeast(0), MEDIUM, MaxConnectionsDoc)
Expand Down Expand Up @@ -1259,6 +1267,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp)
val socketRequestCommonBytes = getInt(KafkaConfig.SocketRequestCommonBytesProp)
val socketRequestBufferCacheSize = getInt(KafkaConfig.SocketRequestBufferCacheSizeProp)
val requestMaxLocalTimeMs = getLong(KafkaConfig.RequestMaxLocalTimeMsProp)
val maxConnectionsPerIp = getInt(KafkaConfig.MaxConnectionsPerIpProp)
val maxConnectionsPerIpOverrides: Map[String, Int] =
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ class KafkaConfigTest {
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketRequestCommonBytesProp =>
case KafkaConfig.SocketRequestBufferCacheSizeProp =>
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
Expand Down

0 comments on commit 42673dc

Please sign in to comment.