diff --git a/pom.xml b/pom.xml index 75cf72ff..0e0195f5 100644 --- a/pom.xml +++ b/pom.xml @@ -10,13 +10,13 @@ 1.0.0-SNAPSHOT start-db-common - start-db-test start-db-avatica-core start-db-calcite start-db-jdbc-driver start-db-cmd start-db-core start-db-server + start-db-test @@ -54,6 +54,7 @@ 1.4.0 0.10 2.0.6 + 4.0.0 diff --git a/start-db-core/pom.xml b/start-db-core/pom.xml index 19392023..8f65540f 100644 --- a/start-db-core/pom.xml +++ b/start-db-core/pom.xml @@ -17,6 +17,17 @@ + + com.google.protobuf + protobuf-java + 3.1.0 + + + com.google.guava + guava + 19.0 + + org.urbcomp start-db-calcite @@ -51,7 +62,7 @@ org.apache.hbase - hbase-client + hbase-shaded-client ${hbase.version} @@ -101,6 +112,39 @@ geomesa-utils_${scala.binary.version} ${geomesa.version} + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + 2.4.7 + + + org.locationtech.geomesa + geomesa-spark-sql_${scala.binary.version} + ${geomesa.version} + + + org.locationtech.geomesa + geomesa-hbase-spark-runtime-hbase2_${scala.binary.version} + ${geomesa.version} + + + org.apache.hadoop + hadoop-hdfs + + + + + + + com.esotericsoftware + kryo + ${kryo.version} + org.mybatis diff --git a/start-db-core/src/main/java/org/urbcomp/start/db/parser/Constant.java b/start-db-core/src/main/java/org/urbcomp/start/db/parser/Constant.java new file mode 100644 index 00000000..1f8bf4ad --- /dev/null +++ b/start-db-core/src/main/java/org/urbcomp/start/db/parser/Constant.java @@ -0,0 +1,31 @@ +/* + * Copyright 2022 ST-Lab + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +package org.urbcomp.start.db.parser; + +/** + * @author stan + * @date 2022/10/5 10:29 + */ +public class Constant { + + // specify name + public static final String HBASE_CATALOG = "hbase.catalog"; + public static final String HBASE_ZK = "hbase.zookeepers"; + public static final String SPARK_SERIALIZER = "spark.serializer"; + public static final String SPARK_KRYO = "spark.kryo.registrator"; + + // custom name + public static final String CATALOG = "root.default"; + public static final String ZK = "localhost:2181"; + public static final String SERIALIZER = "org.apache.spark.serializer.KryoSerializer"; + +} diff --git a/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlTableDriver.java b/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlTableDriver.java new file mode 100644 index 00000000..a40d19be --- /dev/null +++ b/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlTableDriver.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 ST-Lab + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +package org.urbcomp.start.db.parser; + +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.urbcomp.start.db.parser.parser.StartDBSqlLexer; +import org.urbcomp.start.db.parser.parser.StartDBSqlParser; + +import java.util.List; + +public class SqlTableDriver { + + public List apply(String sql) { + CharStream input = CharStreams.fromString(sql); + StartDBSqlLexer lexer = new StartDBSqlLexer(input); + CommonTokenStream tokenStream = new CommonTokenStream(lexer); + StartDBSqlParser parser = new StartDBSqlParser(tokenStream); + SqlVisitor visitor = new SqlVisitor(); + visitor.visit(parser.program()); + + return visitor.getTableList(); + } +} diff --git a/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlVisitor.java b/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlVisitor.java new file mode 100644 index 00000000..ac16aa0d --- /dev/null +++ b/start-db-core/src/main/java/org/urbcomp/start/db/parser/SqlVisitor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2022 ST-Lab + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +package org.urbcomp.start.db.parser; + +import org.urbcomp.start.db.parser.parser.StartDBSqlBaseVisitor; +import org.urbcomp.start.db.parser.parser.StartDBSqlParser; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author stan + * @date 2022/10/5 10:15 + */ +public class SqlVisitor extends StartDBSqlBaseVisitor { + + List tableList = new ArrayList<>(); + + @Override + public Object visitFromTableClause(StartDBSqlParser.FromTableClauseContext ctx) { + tableList.add(ctx.getText()); + return null; + } + + public List getTableList() { + return tableList; + } +} diff --git a/start-db-core/src/main/scala/org/urbcomp/start/db/executor/SparkExecutor.scala b/start-db-core/src/main/scala/org/urbcomp/start/db/executor/SparkExecutor.scala new file mode 100644 index 00000000..f4748f14 --- /dev/null +++ b/start-db-core/src/main/scala/org/urbcomp/start/db/executor/SparkExecutor.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2022 ST-Lab + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +package org.urbcomp.start.db.executor + +import org.apache.spark.sql.SparkSession + +import org.urbcomp.start.db.metadata.MetadataAccessUtil +import org.urbcomp.start.db.metadata.entity.Table +import org.urbcomp.start.db.parser.{Constant, SqlTableDriver} +import org.urbcomp.start.db.util.{MetadataUtil, SqlParam} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * @author stan + * @date 2022/10/5 10:27 + */ +class SparkExecutor { + + def execute(sql: String): String = { + val param = SqlParam.CACHE.get() + val dbName = param.getDbName + val userName = param.getUserName + val tableMap = new mutable.HashMap[String, String] + // 用户的上层表名 + val driver = new SqlTableDriver + val tableList = driver.apply(sql) + + tableList.asScala.foreach { i => + val table: Table = MetadataAccessUtil.getTable(userName, dbName, i) + val mapTableName: String = MetadataUtil.makeSchemaName(table.getId) + tableMap.put(i, mapTableName) + } + + val sparkSession: SparkSession = SparkSession + .builder() + .appName("start-db sql app") + .master("local[*]") + .getOrCreate() + + val params = Map(Constant.HBASE_CATALOG -> Constant.CATALOG, Constant.HBASE_ZK -> Constant.ZK) + + /** + * _1: 上层表名 + * _2: 下层表名 + */ + tableMap.foreach { i => + val userTableName = i._1 + val geomesaSftName = i._2 + sparkSession.read + .format("geomesa") + .options(params) + .option("geomesa.feature", geomesaSftName) + .load() + .createTempView(userTableName) + } + + val dataFrame = sparkSession.sql(sql) + // Tmp + dataFrame.show() + + // ToDO 写入路径的约定 + val path = "hdfs://start-db:balabal" + dataFrame.write.option("delimiter", "||").csv(path) + path + } +} diff --git a/start-db-core/src/test/scala/org/urbcomp/start/db/executor/SparkExecutorTest.scala b/start-db-core/src/test/scala/org/urbcomp/start/db/executor/SparkExecutorTest.scala new file mode 100644 index 00000000..61f19df7 --- /dev/null +++ b/start-db-core/src/test/scala/org/urbcomp/start/db/executor/SparkExecutorTest.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2022 ST-Lab + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without + * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +package org.urbcomp.start.db.executor + +import org.urbcomp.start.db.AbstractCalciteFunctionTest + +/** + * @author stan + * @date 2022/10/5 10:54 + */ +class SparkExecutorTest extends AbstractCalciteFunctionTest { + + ignore("read geomesa-hbase data") { + val executor = new SparkExecutor + executor.execute("select * from t_test") + } + + ignore("read geomesa-hbase data2") { + val executor = new SparkExecutor + val statement = connect.createStatement() + statement.execute("create table if not exists t_test_2 (a Integer, b String);") + statement.execute("insert into t_test_2 (a, b) values (1, 'unknown');") + executor.execute( + "select idx, ride_id from t_test left join t_test_2 on t_test.idx = t_test_2.a" + ); + } +}