Skip to content

Commit 9f2168c

Browse files
yangyxflaming-archer
authored andcommitted
Improve the lifecycle management of cache
1 parent 9353f0c commit 9f2168c

File tree

6 files changed

+217
-29
lines changed

6 files changed

+217
-29
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ package org.apache.kyuubi.spark.connector.hive
2020
import java.lang.{Boolean => JBoolean, Long => JLong}
2121
import java.net.URI
2222
import java.util
23-
2423
import scala.collection.JavaConverters._
2524
import scala.collection.mutable
2625
import scala.util.Try
27-
2826
import org.apache.hadoop.conf.Configuration
2927
import org.apache.spark.SparkConf
3028
import org.apache.spark.internal.Logging
@@ -45,10 +43,10 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
4543
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOBAL_TEMP_DATABASE}
4644
import org.apache.spark.sql.types.StructType
4745
import org.apache.spark.sql.util.CaseInsensitiveStringMap
48-
4946
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
50-
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
47+
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper, getStorageFormatAndProvider, toCatalogDatabase}
5148
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
49+
import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache
5250
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
5351

5452
/**
@@ -388,7 +386,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
388386
case _: NoSuchTableException =>
389387
throw new NoSuchTableException(ident)
390388
}
391-
389+
invalidateTable(ident)
392390
loadTable(ident)
393391
}
394392

@@ -401,9 +399,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
401399
ident.asTableIdentifier,
402400
ignoreIfNotExists = true,
403401
purge = true /* skip HDFS trash */ )
404-
if (table.isInstanceOf[HiveTable]) {
405-
table.asInstanceOf[HiveTable].fileIndex.refresh()
406-
}
402+
invalidateTable(ident)
407403
true
408404
} else {
409405
false
@@ -421,10 +417,16 @@ class HiveTableCatalog(sparkSession: SparkSession)
421417
}
422418

423419
// Load table to make sure the table exists
424-
loadTable(oldIdent)
420+
val table = loadTable(oldIdent)
425421
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
422+
invalidateTable(oldIdent)
426423
}
427424

425+
override def invalidateTable(ident: Identifier): Unit = {
426+
val qualifiedName = s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}"
427+
HiveFileStatusCache.getOrCreate(sparkSession, qualifiedName).invalidateAll()
428+
}
429+
428430
private def toOptions(properties: Map[String, String]): Map[String, String] = {
429431
properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
430432
case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class HiveCatalogFileIndex(
4848
private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] =
4949
mutable.Map()
5050

51-
private val fileStatusCache =
52-
HiveFileStatusCache.getOrCreate(sparkSession, catalogTable.qualifiedName)
51+
private val fileStatusCache = HiveFileStatusCache.getOrCreate(sparkSession,
52+
hiveCatalog.name() + "." + catalogTable.qualifiedName)
5353

5454
private val baseLocation: Option[URI] = table.storage.locationUri
5555

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.kyuubi.spark.connector.hive.write
1919

2020
import scala.util.control.NonFatal
21-
2221
import org.apache.hadoop.conf.Configuration
2322
import org.apache.hadoop.fs.Path
2423
import org.apache.hadoop.hive.conf.HiveConf
@@ -32,8 +31,8 @@ import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTas
3231
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
3332
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.toSQLValue
3433
import org.apache.spark.sql.types.StringType
35-
3634
import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException}
35+
import org.apache.spark.sql.connector.catalog.Identifier
3736

3837
class HiveBatchWrite(
3938
sparkSession: SparkSession,
@@ -69,6 +68,9 @@ class HiveBatchWrite(
6968

7069
// un-cache this table.
7170
hiveTableCatalog.catalog.invalidateCachedTable(table.identifier)
71+
hiveTableCatalog.invalidateTable(
72+
Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table)
73+
)
7274

7375
val catalog = hiveTableCatalog.catalog
7476
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3939

4040
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
4141
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
42-
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
42+
import org.apache.kyuubi.spark.connector.hive.read.{HiveFileStatusCache, HiveScan}
4343

4444
class HiveCatalogSuite extends KyuubiHiveTest {
4545

@@ -284,16 +284,29 @@ class HiveCatalogSuite extends KyuubiHiveTest {
284284
}
285285

286286
test("invalidateTable") {
287-
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
288-
// Hive v2 don't cache table
289-
catalog.invalidateTable(testIdent)
290-
291-
val loaded = catalog.loadTable(testIdent)
292-
293-
assert(table.name == loaded.name)
294-
assert(table.schema == loaded.schema)
295-
assert(table.properties == loaded.properties)
296-
catalog.dropTable(testIdent)
287+
withSparkSession() { spark =>
288+
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
289+
val qualifiedName = s"$catalogName.${testIdent.namespace().mkString(".")}.${testIdent.name()}"
290+
val location = table.asInstanceOf[HiveTable].catalogTable.location
291+
292+
spark.sql(s"select * from $qualifiedName").collect()
293+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
294+
.getLeafFiles(new Path(location)).isDefined)
295+
296+
catalog.invalidateTable(testIdent)
297+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
298+
.getLeafFiles(new Path(location)).isEmpty)
299+
300+
spark.sql(s"select * from $qualifiedName").collect()
301+
assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName)
302+
.getLeafFiles(new Path(location)).isDefined)
303+
304+
val loaded = catalog.loadTable(testIdent)
305+
assert(table.name == loaded.name)
306+
assert(table.schema == loaded.schema)
307+
assert(table.properties == loaded.properties)
308+
catalog.dropTable(testIdent)
309+
}
297310
}
298311

299312
test("listNamespaces: fail if missing namespace") {

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala

Lines changed: 175 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.kyuubi.spark.connector.hive
1919

20+
import com.google.common.collect.Maps
2021
import scala.concurrent.duration.DurationInt
21-
2222
import org.apache.hadoop.fs.{FileStatus, Path}
2323
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
2424
import org.scalatest.concurrent.Eventually.eventually
2525
import org.scalatest.concurrent.Futures.timeout
26-
2726
import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache
27+
import org.apache.spark.sql.connector.catalog.Identifier
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2829

2930
class HiveFileStatusCacheSuite extends KyuubiHiveTest {
3031

@@ -38,9 +39,9 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
3839
val files = (1 to 3).map(_ => new FileStatus())
3940

4041
HiveFileStatusCache.resetForTesting()
41-
val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1")
42+
val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table")
4243
fileStatusCacheTabel1.putLeafFiles(path, files.toArray)
43-
val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1")
44+
val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table")
4445
val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2")
4546

4647
// Exactly 3 files are cached.
@@ -83,4 +84,173 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
8384
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
8485
}
8586
}
86-
}
87+
88+
private def newCatalog(): HiveTableCatalog = {
89+
val catalog = new HiveTableCatalog
90+
val properties = Maps.newHashMap[String, String]()
91+
properties.put("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true")
92+
properties.put("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
93+
catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties))
94+
catalog
95+
}
96+
97+
test("expire FileStatusCache when insert into") {
98+
val dbName = "default"
99+
val tbName = "tbl_partition"
100+
val table = s"${catalogName}.${dbName}.${tbName}"
101+
102+
withTable(table) {
103+
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
104+
val location = newCatalog()
105+
.loadTable(Identifier.of(Array(dbName), tbName))
106+
.asInstanceOf[HiveTable]
107+
.catalogTable.location.toString
108+
109+
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
110+
assert(HiveFileStatusCache.getOrCreate(spark, table)
111+
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
112+
113+
assert(spark.sql(s"select * from $table").count() === 5)
114+
assert(HiveFileStatusCache.getOrCreate(spark, table)
115+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
116+
117+
spark.sql(s"insert into $table partition(city='ct') values(11),(21),(31),(41),(51)").collect()
118+
assert(HiveFileStatusCache.getOrCreate(spark, table)
119+
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
120+
121+
assert(spark.sql(s"select * from $table").count() === 10)
122+
assert(HiveFileStatusCache.getOrCreate(spark, table)
123+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 2)
124+
}
125+
}
126+
127+
test("expire FileStatusCache when insert overwrite") {
128+
val dbName = "default"
129+
val tbName = "tbl_partition"
130+
val table = s"${catalogName}.${dbName}.${tbName}"
131+
132+
withTable(table) {
133+
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
134+
val location = newCatalog()
135+
.loadTable(Identifier.of(Array(dbName), tbName))
136+
.asInstanceOf[HiveTable]
137+
.catalogTable.location.toString
138+
139+
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
140+
assert(HiveFileStatusCache.getOrCreate(spark, table)
141+
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
142+
143+
assert(spark.sql(s"select * from $table").count() === 5)
144+
assert(HiveFileStatusCache.getOrCreate(spark, table)
145+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
146+
147+
spark.sql(s"insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51)")
148+
.collect()
149+
assert(HiveFileStatusCache.getOrCreate(spark, table)
150+
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
151+
152+
assert(spark.sql(s"select * from $table").count() === 5)
153+
assert(HiveFileStatusCache.getOrCreate(spark, table)
154+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
155+
}
156+
}
157+
158+
test("expire FileStatusCache when alter Table") {
159+
val dbName = "default"
160+
val tbName = "tbl_partition"
161+
val table = s"${catalogName}.${dbName}.${tbName}"
162+
163+
withTable(table) {
164+
spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect()
165+
val location = newCatalog()
166+
.loadTable(Identifier.of(Array(dbName), tbName))
167+
.asInstanceOf[HiveTable]
168+
.catalogTable.location.toString
169+
170+
spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect()
171+
spark.sql(s"select * from $table").collect()
172+
assert(HiveFileStatusCache.getOrCreate(spark, table)
173+
.getLeafFiles(new Path(s"$location/city=ct")).get.length === 1)
174+
175+
spark.sql(s"ALTER TABLE $table ADD COLUMNS (name string)").collect()
176+
assert(HiveFileStatusCache.getOrCreate(spark, table)
177+
.getLeafFiles(new Path(s"$location/city=ct")).isEmpty)
178+
}
179+
}
180+
181+
test("expire FileStatusCache when rename Table") {
182+
val dbName = "default"
183+
val oldTbName = "tbl_partition"
184+
val newTbName = "tbl_partition_new"
185+
val oldTable = s"${catalogName}.${dbName}.${oldTbName}"
186+
val newTable = s"${catalogName}.${dbName}.${newTbName}"
187+
188+
withTable(newTable) {
189+
spark.sql(s"create table ${oldTable} (age int)partitioned by(city string) stored as orc")
190+
.collect()
191+
spark.sql(s"insert into $oldTable partition(city='ct') values(10),(20),(30),(40),(50)")
192+
.collect()
193+
spark.sql(s"select * from $oldTable").collect()
194+
195+
val oldLocation = newCatalog()
196+
.loadTable(Identifier.of(Array(dbName), oldTbName))
197+
.asInstanceOf[HiveTable]
198+
.catalogTable.location.toString
199+
assert(HiveFileStatusCache.getOrCreate(spark, oldTable)
200+
.getLeafFiles(new Path(s"$oldLocation/city=ct")).get.length === 1)
201+
202+
spark.sql(s"DROP TABLE IF EXISTS ${newTable}").collect()
203+
spark.sql(s"use ${catalogName}.${dbName}").collect()
204+
spark.sql(s"ALTER TABLE $oldTbName RENAME TO $newTbName").collect()
205+
val newLocation = newCatalog()
206+
.loadTable(Identifier.of(Array(dbName), newTbName))
207+
.asInstanceOf[HiveTable]
208+
.catalogTable.location.toString
209+
210+
assert(HiveFileStatusCache.getOrCreate(spark, oldTable)
211+
.getLeafFiles(new Path(s"$oldLocation/city=ct"))
212+
.isEmpty)
213+
214+
assert(HiveFileStatusCache.getOrCreate(spark, newTable)
215+
.getLeafFiles(new Path(s"$newLocation/city=ct"))
216+
.isEmpty)
217+
}
218+
}
219+
220+
test("FileStatusCache isolated between different catalogs with same database.table") {
221+
val catalog1 = catalogName
222+
val catalog2 = "hive2"
223+
val dbName = "default"
224+
val tbName = "tbl_partition"
225+
val dbTableShortName = s"${dbName}.${tbName}"
226+
val cat1Table = s"${catalog1}.${dbTableShortName}"
227+
val cat2Table = s"${catalog2}.${dbTableShortName}"
228+
229+
withTable(cat1Table, cat2Table) {
230+
spark.sql(s"CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string)" +
231+
s" stored as orc").collect()
232+
val location = newCatalog()
233+
.loadTable(Identifier.of(Array(dbName), tbName))
234+
.asInstanceOf[HiveTable]
235+
.catalogTable.location.toString
236+
237+
spark.sql(s"use $catalog1").collect()
238+
spark.sql(s"insert into $dbTableShortName partition(city='ct1') " +
239+
s"values(11),(12),(13),(14),(15)").collect()
240+
spark.sql(s"select * from $cat1Table where city='ct1'").collect()
241+
assert(HiveFileStatusCache.getOrCreate(spark, cat1Table)
242+
.getLeafFiles(new Path(s"$location/city=ct1"))
243+
.get.length === 1)
244+
245+
spark.sql(s"use $catalog2").collect()
246+
spark.sql(s"insert into $dbTableShortName partition(city='ct2') " +
247+
s"values(21),(22),(23),(24),(25)").collect()
248+
spark.sql(s"select * from $cat2Table where city='ct2'").collect()
249+
assert(HiveFileStatusCache.getOrCreate(spark, cat2Table)
250+
.getLeafFiles(new Path(s"$location/city=ct1")).isEmpty)
251+
assert(HiveFileStatusCache.getOrCreate(spark, cat2Table)
252+
.getLeafFiles(new Path(s"$location/city=ct2"))
253+
.get.length === 1)
254+
}
255+
}
256+
}

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ abstract class KyuubiHiveTest extends QueryTest with Logging {
6666
.set("spark.ui.enabled", "false")
6767
.set("spark.sql.catalogImplementation", "hive")
6868
.set("spark.sql.catalog.hive", classOf[HiveTableCatalog].getName)
69+
.set("spark.sql.catalog.hive2", classOf[HiveTableCatalog].getName)
6970
.set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true")
7071
.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
7172

0 commit comments

Comments
 (0)