Skip to content

Commit

Permalink
[java] Fix avro logical-types conversions for BQ storage (#33422)
Browse files Browse the repository at this point in the history
* [java] Fix avro logical-types conversions for BQ storage

Most of the avro logical-type to BQ are broken.

Add support for both joda and java time to ensure compatibility with
older avro versions

* expected int raw type for time-millis

* Remove unused qualifier

* Fix avro numeric convertion

* Add support for parametrized NUMERIC and BIGNUMERIC
  • Loading branch information
RustedBones authored Jan 27, 2025
1 parent 2b3d328 commit 72102b5
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Bytes;

/**
* Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
* for use with the Storage write API.
*/
public class AvroGenericRecordToStorageApiProto {

private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);

static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
Expand All @@ -67,14 +68,37 @@ public class AvroGenericRecordToStorageApiProto {
.build();

// A map of supported logical types to the protobuf field type.
static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
ImmutableMap.<String, TableFieldSchema.Type>builder()
.put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
.put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
.put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
.build();
static Optional<TableFieldSchema.Type> logicalTypes(LogicalType logicalType) {
switch (logicalType.getName()) {
case "date":
return Optional.of(TableFieldSchema.Type.DATE);
case "time-micros":
return Optional.of(TableFieldSchema.Type.TIME);
case "time-millis":
return Optional.of(TableFieldSchema.Type.TIME);
case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
int scale = decimal.getScale();
int precision = decimal.getPrecision();
if (scale > 9 || precision - scale > 29) {
return Optional.of(TableFieldSchema.Type.BIGNUMERIC);
} else {
return Optional.of(TableFieldSchema.Type.NUMERIC);
}
case "timestamp-micros":
return Optional.of(TableFieldSchema.Type.TIMESTAMP);
case "timestamp-millis":
return Optional.of(TableFieldSchema.Type.TIMESTAMP);
case "local-timestamp-micros":
return Optional.of(TableFieldSchema.Type.DATETIME);
case "local-timestamp-millis":
return Optional.of(TableFieldSchema.Type.DATETIME);
case "uuid":
return Optional.of(TableFieldSchema.Type.STRING);
default:
return Optional.empty();
}
}

static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
Expand All @@ -92,16 +116,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to their encoding functions.
static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
.put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
.put(
LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
.put(
LogicalTypes.timestampMicros().getName(),
(logicalType, value) -> convertTimestamp(value, true))
.put(
LogicalTypes.timestampMillis().getName(),
(logicalType, value) -> convertTimestamp(value, false))
.put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
.put("date", (logicalType, value) -> convertDate(value))
.put("time-micros", (logicalType, value) -> convertTime(value, true))
.put("time-millis", (logicalType, value) -> convertTime(value, false))
.put("decimal", AvroGenericRecordToStorageApiProto::convertDecimal)
.put("timestamp-micros", (logicalType, value) -> convertTimestamp(value, true))
.put("timestamp-millis", (logicalType, value) -> convertTimestamp(value, false))
.put("local-timestamp-micros", (logicalType, value) -> convertDateTime(value, true))
.put("local-timestamp-millis", (logicalType, value) -> convertDateTime(value, false))
.put("uuid", (logicalType, value) -> convertUUID(value))
.build();

static String convertUUID(Object value) {
Expand All @@ -115,34 +138,96 @@ static String convertUUID(Object value) {
}

static Long convertTimestamp(Object value, boolean micros) {
if (value instanceof ReadableInstant) {
return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
if (value instanceof org.joda.time.ReadableInstant) {
return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
} else if (value instanceof java.time.Instant) {
java.time.Instant instant = (java.time.Instant) value;
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
long adjustment = (nanos / 1_000L) - 1_000_000L;
return Math.addExact(ms, adjustment);
} else {
long ms = Math.multiplyExact(seconds, 1_000_000L);
return Math.addExact(ms, nanos / 1_000L);
}
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (millis).");
return (Long) value;
value instanceof Long, "Expecting a value as Long type (timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static Integer convertDate(Object value) {
if (value instanceof ReadableInstant) {
return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
if (value instanceof org.joda.time.LocalDate) {
return org.joda.time.Days.daysBetween(EPOCH_DATE, (org.joda.time.LocalDate) value).getDays();
} else if (value instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) value).toEpochDay();
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (days).");
return (Integer) value;
}
}

static Long convertTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalTime) {
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
} else if (value instanceof java.time.LocalTime) {
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
((java.time.LocalTime) value).toNanoOfDay());
} else {
if (micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return (Long) value;
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return 1_000L * (Integer) value;
}
}
}

static Long convertDateTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalDateTime) {
// we should never come here as local-timestamp has been added after joda deprecation
// implement nonetheless for consistency
org.joda.time.DateTime dateTime =
((org.joda.time.LocalDateTime) value).toDateTime(org.joda.time.DateTimeZone.UTC);
return 1_000L * dateTime.getMillis();
} else if (value instanceof java.time.LocalDateTime) {
java.time.Instant instant =
((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC);
return convertTimestamp(instant, micros);
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (local-timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static ByteString convertDecimal(LogicalType logicalType, Object value) {
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
ByteBuffer byteBuffer;
if (value instanceof BigDecimal) {
// BigDecimalByteStringEncoder does not support parametrized NUMERIC/BIGNUMERIC
byteBuffer =
new Conversions.DecimalConversion()
.toBytes(
(BigDecimal) value,
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
} else {
Preconditions.checkArgument(
value instanceof ByteBuffer, "Expecting a value as ByteBuffer type (decimal).");
byteBuffer = (ByteBuffer) value;
}
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.duplicate().get(bytes);
Bytes.reverse(bytes);
return ByteString.copyFrom(bytes);
}

static ByteString convertBytes(Object value) {
Expand Down Expand Up @@ -223,7 +308,7 @@ public static DynamicMessage messageFromGenericRecord(
return builder.build();
}

private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
@Nullable Schema schema = field.schema();
Preconditions.checkNotNull(schema, "Unexpected null schema!");
if (StorageApiCDC.COLUMNS.contains(field.name())) {
Expand Down Expand Up @@ -292,17 +377,34 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field)
break;
default:
elementType = TypeWithNullability.create(schema).getType();
Optional<LogicalType> logicalType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
@Nullable
TableFieldSchema.Type primitiveType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType))
.map(logicalType -> LOGICAL_TYPES.get(logicalType.getName()))
logicalType
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
if (primitiveType == null) {
throw new RuntimeException("Unsupported type " + elementType.getType());
}
// a scalar will be required by default, if defined as part of union then
// caller will set nullability requirements
builder = builder.setType(primitiveType);
// parametrized types
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
int precision = decimal.getPrecision();
int scale = decimal.getScale();
if (!(precision == 38 && scale == 9) // NUMERIC
&& !(precision == 77 && scale == 38) // BIGNUMERIC
) {
// parametrized type
builder = builder.setPrecision(precision);
if (scale != 0) {
builder = builder.setScale(scale);
}
}
}
}
if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
if (TypeWithNullability.create(schema).isNullable()) {
Expand Down
Loading

0 comments on commit 72102b5

Please sign in to comment.