Skip to content

Commit

Permalink
Locate and download Prism binary (#31796)
Browse files Browse the repository at this point in the history
* Stage PrismRunner implementation and dependencies

* Locate and download Prism binary

* Sync with head

* Remove redundant check

* Remove sha verification; delete files in test setup

* Remove destination dir; check exists

* Add tests for 404 and tag prefix
  • Loading branch information
damondouglas authored Jul 10, 2024
1 parent 7c0cf39 commit 8d5c3b5
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;

/**
* Locates a Prism executable based on a user's default operating system and architecture
* environment or a {@link PrismPipelineOptions#getPrismLocation()} override. Handles the download,
* unzip, {@link PosixFilePermissions}, as needed. For {@link #GITHUB_DOWNLOAD_PREFIX} sources,
* additionally performs a SHA512 verification.
*/
class PrismLocator {
static final String OS_NAME_PROPERTY = "os.name";
static final String ARCH_PROPERTY = "os.arch";
static final String USER_HOME_PROPERTY = "user.home";

private static final String ZIP_EXT = "zip";
private static final ReleaseInfo RELEASE_INFO = ReleaseInfo.getReleaseInfo();
private static final String PRISM_BIN_PATH = ".apache_beam/cache/prism/bin";
private static final Set<PosixFilePermission> PERMS =
PosixFilePermissions.fromString("rwxr-xr-x");
private static final String GITHUB_DOWNLOAD_PREFIX =
"https://github.com/apache/beam/releases/download";
private static final String GITHUB_TAG_PREFIX = "https://github.com/apache/beam/releases/tag";

private final PrismPipelineOptions options;

PrismLocator(PrismPipelineOptions options) {
this.options = options;
}

/**
* Downloads and prepares a Prism executable for use with the {@link PrismRunner}. The returned
* {@link String} is the absolute path to the Prism executable.
*/
String resolve() throws IOException {

String from =
String.format("%s/v%s/%s.zip", GITHUB_DOWNLOAD_PREFIX, getSDKVersion(), buildFileName());

if (!Strings.isNullOrEmpty(options.getPrismLocation())) {
checkArgument(
!options.getPrismLocation().startsWith(GITHUB_TAG_PREFIX),
"Provided --prismLocation URL is not an Apache Beam Github "
+ "Release page URL or download URL: ",
from);

from = options.getPrismLocation();
}

String fromFileName = getNameWithoutExtension(from);
Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName);

if (Files.exists(to)) {
return to.toString();
}

createDirectoryIfNeeded(to);

if (from.startsWith("http")) {
String result = resolve(new URL(from), to);
checkState(Files.exists(to), "Resolved location does not exist: %s", result);
return result;
}

String result = resolve(Paths.get(from), to);
checkState(Files.exists(to), "Resolved location does not exist: %s", result);
return result;
}

static Path prismBinDirectory() {
return Paths.get(userHome(), PRISM_BIN_PATH);
}

private String resolve(URL from, Path to) throws IOException {
BiConsumer<URL, Path> downloadFn = PrismLocator::download;
if (from.getPath().endsWith(ZIP_EXT)) {
downloadFn = PrismLocator::unzip;
}
downloadFn.accept(from, to);

Files.setPosixFilePermissions(to, PERMS);

return to.toString();
}

private String resolve(Path from, Path to) throws IOException {

BiConsumer<InputStream, Path> copyFn = PrismLocator::copy;
if (from.endsWith(ZIP_EXT)) {
copyFn = PrismLocator::unzip;
}

copyFn.accept(from.toUri().toURL().openStream(), to);
ByteStreams.copy(from.toUri().toURL().openStream(), Files.newOutputStream(to));
Files.setPosixFilePermissions(to, PERMS);

return to.toString();
}

String buildFileName() {
String version = getSDKVersion();
return String.format("apache_beam-v%s-prism-%s-%s", version, os(), arch());
}

private static void unzip(URL from, Path to) {
try {
unzip(from.openStream(), to);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static void unzip(InputStream from, Path to) {
try (OutputStream out = Files.newOutputStream(to)) {
ZipInputStream zis = new ZipInputStream(from);
for (ZipEntry entry = zis.getNextEntry(); entry != null; entry = zis.getNextEntry()) {
InputStream in = ByteStreams.limit(zis, entry.getSize());
ByteStreams.copy(in, out);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static void copy(InputStream from, Path to) {
try {
ByteStreams.copy(from, Files.newOutputStream(to));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static void download(URL from, Path to) {
try {
ByteStreams.copy(from.openStream(), Files.newOutputStream(to));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static String getNameWithoutExtension(String path) {
return org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files
.getNameWithoutExtension(path);
}

private String getSDKVersion() {
if (Strings.isNullOrEmpty(options.getPrismVersionOverride())) {
return RELEASE_INFO.getSdkVersion();
}
return options.getPrismVersionOverride();
}

private static String os() {
String result = mustGetPropertyAsLowerCase(OS_NAME_PROPERTY);
if (result.contains("mac")) {
return "darwin";
}
return result;
}

private static String arch() {
String result = mustGetPropertyAsLowerCase(ARCH_PROPERTY);
if (result.contains("aarch")) {
return "arm64";
}
return result;
}

private static String userHome() {
return mustGetPropertyAsLowerCase(USER_HOME_PROPERTY);
}

private static String mustGetPropertyAsLowerCase(String name) {
return checkStateNotNull(System.getProperty(name), "System property: " + name + " not set")
.toLowerCase();
}

private static void createDirectoryIfNeeded(Path path) throws IOException {
Path parent = path.getParent();
if (parent == null) {
return;
}
Files.createDirectories(parent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.runners.prism.PrismLocator.prismBinDirectory;
import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link PrismLocator}. */
@RunWith(JUnit4.class)
public class PrismLocatorTest {

private static final Path DESTINATION_DIRECTORY = prismBinDirectory();

@Before
public void setup() throws IOException {
if (Files.exists(DESTINATION_DIRECTORY)) {
Files.walkFileTree(
DESTINATION_DIRECTORY,
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
});

Files.delete(DESTINATION_DIRECTORY);
}
}

@Test
public void givenVersionOverride_thenResolves() throws IOException {
assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
PrismPipelineOptions options = options();
options.setPrismVersionOverride("2.57.0");
PrismLocator underTest = new PrismLocator(options);
String got = underTest.resolve();
assertThat(got).contains(DESTINATION_DIRECTORY.toString());
assertThat(got).contains("2.57.0");
Path gotPath = Paths.get(got);
assertThat(Files.exists(gotPath)).isTrue();
}

@Test
public void givenHttpPrismLocationOption_thenResolves() throws IOException {
assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
PrismPipelineOptions options = options();
options.setPrismLocation(
"https://github.com/apache/beam/releases/download/v2.57.0/apache_beam-v2.57.0-prism-darwin-arm64.zip");
PrismLocator underTest = new PrismLocator(options);
String got = underTest.resolve();
assertThat(got).contains(DESTINATION_DIRECTORY.toString());
Path gotPath = Paths.get(got);
assertThat(Files.exists(gotPath)).isTrue();
}

@Test
public void givenFilePrismLocationOption_thenResolves() throws IOException {
assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
PrismPipelineOptions options = options();
options.setPrismLocation(getLocalPrismBuildOrIgnoreTest());
PrismLocator underTest = new PrismLocator(options);
String got = underTest.resolve();
assertThat(got).contains(DESTINATION_DIRECTORY.toString());
Path gotPath = Paths.get(got);
assertThat(Files.exists(gotPath)).isTrue();
}

@Test
public void givenGithubTagPrismLocationOption_thenThrows() {
PrismPipelineOptions options = options();
options.setPrismLocation(
"https://github.com/apache/beam/releases/tag/v2.57.0/apache_beam-v2.57.0-prism-darwin-amd64.zip");
PrismLocator underTest = new PrismLocator(options);
IllegalArgumentException error =
assertThrows(IllegalArgumentException.class, underTest::resolve);
assertThat(error.getMessage())
.contains(
"Provided --prismLocation URL is not an Apache Beam Github Release page URL or download URL");
}

@Test
public void givenPrismLocation404_thenThrows() {
PrismPipelineOptions options = options();
options.setPrismLocation("https://example.com/i/dont/exist.zip");
PrismLocator underTest = new PrismLocator(options);
RuntimeException error = assertThrows(RuntimeException.class, underTest::resolve);
assertThat(error.getMessage()).contains("NotFoundException");
}

private static PrismPipelineOptions options() {
return PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
}
}

0 comments on commit 8d5c3b5

Please sign in to comment.