Skip to content

Commit

Permalink
[GOBBLIN-1938] preserve x bit in manifest file based copy (#3804)
Browse files Browse the repository at this point in the history
* preserve x bit in manifest file based copy
* fix project structure preventing running unit tests from intellij
* fix unit test
  • Loading branch information
arjun4084346 authored and Will-Lo committed Oct 27, 2023
1 parent 1a667ec commit 9d748a7
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -88,9 +93,14 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
+ "%s, you can specify multi locations split by '',", manifestPath.toString(), manifestReadFs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}

CopyManifest.CopyableUnitIterator manifests = null;
List<CopyEntity> copyEntities = Lists.newArrayList();
List<FileStatus> toDelete = Lists.newArrayList();
// map of paths and permissions sorted by depth of path, so that permissions can be set in order
Map<String, OwnerAndPermission> ancestorOwnerAndPermissions = new TreeMap<>(
(o1, o2) -> Long.compare(o2.chars().filter(ch -> ch == '/').count(), o1.chars().filter(ch -> ch == '/').count()));

try {
long startTime = System.currentTimeMillis();
manifests = CopyManifest.getReadIterator(this.manifestReadFs, this.manifestPath);
Expand Down Expand Up @@ -118,6 +128,13 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
CopyableFile copyableFile = copyableFileBuilder.build();
copyableFile.setFsDatasets(srcFs, targetFs);
copyEntities.add(copyableFile);

Path fromPath = srcFs.getFileStatus(fileToCopy).isDirectory() ? fileToCopy : fileToCopy.getParent();

ancestorOwnerAndPermissions.putAll(
CopyableFile.resolveReplicatedAncestorOwnerAndPermissionsRecursively(srcFs, fromPath,
new Path(commonFilesParent), configuration));

if (existOnTarget && srcFile.isFile()) {
// this is to match the existing publishing behavior where we won't rewrite the target when it's already existed
// todo: Change the publish behavior to support overwrite destination file during rename, instead of relying on this delete step which is needed if we want to support task level publish
Expand All @@ -128,6 +145,12 @@ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, Cop
toDelete.add(targetFs.getFileStatus(fileToCopy));
}
}

Properties props = new Properties();
props.setProperty(SetPermissionCommitStep.STOP_ON_ERROR_KEY, "true");
CommitStep setPermissionCommitStep = new SetPermissionCommitStep(targetFs, ancestorOwnerAndPermissions, props);
copyEntities.add(new PostPublishStep(datasetURN(), Maps.newHashMap(), setPermissionCommitStep, 1));

if (!toDelete.isEmpty()) {
//todo: add support sync for empty dir
CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, this.properties, Optional.<Path>absent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,19 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
prePublish.size(), postPublish.size()));

executeCommitSequence(prePublish);

if (hasCopyableFiles(datasetWorkUnitStates)) {
// Targets are always absolute, so we start moving from root (will skip any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/"));
} else {
log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
}
executeCommitSequence(postPublish);

this.fs.delete(datasetWriterOutputPath, true);

long datasetOriginTimestamp = Long.MAX_VALUE;
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.<String>absent();
Optional<String> fileSetRoot = Optional.absent();

for (WorkUnitState wus : datasetWorkUnitStates) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
Expand Down Expand Up @@ -300,6 +300,10 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
}
}

// execute post publish commit steps after preserving file attributes, because some post publish step,
// e.g. SetPermissionCommitStep needs to set permissions
executeCommitSequence(postPublish);

// if there are no valid values for datasetOriginTimestamp and datasetUpstreamTimestamp, use
// something more readable
if (Long.MAX_VALUE == datasetOriginTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.data.management.dataset;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -28,22 +29,30 @@
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.util.commit.SetPermissionCommitStep;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.io.Files;

import static org.mockito.Mockito.*;


public class ManifestBasedDatasetFinderTest {
private FileSystem localFs;
private File tmpDir;

public ManifestBasedDatasetFinderTest() throws IOException {
localFs = FileSystem.getLocal(new Configuration());
tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
}

@Test
Expand Down Expand Up @@ -81,7 +90,7 @@ public void testFindFiles() throws IOException, URISyntaxException {
Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
Mockito.when(manifestReadFs.getUri()).thenReturn(MANIFEST_READ_FS_URI);
Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath));
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(new Path(tmpDir.toString())));
Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.exists(any(Path.class))).thenReturn(true);
Mockito.when(manifestReadFs.getFileStatus(manifestPath)).thenReturn(localFs.getFileStatus(manifestPath));
Expand All @@ -96,7 +105,8 @@ public void testFindFiles() throws IOException, URISyntaxException {
new ManifestBasedDataset(sourceFs, manifestReadFs, manifestPath, props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, props).build());
Assert.assertTrue(fileSets.hasNext());
FileSet<CopyEntity> fileSet = fileSets.next();
Assert.assertEquals(fileSet.getFiles().size(), 2);
Assert.assertEquals(fileSet.getFiles().size(), 3); // 2 files to copy + 1 post publish step
Assert.assertTrue(((PostPublishStep)fileSet.getFiles().get(2)).getStep() instanceof SetPermissionCommitStep);
Mockito.verify(manifestReadFs, Mockito.times(1)).exists(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).getFileStatus(manifestPath);
Mockito.verify(manifestReadFs, Mockito.times(1)).open(manifestPath);
Expand Down
2 changes: 1 addition & 1 deletion gobblin-iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dependencies {
transitive = false
}
testCompile('org.apache.hadoop:hadoop-common:2.6.0')
testImplementation(testFixtures(project(":gobblin-completeness")))
testCompile project(":gobblin-completeness").sourceSets.test.output
testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
testCompile externalDependency.testng
testCompile externalDependency.mockito
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public static Path relativizePath(Path fullPath, Path pathPrefix) {
* @return true if possibleAncestor is an ancestor of fullPath.
*/
public static boolean isAncestor(Path possibleAncestor, Path fullPath) {
if (fullPath == null) {
return false;
}
return !relativizePath(fullPath, possibleAncestor).equals(getPathWithoutSchemeAndAuthority(fullPath));
}

Expand Down

0 comments on commit 9d748a7

Please sign in to comment.