Skip to content

Conversation

@dengzhhu653
Copy link
Member

@dengzhhu653 dengzhhu653 commented Oct 30, 2025

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

Tested on Postgres 17.2, MariaDB 10.3.39-MariaDB-1, MySQL 9.1.0-1.el9 and 5.7.44 and Oracle 23

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 12, 2025

It is still using pessimistic locking. how about

Transaction A: UPDATE version = version + 1 (starts at v=5)
Transaction B: UPDATE version = version + 1 (starts at v=5)

Database MVCC:
├─ Transaction A gets version 5, increments to 6, commits
├─ Transaction B sees old version 5 (MVCC snapshot)
├─ When B tries to commit:
│ ├─ Detects conflict (row changed by A)
│ ├─ updCount = 0 (WHERE clause fails - version is now 6, not 5)
│ └─ Returns null to signal conflict

 // ✅ OPTIMISTIC LOCKING: Read current version, increment, and prepare for atomic check
String currentVersionStr = table.getParameters().get(versionParamKey);
long currentVersion = (currentVersionStr != null ? Long.parseLong(currentVersionStr) : 0L);
long newVersion = currentVersion + 1;
newParams.put(versionParamKey, String.valueOf(newVersion));
        
oldt.setParameters(newParams);
        
 // ✅ Atomically increment version with conflict detection
// This UPDATE will fail if another transaction changed the version
int updCount = incrementTableVersionAtomic(mTable.getId(), versionParamKey, currentVersion, newVersion);
        
if (updCount == 0) {
   // Concurrent modification detected - retry
   LOG.debug("Table {}.{} was modified by another transaction (version {} changed), retrying...", dbname, name, currentVersion);
   throw new RetryingExecutor.RetryException(
              new MetaException("Optimistic lock failure - table version changed"));
}
        
LOG.debug("Successfully updated table {}.{} version: {} -> {}", dbname, name, currentVersion, newVersion);

.............

  private int incrementTableVersionAtomic(long tblId, String versionParamKey, 
      long expectedVersion, long newVersion) throws MetaException {
    
    try {
      // First, try to UPDATE with optimistic lock check
      String updateSQL = "UPDATE \"TABLE_PARAMS\" " +
          "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
          "WHERE \"TBL_ID\" = " + tblId + 
          " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
          " AND \"PARAM_VALUE\" = '" + expectedVersion + "'";
      
      int updCount = executePlainSQLUpdate(updateSQL);
      
      if (updCount == 1) {
        // Success - version was incremented
        return 1;
      }

@dengzhhu653
Copy link
Member Author

Thank you @deniskuzZ for the comment.

 String updateSQL = "UPDATE \"TABLE_PARAMS\" " +   "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
          "WHERE \"TBL_ID\" = " + tblId + " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
          AND \"PARAM_VALUE\" = '" + expectedVersion + "'";  // ✅ CHECK SNAPSHOT!

The result of this query seems important to the example, let's say there is a row(TBL_ID, PARAM_KEY, PARAM_VALUE) (1, hive.metastore.table.version, 1) on the table TABLE_PARAMS, if transaction A and B happens to execute the update(set hive.metastore.table.version = 2) at the same time, say if A takes the row lock, then B needs to wait for A committing or rollbacking to release the row before B is allowed to update this row, then B re-evaluates the where condition and see there is no row matched, then return 0.

If there are more transactions to update this row, then they are piled up to get a change to take over the row lock. In my opinion, this is similar to the s4u way I proposed in the old PR.

@dengzhhu653
Copy link
Member Author

I tried the similar update on MySQL, the black transaction is waiting until "Lock wait timeout exceeded",
Screenshot 2025-11-13 at 09 06 43

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 13, 2025

I tried the similar update on MySQL, the black transaction is waiting until "Lock wait timeout exceeded", Screenshot 2025-11-13 at 09 06 43

that is 100% true, however, MVCC is better because:

  • Faster claim (1ms vs 10ms)
  • Parallel claim attempts (database resolves conflicts)
  • Simpler code (no savepoints) Database uses MVCC to serialize at commit time
  • No locks held during work phase

PS: we already use MVCC in ObjectStore: updateParameterWithExpectedValue()

updated patch:

Subject: [PATCH] DRAFT
---
Index: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java	(revision c729ea19807c0c0ca6f1df4870fff49660e95a85)
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java	(date 1763028067727)
@@ -9171,21 +9171,39 @@
       int maxRetries = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
       long sleepInterval = MetastoreConf.getTimeVar(conf,
           ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
+      
+      final String versionParamKey = "hive.metastore.table.version";
+      
       Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> {
-        Ref<Exception> exceptionRef = new Ref<>();
-        String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000) + "_" + System.nanoTime();
-        setTransactionSavePoint(savePoint);
-        executePlainSQL(
-            sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM \"TBLS\" WHERE \"TBL_ID\" = " + mTable.getId()),
-            exception -> {
-              rollbackTransactionToSavePoint(savePoint);
-              exceptionRef.t = exception;
-            });
-        if (exceptionRef.t != null) {
-          throw new RetryingExecutor.RetryException(exceptionRef.t);
-        }
         pm.refresh(mTable);
         Table table = convertToTable(mTable);
+        String dbname = table.getDbName();
+        String name = table.getTableName();
+        
+        // ✅ STEP 1: Read current version snapshot from TABLE_PARAMS
+        String expectedVersionStr = table.getParameters().get(versionParamKey);
+        if (expectedVersionStr == null) {
+          expectedVersionStr = "0";
+        }
+        long newVersion = Long.parseLong(expectedVersionStr) + 1;
+        String newVersionStr = String.valueOf(newVersion);
+        
+        // ✅ STEP 2: Atomically claim the version using existing MVCC API
+        // This uses UPDATE with WHERE clause to check snapshot hasn't changed
+        long affectedRows = updateParameterWithExpectedValue(table, versionParamKey, expectedVersionStr, newVersionStr);
+        
+        if (affectedRows != 1) {
+          // Version conflict - PARAM_VALUE changed since we read it (concurrent modification)
+          LOG.debug("Table {}.{} version conflict (expected={}), retrying...", dbname, name, expectedVersionStr);
+          throw new RetryingExecutor.RetryException(
+              new MetaException("The table has been modified. The parameter value for key '" + 
+                  versionParamKey + "' is different"));
+        }
+        
+        // ✅ STEP 3: Successfully claimed version - now do the work
+        LOG.debug("Claimed table {}.{} version {} -> {}, proceeding with stats update", 
+            dbname, name, expectedVersionStr, newVersion);
+        
         List<String> colNames = new ArrayList<>();
         for (ColumnStatisticsObj statsObj : statsObjs) {
           colNames.add(statsObj.getColName());
@@ -9201,17 +9219,14 @@
           MTableColumnStatistics mStatsObj = StatObjectConverter.convertToMTableColumnStatistics(mTable, statsDesc,
               statsObj, colStats.getEngine());
           writeMTableColumnStatistics(table, mStatsObj, oldStats.get(statsObj.getColName()));
-          // There is no need to add colname again, otherwise we will get duplicate colNames.
         }
 
         // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table!
         // Set the table properties
-        // No need to check again if it exists.
-        String dbname = table.getDbName();
-        String name = table.getTableName();
         MTable oldt = mTable;
         Map<String, String> newParams = new HashMap<>(table.getParameters());
         StatsSetupConst.setColumnStatsState(newParams, colNames);
+        
         boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
         if (isTxn) {
           if (!areTxnStatsSupported) {
@@ -9230,7 +9245,11 @@
             oldt.setWriteId(writeId);
           }
         }
+        
+        // ✅ STEP 4: Add the new version to params (already updated in DB via directSql)
+        newParams.put(versionParamKey, newVersionStr);
         oldt.setParameters(newParams);
+
         return newParams;
       }).onRetry(e -> e instanceof RetryingExecutor.RetryException)
         .commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, interval ->

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 13, 2025

in any case, solution in this PR is acceptable and OK to merge, but please consider using MVCC (i.e. updateParameterWithExpectedValue) since it's already used (please see comment above).

Another question: since we’re adding RetryingExecutor here, what’s the role of RetryingMetaStoreClient? Doesn’t it already handle the same functionality?

@dengzhhu653
Copy link
Member Author

dengzhhu653 commented Nov 13, 2025

in any case, solution in this PR is acceptable and OK to merge, but please consider using MVCC (i.e. updateParameterWithExpectedValue) since it's already used (please see comment above).

Another question: since we’re adding RetryingExecutor here, what’s the role of RetryingMetaStoreClient? Doesn’t it already handle the same functionality?

RetryingMetaStoreClient basically retries the call on thrift lawyer exception, such as service shutdown(connection refused or timeout), or incompatible protocol(connect to an old HMS which thrift method hasn't been introduced).

RetryingExecutor doesn't need a tcp round-trip, it's more efficient than RetryingMetaStoreClient, i.e, no need to serialize or de-serialize the thrift message and convey the message over the connection.

IMO in the MVCC the transaction still holds the row lock in case of affected rows = 1, it might be rolled back in the middle of the transaction, so other updates need this transaction to commit or rollback to get the right version of this row(avoid to read dirty).

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 13, 2025

MVCC the transaction still holds the row lock in case of affected rows = 1, it might be rolled back in the middle of the transaction, so other updates need this transaction to commit or rollback to get the right version of this row(avoid to read dirty)

Performance Comparison (Updates Only)

Aspect MVCC + Retry S4U NOWAIT + Retry
Locking behavior Row-level lock acquired at commit; version checks allow other transactions to proceed Row-level lock attempted immediately; fails if locked
Conflict detection At commit; may do speculative work before detecting conflict Immediately at lock acquisition; no speculative work
Retries Only if affected row changed; may retry heavier work On immediate lock failure; retries are lightweight
Throughput under low contention High High
Throughput under high contention Medium (retries on conflicts; speculative work adds overhead) Lower (more frequent immediate failures, but less wasted work)
CPU overhead Higher under high contention due to speculative work and version checks Can be lower per transaction, but frequent retries increase CPU
Latency Slightly variable; may spike if many conflicts Immediate failures add retry latency

private boolean areTxnStatsSupported = false;
private PropertyStore propertyStore;

private static Striped<Lock> tablelocks;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this optimization? it can safe some resources when concurrency happens on HMS process level

Copy link
Member Author

@dengzhhu653 dengzhhu653 Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can depend on a shared outer lock solely, if we worry about the CPU overhead it brought to the database, I can add it back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point is the retry is random but at least 30ms, and the row is located by the primary key, so I guess the CPU overhead might not be so high as we think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably you are right

return selectStatement + " for update NOWAIT";
} else {
int selectLength = "select".length();
return selectStatement.trim().substring(0, selectLength) + " /*+ MAX_EXECUTION_TIME(300) */ " +
Copy link
Member

@deniskuzZ deniskuzZ Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how reliable is that? it won't take row-level lock

Copy link
Member Author

@dengzhhu653 dengzhhu653 Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it takes the row-level lock as well, and maximum wait time for the lock is 300ms, otherwise throws:

Caused by: java.sql.SQLException: Query execution was interrupted, maximum statement execution time exceeded

Checked MySQL 5.7, using mysql-connector-j-8.0.32.jar and mysql-connector-java-5.1.49.jar

Copy link
Member

@deniskuzZ deniskuzZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, pending tests

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 13, 2025

Since the retries are handled by server-side via RetryingExecutor, do we actually need S4U NOWAIT? I initially assumed that the client would be responsible for retrying.
Now #5929 makes sense.

@dengzhhu653
Copy link
Member Author

Since the retries are handled by server-side via RetryingExecutor, do we actually need S4U NOWAIT? I initially assumed that the client would be responsible for retrying. Now #5929 makes sense.

This is a more optimistic way compared to S4U, NOWAIT helps prevent the long hang for waiting for the lock, reduce the hot contention, personally I like this way.

@sonarqubecloud
Copy link

@dengzhhu653
Copy link
Member Author

Filed another jira HIVE-29316 to fix the similar issue on updatePartitionColumnStatistics

@dengzhhu653 dengzhhu653 merged commit a947925 into apache:master Nov 14, 2025
3 of 4 checks passed
@dengzhhu653
Copy link
Member Author

Thank you @deniskuzZ for the review!

@dengzhhu653 dengzhhu653 deleted the HIVE-28578-optimistic branch November 14, 2025 06:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants