Skip to content

Commit

Permalink
Allow extension of functions in GobblinMCEPublisher and customization…
Browse files Browse the repository at this point in the history
… of fileList file metrics are calculated for (#3820)
  • Loading branch information
AndyJiang99 authored and Will-Lo committed Nov 7, 2023
1 parent 0b87af7 commit dab2806
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,7 +84,6 @@ public class GobblinMCEPublisher extends DataPublisher {
public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = "serializedAuditCountMap";

public GobblinMCEPublisher(State state) throws IOException {

this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
}

Expand All @@ -96,7 +96,7 @@ public GobblinMCEPublisher(State state, GobblinMCEProducer producer) {
public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
// First aggregate the new files by partition
for (State state : states) {
Map<Path, Metrics> newFiles = computeFileMetrics(state);
Map<Path, Metrics> newFiles = computeFileMetrics(state, state.getPropAsList(NEW_FILES_LIST, ""));
Map<String, String> offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY);
if (newFiles.isEmpty()) {
// There'll be only one dummy file here. This file is parsed for DB and table name calculation.
Expand All @@ -114,7 +114,7 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
}
}

private Map<String, String> getPartitionOffsetRange(String offsetKey) {
protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
return state.getPropAsList(offsetKey)
.stream()
.collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
Expand All @@ -125,11 +125,11 @@ private Map<String, String> getPartitionOffsetRange(String offsetKey) {
* and calculate the hive spec for each datafile and submit the task to register that datafile
* @throws IOException
*/
private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
protected Map<Path, Metrics> computeFileMetrics(State state, List<String> fileList) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
NameMapping mapping = getNameMapping();
FileSystem fs = FileSystem.get(conf);
for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
for (final String pathString : fileList) {
Path path = new Path(pathString);
LinkedList<FileStatus> fileStatuses = new LinkedList<>();
fileStatuses.add(fs.getFileStatus(path));
Expand All @@ -153,7 +153,7 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
* It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
* @throws IOException
*/
private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
protected Map<Path, Metrics> computeDummyFile(State state) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
FileSystem fs = FileSystem.get(conf);
if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
Expand Down

0 comments on commit dab2806

Please sign in to comment.