Skip to content

Commit

Permalink
Benchmarking Indexing formats: Javabin, JSON & CBOR (#70)
Browse files Browse the repository at this point in the history
Introducing "prepare-binary-format" in indexing benchmark options for a pre-computed (and stored) binary payload to be used for indexing.
---------

Co-authored-by: Ishan Chattopadhyaya <[email protected]>
Co-authored-by: Noble Paul <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2023
1 parent 3c0a013 commit 3dd6161
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 33 deletions.
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.7</version>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.7.1</version>
<version>2.15.2</version>
</dependency>

<dependency>
Expand Down
27 changes: 21 additions & 6 deletions src/main/java/org/apache/solr/benchmarks/BenchmarksMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public static void runIndexingBenchmarks(List<IndexBenchmark> indexBenchmarks, S
}
solrCloud.createCollection(setup, collectionName, configsetName);
}

indexInit(solrCloud.nodes.get(0).getBaseUrl(), collectionName, i, setup, benchmark);
long start = System.nanoTime();
index(solrCloud.nodes.get(0).getBaseUrl(), collectionName, i, setup, benchmark);
long end = System.nanoTime();
Expand Down Expand Up @@ -222,9 +224,15 @@ private static void printErrOutput(String qr, NamedList<Object> rsp) throws IOEx
}
}

static void index(String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
public static void indexInit(String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
index(true, baseUrl, collection, threads, setup, benchmark);
}
public static void index(String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
index(false, baseUrl, collection, threads, setup, benchmark);
}
private static void index(boolean init, String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
if (benchmark.fileFormat.equalsIgnoreCase("json")) {
indexJsonComplex(baseUrl, collection, threads, setup, benchmark);
indexJsonComplex(init, baseUrl, collection, threads, setup, benchmark);
} else if (benchmark.fileFormat.equalsIgnoreCase("tsv")) {
indexTSV(baseUrl, collection, threads, setup, benchmark);
}
Expand Down Expand Up @@ -263,7 +271,14 @@ static void indexTSV(String baseUrl, String collection, int threads, IndexBenchm
client.close();
}

static void indexJsonComplex(String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
static boolean isInitPhaseNeeded(IndexBenchmark benchmark) {
if (benchmark.prepareBinaryFormat != null) {
// We need an init phase to prepare the raw binary batch files to index in the final stage
return true;
} else return false;
}
static void indexJsonComplex(boolean init, String baseUrl, String collection, int threads, IndexBenchmark.Setup setup, IndexBenchmark benchmark) throws Exception {
if (init && !isInitPhaseNeeded(benchmark)) return; // no-op

long start = System.currentTimeMillis();
CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
Expand All @@ -276,11 +291,11 @@ static void indexJsonComplex(String baseUrl, String collection, int threads, Ind

for (Slice slice : coll.getSlices()) {
Replica leader = slice.getLeader();
shardVsLeader.put(slice.getName(), leader.getBaseUrl() + "/" + leader.getCoreName() + "/update/json/docs");
shardVsLeader.put(slice.getName(), leader.getBaseUrl() + "/" + leader.getCoreName());
}
File datasetFile = Util.resolveSuitePath(benchmark.datasetFile);
try (DocReader docReader = new FileDocReader(datasetFile, benchmark.maxDocs != null ? benchmark.maxDocs.longValue() : null, benchmark.offset)) {
try (IndexBatchSupplier indexBatchSupplier = new IndexBatchSupplier(docReader, benchmark, coll, httpClient, shardVsLeader)) {
try (IndexBatchSupplier indexBatchSupplier = new IndexBatchSupplier(init, docReader, benchmark, coll, httpClient, shardVsLeader)) {
ControlledExecutor controlledExecutor = new ControlledExecutor(
benchmark.name,
threads,
Expand All @@ -294,7 +309,7 @@ static void indexJsonComplex(String baseUrl, String collection, int threads, Ind
client.commit(collection);
client.close();

log.info("Indexed " + indexBatchSupplier.getDocsIndexed() + " docs." + "time taken : " + ((System.currentTimeMillis() - start) / 1000));
log.info("Indexed " + indexBatchSupplier.getBatchesIndexed() + " docs." + "time taken : " + ((System.currentTimeMillis() - start) / 1000));
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public class IndexBenchmark extends BaseBenchmark {
@JsonProperty("file-format")
public String fileFormat;

@JsonProperty("prepare-binary-format")
public String prepareBinaryFormat = null;

@JsonProperty("setups")
public List<Setup> setups;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.JsonRecordReader;
import org.eclipse.jgit.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -25,14 +28,14 @@ public class IndexBatchSupplier implements Supplier<Callable>, AutoCloseable {
private DocCollection docCollection;
private HttpClient httpClient;
private Map<String, String> shardVsLeader;
private BlockingQueue<UploadDocs> pendingBatches = new LinkedBlockingQueue<>(10); //at most 10 pending batches
private BlockingQueue<Callable> pendingBatches = new LinkedBlockingQueue<>(10); //at most 10 pending batches
private final boolean init;
private AtomicLong batchesIndexed = new AtomicLong();

private AtomicLong docsIndexed = new AtomicLong();

public IndexBatchSupplier(DocReader docReader, IndexBenchmark benchmark, DocCollection docCollection, HttpClient httpClient, Map<String, String> shardVsLeader) {
public IndexBatchSupplier(boolean init, DocReader docReader, IndexBenchmark benchmark, DocCollection docCollection, HttpClient httpClient, Map<String, String> shardVsLeader) {
this.benchmark = benchmark;
this.docCollection = docCollection;

this.init = init;
this.httpClient = httpClient;
this.shardVsLeader = shardVsLeader;

Expand All @@ -58,6 +61,7 @@ private Future<?> startWorker(DocReader docReader) {
JsonRecordReader rdr = JsonRecordReader.getInst("/", Collections.singletonList(benchmark.idField + ":/" + benchmark.idField));
IdParser idParser = new IdParser();
Map<String, List<String>> shardVsDocs = new HashMap<>();
Map<String, AtomicInteger> batchCounters = new ConcurrentHashMap<>();
try {
while (!exit && (inputDocs = docReader.readDocs(benchmark.batchSize)) != null) { //can read more than batch size, just use batch size as a sensible value
for (String inputDoc : inputDocs) {
Expand All @@ -67,19 +71,22 @@ private Future<?> startWorker(DocReader docReader) {
shardDocs.add(inputDoc);
if (shardDocs.size() >= benchmark.batchSize) {
shardVsDocs.remove(targetSlice.getName());

String batchFilename = computeBatchFilename(benchmark, batchCounters, targetSlice.getName());
//a shard has accumulated enough docs to be executed
UploadDocs uploadDocs = new UploadDocs(shardDocs, httpClient, shardVsLeader.get(targetSlice.getName()), docsIndexed);
while (!exit && !pendingBatches.offer(uploadDocs, 1, TimeUnit.SECONDS)) {
Callable docsBatchCallable = init ? new PrepareRawBinaryFiles(benchmark, batchFilename, shardDocs, shardVsLeader.get(targetSlice.getName())):
new UploadDocs(benchmark, batchFilename, shardDocs, httpClient, shardVsLeader.get(targetSlice.getName()), batchesIndexed);
while (!exit && !pendingBatches.offer(docsBatchCallable, 1, TimeUnit.SECONDS)) {
//try again
}
}
}
}
shardVsDocs.forEach((shard, docs) -> { //flush the remaining ones
try {
UploadDocs uploadDocs = new UploadDocs(docs, httpClient, shardVsLeader.get(shard), docsIndexed);
while (!exit && !pendingBatches.offer(uploadDocs, 1, TimeUnit.SECONDS)) {
String batchFilename = computeBatchFilename(benchmark, batchCounters, shard);
Callable docsBatchCallable = init ? new PrepareRawBinaryFiles(benchmark, batchFilename, docs, shardVsLeader.get(shard)):
new UploadDocs(benchmark, batchFilename, docs, httpClient, shardVsLeader.get(shard), batchesIndexed);
while (!exit && !pendingBatches.offer(docsBatchCallable, 1, TimeUnit.SECONDS)) {
//try again
}
} catch (InterruptedException e) {
Expand All @@ -98,10 +105,28 @@ private Future<?> startWorker(DocReader docReader) {
return workerFuture;
}

private String computeBatchFilename(IndexBenchmark benchmark, Map<String, AtomicInteger> batchCounters, String shard) {
String batchFilename = null;
String tmpDir = "tmp/"+benchmark.name;
try {
FileUtils.mkdirs(new File(tmpDir), true);
} catch (IOException e) {
log.error("Unable to create directory: " + tmpDir);
throw new RuntimeException("Unable to create directory "+tmpDir, e);
}
AtomicInteger batchCounter = batchCounters.get(shard);
if (batchCounter == null) batchCounter = new AtomicInteger(0);
batchCounter.incrementAndGet();
batchCounters.put(shard, batchCounter);

batchFilename = docCollection.getName() + "_" + shard.replace(':', '_').replace('/', '_') + "_batch" + batchCounter.get() + "." + benchmark.prepareBinaryFormat;
return tmpDir + "/" + batchFilename;
}

@Override
public Callable get() {
try {
UploadDocs batch = null;
Callable batch = null;
while ((batch = pendingBatches.poll(1, TimeUnit.SECONDS)) == null && !exit) {
}
if (batch == null) { //rare race condition can fill the queue even if above loop exits, just try it once last time...
Expand All @@ -121,7 +146,7 @@ public void close() throws Exception {
workerFuture.get(); //this could throw exception if there are any unhandled exceptions in UploadDocs execution
}

public long getDocsIndexed() {
return docsIndexed.get();
public long getBatchesIndexed() {
return batchesIndexed.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.solr.benchmarks.indexing;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import com.fasterxml.jackson.dataformat.cbor.CBORGenerator;
import org.apache.commons.io.FileUtils;
import org.apache.solr.benchmarks.beans.IndexBenchmark;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;

/**
* This will be executed during the init phase of the indexing tasks.
* Primary use for this is to read all the accumulated documents (batches) and
* prepare binary files, as per the {@link IndexBenchmark#indexingFormat}, to be used for
* the actual indexing task later.
*/
class PrepareRawBinaryFiles implements Callable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final List<String> docs;
final String leaderUrl;
final private IndexBenchmark benchmark;
final String batchFilename;

PrepareRawBinaryFiles(IndexBenchmark benchmark, String batchFilename, List<String> docs, String leaderUrl) {
this.docs = docs;
this.leaderUrl = leaderUrl;
this.benchmark = benchmark;
this.batchFilename = batchFilename;
log.debug("Batch file: "+batchFilename);
}

@Override
public Object call() throws IOException {
log.debug("INIT PHASE of INDEXING! Shard: "+leaderUrl + ", batch: "+batchFilename);
List<Map> parsedDocs = new ArrayList<>();
for (String doc: docs) parsedDocs.add(new ObjectMapper().readValue(doc, Map.class));
byte jsonDocs[] = new ObjectMapper().writeValueAsBytes(parsedDocs);
byte cborDocs[] = createCborReq(jsonDocs);
byte javabinDocs[] = createJavabinReq(jsonDocs);

byte binary[];
switch(benchmark.prepareBinaryFormat) {
case "javabin": binary = javabinDocs; break;
case "cbor" : binary = cborDocs; break;
case "json" : binary = jsonDocs; break;
default: binary = jsonDocs; break;
}
FileUtils.writeByteArrayToFile(new File(batchFilename), binary);
log.info("Json size: " + jsonDocs.length + ", cbor size: " + cborDocs.length + ", javabin size: " + javabinDocs.length);
log.debug("Writing filename: " + batchFilename);
return null;
}

private byte[] createJavabinReq(byte[] b) throws IOException {
List l = (List) Utils.fromJSON(b);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
new JavaBinCodec().marshal(l.iterator(), baos);

return baos.toByteArray();
}

private byte[] createCborReq(byte[] is) throws IOException {
ByteArrayOutputStream baos;
ObjectMapper jsonMapper = new ObjectMapper(new JsonFactory());

// Read JSON file as a JsonNode
JsonNode jsonNode = jsonMapper.readTree(is);
// Create a CBOR ObjectMapper
ObjectMapper cborMapper = new ObjectMapper(CBORFactory.builder()
.enable(CBORGenerator.Feature.STRINGREF)
.build());
baos = new ByteArrayOutputStream();
JsonGenerator jsonGenerator = cborMapper.createGenerator(baos);

jsonGenerator.writeTree(jsonNode);
jsonGenerator.close();
return baos.toByteArray();
}
}
Loading

0 comments on commit 3dd6161

Please sign in to comment.