Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added replaceGcsFilesWithLocalFiles #33006

Merged
merged 7 commits into from
Nov 19, 2024
Merged

Conversation

liferoad
Copy link
Contributor

@liferoad liferoad commented Nov 2, 2024

Fixes #32531

Improve GCS file handling in DataflowRunner

The proposed changes are only limited to DataflowRunner and are kept simple. When a GCS file is detected in filesToStage, we first download this to a local temp file and replace the GCS file with this local temp file.

Changes

  • Added functionality to handle GCS files in replaceGcsFilesWithLocalFiles() method
  • Files with GCS paths (gs://) are now downloaded to local temp files before being staged
  • Maintains original file names when downloading from GCS
  • Handles both files with and without staging names specified
  • Adds logging for file downloads and replacements

Notes

  • Temp files are created with same names as original GCS files for consistency
  • Files are downloaded using FileSystems API for proper GCS integration
  • Original functionality for local files is preserved unchanged

Tests

  • Add a simple IT to validate the file is downloaded and staged later with a Dataflow job
image

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@liferoad liferoad marked this pull request as ready for review November 5, 2024 15:58
@liferoad liferoad requested review from Abacn and kennknowles November 5, 2024 15:58
Copy link
Contributor

github-actions bot commented Nov 5, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is a working solution.

Alternatively, I am wondering if the client could initiate gcs copy call from source path to the actual staging path when staging happens, this would eliminate the need to download actual file to local. Could be a separate task

ResourceId source = FileSystems.matchNewResource(filePath, false);
try (ReadableByteChannel reader = FileSystems.open(source);
FileOutputStream writer = new FileOutputStream(tempFile)) {
ByteStreams.copy(Channels.newInputStream(reader), writer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't totally understand the use of this method, but surely you should use Filesystems.copy method that does this:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Filesystems.copy has too many layers. Here, we simply downloads the file to a given temp file, which is also removed with deleteOnExit. I think we do not need to use our internal tool since it is quite straightforward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you implemented is just the same as copy but now we have two copies to maintain, really. The layers should be robust.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expect srcResourceIds and destResourceIds have the same scheme, but received gs, file.
java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received gs, file.

Looks like our copy needs the same scheme. :)

@liferoad
Copy link
Contributor Author

Will close this PR and create a new one to do this when staging the GCS files.

@liferoad liferoad closed this Nov 15, 2024
@liferoad liferoad reopened this Nov 18, 2024
@liferoad
Copy link
Contributor Author

some offline discussions with @kennknowles. Adding this to the staging phase needs more time and more places to be changed since the current staging logic relies on local files (e.g., computing hash). So we decided to merge this PR for now. The improvements on the staging phase could be done later.

@liferoad liferoad merged commit c57553c into apache:master Nov 19, 2024
33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request][Java]:filesToStage should support the gcs file path
3 participants