Skip to content

Commit

Permalink
HIVE-28062: Optimize get_partitions_by_names in direct sql (apache#5063
Browse files Browse the repository at this point in the history
…)(Wechar Yu, reviewed by Denys Kuzmenko, Butao Zhang)
  • Loading branch information
wecharyu authored Mar 27, 2024
1 parent 106c52e commit c26c25d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public static void beforeTest() throws Exception {
conf.setBoolVar(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL, true);
conf.setBoolVar(HiveConf.ConfVars.DYNAMIC_PARTITIONING, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, false);
// Disable loading dynamic partitions from partition names in this test
// because get_partitions_by_names will also hit partition limit since HIVE-28062.
conf.setBoolVar(HiveConf.ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_SCAN_SPECIFIC_PARTITIONS, false);

miniHS2 = new MiniHS2.Builder().withConf(conf).build();
Map<String, String> overlayProps = new HashMap<String, String>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ public enum ConfVars {
"This limits the number of partitions (whole partition objects) that can be requested " +
"from the metastore for a give table. MetaStore API methods using this are: \n" +
"get_partitions, \n" +
"get_partitions_by_names, \n" +
"get_partitions_with_auth, \n" +
"get_partitions_by_filter, \n" +
"get_partitions_spec_by_filter, \n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7466,7 +7466,7 @@ public GetPartitionsByNamesResult get_partitions_by_names_req(GetPartitionsByNam
return result;
}

public List<Partition> get_partitions_by_names(final String dbName, final String tblName,
private List<Partition> get_partitions_by_names(final String dbName, final String tblName,
boolean getColStats, String engine,
List<String> processorCapabilities, String processorId,
GetPartitionsArgs args) throws TException {
Expand All @@ -7478,14 +7478,14 @@ public List<Partition> get_partitions_by_names(final String dbName, final String
Table table = null;
Exception ex = null;
boolean success = false;
startTableFunction("get_partitions_by_names", parsedCatName, parsedDbName,
tblName);
startTableFunction("get_partitions_by_names", parsedCatName, parsedDbName, tblName);
try {
getMS().openTransaction();
authorizeTableForPartitionMetadata(parsedCatName, parsedDbName, tblName);

fireReadTablePreEvent(parsedCatName, parsedDbName, tblName);

checkLimitNumberOfPartitions(tblName, args.getPartNames().size(), -1);
ret = getMS().getPartitionsByNames(parsedCatName, parsedDbName, tblName, args);
ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
table = getTable(parsedCatName, parsedDbName, tblName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,14 +739,7 @@ public List<Partition> getPartitionsViaPartNames(final String catName, final Str
return Batchable.runBatched(batchSize, partNames, new Batchable<String, Partition>() {
@Override
public List<Partition> run(List<String> input) throws MetaException {
String filter = "" + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";
List<Long> partitionIds = getPartitionIdsViaSqlFilter(catName, dbName, tblName,
filter, input, Collections.<String>emptyList(), null);
if (partitionIds.isEmpty()) {
return Collections.emptyList(); // no partitions, bail early.
}
return getPartitionsFromPartitionIds(catName, dbName, tblName, null,
partitionIds, Collections.emptyList(), false, args);
return getPartitionsByNames(catName, dbName, tblName, partNames, false, args);
}
});
}
Expand All @@ -770,8 +763,7 @@ public List<Partition> getPartitionsViaSqlFilter(String catName, String dbName,
return Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsFromPartitionIds(catName, dbName,
tableName, null, input, Collections.emptyList(), isAcidTable, args);
return getPartitionsByPartitionIds(catName, dbName, tableName, input, isAcidTable, args);
}
});
}
Expand Down Expand Up @@ -925,7 +917,7 @@ public List<Partition> getPartitions(String catName,
List<Partition> result = Batchable.runBatched(batchSize, partitionIds, new Batchable<Long, Partition>() {
@Override
public List<Partition> run(List<Long> input) throws MetaException {
return getPartitionsFromPartitionIds(catName, dbName, tblName, null, input, Collections.emptyList(), false, args);
return getPartitionsByPartitionIds(catName, dbName, tblName, input, false, args);
}
});
return result;
Expand Down Expand Up @@ -1024,15 +1016,39 @@ public <T> List<T> getPartitionFieldsViaSqlFilter(
}
}


/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<Partition> getPartitionsFromPartitionIds(String catName, String dbName, String tblName,
Boolean isView, List<Long> partIdList, List<String> projectionFields,
boolean isAcidTable, GetPartitionsArgs args) throws MetaException {
private List<Partition> getPartitionsByNames(String catName, String dbName,
String tblName, List<String> partNameList, boolean isAcidTable, GetPartitionsArgs args)
throws MetaException {
// Get most of the fields for the partNames provided.
// Assume db and table names are the same for all partition, as provided in arguments.
String quotedPartNames = partNameList.stream()
.map(DirectSqlUpdatePart::quoteString)
.collect(Collectors.joining(","));

boolean doTrace = LOG.isDebugEnabled();
String queryText =
"select " + PARTITIONS + ".\"PART_ID\"," + SDS + ".\"SD_ID\"," + SDS + ".\"CD_ID\","
+ SERDES + ".\"SERDE_ID\"," + PARTITIONS + ".\"CREATE_TIME\"," + PARTITIONS
+ ".\"LAST_ACCESS_TIME\"," + SDS + ".\"INPUT_FORMAT\"," + SDS + ".\"IS_COMPRESSED\","
+ SDS + ".\"IS_STOREDASSUBDIRECTORIES\"," + SDS + ".\"LOCATION\"," + SDS
+ ".\"NUM_BUCKETS\"," + SDS + ".\"OUTPUT_FORMAT\"," + SERDES + ".\"NAME\","
+ SERDES + ".\"SLIB\"," + PARTITIONS + ".\"WRITE_ID\"" + " from " + PARTITIONS
+ " left outer join " + SDS + " on " + PARTITIONS + ".\"SD_ID\" = " + SDS + ".\"SD_ID\" "
+ " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = " + SERDES + ".\"SERDE_ID\" "
+ " inner join " + TBLS + " on " + TBLS + ".\"TBL_ID\" = " + PARTITIONS + ".\"TBL_ID\" "
+ " inner join " + DBS + " on " + DBS + ".\"DB_ID\" = " + TBLS + ".\"DB_ID\" "
+ " where \"PART_NAME\" in (" + quotedPartNames + ") "
+ " and " + TBLS + ".\"TBL_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + DBS
+ ".\"CTLG_NAME\" = ? order by \"PART_NAME\" asc";

Object[] params = new Object[]{tblName, dbName, catName};
return getPartitionsByQuery(catName, dbName, tblName, queryText, params, isAcidTable, args);
}

int idStringWidth = (int)Math.ceil(Math.log10(partIdList.size())) + 1; // 1 for comma
int sbCapacity = partIdList.size() * idStringWidth;
/** Should be called with the list short enough to not trip up Oracle/etc. */
private List<Partition> getPartitionsByPartitionIds(String catName, String dbName, String tblName,
List<Long> partIdList, boolean isAcidTable, GetPartitionsArgs args) throws MetaException {
// Get most of the fields for the IDs provided.
// Assume db and table names are the same for all partition, as provided in arguments.
String partIds = getIdListForIn(partIdList);
Expand All @@ -1047,17 +1063,24 @@ private List<Partition> getPartitionsFromPartitionIds(String catName, String dbN
+ ".\"SD_ID\" " + " left outer join " + SERDES + " on " + SDS + ".\"SERDE_ID\" = "
+ SERDES + ".\"SERDE_ID\" " + "where \"PART_ID\" in (" + partIds
+ ") order by \"PART_NAME\" asc";
return getPartitionsByQuery(catName, dbName, tblName, queryText, null, isAcidTable, args);
}

private List<Partition> getPartitionsByQuery(String catName, String dbName, String tblName,
String queryText, Object[] params, boolean isAcidTable, GetPartitionsArgs args)
throws MetaException {
boolean doTrace = LOG.isDebugEnabled();

// Read all the fields and create partitions, SDs and serdes.
TreeMap<Long, Partition> partitions = new TreeMap<Long, Partition>();
TreeMap<Long, StorageDescriptor> sds = new TreeMap<Long, StorageDescriptor>();
TreeMap<Long, SerDeInfo> serdes = new TreeMap<Long, SerDeInfo>();
TreeMap<Long, List<FieldSchema>> colss = new TreeMap<Long, List<FieldSchema>>();
// Keep order by name, consistent with JDO.
ArrayList<Partition> orderedResult = new ArrayList<Partition>(partIdList.size());
ArrayList<Partition> orderedResult;

// Prepare StringBuilder-s for "in (...)" lists to use in one-to-many queries.
StringBuilder sdSb = new StringBuilder(sbCapacity), serdeSb = new StringBuilder(sbCapacity);
StringBuilder sdSb = new StringBuilder(), serdeSb = new StringBuilder();
StringBuilder colsSb = new StringBuilder(7); // We expect that there's only one field schema.
tblName = tblName.toLowerCase();
dbName = dbName.toLowerCase();
Expand All @@ -1066,9 +1089,10 @@ private List<Partition> getPartitionsFromPartitionIds(String catName, String dbN

try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) {
long start = doTrace ? System.nanoTime() : 0;
List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(), null, queryText);
List<Object[]> sqlResult = executeWithArray(query.getInnerQuery(), params, queryText);
long queryTime = doTrace ? System.nanoTime() : 0;
Deadline.checkTimeout();
orderedResult = new ArrayList<>(sqlResult.size());

for (Object[] fields : sqlResult) {
// Here comes the ugly part...
Expand Down Expand Up @@ -1161,6 +1185,7 @@ private List<Partition> getPartitionsFromPartitionIds(String catName, String dbN
}
MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, queryTime);
}
String partIds = getIdListForIn(partitions.keySet());
// Now get all the one-to-many things. Start with partitions.
MetastoreDirectSqlUtils
.setPartitionParametersWithFilter(PARTITION_PARAMS, convertMapNullsToEmptyStrings, pm,
Expand Down

0 comments on commit c26c25d

Please sign in to comment.