Skip to content

Commit

Permalink
[Managed Iceberg] support BQMS catalog (#33511)
Browse files Browse the repository at this point in the history
* Add BQMS catalog

* trigger integration tests

* build fix

* use shaded jar

* shadowClosure

* use global timeout for tests

* define version in BeamModulePlugin

* address comments
  • Loading branch information
ahmedabu98 authored Jan 9, 2025
1 parent 4fc5c86 commit 6b3783f
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ class BeamModulePlugin implements Plugin<Project> {
def influxdb_version = "2.19"
def httpclient_version = "4.5.13"
def httpcore_version = "4.4.14"
def iceberg_bqms_catalog_version = "1.5.2-0.1.0"
def jackson_version = "2.15.4"
def jaxb_api_version = "2.3.3"
def jsr305_version = "3.0.2"
Expand Down Expand Up @@ -650,6 +651,10 @@ class BeamModulePlugin implements Plugin<Project> {

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
// TODO: remove this and download the jar normally when the catalog gets
// open-sourced (https://github.com/apache/iceberg/pull/11039)
project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version

// A map of maps containing common libraries used per language. To use:
// dependencies {
Expand Down
6 changes: 4 additions & 2 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ dependencies {
// **** IcebergIO runtime dependencies ****
runtimeOnly library.java.hadoop_auth
runtimeOnly library.java.hadoop_client
// Needed when using GCS as the warehouse location.
// For writing to GCS
runtimeOnly library.java.bigdataoss_gcs_connector
// Needed for HiveCatalog
// HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive")
// BigQueryMetastoreCatalog (Java 11+)
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")

runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
Expand Down
63 changes: 63 additions & 0 deletions sdks/java/io/iceberg/bqms/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id 'org.apache.beam.module'
}

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
shadowClosure: {},
exportJavadoc: false,
publish: false, // it's an intermediate jar for io-expansion-service
validateShadowJar: false
)

def libDir = "$buildDir/libs"
def bqmsFileName = "iceberg-bqms-catalog-${iceberg_bqms_catalog_version}.jar"
task downloadBqmsJar(type: Copy) {
// TODO: remove this workaround and downlooad normally when the catalog gets open-sourced:
// (https://github.com/apache/iceberg/pull/11039)
def jarUrl = "https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-${iceberg_bqms_catalog_version}.jar"
def outputDir = file("$libDir")
outputDir.mkdirs()
def destFile = new File(outputDir, bqmsFileName)

if (!destFile.exists()) {
try {
ant.get(src: jarUrl, dest: destFile)
println "Successfully downloaded BQMS catalog jar: $destFile"
} catch (Exception e) {
println "Could not download $jarUrl: ${e.message}"
}
}
}

repositories {
flatDir {
dirs "$libDir"
}
}

compileJava.dependsOn downloadBqmsJar

dependencies {
implementation files("$libDir/$bqmsFileName")
}

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore"
ext.summary = "A copy of the BQMS catalog."
8 changes: 8 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ dependencies {
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
}

// BigQueryMetastore catalog dep
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
Expand Down Expand Up @@ -136,6 +139,11 @@ task integrationTest(type: Test) {
outputs.upToDateWhen { false }

include '**/*IT.class'
// BQ metastore catalog doesn't support java 8
if (project.findProperty('testJavaVersion') == '8' ||
JavaVersion.current().equals(JavaVersion.VERSION_1_8)) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
static final long SALT = System.nanoTime();

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
}

@Override
public Catalog createCatalog() {
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.build(),
new Configuration());
}

@Override
public void catalogCleanup() throws IOException {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
catalog.dropTable(tableIdentifier);
}
}
}

@Override
public Map<String, Object> managedIcebergConfig(String tableId) {
return ImmutableMap.<String, Object>builder()
.put("table", tableId)
.put(
"catalog_properties",
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public Catalog createCatalog() {

@Override
public void catalogCleanup() throws Exception {
System.out.println("xxx CLEANING UP!");
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -145,7 +146,11 @@ public void setUp() throws Exception {

@After
public void cleanUp() throws Exception {
catalogCleanup();
try {
catalogCleanup();
} catch (Exception e) {
LOG.warn("Catalog cleanup failed.", e);
}

try {
GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
Expand All @@ -163,7 +168,7 @@ public void cleanUp() throws Exception {

gcsUtil.remove(filesToDelete);
} catch (Exception e) {
LOG.warn("Failed to clean up files.", e);
LOG.warn("Failed to clean up GCS files.", e);
}
}

Expand All @@ -173,6 +178,7 @@ public void cleanUp() throws Exception {
private static final String RANDOM = UUID.randomUUID().toString();
@Rule public TestPipeline pipeline = TestPipeline.create();
@Rule public TestName testName = new TestName();
@Rule public transient Timeout globalTimeout = Timeout.seconds(300);
private static final int NUM_SHARDS = 10;
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,5 @@ include("sdks:java:extensions:combiners")
findProject(":sdks:java:extensions:combiners")?.name = "combiners"
include("sdks:java:io:iceberg:hive")
findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
include("sdks:java:io:iceberg:bqms")
findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms"

0 comments on commit 6b3783f

Please sign in to comment.