Skip to content

Commit

Permalink
[590] Changes in xtable-core for Hudi Catalog Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Vamsi committed Feb 11, 2025
1 parent e79a2d3 commit c37ebb5
Show file tree
Hide file tree
Showing 20 changed files with 2,427 additions and 233 deletions.
6 changes: 6 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>

<!-- Iceberg dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* Defines operations for managing partitions in an external catalog.
*
* <p>This interface provides methods to perform CRUD (Create, Read, Update, Delete) operations on
* partitions associated with a table in an external catalog system.
*/
public interface CatalogPartitionSyncOperations {

/**
* Retrieves all partitions associated with the specified table.
*
* @param tableIdentifier an object identifying the table whose partitions are to be fetched.
* @return a list of {@link Partition} objects representing all partitions of the specified table.
*/
List<Partition> getAllPartitions(CatalogTableIdentifier tableIdentifier);

/**
* Adds new partitions to the specified table in the catalog.
*
* @param tableIdentifier an object identifying the table where partitions are to be added.
* @param partitionsToAdd a list of partitions to be added to the table.
*/
void addPartitionsToTable(
CatalogTableIdentifier tableIdentifier, List<Partition> partitionsToAdd);

/**
* Updates the specified partitions for a table in the catalog.
*
* @param tableIdentifier an object identifying the table whose partitions are to be updated.
* @param changedPartitions a list of partitions to be updated in the table.
*/
void updatePartitionsToTable(
CatalogTableIdentifier tableIdentifier, List<Partition> changedPartitions);

/**
* Removes the specified partitions from a table in the catalog.
*
* @param tableIdentifier an object identifying the table from which partitions are to be dropped.
* @param partitionsToDrop a list of partitions to be removed from the table.
*/
void dropPartitions(CatalogTableIdentifier tableIdentifier, List<Partition> partitionsToDrop);

/**
* Retrieves the properties indicating the last synchronization state for the given table.
*
* <p>This method provides a default implementation that returns an empty map. Implementations of
* this interface can override it to fetch the actual last synced properties from a catalog.
*
* @param tableIdentifier the identifier of the table whose last synced properties are to be
* fetched.
* @param keysToRetrieve a list of keys representing the specific properties to retrieve.
* @return a map of key-value pairs representing the last synchronization properties.
*/
default Map<String, String> getTableProperties(
CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) {
return new HashMap<>();
}

/**
* Updates the properties indicating the last synchronization state for the given table.
*
* <p>This method provides a default implementation that performs no operation. Implementations of
* this interface can override it to update the last synced properties in a catalog.
*
* @param tableIdentifier the identifier of the table whose last synced properties are to be
* updated.
* @param propertiesToUpdate a map of key-value pairs representing the updated properties.
*/
default void updateTableProperties(
CatalogTableIdentifier tableIdentifier, Map<String, String> propertiesToUpdate) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@

package org.apache.xtable.catalog;

import java.util.Optional;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import org.apache.hadoop.conf.Configuration;

import org.apache.hudi.sync.common.model.PartitionValueExtractor;

import org.apache.xtable.hudi.HudiPartitionSyncTool;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.reflection.ReflectionUtils;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CatalogUtils {
Expand All @@ -35,4 +44,24 @@ public static HierarchicalTableIdentifier toHierarchicalTableIdentifier(
throw new IllegalArgumentException(
"Invalid tableIdentifier implementation: " + tableIdentifier.getClass().getName());
}

public static Optional<PartitionSyncTool> getPartitionSyncTool(
String tableFormat,
String partitionValueExtractorClass,
CatalogPartitionSyncOperations catalogPartitionSyncOperations,
Configuration configuration) {

if (partitionValueExtractorClass.isEmpty()) {
return Optional.empty();
}

if (tableFormat.equals(TableFormat.HUDI)) {
PartitionValueExtractor partitionValueExtractor =
ReflectionUtils.createInstanceOfClass(partitionValueExtractorClass);
return Optional.of(
new HudiPartitionSyncTool(
catalogPartitionSyncOperations, partitionValueExtractor, configuration));
}
return Optional.empty();
}
}
48 changes: 48 additions & 0 deletions xtable-core/src/main/java/org/apache/xtable/catalog/Partition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import java.util.List;

import lombok.Getter;

/**
* This class is designed to encapsulate a set of partition values and the corresponding storage
* location where the data for this partition is stored.
*/
@Getter
public class Partition {

/**
* A list of values defining this partition. For example, these values might correspond to
* partition keys in a dataset (e.g., year, month, day).
*/
private final List<String> values;

/**
* The storage location associated with this partition. Typically, this would be a path in a file
* system or object store.
*/
private final String storageLocation;

public Partition(List<String> values, String storageLocation) {
this.values = values;
this.storageLocation = storageLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

/** Partition Event captures any partition that needs to be added or updated. */
public class PartitionEvent {

public enum PartitionEventType {
ADD,
UPDATE,
DROP
}

public PartitionEventType eventType;
public String storagePartition;

PartitionEvent(PartitionEventType eventType, String storagePartition) {
this.eventType = eventType;
this.storagePartition = storagePartition;
}

public static PartitionEvent newPartitionAddEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.ADD, storagePartition);
}

public static PartitionEvent newPartitionUpdateEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}

public static PartitionEvent newPartitionDropEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.DROP, storagePartition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* Defines methods to synchronize all partitions from the storage to the catalog. Implementations of
* this interface will handle the logic for syncing partitions, including detecting partition
* changes and updating the catalog accordingly.
*/
public interface PartitionSyncTool {

/**
* Syncs all partitions on storage to the catalog.
*
* @param oneTable The object representing the table whose partitions are being synced. This
* object contains necessary details to perform the sync operation.
* @param tableIdentifier The table in the catalog.
* @return {@code true} if one or more partition(s) are changed in the catalog; {@code false}
* otherwise.
*/
public boolean syncPartitions(InternalTable oneTable, CatalogTableIdentifier tableIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.schema.SparkSchemaExtractor;
import org.apache.xtable.spi.sync.ConversionTarget;

public class DeltaConversionTarget implements ConversionTarget {
Expand Down Expand Up @@ -247,7 +248,7 @@ private void addColumn(StructField field) {

private void setLatestSchema(InternalSchema schema) {
this.latestSchemaInternal = schema;
this.latestSchema = schemaExtractor.fromInternalSchema(schema);
this.latestSchema = SparkSchemaExtractor.getInstance().fromInternalSchema(schema);
}

private void commitTransaction() {
Expand Down
Loading

0 comments on commit c37ebb5

Please sign in to comment.