From 0bc624ade3020826b4e7101b7ea4789be7e8585b Mon Sep 17 00:00:00 2001 From: Raghav Aggarwal Date: Fri, 29 Mar 2024 23:25:00 +0530 Subject: [PATCH] HIVE-28028: Remove duplicated proto reader/writer classes introduced in HIVE-19288 (#5079) (Raghav Aggarwal reviewed by Laszlo Bodor, Ayush Saxena) --- itests/hive-unit/pom.xml | 4 + pom.xml | 39 ++++ ql/pom.xml | 21 +- .../logging/proto/DatePartitionedLogger.java | 185 ------------------ .../logging/proto/ProtoMessageReader.java | 69 ------- .../logging/proto/ProtoMessageWritable.java | 102 ---------- .../logging/proto/ProtoMessageWriter.java | 71 ------- .../history/logging/proto/package-info.java | 23 --- 8 files changed, 48 insertions(+), 466 deletions(-) delete mode 100644 ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java delete mode 100644 ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java delete mode 100644 ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java delete mode 100644 ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java delete mode 100644 ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index a1c9be35bb8c..011f817a46c3 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -116,6 +116,10 @@ + + org.apache.tez + tez-protobuf-history-plugin + org.apache.tez tez-tests diff --git a/pom.xml b/pom.xml index 462fcd864d2e..1a48f12bf578 100644 --- a/pom.xml +++ b/pom.xml @@ -1343,6 +1343,45 @@ tez-common ${tez.version} + + org.apache.tez + tez-protobuf-history-plugin + ${tez.version} + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-yarn-client + + + org.springframework spring-jdbc diff --git a/ql/pom.xml b/ql/pom.xml index 5340341b5132..5fc3185411b6 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -721,16 +721,13 @@ org.apache.hadoop hadoop-yarn-client - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - + + org.apache.tez + tez-protobuf-history-plugin + true + org.apache.tez tez-mapreduce @@ -800,14 +797,6 @@ org.apache.hadoop hadoop-yarn-client - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java deleted file mode 100644 index 58cec7eacea3..000000000000 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.proto; - -import java.io.IOException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.yarn.util.Clock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -/** - * Class to create proto reader and writer for a date partitioned directory structure. - * - * @param The proto message type. - */ -public class DatePartitionedLogger { - private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class); - // Everyone has permission to write, but with sticky set so that delete is restricted. - // This is required, since the path is same for all users and everyone writes into it. - private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); - - // Since the directories have broad permissions restrict the file read access. - private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066); - - private final Parser parser; - private final Path basePath; - private final Configuration conf; - private final Clock clock; - - public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, Clock clock) - throws IOException { - this.conf = new Configuration(conf); - this.clock = clock; - this.parser = parser; - createDirIfNotExists(baseDir); - this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); - FsPermission.setUMask(this.conf, FILE_UMASK); - } - - private void createDirIfNotExists(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - try { - if (!fileSystem.exists(path)) { - fileSystem.mkdirs(path); - fileSystem.setPermission(path, DIR_PERMISSION); - } - } catch (IOException e) { - // Ignore this exception, if there is a problem it'll fail when trying to read or write. - LOG.warn("Error while trying to set permission: ", e); - } - } - - /** - * Creates a writer for the given fileName, with date as today. - */ - public ProtoMessageWriter getWriter(String fileName) throws IOException { - Path filePath = getPathForDate(getNow().toLocalDate(), fileName); - return new ProtoMessageWriter<>(conf, filePath, parser); - } - - /** - * Creates a reader for the given filePath, no validation is done. - */ - public ProtoMessageReader getReader(Path filePath) throws IOException { - return new ProtoMessageReader<>(conf, filePath, parser); - } - - /** - * Create a path for the given date and fileName. This can be used to create a reader. - */ - public Path getPathForDate(LocalDate date, String fileName) throws IOException { - Path path = new Path(basePath, getDirForDate(date)); - createDirIfNotExists(path); - return new Path(path, fileName); - } - - public Path getPathForSubdir(String dirName, String fileName) { - return new Path(new Path(basePath, dirName), fileName); - } - - /** - * Extract the date from the directory name, this should be a directory created by this class. - */ - public LocalDate getDateFromDir(String dirName) { - if (!dirName.startsWith("date=")) { - throw new IllegalArgumentException("Invalid directory: "+ dirName); - } - return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE); - } - - /** - * Returns the directory name for a given date. - */ - public String getDirForDate(LocalDate date) { - return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); - } - - /** - * Find next available directory, after the given directory. - */ - public String getNextDirectory(String currentDir) throws IOException { - // Fast check, if the next day directory exists return it. - String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); - FileSystem fileSystem = basePath.getFileSystem(conf); - if (fileSystem.exists(new Path(basePath, nextDate))) { - return nextDate; - } - // Have to scan the directory to find min date greater than currentDir. - String dirName = null; - for (FileStatus status : fileSystem.listStatus(basePath)) { - String name = status.getPath().getName(); - // String comparison is good enough, since its of form date=yyyy-MM-dd - if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) { - dirName = name; - } - } - return dirName; - } - - /** - * Returns new or changed files in the given directory. The offsets are used to find - * changed files. - */ - public List scanForChangedFiles(String subDir, Map currentOffsets) - throws IOException { - Path dirPath = new Path(basePath, subDir); - FileSystem fileSystem = basePath.getFileSystem(conf); - List newFiles = new ArrayList<>(); - if (!fileSystem.exists(dirPath)) { - return newFiles; - } - for (FileStatus status : fileSystem.listStatus(dirPath)) { - String fileName = status.getPath().getName(); - Long offset = currentOffsets.get(fileName); - // If the offset was never added or offset < fileSize. - if (offset == null || offset < status.getLen()) { - newFiles.add(status); - } - } - return newFiles; - } - - /** - * Returns the current time, using the underlying clock in UTC time. - */ - public LocalDateTime getNow() { - // Use UTC date to ensure reader date is same on all timezones. - return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC); - } - - public Configuration getConfig() { - return conf; - } -} diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java deleted file mode 100644 index b56f06673e35..000000000000 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.proto; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile.Reader; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageReader implements Closeable { - private final Path filePath; - private final Reader reader; - private final ProtoMessageWritable writable; - - ProtoMessageReader(Configuration conf, Path filePath, Parser parser) throws IOException { - this.filePath = filePath; - // The writer does not flush the length during hflush. Using length options lets us read - // past length in the FileStatus but it will throw EOFException during a read instead - // of returning null. - this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE)); - this.writable = new ProtoMessageWritable<>(parser); - } - - public Path getFilePath() { - return filePath; - } - - public void setOffset(long offset) throws IOException { - reader.seek(offset); - } - - public long getOffset() throws IOException { - return reader.getPosition(); - } - - public T readEvent() throws IOException { - if (!reader.next(NullWritable.get(), writable)) { - return null; - } - return writable.getMessage(); - } - - @Override - public void close() throws IOException { - reader.close(); - } -} diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java deleted file mode 100644 index fe7cb2e0a091..000000000000 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.proto; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import com.google.protobuf.ExtensionRegistryLite; -import org.apache.hadoop.io.Writable; - -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageWritable implements Writable { - private T message; - private final Parser parser; - private DataOutputStream dos; - private CodedOutputStream cos; - private DataInputStream din; - private CodedInputStream cin; - - ProtoMessageWritable(Parser parser) { - this.parser = parser; - } - - public T getMessage() { - return message; - } - - public void setMessage(T message) { - this.message = message; - } - - private static class DataOutputStream extends OutputStream { - DataOutput out; - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - out.write(b, off, len); - } - } - - @Override - public void write(DataOutput out) throws IOException { - if (dos == null) { - dos = new DataOutputStream(); - cos = CodedOutputStream.newInstance(dos); - } - dos.out = out; - cos.writeMessageNoTag(message); - cos.flush(); - } - - private static class DataInputStream extends InputStream { - DataInput in; - @Override - public int read() throws IOException { - try { - return in.readUnsignedByte(); - } catch (EOFException e) { - return -1; - } - } - } - - @Override - public void readFields(DataInput in) throws IOException { - if (din == null) { - din = new DataInputStream(); - cin = CodedInputStream.newInstance(din); - cin.setSizeLimit(Integer.MAX_VALUE); - } - din.in = in; - message = cin.readMessage(parser, ExtensionRegistryLite.newInstance()); - } -} diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java deleted file mode 100644 index 9c086ef0d713..000000000000 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.history.logging.proto; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.SequenceFile.Writer; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageWriter implements Closeable { - private final Path filePath; - private final Writer writer; - private final ProtoMessageWritable writable; - - ProtoMessageWriter(Configuration conf, Path filePath, Parser parser) throws IOException { - this.filePath = filePath; - this.writer = SequenceFile.createWriter( - conf, - Writer.file(filePath), - Writer.keyClass(NullWritable.class), - Writer.valueClass(ProtoMessageWritable.class), - Writer.compression(CompressionType.RECORD)); - this.writable = new ProtoMessageWritable<>(parser); - } - - public Path getPath() { - return filePath; - } - - public long getOffset() throws IOException { - return writer.getLength(); - } - - public void writeProto(T message) throws IOException { - writable.setMessage(message); - writer.append(NullWritable.get(), writable); - } - - public void hflush() throws IOException { - writer.hflush(); - } - - @Override - public void close() throws IOException { - writer.close(); - } -} diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java deleted file mode 100644 index 23ed46062e07..000000000000 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Logger code copied from tez codebase, this should be removed when we swtich - * to 0.9.2 tez version and we should depend on the tez libraries for this. - */ -package org.apache.tez.dag.history.logging.proto;