Skip to content

Commit

Permalink
create codec factory
Browse files Browse the repository at this point in the history
  • Loading branch information
tejaskriya committed Feb 5, 2025
1 parent e171bbe commit dba5d12
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.DBDefinition;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -83,8 +80,6 @@ public class TestLDBCli {
private static final String KEY_TABLE = "keyTable";
private static final String BLOCK_DATA = "block_data";
public static final String PIPELINES = "pipelines";
public static final String DUMMY_DB = "anotherKeyTable";
public static final String ANOTHER_KEY_TABLE_NAME = "anotherKeyTable";
private static final ObjectMapper MAPPER = new ObjectMapper();
private OzoneConfiguration conf;
private DBStore dbStore;
Expand Down Expand Up @@ -415,40 +410,6 @@ void testSchemaCommand() throws IOException {
assertEquals("", stderr.toString());
}

@Test
void testCommandsWithDBDefOverride() throws IOException {
// Prepare dummy table
prepareTable(DUMMY_DB, true);

// Prepare args for value-schema command
List<String> completeScanArgs = new ArrayList<>(Arrays.asList(
"--db", dbStore.getDbLocation().getAbsolutePath(),
"--schema", DummyDBDefinition.class.getName(),
"value-schema",
"--column-family", ANOTHER_KEY_TABLE_NAME));

int exitCode = cmd.execute(completeScanArgs.toArray(new String[0]));
assertEquals(0, exitCode, stderr.toString());
Pattern p = Pattern.compile(".*String.*String.*", Pattern.MULTILINE);
Matcher m = p.matcher(stdout.toString());
assertTrue(m.find());
assertEquals("", stderr.toString());

// Prepare args for scan command
completeScanArgs = new ArrayList<>(Arrays.asList(
"--db", dbStore.getDbLocation().getAbsolutePath(),
"--schema", DummyDBDefinition.class.getName(),
"scan",
"--column-family", ANOTHER_KEY_TABLE_NAME));

exitCode = cmd.execute(completeScanArgs.toArray(new String[0]));
assertEquals(0, exitCode, stderr.toString());
p = Pattern.compile(".*random-key.*random-value.*", Pattern.MULTILINE);
m = p.matcher(stdout.toString());
assertTrue(m.find());
assertEquals("", stderr.toString());
}

/**
* Converts String input to a Map and compares to the given Map input.
* @param expected expected result Map
Expand Down Expand Up @@ -519,16 +480,6 @@ private void prepareTable(String tableName, boolean schemaV3)
.setPath(tempDir.toPath()).addTable(PIPELINES).build();
break;

case DUMMY_DB:
dbStore = DBStoreBuilder.newBuilder(new OzoneConfiguration())
.setName("another.db")
.setPath(tempDir.toPath())
.addTable(ANOTHER_KEY_TABLE_NAME)
.build();
dbStore.getTable(ANOTHER_KEY_TABLE_NAME, String.class, String.class)
.put("random-key", "random-value");
break;

default:
throw new IllegalArgumentException("Unsupported table: " + tableName);
}
Expand Down Expand Up @@ -563,25 +514,4 @@ private static Map<String, Object> toMap(Object obj) throws IOException {
return MAPPER.readValue(json, new TypeReference<Map<String, Object>>() { });
}

/**
* New DBDefinition to test arbitrary schemas for ldb commands.
*/
public static class DummyDBDefinition extends DBDefinition.WithMap {
public static final DBColumnFamilyDefinition<String, String> ANOTHER_KEY_TABLE
= new DBColumnFamilyDefinition<>(ANOTHER_KEY_TABLE_NAME, StringCodec.get(), StringCodec.get());
private static final Map<String, DBColumnFamilyDefinition<?, ?>>
COLUMN_FAMILIES = DBColumnFamilyDefinition.newUnmodifiableMap(ANOTHER_KEY_TABLE);
protected DummyDBDefinition() {
super(COLUMN_FAMILIES);
}
@Override
public String getName() {
return "another.db";
}
@Override
public String getLocationConfigKey() {
return "";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

package org.apache.hadoop.ozone.debug;

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -42,7 +39,6 @@

import com.amazonaws.services.kms.model.InvalidArnException;
import com.google.common.base.Preconditions;
import org.apache.ratis.util.function.CheckedSupplier;

import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_KEY_DB;
Expand Down Expand Up @@ -102,56 +98,6 @@ public static DBDefinition getDefinition(Path dbPath,
return getDefinition(dbName);
}

static List<CheckedSupplier<DBDefinition, Exception>> getFactories(
Class<?> clazz, String dbPathString, ConfigurationSource config) {
return Arrays.asList(
() -> {
Constructor<?> constructor = clazz.getDeclaredConstructor(String.class, ConfigurationSource.class);
constructor.setAccessible(true);
return (DBDefinition) constructor.newInstance(dbPathString, config);
},
() -> {
Constructor<?> constructor = clazz.getDeclaredConstructor(String.class);
constructor.setAccessible(true);
return (DBDefinition) constructor.newInstance(dbPathString);
},
() -> {
Method factoryMethod = clazz.getDeclaredMethod("get");
factoryMethod.setAccessible(true);
return (DBDefinition) factoryMethod.invoke(clazz);
},
() -> {
Constructor<?> constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
return (DBDefinition) constructor.newInstance();
}
);
}

public static DBDefinition getDefinition(Path dbPath, ConfigurationSource config, String overrideDBDef) {
if (overrideDBDef == null) {
return getDefinition(dbPath, config);
}
try {
Class<?> clazz = Class.forName(overrideDBDef);
if (DBDefinition.class.isAssignableFrom(clazz)) {
String dbPathString = dbPath.toAbsolutePath().toString();
for (CheckedSupplier<DBDefinition, Exception> factory : getFactories(clazz, dbPathString, config)) {
try {
return factory.get();
} catch (Exception ignored) {
}
}
throw new IllegalArgumentException("Could not get instance of " + overrideDBDef);
} else {
System.err.println("Class does not implement DBDefinition: " + overrideDBDef);
}
} catch (ClassNotFoundException e) {
System.err.println("Class not found: " + overrideDBDef);
}
throw new IllegalArgumentException("Incorrect DBDefinition class.");
}

private static DBDefinition getReconDBDefinition(String dbName) {
if (dbName.startsWith(RECON_CONTAINER_KEY_DB)) {
return new ReconDBDefinition(dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBDefinition;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
import org.apache.hadoop.hdds.utils.db.LongCodec;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.*;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
Expand All @@ -45,6 +41,7 @@
import org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
import org.apache.hadoop.ozone.debug.DBDefinitionFactory;
import org.apache.hadoop.ozone.debug.RocksDBUtils;
import org.apache.hadoop.ozone.utils.CodecFactory;
import org.apache.hadoop.ozone.utils.Filter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -177,6 +174,14 @@ public class DBScanner implements Callable<Void> {
defaultValue = "0")
private long recordsPerFile;

@CommandLine.Option(names = {"--key-codec"},
description = "key codec of the table")
private String keyCodec;

@CommandLine.Option(names = {"--value-codec"},
description = "value codec of the table")
private String valueCodec;

private int fileSuffix = 0;
private long globalCount = 0;

Expand Down Expand Up @@ -562,6 +567,31 @@ private ColumnFamilyHandle getColumnFamilyHandle(
.orElse(null);
}

private DBColumnFamilyDefinition getColumnFamilyDefinition(String dbPath) {

if ((keyCodec != null && valueCodec == null) || (keyCodec == null && valueCodec != null)) {
throw new IllegalStateException("Both key-codec and value-codec are needed");
}
if (keyCodec != null && valueCodec != null) {
try {
CodecFactory.MyCodec<?> codecForKey = CodecFactory.createCodec(keyCodec);
CodecFactory.MyCodec<?> codecForValue = CodecFactory.createCodec(valueCodec);
return new DBColumnFamilyDefinition(tableName, codecForKey, codecForValue);
} catch (ClassNotFoundException ex) {
throw new IllegalStateException("Incorrect keyCodec (" + keyCodec +
") or valueCodec (" + valueCodec + ") passed.");
}
}

DBDefinitionFactory.setDnDBSchemaVersion(dnDBSchemaVersion);
DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
Paths.get(dbPath), new OzoneConfiguration());
if (dbDefinition == null) {
err().println("Error: Incorrect DB Path");
}
return dbDefinition.getColumnFamily(tableName);
}

/**
* Main table printing logic.
* User-provided args are not in the arg list. Those are instance variables
Expand All @@ -579,16 +609,8 @@ private boolean printTable(List<ColumnFamilyHandle> columnFamilyHandleList,
" number is -1 which is to dump entire table");
}
dbPath = removeTrailingSlashIfNeeded(dbPath);
DBDefinitionFactory.setDnDBSchemaVersion(dnDBSchemaVersion);
DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(
Paths.get(dbPath), new OzoneConfiguration(), parent.getDbDefinition());
if (dbDefinition == null) {
err().println("Error: Incorrect DB Path");
return false;
}

final DBColumnFamilyDefinition<?, ?> columnFamilyDefinition =
dbDefinition.getColumnFamily(tableName);
final DBColumnFamilyDefinition<?, ?> columnFamilyDefinition = getColumnFamilyDefinition(dbPath);
if (columnFamilyDefinition == null) {
err().print("Error: Table with name '" + tableName + "' not found");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,11 @@ public class RDBParser implements DebugSubcommand {
description = "Database File Path")
private String dbPath;

@CommandLine.Option(names = {"--schema"},
description = "DBDefinition of the database")
private String dbDefinition;

public String getDbPath() {
return dbPath;
}

public void setDbPath(String dbPath) {
this.dbPath = dbPath;
}

public String getDbDefinition() {
return dbDefinition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Void call() throws Exception {

String dbPath = parent.getDbPath();
Map<String, Object> fields = new HashMap<>();
success = getValueFields(dbPath, fields, depth, tableName, dnDBSchemaVersion, parent.getDbDefinition());
success = getValueFields(dbPath, fields, depth, tableName, dnDBSchemaVersion);

out().println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(fields));

Expand All @@ -100,11 +100,11 @@ public Void call() throws Exception {
}

public static boolean getValueFields(String dbPath, Map<String, Object> valueSchema, int d, String table,
String dnDBSchemaVersion, String dbDef) {
String dnDBSchemaVersion) {

dbPath = removeTrailingSlashIfNeeded(dbPath);
DBDefinitionFactory.setDnDBSchemaVersion(dnDBSchemaVersion);
DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(Paths.get(dbPath), new OzoneConfiguration(), dbDef);
DBDefinition dbDefinition = DBDefinitionFactory.getDefinition(Paths.get(dbPath), new OzoneConfiguration());
if (dbDefinition == null) {
err().println("Error: Incorrect DB Path");
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Creates Codec for a given type class.
*/
public class CodecFactory {

private static final Logger LOG = LoggerFactory.getLogger(
CodecFactory.class);
public static final class MyCodec<T> implements Codec<T> {

private static final ObjectMapper MAPPER = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final Class<T> typeClass;

public MyCodec(Class<T> typeClass) {
this.typeClass = typeClass;
}

@Override
public Class<T> getTypeClass() {
return typeClass;
}

@Override
public byte[] toPersistedFormat(T object)
throws IOException {
return MAPPER.writeValueAsBytes(object);
}

@Override
public T fromPersistedFormat(byte[] rawData)
throws IOException {
return MAPPER.readValue(rawData, typeClass);
}

@Override
public T copyObject(T object) {
return object;
}
}
public static <T> MyCodec<T> createCodec(String className) throws ClassNotFoundException {
LOG.info("tej creating codec for: " + className);
Class<T> clazz = (Class<T>) Class.forName(className);
return new MyCodec<>(clazz);
}

}
Loading

0 comments on commit dba5d12

Please sign in to comment.