Skip to content

Commit

Permalink
introduced DirectBufferPool to cope with nio.direct-buffers that can …
Browse files Browse the repository at this point in the history
…cause OOM when GC is lazy: caching them is much better than letting GC free the object chain
  • Loading branch information
carlomedas committed Apr 10, 2015
1 parent 32b499e commit 2325c69
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 14 deletions.
2 changes: 1 addition & 1 deletion java/hadoop-4mc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.hadoop.fourmc</groupId>
<artifactId>hadoop-4mc</artifactId>
<version>1.2.1</version>
<version>1.3.0</version>
<packaging>jar</packaging>

<name>4mc</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ public void close() throws IOException {
decompressor.decompress(b, 0, b.length);
}
super.close();

// force release direct buffers of decompressor
((Lz4Decompressor)decompressor).releaseDirectBuffers();
decompressor=null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public void close() throws IOException {

out.close();
closed = true;

// force release compressor and related direct buffers
((Lz4Compressor)compressor).releaseDirectBuffers();
compressor=null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
**/
package com.hadoop.compression.fourmc;

import com.hadoop.compression.fourmc.util.DirectBufferPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -114,18 +115,22 @@ private ByteBuffer realloc(ByteBuffer buf, int newSize) {
buf.clear();
return buf;
}
try {
// Manually free the old buffer using undocumented unsafe APIs.
// If this fails, we'll drop the reference and hope GC finds it
// eventually.
Object cleaner = buf.getClass().getMethod("cleaner").invoke(buf);
cleaner.getClass().getMethod("clean").invoke(cleaner);
} catch (Exception e) {
// Perhaps a non-sun-derived JVM - contributions welcome
LOG.warn("Couldn't realloc bytebuffer", e);
}

DirectBufferPool.getInstance().release(buf);
}
return DirectBufferPool.getInstance().allocate(newSize);
}

// trying to get rid of java.lang.OufOfMemoryError: Direct Buffer Memory
public void releaseDirectBuffers() {
if (compressedDirectBuf != null) {
DirectBufferPool.getInstance().release(compressedDirectBuf);
compressedDirectBuf=null;
}
if (uncompressedDirectBuf != null) {
DirectBufferPool.getInstance().release(uncompressedDirectBuf);
uncompressedDirectBuf=null;
}
return ByteBuffer.allocateDirect(newSize);
}

private void init(int directBufferSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
**/
package com.hadoop.compression.fourmc;

import com.hadoop.compression.fourmc.util.DirectBufferPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.Decompressor;
Expand Down Expand Up @@ -97,8 +98,8 @@ public static boolean isNativeLoaded() {
public Lz4Decompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;

compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = DirectBufferPool.getInstance().allocate(directBufferSize);
uncompressedDirectBuf = DirectBufferPool.getInstance().allocate(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
}

Expand Down Expand Up @@ -244,6 +245,18 @@ protected void finalize() {
end();
}

// trying to get rid of java.lang.OufOfMemoryError: Direct Buffer Memory
public void releaseDirectBuffers() {
if (compressedDirectBuf != null) {
DirectBufferPool.getInstance().release((ByteBuffer) compressedDirectBuf);
compressedDirectBuf=null;
}
if (uncompressedDirectBuf != null) {
DirectBufferPool.getInstance().release((ByteBuffer) uncompressedDirectBuf);
uncompressedDirectBuf=null;
}
}

/**
* Note whether the current block being decompressed is actually
* stored as uncompressed data. If it is, there is no need to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.hadoop.compression.fourmc.util;

import org.apache.hadoop.util.hash.Hash;

import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;

/**
* Pool for direct buffers, used by compressors and decompressors.
*/
public class DirectBufferPool {
private static DirectBufferPool instance;

static {
instance = new DirectBufferPool();
}

public static DirectBufferPool getInstance() {
return instance;
}

// -------------------------------------------------------------------------------

public static class Stats {
public long allocatedBuffers = 0;
public long usedBuffers = 0;
public long allocatedBytes = 0;
public long usedBytes = 0;

public Stats() {
}
public Stats(long allocatedBuffers, long usedBuffers, long allocatedBytes, long usedBytes) {
this.allocatedBuffers = allocatedBuffers;
this.usedBuffers = usedBuffers;
this.allocatedBytes = allocatedBytes;
this.usedBytes = usedBytes;
}
}

// -------------------------------------------------------------------------------

private HashMap<Integer, List<ByteBuffer> > poolMap;
private boolean enabled = true;
private long allocatedBuffers = 0;
private long usedBuffers = 0;
private long allocatedBytes = 0;
private long usedBytes = 0;

private DirectBufferPool() {
poolMap = new HashMap<Integer, List<ByteBuffer> >();
}

public synchronized void enable() {
if (enabled) return;
enabled=true;
if (poolMap==null) {
poolMap = new HashMap<Integer, List<ByteBuffer> >();
}
}

public synchronized void disable() {
enabled=false;
allocatedBuffers=0;
usedBuffers=0;
allocatedBytes=0;
usedBytes=0;
poolMap.clear();
poolMap=null;
}

public synchronized boolean isEnabled() {
return enabled;
}

public synchronized Stats getStats() {
return new Stats(allocatedBuffers, usedBuffers, allocatedBytes, usedBytes);
}

public synchronized ByteBuffer allocate(int capacity) {
if (!enabled) return ByteBuffer.allocateDirect(capacity);
List<ByteBuffer> pool = poolMap.get(capacity);
if (pool==null || pool.isEmpty()) {
++allocatedBuffers;
allocatedBytes+=capacity;
++usedBuffers;
usedBytes+=capacity;
return ByteBuffer.allocateDirect(capacity);
}
ByteBuffer res = pool.get(0);
pool.remove(0);
++usedBuffers;
usedBytes+=res.capacity();
return res;
}

public synchronized void release(ByteBuffer buff) {
if (!enabled) return;
List<ByteBuffer> pool = poolMap.get(buff.capacity());
if (pool==null) {
pool = new LinkedList<ByteBuffer>();
poolMap.put(buff.capacity(), pool);
}
buff.clear();
pool.add(buff);
--usedBuffers;
usedBytes-=buff.capacity();
}

}

0 comments on commit 2325c69

Please sign in to comment.