Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
<version>1.0.0-SNAPSHOT</version>
<modules>
<module>start-db-common</module>
<module>start-db-test</module>
<module>start-db-avatica-core</module>
<module>start-db-calcite</module>
<module>start-db-jdbc-driver</module>
<module>start-db-cmd</module>
<module>start-db-core</module>
<module>start-db-server</module>
<module>start-db-test</module>
</modules>

<properties>
Expand Down Expand Up @@ -54,6 +54,7 @@
<org.jgrapht.version>1.4.0</org.jgrapht.version>
<rtree.version>0.10</rtree.version>
<fastjson.version>2.0.6</fastjson.version>
<kryo.version>4.0.0</kryo.version>
</properties>

<dependencyManagement>
Expand Down
46 changes: 45 additions & 1 deletion start-db-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
</properties>

<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>

<dependency>
<groupId>org.urbcomp</groupId>
<artifactId>start-db-calcite</artifactId>
Expand Down Expand Up @@ -51,7 +62,7 @@

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -101,6 +112,39 @@
<artifactId>geomesa-utils_${scala.binary.version}</artifactId>
<version>${geomesa.version}</version>
</dependency>
<!--/ <dependency>-->
<!-- <groupId>org.locationtech.geomesa</groupId>-->
<!-- <artifactId>geomesa-spark_${scala.binary.version}</artifactId>-->
<!-- <version>${geomesa.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-sql_${scala.binary.version}</artifactId>
<version>${geomesa.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-hbase-spark-runtime-hbase2_${scala.binary.version}</artifactId>
<version>${geomesa.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<!-- Mybatis Core -->
<dependency>
<groupId>org.mybatis</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 ST-Lab
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/

package org.urbcomp.start.db.parser;

/**
* @author stan
* @date 2022/10/5 10:29
*/
public class Constant {

// specify name
public static final String HBASE_CATALOG = "hbase.catalog";
public static final String HBASE_ZK = "hbase.zookeepers";
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO = "spark.kryo.registrator";

// custom name
public static final String CATALOG = "root.default";
public static final String ZK = "localhost:2181";
public static final String SERIALIZER = "org.apache.spark.serializer.KryoSerializer";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 ST-Lab
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/

package org.urbcomp.start.db.parser;

import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.urbcomp.start.db.parser.parser.StartDBSqlLexer;
import org.urbcomp.start.db.parser.parser.StartDBSqlParser;

import java.util.List;

public class SqlTableDriver {

public List<String> apply(String sql) {
CharStream input = CharStreams.fromString(sql);
StartDBSqlLexer lexer = new StartDBSqlLexer(input);
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
StartDBSqlParser parser = new StartDBSqlParser(tokenStream);
SqlVisitor<String> visitor = new SqlVisitor<String>();
visitor.visit(parser.program());

return visitor.getTableList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022 ST-Lab
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/

package org.urbcomp.start.db.parser;

import org.urbcomp.start.db.parser.parser.StartDBSqlBaseVisitor;
import org.urbcomp.start.db.parser.parser.StartDBSqlParser;

import java.util.ArrayList;
import java.util.List;

/**
* @author stan
* @date 2022/10/5 10:15
*/
public class SqlVisitor<Object> extends StartDBSqlBaseVisitor<Object> {

List<String> tableList = new ArrayList<>();

@Override
public Object visitFromTableClause(StartDBSqlParser.FromTableClauseContext ctx) {
tableList.add(ctx.getText());
return null;
}

public List<String> getTableList() {
return tableList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022 ST-Lab
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/

package org.urbcomp.start.db.executor

import org.apache.spark.sql.SparkSession

import org.urbcomp.start.db.metadata.MetadataAccessUtil
import org.urbcomp.start.db.metadata.entity.Table
import org.urbcomp.start.db.parser.{Constant, SqlTableDriver}
import org.urbcomp.start.db.util.{MetadataUtil, SqlParam}

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* @author stan
* @date 2022/10/5 10:27
*/
class SparkExecutor {

def execute(sql: String): String = {
val param = SqlParam.CACHE.get()
val dbName = param.getDbName
val userName = param.getUserName
val tableMap = new mutable.HashMap[String, String]
// 用户的上层表名
val driver = new SqlTableDriver
val tableList = driver.apply(sql)

tableList.asScala.foreach { i =>
val table: Table = MetadataAccessUtil.getTable(userName, dbName, i)
val mapTableName: String = MetadataUtil.makeSchemaName(table.getId)
tableMap.put(i, mapTableName)
}

val sparkSession: SparkSession = SparkSession
.builder()
.appName("start-db sql app")
.master("local[*]")
.getOrCreate()

val params = Map(Constant.HBASE_CATALOG -> Constant.CATALOG, Constant.HBASE_ZK -> Constant.ZK)

/**
* _1: 上层表名
* _2: 下层表名
*/
tableMap.foreach { i =>
val userTableName = i._1
val geomesaSftName = i._2
sparkSession.read
.format("geomesa")
.options(params)
.option("geomesa.feature", geomesaSftName)
.load()
.createTempView(userTableName)
}

val dataFrame = sparkSession.sql(sql)
// Tmp
dataFrame.show()

// ToDO 写入路径的约定
val path = "hdfs://start-db:balabal"
dataFrame.write.option("delimiter", "||").csv(path)
path
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 ST-Lab
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*/

package org.urbcomp.start.db.executor

import org.urbcomp.start.db.AbstractCalciteFunctionTest

/**
* @author stan
* @date 2022/10/5 10:54
*/
class SparkExecutorTest extends AbstractCalciteFunctionTest {

ignore("read geomesa-hbase data") {
val executor = new SparkExecutor
executor.execute("select * from t_test")
}

ignore("read geomesa-hbase data2") {
val executor = new SparkExecutor
val statement = connect.createStatement()
statement.execute("create table if not exists t_test_2 (a Integer, b String);")
statement.execute("insert into t_test_2 (a, b) values (1, 'unknown');")
executor.execute(
"select idx, ride_id from t_test left join t_test_2 on t_test.idx = t_test_2.a"
);
}
}