Skip to content

Commit

Permalink
HIVE-28173: Fixed staging dir issues on materialized views on HDFS en…
Browse files Browse the repository at this point in the history
…crypted tables (Stephen Carlin reviewed by Krisztian Kasa, Denys Kuzmenko)
  • Loading branch information
scarlin-cloudera authored May 2, 2024
1 parent a02be2b commit 636b0d3
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.parse;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;

import org.apache.hadoop.hive.metastore.api.StorageDescriptor;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test ParseUtils to see if a staging dir is created.
*/
public class TestParseUtilsStagingDir {
protected static final Logger LOG = LoggerFactory.getLogger(TestParseUtilsStagingDir.class.getName());

private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks";

private static WarehouseInstance primary;
private static HiveConf conf;
private static MiniDFSCluster miniDFSCluster;

private static final String TABLE = "tbl";

@BeforeClass
public static void beforeClassSetup() throws Exception {
System.setProperty("jceks.key.serialFilter", "java.lang.Enum;java.security.KeyRep;" +
"java.security.KeyRep$Type;javax.crypto.spec.SecretKeySpec;" +
"org.apache.hadoop.crypto.key.JavaKeyStoreProvider$KeyMetadata;!*");
conf = new HiveConf();
conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile);

miniDFSCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();

DFSTestUtil.createKey("test_key", miniDFSCluster, conf);
primary = new WarehouseInstance(LOG, miniDFSCluster, new HashMap<String, String>(),
"test_key");
SessionState.start((HiveConf) conf);
}

@AfterClass
public static void classLevelTearDown() throws IOException {
SessionState.get().close();
primary.close();
FileUtils.deleteQuietly(new File(jksFile));
}

/**
* Test to make sure staging directory is not created when the
* isLoadingMaterializedView flag is true and the file system
* is encrypted.
*/
@Test
public void testGetStagingDirEncryptedWithMV() throws Exception {
Table table = createTable(TABLE, primary.warehouseRoot);
QB qb = createQB(table);
Context ctx = new Context(conf);
ctx.setIsLoadingMaterializedView(true);
Path path = SemanticAnalyzer.getStagingDirectoryPathname(qb, conf, ctx);
Assert.assertFalse(miniDFSCluster.getFileSystem().exists(path.getParent()));
}

/**
* Test to make sure staging directory is created when with no flags set when
* the file system is encrypted.
*
* Note: This might not be the correct behavior, but this was the behavior
* HIVE-28173 was fixed. It is too risky to change behavior for
* the default path.
*/
@Test
public void testGetStagingDirEncrypted() throws Exception {
Table table = createTable(TABLE, primary.warehouseRoot);
QB qb = createQB(table);
Context ctx = new Context(conf);
Path path = SemanticAnalyzer.getStagingDirectoryPathname(qb, conf, ctx);
Assert.assertTrue(miniDFSCluster.getFileSystem().exists(path.getParent()));
}

/**
* Test that no staging directory is created for default behavior where the
* directory location is on the local file system.
*/
@Test
public void testGetStagingDir() throws Exception {
Context ctx = new Context(conf);
QB qb = new QB("", "", false);
Path path = SemanticAnalyzer.getStagingDirectoryPathname(qb, conf, ctx);
FileSystem fs = FileSystem.getLocal(conf);
Assert.assertFalse(fs.exists(path.getParent()));
}

private static QB createQB(Table table) {
String tableName = table.getTTable().getTableName();
QB qb = new QB("", "", false);
qb.setTabAlias(tableName, tableName);
qb.getMetaData().setSrcForAlias(tableName, table);
return qb;
}

private static Table createTable(String tableName, Path location) {
Table table = new Table();
table.setTTable(createTTableObject(tableName));
table.setDataLocation(location);
return table;
}

private static org.apache.hadoop.hive.metastore.api.Table createTTableObject(String tableName) {
org.apache.hadoop.hive.metastore.api.Table tTable =
new org.apache.hadoop.hive.metastore.api.Table();
tTable.setSd(new StorageDescriptor());
tTable.setTableName(tableName);
return tTable;
}
}
6 changes: 4 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ public Path getMRScratchDir(boolean mkDir) {
*
*/
public Path getMRScratchDir() {
return getMRScratchDir(!isExplainSkipExecution());
return getMRScratchDir(!isExplainSkipExecution() && !isLoadingMaterializedView());
}

/**
Expand Down Expand Up @@ -850,7 +850,9 @@ public boolean isResultCacheDir(Path destinationPath) {
}

public Path getMRTmpPath(URI uri) {
return new Path(getStagingDir(new Path(uri), !isExplainSkipExecution()), MR_PREFIX + nextPathId());
return new Path(getStagingDir(new Path(uri),
!isExplainSkipExecution() && !isLoadingMaterializedView()),
MR_PREFIX + nextPathId());
}

public Path getMRTmpPath(boolean mkDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
Expand Down Expand Up @@ -227,7 +228,7 @@ public HiveRelOptMaterialization createMaterialization(HiveConf conf, Table mate
}
final CBOPlan plan;
try {
plan = ParseUtils.parseQuery(conf, viewQuery);
plan = ParseUtils.parseQuery(createContext(conf), viewQuery);
} catch (Exception e) {
LOG.warn("Materialized view " + materializedViewTable.getCompleteName() +
" ignored; error parsing original query; " + e);
Expand Down Expand Up @@ -362,6 +363,13 @@ public List<HiveRelOptMaterialization> getRewritingMaterializedViews(ASTNode ast
return materializedViewsCache.get(ast);
}

private Context createContext(HiveConf conf) {
Context ctx = new Context(conf);
ctx.setIsLoadingMaterializedView(true);
ctx.setHDFSCleanup(true);
return ctx;
}

public boolean isEmpty() {
return materializedViewsCache.isEmpty();
}
Expand Down
6 changes: 2 additions & 4 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,10 @@ public static String getKeywords(Set<String> excludes) {
return sb.toString();
}

public static CBOPlan parseQuery(HiveConf conf, String viewQuery)
public static CBOPlan parseQuery(Context ctx, String viewQuery)
throws SemanticException, ParseException {
final Context ctx = new Context(conf);
ctx.setIsLoadingMaterializedView(true);
final ASTNode ast = parse(viewQuery, ctx);
final CalcitePlanner analyzer = getAnalyzer(conf, ctx);
final CalcitePlanner analyzer = getAnalyzer((HiveConf) ctx.getConf(), ctx);
RelNode logicalPlan = analyzer.genLogicalPlan(ast);
return new CBOPlan(
ast, logicalPlan, analyzer.getMaterializationValidationResult().getSupportedRewriteAlgorithms());
Expand Down
20 changes: 10 additions & 10 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2647,7 +2647,7 @@ private void getMetaData(QB qb, ReadEntity parentInput)
} else {
// This is the only place where isQuery is set to true; it defaults to false.
qb.setIsQuery(true);
Path stagingPath = getStagingDirectoryPathname(qb);
Path stagingPath = getStagingDirectoryPathname(qb, conf, ctx);
fname = stagingPath.toString();
ctx.setResDir(stagingPath);
}
Expand Down Expand Up @@ -2715,7 +2715,7 @@ private void getMetaData(QB qb, ReadEntity parentInput)
* @return True if the path is encrypted; False if it is not encrypted
* @throws HiveException If an error occurs while checking for encryption
*/
private boolean isPathEncrypted(Path path) throws HiveException {
private static boolean isPathEncrypted(Path path, HiveConf conf) throws HiveException {

try {
HadoopShims.HdfsEncryptionShim hdfsEncryptionShim =
Expand All @@ -2740,7 +2740,7 @@ private boolean isPathEncrypted(Path path) throws HiveException {
* @return -1 if strength is weak; 0 if is equals; 1 if it is stronger
* @throws HiveException If an error occurs while comparing key strengths.
*/
private int comparePathKeyStrength(Path p1, Path p2) throws HiveException {
private static int comparePathKeyStrength(Path p1, Path p2, HiveConf conf) throws HiveException {
try {
HadoopShims.HdfsEncryptionShim hdfsEncryptionShim1 = SessionState.get().getHdfsEncryptionShim(p1.getFileSystem(conf), conf);
HadoopShims.HdfsEncryptionShim hdfsEncryptionShim2 = SessionState.get().getHdfsEncryptionShim(p2.getFileSystem(conf), conf);
Expand All @@ -2762,7 +2762,7 @@ private int comparePathKeyStrength(Path p1, Path p2) throws HiveException {
* @return True if the path is read-only; False otherwise.
* @throws HiveException If an error occurs while checking file permissions.
*/
private boolean isPathReadOnly(Path path) throws HiveException {
private static boolean isPathReadOnly(Path path) throws HiveException {
HiveConf conf = SessionState.get().getConf();
try {
FileSystem fs = path.getFileSystem(conf);
Expand Down Expand Up @@ -2791,7 +2791,7 @@ private boolean isPathReadOnly(Path path) throws HiveException {
* @return The strongest encrypted path. It may return NULL if there are not tables encrypted, or are not HDFS tables.
* @throws HiveException if an error occurred attempting to compare the encryption strength
*/
private Path getStrongestEncryptedTablePath(QB qb) throws HiveException {
private static Path getStrongestEncryptedTablePath(QB qb, HiveConf conf) throws HiveException {
List<String> tabAliases = new ArrayList<String>(qb.getTabAliases());
Path strongestPath = null;

Expand All @@ -2802,10 +2802,10 @@ private Path getStrongestEncryptedTablePath(QB qb) throws HiveException {
Path tablePath = tab.getDataLocation();
if (tablePath != null) {
if ("hdfs".equalsIgnoreCase(tablePath.toUri().getScheme())) {
if (isPathEncrypted(tablePath)) {
if (isPathEncrypted(tablePath, conf)) {
if (strongestPath == null) {
strongestPath = tablePath;
} else if (comparePathKeyStrength(tablePath, strongestPath) > 0) {
} else if (comparePathKeyStrength(tablePath, strongestPath, conf) > 0) {
strongestPath = tablePath;
}
}
Expand All @@ -2829,19 +2829,19 @@ private Path getStrongestEncryptedTablePath(QB qb) throws HiveException {
* @return The path to the staging directory.
* @throws HiveException If an error occurs while identifying the correct staging location.
*/
private Path getStagingDirectoryPathname(QB qb) throws HiveException {
static Path getStagingDirectoryPathname(QB qb, HiveConf conf, Context ctx) throws HiveException {
Path stagingPath = null, tablePath = null;

if (DFSUtilClient.isHDFSEncryptionEnabled(conf)) {
// Looks for the most encrypted table location
// It may return null if there are not tables encrypted, or are not part of HDFS
tablePath = getStrongestEncryptedTablePath(qb);
tablePath = getStrongestEncryptedTablePath(qb, conf);
}
if (tablePath != null) {
// At this point, tablePath is part of HDFS and it is encrypted
if (isPathReadOnly(tablePath)) {
Path tmpPath = ctx.getMRTmpPath();
if (comparePathKeyStrength(tablePath, tmpPath) < 0) {
if (comparePathKeyStrength(tablePath, tmpPath, conf) < 0) {
throw new HiveException("Read-only encrypted tables cannot be read " +
"if the scratch directory is not encrypted (or encryption is weak)");
} else {
Expand Down

0 comments on commit 636b0d3

Please sign in to comment.