Skip to content

Commit c77f11f

Browse files
committed
HIVE-28578: Concurrency issue in updateTableColumnStatistics
1 parent 937d100 commit c77f11f

File tree

1 file changed

+36
-30
lines changed
  • standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore

1 file changed

+36
-30
lines changed

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9517,6 +9517,9 @@ public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats
95179517
// So let's not use them anywhere unless absolutely necessary.
95189518
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
95199519
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
9520+
lockForUpdate("TBLS", "TBL_ID", Optional.of("\"TBL_ID\" = " + mTable.getId()));
9521+
// Get the newest version of mTable
9522+
pm.refresh(mTable);
95209523
Table table = convertToTable(mTable);
95219524
List<String> colNames = new ArrayList<>();
95229525
for (ColumnStatisticsObj statsObj : statsObjs) {
@@ -9605,23 +9608,28 @@ public Map<String, String> updatePartitionColumnStatistics(Table table, MTable m
96059608
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
96069609
ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
96079610
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
9608-
Partition partition = convertToPart(catName, statsDesc.getDbName(), statsDesc.getTableName(), getMPartition(
9609-
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable), TxnUtils.isAcidTable(table));
9611+
MPartition mPartition = getMPartition(
9612+
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable);
9613+
Partition partition = convertToPart(catName, statsDesc.getDbName(), statsDesc.getTableName(),
9614+
mPartition, TxnUtils.isAcidTable(table));
96109615
List<String> colNames = new ArrayList<>();
96119616

96129617
for(ColumnStatisticsObj statsObj : statsObjs) {
96139618
colNames.add(statsObj.getColName());
96149619
}
96159620

9616-
Map<String, MPartitionColumnStatistics> oldStats = getPartitionColStats(table, statsDesc
9617-
.getPartName(), colNames, colStats.getEngine());
9618-
9619-
MPartition mPartition = getMPartition(
9620-
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable);
9621-
if (partition == null) {
9621+
List<Long> partitionIds = directSql.getPartitionFieldsViaSqlFilter(catName, statsDesc.getDbName(), statsDesc.getTableName(),
9622+
Arrays.asList("\"PART_ID\""), "\"PARTITIONS\".\"PART_NAME\" = ?",
9623+
Arrays.asList(Warehouse.makePartName(table.getPartitionKeys(), partVals)), Collections.emptyList(), -1);
9624+
if (partition == null || partitionIds.isEmpty()) {
96229625
throw new NoSuchObjectException("Partition for which stats is gathered doesn't exist.");
96239626
}
96249627

9628+
Map<String, MPartitionColumnStatistics> oldStats = getPartitionColStats(table, statsDesc
9629+
.getPartName(), colNames, colStats.getEngine());
9630+
lockForUpdate("PARTITIONS", "PART_ID", Optional.of("\"PART_ID\" = " + partitionIds.getFirst()));
9631+
pm.refresh(mPartition);
9632+
96259633
for (ColumnStatisticsObj statsObj : statsObjs) {
96269634
MPartitionColumnStatistics mStatsObj =
96279635
StatObjectConverter.convertToMPartitionColumnStatistics(mPartition, statsDesc, statsObj, colStats.getEngine());
@@ -11448,36 +11456,34 @@ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, Stri
1144811456
return writeEventInfoList;
1144911457
}
1145011458

11451-
private void prepareQuotes() throws SQLException {
11452-
String s = dbType.getPrepareTxnStmt();
11453-
if (s != null) {
11454-
assert pm.currentTransaction().isActive();
11455-
JDOConnection jdoConn = pm.getDataStoreConnection();
11456-
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11457-
statement.execute(s);
11458-
} finally {
11459-
jdoConn.close();
11460-
}
11461-
}
11462-
}
11463-
11464-
private void lockNotificationSequenceForUpdate() throws MetaException {
11459+
private void lockForUpdate(String tableName, String column, Optional<String> rowFilter)
11460+
throws MetaException {
1146511461
if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) {
1146611462
// Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache
1146711463
// .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's
1146811464
// only one row in the table, this shouldn't cause any performance degradation.
1146911465
new RetryingExecutor(conf, () -> {
11470-
directSql.lockDbTable("NOTIFICATION_SEQUENCE");
11466+
directSql.lockDbTable(tableName);
1147111467
}).run();
1147211468
} else {
11473-
String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\"";
11469+
String selectQuery = "select \"" + column + "\" from \"" + tableName + "\"" +
11470+
rowFilter.map(f -> " where " + f).orElse("");
1147411471
String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
1147511472
new RetryingExecutor(conf, () -> {
11476-
prepareQuotes();
11477-
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) {
11478-
query.setUnique(true);
11479-
// only need to execute it to get db Lock
11480-
query.execute();
11473+
String txnStmt = dbType.getPrepareTxnStmt();
11474+
List<String> statements = new ArrayList<>();
11475+
if (txnStmt != null) {
11476+
statements.add(txnStmt);
11477+
}
11478+
statements.add(lockingQuery);
11479+
assert pm.currentTransaction().isActive();
11480+
JDOConnection jdoConn = pm.getDataStoreConnection();
11481+
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11482+
for (String s : statements) {
11483+
statement.execute(s);
11484+
}
11485+
} finally {
11486+
jdoConn.close();
1148111487
}
1148211488
}).run();
1148311489
}
@@ -11545,7 +11551,7 @@ public void addNotificationEvent(NotificationEvent entry) throws MetaException {
1154511551
try {
1154611552
pm.flush();
1154711553
openTransaction();
11548-
lockNotificationSequenceForUpdate();
11554+
lockForUpdate("NOTIFICATION_SEQUENCE", "NEXT_EVENT_ID", Optional.empty());
1154911555
query = pm.newQuery(MNotificationNextId.class);
1155011556
Collection<MNotificationNextId> ids = (Collection) query.execute();
1155111557
MNotificationNextId mNotificationNextId = null;

0 commit comments

Comments
 (0)