Skip to content

Commit 63d6b21

Browse files
Merge branch 'v1.4.1' of https://github.com/DTStack/flinkStreamSQL into v1.4.1
ysq
2 parents 27603ea + 06bc381 commit 63d6b21

File tree

6 files changed

+78
-9
lines changed

6 files changed

+78
-9
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
8787
* **confProp**
8888
* 描述:一些参数设置
8989
* 格式: json
90-
* 必选:
90+
* 必选:是 (如无参数填写空json即可)
9191
* 默认值:无
9292
* 可选参数:
9393
* sql.env.parallelism: 默认并行度设置

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.enums.ECacheType;
2425
import com.dtstack.flink.sql.parser.CreateFuncParser;
2526
import com.dtstack.flink.sql.parser.InsertSqlParser;
2627
import com.dtstack.flink.sql.side.SideSqlExec;
@@ -270,8 +271,9 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
270271
classPathSet.add( PluginUtil.getRemoteJarFilePath(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
271272
} else if(tableInfo instanceof SideTableInfo){
272273

274+
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
273275
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
274-
classPathSet.add(PluginUtil.getRemoteJarFilePath(tableInfo.getType(), SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
276+
classPathSet.add(PluginUtil.getRemoteSideJarFilePath(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, remoteSqlPluginPath));
275277
}else {
276278
throw new RuntimeException("not support table type:" + tableInfo.getType());
277279
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.enums.ECacheType;
2425
import com.dtstack.flink.sql.table.AbsSideTableParser;
2526
import com.dtstack.flink.sql.table.AbsTableParser;
2627
import com.dtstack.flink.sql.util.PluginUtil;
@@ -36,14 +37,11 @@ public class StreamSideFactory {
3637

3738
private static final String CURR_TYPE = "side";
3839

39-
private static final String SIDE_DIR_TMPL = "%s%sside";
40-
4140
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4241

43-
cacheType = cacheType == null ? "async" : cacheType;
44-
String sideDir = String.format(SIDE_DIR_TMPL, pluginType, cacheType);
42+
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
4543
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
46-
String pluginJarPath = PluginUtil.getJarFileDirPath(sideDir, sqlRootDir);
44+
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
4745

4846
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
4947
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ public static String getJarFileDirPath(String type, String sqlRootDir){
6363
return jarPath;
6464
}
6565

66+
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir) throws MalformedURLException {
67+
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
68+
File jarFile = new File(dirName);
69+
70+
if(!jarFile.exists()){
71+
throw new RuntimeException(String.format("path %s not exists!!!", dirName));
72+
}
73+
74+
return dirName;
75+
}
76+
6677
public static String getGenerClassName(String pluginTypeName, String type) throws IOException {
6778
String pluginClassName = upperCaseFirstChar(pluginTypeName) + upperCaseFirstChar(type);
6879
return CLASS_PRE_STR + "." + type.toLowerCase() + "." + pluginTypeName + "." + pluginClassName;
@@ -94,12 +105,18 @@ public static Properties stringToProperties(String str) throws IOException{
94105
return properties;
95106
}
96107

97-
public static URL getRemoteJarFilePath(String pluginType, String tableType,String remoteSqlRootDir) throws MalformedURLException {
108+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws MalformedURLException {
98109
String dirName = pluginType + tableType.toLowerCase();
99110
String jarName = String.format("%s-%s.jar", pluginType, tableType.toLowerCase());
100111
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
101112
}
102113

114+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws MalformedURLException {
115+
String dirName = pluginType + sideOperator + tableType.toLowerCase();
116+
String jarName = String.format("%s-%s-%s.jar", pluginType, sideOperator, tableType.toLowerCase());
117+
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
118+
}
119+
103120
public static String upperCaseFirstChar(String str){
104121
return str.substring(0, 1).toUpperCase() + str.substring(1);
105122
}

hbase/hbase-side/hbase-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
<configuration>
4444
<artifactSet>
4545
<excludes>
46-
46+
<exclude>org.slf4j:slf4j-log4j12</exclude>
4747
</excludes>
4848
</artifactSet>
4949
<filters>

launcher/job/mysqlsideSql.txt

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
CREATE TABLE MyTable(
2+
channel STRING,
3+
pv INT,
4+
xctime bigint,
5+
CHARACTER_LENGTH(channel) as timeLeng,
6+
WATERMARK FOR xctime AS withOffset(xctime,1000)
7+
)WITH(
8+
type='kafka09',
9+
bootstrapServers='172.16.8.198:9092',
10+
offsetReset='latest',
11+
topic='nbTest1'
12+
);
13+
CREATE TABLE MyResult(
14+
channel STRING,
15+
pv INT
16+
)WITH(
17+
type='mysql',
18+
url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
19+
userName='dtstack',
20+
password='abc123',
21+
tableName='pv'
22+
);
23+
24+
create table sideTable(
25+
channel String,
26+
xccount int,
27+
PRIMARY KEY(channel),
28+
PERIOD FOR SYSTEM_TIME
29+
)WITH(
30+
type='mysql',
31+
url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
32+
userName='dtstack',
33+
password='abc123',
34+
tableName='sidetest',
35+
cache = 'LRU',
36+
cacheTTLMs='10000'
37+
);
38+
39+
insert
40+
into
41+
MyResult
42+
select
43+
a.channel,
44+
b.xccount
45+
from
46+
MyTable a
47+
join
48+
sideTable b
49+
on a.channel=b.channel
50+
where
51+
b.channel = 'xc'
52+
and a.pv=10;

0 commit comments

Comments
 (0)