diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java new file mode 100644 index 000000000000..f32e4d88f42b --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; + +/** + * Locates a Prism executable based on a user's default operating system and architecture + * environment or a {@link PrismPipelineOptions#getPrismLocation()} override. Handles the download, + * unzip, {@link PosixFilePermissions}, as needed. For {@link #GITHUB_DOWNLOAD_PREFIX} sources, + * additionally performs a SHA512 verification. + */ +class PrismLocator { + static final String OS_NAME_PROPERTY = "os.name"; + static final String ARCH_PROPERTY = "os.arch"; + static final String USER_HOME_PROPERTY = "user.home"; + + private static final String ZIP_EXT = "zip"; + private static final ReleaseInfo RELEASE_INFO = ReleaseInfo.getReleaseInfo(); + private static final String PRISM_BIN_PATH = ".apache_beam/cache/prism/bin"; + private static final Set PERMS = + PosixFilePermissions.fromString("rwxr-xr-x"); + private static final String GITHUB_DOWNLOAD_PREFIX = + "https://github.com/apache/beam/releases/download"; + private static final String GITHUB_TAG_PREFIX = "https://github.com/apache/beam/releases/tag"; + + private final PrismPipelineOptions options; + + PrismLocator(PrismPipelineOptions options) { + this.options = options; + } + + /** + * Downloads and prepares a Prism executable for use with the {@link PrismRunner}. The returned + * {@link String} is the absolute path to the Prism executable. + */ + String resolve() throws IOException { + + String from = + String.format("%s/v%s/%s.zip", GITHUB_DOWNLOAD_PREFIX, getSDKVersion(), buildFileName()); + + if (!Strings.isNullOrEmpty(options.getPrismLocation())) { + checkArgument( + !options.getPrismLocation().startsWith(GITHUB_TAG_PREFIX), + "Provided --prismLocation URL is not an Apache Beam Github " + + "Release page URL or download URL: ", + from); + + from = options.getPrismLocation(); + } + + String fromFileName = getNameWithoutExtension(from); + Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName); + + if (Files.exists(to)) { + return to.toString(); + } + + createDirectoryIfNeeded(to); + + if (from.startsWith("http")) { + String result = resolve(new URL(from), to); + checkState(Files.exists(to), "Resolved location does not exist: %s", result); + return result; + } + + String result = resolve(Paths.get(from), to); + checkState(Files.exists(to), "Resolved location does not exist: %s", result); + return result; + } + + static Path prismBinDirectory() { + return Paths.get(userHome(), PRISM_BIN_PATH); + } + + private String resolve(URL from, Path to) throws IOException { + BiConsumer downloadFn = PrismLocator::download; + if (from.getPath().endsWith(ZIP_EXT)) { + downloadFn = PrismLocator::unzip; + } + downloadFn.accept(from, to); + + Files.setPosixFilePermissions(to, PERMS); + + return to.toString(); + } + + private String resolve(Path from, Path to) throws IOException { + + BiConsumer copyFn = PrismLocator::copy; + if (from.endsWith(ZIP_EXT)) { + copyFn = PrismLocator::unzip; + } + + copyFn.accept(from.toUri().toURL().openStream(), to); + ByteStreams.copy(from.toUri().toURL().openStream(), Files.newOutputStream(to)); + Files.setPosixFilePermissions(to, PERMS); + + return to.toString(); + } + + String buildFileName() { + String version = getSDKVersion(); + return String.format("apache_beam-v%s-prism-%s-%s", version, os(), arch()); + } + + private static void unzip(URL from, Path to) { + try { + unzip(from.openStream(), to); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void unzip(InputStream from, Path to) { + try (OutputStream out = Files.newOutputStream(to)) { + ZipInputStream zis = new ZipInputStream(from); + for (ZipEntry entry = zis.getNextEntry(); entry != null; entry = zis.getNextEntry()) { + InputStream in = ByteStreams.limit(zis, entry.getSize()); + ByteStreams.copy(in, out); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void copy(InputStream from, Path to) { + try { + ByteStreams.copy(from, Files.newOutputStream(to)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void download(URL from, Path to) { + try { + ByteStreams.copy(from.openStream(), Files.newOutputStream(to)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static String getNameWithoutExtension(String path) { + return org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files + .getNameWithoutExtension(path); + } + + private String getSDKVersion() { + if (Strings.isNullOrEmpty(options.getPrismVersionOverride())) { + return RELEASE_INFO.getSdkVersion(); + } + return options.getPrismVersionOverride(); + } + + private static String os() { + String result = mustGetPropertyAsLowerCase(OS_NAME_PROPERTY); + if (result.contains("mac")) { + return "darwin"; + } + return result; + } + + private static String arch() { + String result = mustGetPropertyAsLowerCase(ARCH_PROPERTY); + if (result.contains("aarch")) { + return "arm64"; + } + return result; + } + + private static String userHome() { + return mustGetPropertyAsLowerCase(USER_HOME_PROPERTY); + } + + private static String mustGetPropertyAsLowerCase(String name) { + return checkStateNotNull(System.getProperty(name), "System property: " + name + " not set") + .toLowerCase(); + } + + private static void createDirectoryIfNeeded(Path path) throws IOException { + Path parent = path.getParent(); + if (parent == null) { + return; + } + Files.createDirectories(parent); + } +} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java new file mode 100644 index 000000000000..982a8bfd657c --- /dev/null +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.runners.prism.PrismLocator.prismBinDirectory; +import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PrismLocator}. */ +@RunWith(JUnit4.class) +public class PrismLocatorTest { + + private static final Path DESTINATION_DIRECTORY = prismBinDirectory(); + + @Before + public void setup() throws IOException { + if (Files.exists(DESTINATION_DIRECTORY)) { + Files.walkFileTree( + DESTINATION_DIRECTORY, + new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + }); + + Files.delete(DESTINATION_DIRECTORY); + } + } + + @Test + public void givenVersionOverride_thenResolves() throws IOException { + assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse(); + PrismPipelineOptions options = options(); + options.setPrismVersionOverride("2.57.0"); + PrismLocator underTest = new PrismLocator(options); + String got = underTest.resolve(); + assertThat(got).contains(DESTINATION_DIRECTORY.toString()); + assertThat(got).contains("2.57.0"); + Path gotPath = Paths.get(got); + assertThat(Files.exists(gotPath)).isTrue(); + } + + @Test + public void givenHttpPrismLocationOption_thenResolves() throws IOException { + assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse(); + PrismPipelineOptions options = options(); + options.setPrismLocation( + "https://github.com/apache/beam/releases/download/v2.57.0/apache_beam-v2.57.0-prism-darwin-arm64.zip"); + PrismLocator underTest = new PrismLocator(options); + String got = underTest.resolve(); + assertThat(got).contains(DESTINATION_DIRECTORY.toString()); + Path gotPath = Paths.get(got); + assertThat(Files.exists(gotPath)).isTrue(); + } + + @Test + public void givenFilePrismLocationOption_thenResolves() throws IOException { + assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse(); + PrismPipelineOptions options = options(); + options.setPrismLocation(getLocalPrismBuildOrIgnoreTest()); + PrismLocator underTest = new PrismLocator(options); + String got = underTest.resolve(); + assertThat(got).contains(DESTINATION_DIRECTORY.toString()); + Path gotPath = Paths.get(got); + assertThat(Files.exists(gotPath)).isTrue(); + } + + @Test + public void givenGithubTagPrismLocationOption_thenThrows() { + PrismPipelineOptions options = options(); + options.setPrismLocation( + "https://github.com/apache/beam/releases/tag/v2.57.0/apache_beam-v2.57.0-prism-darwin-amd64.zip"); + PrismLocator underTest = new PrismLocator(options); + IllegalArgumentException error = + assertThrows(IllegalArgumentException.class, underTest::resolve); + assertThat(error.getMessage()) + .contains( + "Provided --prismLocation URL is not an Apache Beam Github Release page URL or download URL"); + } + + @Test + public void givenPrismLocation404_thenThrows() { + PrismPipelineOptions options = options(); + options.setPrismLocation("https://example.com/i/dont/exist.zip"); + PrismLocator underTest = new PrismLocator(options); + RuntimeException error = assertThrows(RuntimeException.class, underTest::resolve); + assertThat(error.getMessage()).contains("NotFoundException"); + } + + private static PrismPipelineOptions options() { + return PipelineOptionsFactory.create().as(PrismPipelineOptions.class); + } +}