Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ public List<String> listTables(ConnectContext connectContext, String dbName) {
public Table getTable(ConnectContext connectContext, String dbName, String tableName) throws StarRocksConnectorException {
IcebergTableName icebergTableName = new IcebergTableName(dbName, tableName);

if (shouldOnlyReadCache(connectContext)) {
Table cachedTable = tables.getIfPresent(icebergTableName);
return cachedTable != null ? cachedTable : delegate.getTable(connectContext, dbName, tableName);
}

if (ConnectContext.get() == null || ConnectContext.get().getCommand() == MysqlCommand.COM_QUERY) {
tableLatestAccessTime.put(icebergTableName, System.currentTimeMillis());
}
Expand Down Expand Up @@ -514,12 +519,19 @@ private void invalidateCache(IcebergTableName key) {

@Override
public StarRocksIcebergTableScan getTableScan(Table table, StarRocksIcebergTableScanContext scanContext) {
scanContext.setDataFileCache(dataFileCache);
scanContext.setDeleteFileCache(deleteFileCache);
boolean bypassCache = shouldOnlyReadCache(scanContext.getConnectContext());
scanContext.setMetaFileCacheMap(metaFileCacheMap);
scanContext.setDataFileCacheWithMetrics(icebergProperties.isIcebergManifestCacheWithColumnStatistics());
scanContext.setEnableCacheDataFileIdentifierColumnMetrics(
icebergProperties.enableCacheDataFileIdentifierColumnStatistics());
if (bypassCache) {
scanContext.setOnlyReadCache(true);
scanContext.setDataFileCache(null);
scanContext.setDeleteFileCache(null);
} else {
Comment on lines +527 to +531

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep caches attached when only-read mode is set

When onlyReadIcebergCache is true you currently set both caches to null, which makes StarRocksIcebergTableScan.useCache() return false because it checks dataFileCache != null. That means MV refresh / INSERT‑SELECT will bypass existing manifest caches entirely instead of reading from them, and will re-scan manifests every time even if they were already cached. If the intent is “read cache but don’t populate/refresh,” you should keep the caches attached and rely on onlyReadCache (which is already honored in planTaskWithCache) to skip puts; otherwise this change silently disables the cache in exactly the paths it was meant to use.

Useful? React with 👍 / 👎.

scanContext.setDataFileCache(dataFileCache);
scanContext.setDeleteFileCache(deleteFileCache);
}

return delegate.getTableScan(table, scanContext);
}
Expand All @@ -542,6 +554,12 @@ private Caffeine<Object, Object> newCacheBuilderWithMaximumSize(long expiresAfte
return newCacheBuilder(expiresAfterWriteSec, refreshInterval).maximumSize(maximumSize);
}

// We still allow reads from caches, but skip populating or refreshing them when this flag is set.
private boolean shouldOnlyReadCache(ConnectContext context) {
ConnectContext ctx = context != null ? context : ConnectContext.get();
return ctx != null && ctx.isOnlyReadIcebergCache();
}

public static class IcebergTableName {
private final String dbName;
private final String tableName;
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ public class ConnectContext {

private final Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap();

// Control whether to read Iceberg caches without populating/updating them for the current execution.
private boolean onlyReadIcebergCache = false;

private UUID sessionId;

private String proxyHostName;
Expand Down Expand Up @@ -1283,6 +1286,14 @@ public void setQuerySource(QuerySource querySource) {
this.querySource = querySource;
}

public boolean isOnlyReadIcebergCache() {
return onlyReadIcebergCache;
}

public void setOnlyReadIcebergCache(boolean onlyReadIcebergCache) {
this.onlyReadIcebergCache = onlyReadIcebergCache;
}

public void startAcceptQuery(ConnectProcessor connectProcessor) {
mysqlChannel.startAcceptQuery(this, connectProcessor);
}
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,11 @@ public void execute() throws Exception {
WarehouseIdleChecker.increaseRunningSQL(originWarehouseId);
}

final boolean originSkipIcebergCache = context.isOnlyReadIcebergCache();
if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) {
context.setOnlyReadIcebergCache(true);
}

RecursiveCTEExecutor cteExecutor = null;
try {
context.getState().setIsQuery(context.isQueryStmt(parsedStmt));
Expand Down Expand Up @@ -1129,6 +1134,7 @@ public void execute() throws Exception {

// process post-action after query is finished
context.onQueryFinished();
context.setOnlyReadIcebergCache(originSkipIcebergCache);
if (cteExecutor != null) {
cteExecutor.finalizeRecursiveCTE();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public Constants.TaskRunState processTaskRun(TaskRunContext context) throws Exce
connectContext.setQuerySource(com.starrocks.qe.QueryDetail.QuerySource.MV);
final QueryMaterializationContext queryMVContext = new QueryMaterializationContext();
connectContext.setQueryMVContext(queryMVContext);
final boolean originSkipIcebergCache = connectContext.isOnlyReadIcebergCache();
connectContext.setOnlyReadIcebergCache(true);
try {
// do refresh
try (Timer ignored = Tracers.watchScope("MVRefreshDoWholeRefresh")) {
Expand Down Expand Up @@ -217,6 +219,7 @@ public Constants.TaskRunState processTaskRun(TaskRunContext context) throws Exce
// reset query mv context to avoid affecting other tasks
queryMVContext.clear();
connectContext.setQueryMVContext(null);
connectContext.setOnlyReadIcebergCache(originSkipIcebergCache);

if (FeConstants.runningUnitTest) {
runtimeProfile = new RuntimeProfile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,11 @@ public static PredicateSplit getQuerySplitPredicate(OptimizerContext optimizerCo

public static Optional<Table> getTable(BaseTableInfo baseTableInfo) {
try {
return GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(new ConnectContext(), baseTableInfo);
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
ctx = new ConnectContext();
}
return GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(ctx, baseTableInfo);
} catch (Exception e) {
// For hive catalog, when meets NoSuchObjectException, we should return empty
// msg: NoSuchObjectException: hive_db_8b48cd2f_4bfe_11f0_bc1a_00163e09349d.t1 table not found
Expand All @@ -1498,8 +1502,12 @@ public static Optional<Table> getTable(BaseTableInfo baseTableInfo) {

public static Optional<Table> getTableWithIdentifier(BaseTableInfo baseTableInfo) {
try {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
ctx = new ConnectContext();
}
return GlobalStateMgr.getCurrentState().getMetadataMgr()
.getTableWithIdentifier(new ConnectContext(), baseTableInfo);
.getTableWithIdentifier(ctx, baseTableInfo);
} catch (Exception e) {
// For hive catalog, when meets NoSuchObjectException, we should return empty
// msg: NoSuchObjectException: hive_db_8b48cd2f_4bfe_11f0_bc1a_00163e09349d.t1 table not found
Expand Down
Loading