File tree Expand file tree Collapse file tree 1 file changed +52
-0
lines changed Expand file tree Collapse file tree 1 file changed +52
-0
lines changed Original file line number Diff line number Diff line change
1
+ CREATE TABLE MyTable(
2
+ channel STRING,
3
+ pv INT,
4
+ xctime bigint,
5
+ CHARACTER_LENGTH(channel) as timeLeng,
6
+ WATERMARK FOR xctime AS withOffset(xctime,1000)
7
+ )WITH(
8
+ type='kafka09',
9
+ bootstrapServers='172.16.8.198:9092',
10
+ offsetReset='latest',
11
+ topic='nbTest1'
12
+ );
13
+ CREATE TABLE MyResult(
14
+ channel STRING,
15
+ pv INT
16
+ )WITH(
17
+ type='mysql',
18
+ url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
19
+ userName='dtstack',
20
+ password='abc123',
21
+ tableName='pv'
22
+ );
23
+
24
+ create table sideTable(
25
+ channel String,
26
+ xccount int,
27
+ PRIMARY KEY(channel),
28
+ PERIOD FOR SYSTEM_TIME
29
+ )WITH(
30
+ type='mysql',
31
+ url='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
32
+ userName='dtstack',
33
+ password='abc123',
34
+ tableName='sidetest',
35
+ cache = 'LRU',
36
+ cacheTTLMs='10000'
37
+ );
38
+
39
+ insert
40
+ into
41
+ MyResult
42
+ select
43
+ a.channel,
44
+ b.xccount
45
+ from
46
+ MyTable a
47
+ join
48
+ sideTable b
49
+ on a.channel=b.channel
50
+ where
51
+ b.channel = 'xc'
52
+ and a.pv=10;
You can’t perform that action at this time.
0 commit comments