Skip to content

[BugFix] Fixed a bug in starrocks.filter.query and Spark pushdown filters merging#149

Open
monchickey wants to merge 2 commits intoStarRocks:mainfrom
monchickey:main
Open

[BugFix] Fixed a bug in starrocks.filter.query and Spark pushdown filters merging#149
monchickey wants to merge 2 commits intoStarRocks:mainfrom
monchickey:main

Conversation

@monchickey
Copy link

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Which issues of this PR fixes :

There are currently no related issues.

Problem Summary(Required) :

StarRocks version: 3.4.9
Spark version: 3.5.6
Spark Connector version: 1.1.3

Reproduction steps:

Create a test table:

CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE test.product_sales (
    sale_id     BIGINT,
    product_name VARCHAR(64),
    category     VARCHAR(32),
    price        DECIMAL(10,2),
    ts           DATETIME
  ) DUPLICATE KEY(sale_id)
  DISTRIBUTED BY HASH(sale_id) BUCKETS 4
  PROPERTIES("replication_num" = "1");

Write test data:

INSERT INTO test.product_sales VALUES
    (1, 'Apple iPhone 14', 'phone', 5999.00, '2024-01-01 10:00:00'),
    (2, 'Samsung Galaxy S23', 'phone', 5299.00, '2024-01-01 10:05:00'),
    (3, 'Xiaomi 13', 'phone', 3999.00, '2024-01-01 10:10:00'),
    (4, 'Apple MacBook Air', 'laptop', 8999.00, '2024-01-01 10:15:00'),
    (5, 'Dell XPS 13', 'laptop', 7999.00, '2024-01-01 10:20:00'),
    (6, 'Lenovo ThinkPad X1', 'laptop', 9999.00, '2024-01-01 10:25:00'),
    (7, 'Sony WH-1000XM5', 'headphone', 2999.00, '2024-01-01 10:30:00'),
    (8, 'Bose QC45', 'headphone', 2499.00, '2024-01-01 10:35:00');

Query test data:

select * from test.product_sales;
+---------+--------------------+-----------+---------+---------------------+
| sale_id | product_name       | category  | price   | ts                  |
+---------+--------------------+-----------+---------+---------------------+
|       3 | Xiaomi 13          | phone     | 3999.00 | 2024-01-01 10:10:00 |
|       6 | Lenovo ThinkPad X1 | laptop    | 9999.00 | 2024-01-01 10:25:00 |
|       5 | Dell XPS 13        | laptop    | 7999.00 | 2024-01-01 10:20:00 |
|       2 | Samsung Galaxy S23 | phone     | 5299.00 | 2024-01-01 10:05:00 |
|       7 | Sony WH-1000XM5    | headphone | 2999.00 | 2024-01-01 10:30:00 |
|       8 | Bose QC45          | headphone | 2499.00 | 2024-01-01 10:35:00 |
|       1 | Apple iPhone 14    | phone     | 5999.00 | 2024-01-01 10:00:00 |
|       4 | Apple MacBook Air  | laptop    | 8999.00 | 2024-01-01 10:15:00 |
+---------+--------------------+-----------+---------+---------------------+

We have prepared the following test cases:

    val spark = SparkSession.builder()
      .appName("StarRocks push down test")
      .master("local")
      .getOrCreate()

    val df = spark.read.format("starrocks")
      .option("starrocks.fe.http.url", "192.168.1.211:8030")
      .option("starrocks.fe.jdbc.url", "jdbc:mysql://192.168.1.211:9030")
      .option("starrocks.user", "root")
      .option("starrocks.password", "")
      .option("starrocks.table.identifier", s"test.product_sales")
      .option("starrocks.filter.query", "((sale_id = 1) OR (sale_id = 2) OR (sale_id = 5) OR (sale_id = 7) OR (sale_id = 8))")
      .load()

    val df2 = df.filter(col("category").startsWith("phone") || col("category").contains("laptop"))

    df2.explain(true)
    df2.show()

    spark.stop()

After execution, Spark reported the following error:

ERROR com.starrocks.connector.spark.sql.ScalaStarrocksRowRDD - Connect to StarRocks http://192.168.1.211:8030/api/test/product_sales/_query_plan failed.


Connect to http://192.168.1.211:8030/api/test/product_sales/_query_plan failed, status: HTTP/1.1 500 Internal Server Error, response entity: {"exception":"The Sql is invalid","status":500}.
com.starrocks.connector.spark.exception.ConnectedFailedException: Connect to http://192.168.1.211:8030/api/test/product_sales/_query_plan failed, status: HTTP/1.1 500 Internal Server Error, response entity: {"exception":"The Sql is invalid","status":500}.
	at com.starrocks.connector.spark.rest.RestService.send(RestService.java:154)
	at com.starrocks.connector.spark.rest.RestService.findPartitions(RestService.java:303)
	at com.starrocks.connector.spark.rdd.AbstractStarrocksRDD.starrocksPartitions$lzycompute(AbstractStarrocksRDD.scala:60)
	at com.starrocks.connector.spark.rdd.AbstractStarrocksRDD.starrocksPartitions(AbstractStarrocksRDD.scala:59)
	at com.starrocks.connector.spark.rdd.AbstractStarrocksRDD.getPartitions(AbstractStarrocksRDD.scala:37)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)

The StarRocks FE log output is as follows:

2025-12-11 17:55:17.848+08:00 INFO (nioEventLoopGroup-4-5|355) [TableQueryPlanAction.executeWithoutPassword():161] receive SQL statement [select `sale_id`,`product_name`,`category`,`price`,`ts` from `test`.`product_sales` where  and (((sale_id = 1) OR (sale_id = 2) OR (sale_id = 5) OR (sale_id = 7) OR (sale_id = 8)))] from external service [ user ['root'@'%']] for database [test] table [product_sales] warehouse [default_warehouse] 
2025-12-11 17:55:17.851+08:00 ERROR (nioEventLoopGroup-4-5|355) [TableQueryPlanAction.handleQuery():239] error occurred when optimizing queryId: 7f961648-d677-11f0-90d9-0242d0efef80
com.starrocks.sql.parser.ParsingException: Getting syntax error at line 1, column 91. Detail message: Unexpected input 'and', the most similar input is {a legal identifier}.

The error can be seen to be caused by a problem with the SQL concatenation.

if (filters != null && filters.length > 0) {
val userFilters = paramWithScan.get(ConfigurationOptions.STARROCKS_FILTER_QUERY)
.filter(filters => filters.nonEmpty)
.map(filters => " and (" + filters + ")")
.getOrElse("")

Concatenation was not handled correctly when the pushback result of DialectUtils.compileFilter returned an empty string.

The following changes should be made:

.map(filters => if (filterWhereClause.nonEmpty) s" and ($filters)" else s"($filters)")

After fixing the issue and re-executing the test cases, the FE logs are as follows:

2025-12-12 09:22:31.870+08:00 INFO (nioEventLoopGroup-4-11|4045) [TableQueryPlanAction.executeWithoutPassword():161] receive SQL statement [select sale_id,product_name,category,price,ts from `test`.`product_sales` where ((`category` LIKE 'phone%' ESCAPE '\') or (`category` LIKE '%laptop%' ESCAPE '\'))] from external service [ user ['root'@'%']] for database [test] table [product_sales] warehouse [default_warehouse] 
2025-12-12 09:22:31.872+08:00 ERROR (nioEventLoopGroup-4-11|4045) [TableQueryPlanAction.handleQuery():239] error occurred when optimizing queryId: 080c25ea-d6f9-11f0-90d9-0242d0efef80
com.starrocks.sql.parser.ParsingException: Getting syntax error at line 1, column 107. Detail message: No viable statement for input '((`category` LIKE 'phone%' ESCAPE'.

This is because StarRocks does not support MySQL's "LIKE ... ESCAPE ..." search method. Furthermore, the conditions passed by the user using starrocks.filter.query were also overridden.

override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val (supported, _) = predicates.partition(dialect.compileExpression(_).isDefined)
val predicateWhereClause = supported
.flatMap(compilePredicate)
.map(p => s"($p)")
.mkString(" and ")
// only for test
predicateWhereClauseForTest = predicateWhereClause
// pass filter column to BE
config.setProperty(STARROCKS_FILTER_QUERY, predicateWhereClause)
supportedPredicates = supported
supported
}

The current method should return an unsupported predicate, modifications are as follows:

  override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
    val (supported, unSupported) = predicates.partition(dialect.compileExpression(_).isDefined)

    val predicateWhereClause = supported
      .flatMap(compilePredicate)
      .map(p => s"($p)")
      .mkString(" and ")
    // only for test
    predicateWhereClauseForTest = predicateWhereClause

    val userFilters = Option(config.getProperty(STARROCKS_FILTER_QUERY))
      .filter(_.nonEmpty)
      .map(f => s"($f)")

    val pushedFilters = userFilters.toSeq ++ Seq(predicateWhereClause).filter(_.nonEmpty)

    if (pushedFilters.nonEmpty) {
      // pass merged filters to BE
      config.setProperty(STARROCKS_FILTER_QUERY, pushedFilters.mkString(" and "))
    }

    supportedPredicates = supported
    unSupported
  }

The generated SQL is now correct:

2025-12-12 13:36:09.174+08:00 INFO (nioEventLoopGroup-4-35|5104) [TableQueryPlanAction.executeWithoutPassword():161] receive SQL statement [select sale_id,product_name,category,price,ts from `test`.`product_sales` where (((sale_id = 1) OR (sale_id = 2) OR (sale_id = 5) OR (sale_id = 7) OR (sale_id = 8))) and ((`category` LIKE 'phone%') or (`category` LIKE '%laptop%'))] from external service [ user ['root'@'%']] for database [test] table [product_sales] warehouse [default_warehouse]

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr will affect users' behaviors
  • This pr needs user documentation (for new or modified features or behaviors)
  • I have added documentation for my new feature or new function

…ters merging

Signed-off-by: monchickey <monchickey@gmail.com>
Signed-off-by: monchickey <monchickey@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant