diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index a1c9be35bb8c..011f817a46c3 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -116,6 +116,10 @@
+
+ org.apache.tez
+ tez-protobuf-history-plugin
+
org.apache.tez
tez-tests
diff --git a/pom.xml b/pom.xml
index 462fcd864d2e..1a48f12bf578 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1343,6 +1343,45 @@
tez-common
${tez.version}
+
+ org.apache.tez
+ tez-protobuf-history-plugin
+ ${tez.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ commons-logging
+ commons-logging
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
org.springframework
spring-jdbc
diff --git a/ql/pom.xml b/ql/pom.xml
index 5340341b5132..5fc3185411b6 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -721,16 +721,13 @@
org.apache.hadoop
hadoop-yarn-client
-
- org.slf4j
- slf4j-log4j12
-
-
- commons-logging
- commons-logging
-
+
+ org.apache.tez
+ tez-protobuf-history-plugin
+ true
+
org.apache.tez
tez-mapreduce
@@ -800,14 +797,6 @@
org.apache.hadoop
hadoop-yarn-client
-
- org.slf4j
- slf4j-log4j12
-
-
- commons-logging
- commons-logging
-
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
deleted file mode 100644
index 58cec7eacea3..000000000000
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.proto;
-
-import java.io.IOException;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.util.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-/**
- * Class to create proto reader and writer for a date partitioned directory structure.
- *
- * @param The proto message type.
- */
-public class DatePartitionedLogger {
- private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
- // Everyone has permission to write, but with sticky set so that delete is restricted.
- // This is required, since the path is same for all users and everyone writes into it.
- private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
-
- // Since the directories have broad permissions restrict the file read access.
- private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);
-
- private final Parser parser;
- private final Path basePath;
- private final Configuration conf;
- private final Clock clock;
-
- public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, Clock clock)
- throws IOException {
- this.conf = new Configuration(conf);
- this.clock = clock;
- this.parser = parser;
- createDirIfNotExists(baseDir);
- this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
- FsPermission.setUMask(this.conf, FILE_UMASK);
- }
-
- private void createDirIfNotExists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- try {
- if (!fileSystem.exists(path)) {
- fileSystem.mkdirs(path);
- fileSystem.setPermission(path, DIR_PERMISSION);
- }
- } catch (IOException e) {
- // Ignore this exception, if there is a problem it'll fail when trying to read or write.
- LOG.warn("Error while trying to set permission: ", e);
- }
- }
-
- /**
- * Creates a writer for the given fileName, with date as today.
- */
- public ProtoMessageWriter getWriter(String fileName) throws IOException {
- Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
- return new ProtoMessageWriter<>(conf, filePath, parser);
- }
-
- /**
- * Creates a reader for the given filePath, no validation is done.
- */
- public ProtoMessageReader getReader(Path filePath) throws IOException {
- return new ProtoMessageReader<>(conf, filePath, parser);
- }
-
- /**
- * Create a path for the given date and fileName. This can be used to create a reader.
- */
- public Path getPathForDate(LocalDate date, String fileName) throws IOException {
- Path path = new Path(basePath, getDirForDate(date));
- createDirIfNotExists(path);
- return new Path(path, fileName);
- }
-
- public Path getPathForSubdir(String dirName, String fileName) {
- return new Path(new Path(basePath, dirName), fileName);
- }
-
- /**
- * Extract the date from the directory name, this should be a directory created by this class.
- */
- public LocalDate getDateFromDir(String dirName) {
- if (!dirName.startsWith("date=")) {
- throw new IllegalArgumentException("Invalid directory: "+ dirName);
- }
- return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
- }
-
- /**
- * Returns the directory name for a given date.
- */
- public String getDirForDate(LocalDate date) {
- return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
- }
-
- /**
- * Find next available directory, after the given directory.
- */
- public String getNextDirectory(String currentDir) throws IOException {
- // Fast check, if the next day directory exists return it.
- String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
- FileSystem fileSystem = basePath.getFileSystem(conf);
- if (fileSystem.exists(new Path(basePath, nextDate))) {
- return nextDate;
- }
- // Have to scan the directory to find min date greater than currentDir.
- String dirName = null;
- for (FileStatus status : fileSystem.listStatus(basePath)) {
- String name = status.getPath().getName();
- // String comparison is good enough, since its of form date=yyyy-MM-dd
- if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
- dirName = name;
- }
- }
- return dirName;
- }
-
- /**
- * Returns new or changed files in the given directory. The offsets are used to find
- * changed files.
- */
- public List scanForChangedFiles(String subDir, Map currentOffsets)
- throws IOException {
- Path dirPath = new Path(basePath, subDir);
- FileSystem fileSystem = basePath.getFileSystem(conf);
- List newFiles = new ArrayList<>();
- if (!fileSystem.exists(dirPath)) {
- return newFiles;
- }
- for (FileStatus status : fileSystem.listStatus(dirPath)) {
- String fileName = status.getPath().getName();
- Long offset = currentOffsets.get(fileName);
- // If the offset was never added or offset < fileSize.
- if (offset == null || offset < status.getLen()) {
- newFiles.add(status);
- }
- }
- return newFiles;
- }
-
- /**
- * Returns the current time, using the underlying clock in UTC time.
- */
- public LocalDateTime getNow() {
- // Use UTC date to ensure reader date is same on all timezones.
- return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
- }
-
- public Configuration getConfig() {
- return conf;
- }
-}
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
deleted file mode 100644
index b56f06673e35..000000000000
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.proto;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile.Reader;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageReader implements Closeable {
- private final Path filePath;
- private final Reader reader;
- private final ProtoMessageWritable writable;
-
- ProtoMessageReader(Configuration conf, Path filePath, Parser parser) throws IOException {
- this.filePath = filePath;
- // The writer does not flush the length during hflush. Using length options lets us read
- // past length in the FileStatus but it will throw EOFException during a read instead
- // of returning null.
- this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
- this.writable = new ProtoMessageWritable<>(parser);
- }
-
- public Path getFilePath() {
- return filePath;
- }
-
- public void setOffset(long offset) throws IOException {
- reader.seek(offset);
- }
-
- public long getOffset() throws IOException {
- return reader.getPosition();
- }
-
- public T readEvent() throws IOException {
- if (!reader.next(NullWritable.get(), writable)) {
- return null;
- }
- return writable.getMessage();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-}
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
deleted file mode 100644
index fe7cb2e0a091..000000000000
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.proto;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import com.google.protobuf.ExtensionRegistryLite;
-import org.apache.hadoop.io.Writable;
-
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWritable implements Writable {
- private T message;
- private final Parser parser;
- private DataOutputStream dos;
- private CodedOutputStream cos;
- private DataInputStream din;
- private CodedInputStream cin;
-
- ProtoMessageWritable(Parser parser) {
- this.parser = parser;
- }
-
- public T getMessage() {
- return message;
- }
-
- public void setMessage(T message) {
- this.message = message;
- }
-
- private static class DataOutputStream extends OutputStream {
- DataOutput out;
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- }
-
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- if (dos == null) {
- dos = new DataOutputStream();
- cos = CodedOutputStream.newInstance(dos);
- }
- dos.out = out;
- cos.writeMessageNoTag(message);
- cos.flush();
- }
-
- private static class DataInputStream extends InputStream {
- DataInput in;
- @Override
- public int read() throws IOException {
- try {
- return in.readUnsignedByte();
- } catch (EOFException e) {
- return -1;
- }
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if (din == null) {
- din = new DataInputStream();
- cin = CodedInputStream.newInstance(din);
- cin.setSizeLimit(Integer.MAX_VALUE);
- }
- din.in = in;
- message = cin.readMessage(parser, ExtensionRegistryLite.newInstance());
- }
-}
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
deleted file mode 100644
index 9c086ef0d713..000000000000
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.logging.proto;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Writer;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
-public class ProtoMessageWriter implements Closeable {
- private final Path filePath;
- private final Writer writer;
- private final ProtoMessageWritable writable;
-
- ProtoMessageWriter(Configuration conf, Path filePath, Parser parser) throws IOException {
- this.filePath = filePath;
- this.writer = SequenceFile.createWriter(
- conf,
- Writer.file(filePath),
- Writer.keyClass(NullWritable.class),
- Writer.valueClass(ProtoMessageWritable.class),
- Writer.compression(CompressionType.RECORD));
- this.writable = new ProtoMessageWritable<>(parser);
- }
-
- public Path getPath() {
- return filePath;
- }
-
- public long getOffset() throws IOException {
- return writer.getLength();
- }
-
- public void writeProto(T message) throws IOException {
- writable.setMessage(message);
- writer.append(NullWritable.get(), writable);
- }
-
- public void hflush() throws IOException {
- writer.hflush();
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- }
-}
diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
deleted file mode 100644
index 23ed46062e07..000000000000
--- a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Logger code copied from tez codebase, this should be removed when we swtich
- * to 0.9.2 tez version and we should depend on the tez libraries for this.
- */
-package org.apache.tez.dag.history.logging.proto;