Skip to content

Commit a1e244c

Browse files
committed
Merge branch '1.8.0_dev_optimizeFormatWithKafka' into 'v1.8.0_dev'
1.8.0 dev optimize format with kafka 1. 对所有kafka插件的序列化和反序列化做了统筹管理和优化 2. 对kafka的重复代码进行重构 3. 抽象kafka-base作为所有kafka插件的基础 代码的优化改动较大 See merge request !180
2 parents 51f9d55 + d91ac50 commit a1e244c

File tree

108 files changed

+1590
-5168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+1590
-5168
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
51+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5252
import org.apache.flink.api.common.typeinfo.TypeInformation;
5353
import org.apache.flink.api.java.tuple.Tuple;
5454
import org.apache.flink.api.java.tuple.Tuple2;
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends MetricOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.console;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import com.dtstack.flink.sql.sink.console.table.TablePrintUtil;
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends MetricOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

core/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,24 @@
116116
<version>${flink.version}</version>
117117
</dependency>
118118

119+
<dependency>
120+
<groupId>org.apache.flink</groupId>
121+
<artifactId>flink-avro</artifactId>
122+
<version>${flink.version}</version>
123+
</dependency>
124+
125+
<dependency>
126+
<groupId>org.apache.flink</groupId>
127+
<artifactId>flink-csv</artifactId>
128+
<version>${flink.version}</version>
129+
</dependency>
130+
131+
<dependency>
132+
<groupId>org.apache.flink</groupId>
133+
<artifactId>flink-json</artifactId>
134+
<version>${flink.version}</version>
135+
</dependency>
136+
119137

120138
</dependencies>
121139

core/src/main/java/com/dtstack/flink/sql/source/AbsDeserialization.java renamed to core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,51 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source;
19+
package com.dtstack.flink.sql.format;
2020

2121
import com.dtstack.flink.sql.metric.MetricConstant;
2222
import org.apache.flink.api.common.functions.RuntimeContext;
2323
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
24+
import org.apache.flink.api.common.serialization.DeserializationSchema;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2426
import org.apache.flink.metrics.Counter;
2527
import org.apache.flink.metrics.Meter;
2628
import org.apache.flink.metrics.MeterView;
2729
import org.apache.flink.types.Row;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

33+
import java.io.IOException;
34+
3135
/**
32-
* add metric for source, customer Deserialization which want add metric need to extends this abs class
33-
* Date: 2018/10/19
34-
* Company: www.dtstack.com
35-
*
36-
* @author xuchao
36+
* add metric for source
37+
* <p>
38+
* company: www.dtstack.com
39+
* author: toutian
40+
* create: 2019/12/24
3741
*/
42+
public class DeserializationMetricWrapper extends AbstractDeserializationSchema<Row> {
3843

39-
public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {
40-
private static final Logger LOG = LoggerFactory.getLogger(AbsDeserialization.class);
41-
42-
private static final long serialVersionUID = 2176278128811784415L;
44+
private static final Logger LOG = LoggerFactory.getLogger(DeserializationMetricWrapper.class);
4345

4446
private static int dataPrintFrequency = 1000;
4547

46-
protected JsonDataParser jsonDataParser;
48+
private DeserializationSchema<Row> deserializationSchema;
4749

4850
private transient RuntimeContext runtimeContext;
4951

5052
protected transient Counter dirtyDataCounter;
5153

52-
//tps ransactions Per Second
54+
/**
55+
* tps ransactions Per Second
56+
*/
5357
protected transient Counter numInRecord;
5458

5559
protected transient Meter numInRate;
5660

57-
//rps Record Per Second: deserialize data and out record num
61+
/**
62+
* rps Record Per Second: deserialize data and out record num
63+
*/
5864
protected transient Counter numInResolveRecord;
5965

6066
protected transient Meter numInResolveRate;
@@ -63,37 +69,37 @@ public abstract class AbsDeserialization<T> extends AbstractDeserializationSchem
6369

6470
protected transient Meter numInBytesRate;
6571

66-
public RuntimeContext getRuntimeContext() {
67-
return runtimeContext;
68-
}
69-
70-
public void setRuntimeContext(RuntimeContext runtimeContext) {
71-
this.runtimeContext = runtimeContext;
72+
public DeserializationMetricWrapper(TypeInformation<Row> typeInfo, DeserializationSchema<Row> deserializationSchema) {
73+
super(typeInfo);
74+
this.deserializationSchema = deserializationSchema;
7275
}
7376

74-
public void initMetric(){
77+
public void initMetric() {
7578
dirtyDataCounter = runtimeContext.getMetricGroup().counter(MetricConstant.DT_DIRTY_DATA_COUNTER);
7679

7780
numInRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_IN_COUNTER);
78-
numInRate = runtimeContext.getMetricGroup().meter( MetricConstant.DT_NUM_RECORDS_IN_RATE, new MeterView(numInRecord, 20));
81+
numInRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_IN_RATE, new MeterView(numInRecord, 20));
7982

8083
numInBytes = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_BYTES_IN_COUNTER);
81-
numInBytesRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_BYTES_IN_RATE , new MeterView(numInBytes, 20));
84+
numInBytesRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_BYTES_IN_RATE, new MeterView(numInBytes, 20));
8285

8386
numInResolveRecord = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_COUNTER);
8487
numInResolveRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_RESOVED_IN_RATE, new MeterView(numInResolveRecord, 20));
8588
}
8689

87-
protected T parseSourceData (byte[] message) {
90+
@Override
91+
public Row deserialize(byte[] message) throws IOException {
8892
try {
8993
if (numInRecord.getCount() % dataPrintFrequency == 0) {
9094
LOG.info("receive source data:" + new String(message, "UTF-8"));
9195
}
9296
numInRecord.inc();
9397
numInBytes.inc(message.length);
94-
Row row = jsonDataParser.parseData(message);
98+
beforeDeserialize();
99+
Row row = deserializationSchema.deserialize(message);
100+
afterDeserialize();
95101
numInResolveRecord.inc();
96-
return (T) row;
102+
return row;
97103
} catch (Exception e) {
98104
//add metric of dirty data
99105
if (dirtyDataCounter.getCount() % dataPrintFrequency == 0) {
@@ -105,5 +111,17 @@ protected T parseSourceData (byte[] message) {
105111
}
106112
}
107113

114+
protected void beforeDeserialize() throws IOException {
115+
}
108116

117+
protected void afterDeserialize() throws IOException {
118+
}
119+
120+
public RuntimeContext getRuntimeContext() {
121+
return runtimeContext;
122+
}
123+
124+
public void setRuntimeContext(RuntimeContext runtimeContext) {
125+
this.runtimeContext = runtimeContext;
126+
}
109127
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.format;
20+
21+
/**
22+
* indicate source table input data format type
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/12/24
26+
*/
27+
public enum FormatType {
28+
//Indicates that the data is in nest json format(default)
29+
DT_NEST,
30+
//Indicates that the data is in json format
31+
JSON,
32+
//Indicates that the data is in avro format
33+
AVRO,
34+
//Indicates that the data is in csv format
35+
CSV
36+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.format;
20+
21+
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import org.apache.flink.api.common.functions.RuntimeContext;
23+
import org.apache.flink.api.common.serialization.SerializationSchema;
24+
import org.apache.flink.metrics.Counter;
25+
import org.apache.flink.metrics.Meter;
26+
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.types.Row;
28+
29+
30+
/**
31+
* add metric for source
32+
* <p>
33+
* company: www.dtstack.com
34+
* author: toutian
35+
* create: 2019/12/24
36+
*/
37+
public class SerializationMetricWrapper implements SerializationSchema<Row> {
38+
39+
private SerializationSchema<Row> serializationSchema;
40+
41+
private transient RuntimeContext runtimeContext;
42+
43+
protected transient Counter dtNumRecordsOut;
44+
45+
protected transient Meter dtNumRecordsOutRate;
46+
47+
48+
public SerializationMetricWrapper(SerializationSchema<Row> serializationSchema) {
49+
this.serializationSchema = serializationSchema;
50+
}
51+
52+
public void initMetric() {
53+
dtNumRecordsOut = runtimeContext.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
54+
dtNumRecordsOutRate = runtimeContext.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(dtNumRecordsOut, 20));
55+
}
56+
57+
@Override
58+
public byte[] serialize(Row element) {
59+
beforeSerialize();
60+
byte[] row = serializationSchema.serialize(element);
61+
afterSerialize();
62+
return row;
63+
}
64+
65+
protected void beforeSerialize() {
66+
}
67+
68+
protected void afterSerialize() {
69+
dtNumRecordsOut.inc();
70+
}
71+
72+
public RuntimeContext getRuntimeContext() {
73+
return runtimeContext;
74+
}
75+
76+
public void setRuntimeContext(RuntimeContext runtimeContext) {
77+
this.runtimeContext = runtimeContext;
78+
}
79+
80+
}

core/src/main/java/com/dtstack/flink/sql/source/JsonDataParser.java renamed to core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source;
19+
package com.dtstack.flink.sql.format.dtnest;
2020

2121
import com.dtstack.flink.sql.table.TableInfo;
2222
import com.google.common.base.Strings;
2323
import com.google.common.collect.Maps;
24+
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
2526
import org.apache.flink.api.common.typeinfo.Types;
2627
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -32,7 +33,6 @@
3233
import org.apache.flink.types.Row;
3334

3435
import java.io.IOException;
35-
import java.io.Serializable;
3636
import java.sql.Date;
3737
import java.sql.Time;
3838
import java.sql.Timestamp;
@@ -41,13 +41,14 @@
4141
import java.util.Map;
4242

4343
/**
44-
* source data parse to json format
44+
* source data parse to json format
4545
*
4646
* Date: 2019/12/12
4747
* Company: www.dtstack.com
48+
*
4849
* @author maqi
4950
*/
50-
public class JsonDataParser implements Serializable {
51+
public class DtNestRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
5152

5253
private final ObjectMapper objectMapper = new ObjectMapper();
5354

@@ -58,17 +59,17 @@ public class JsonDataParser implements Serializable {
5859
private final TypeInformation<?>[] fieldTypes;
5960
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
6061

61-
public JsonDataParser(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
62+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
6263
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6364
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6465
this.rowAndFieldMapping = rowAndFieldMapping;
6566
this.fieldExtraInfos = fieldExtraInfos;
6667
}
6768

68-
69-
public Row parseData(byte[] data) throws IOException {
70-
JsonNode root = objectMapper.readTree(data);
71-
parseTree(root, null);
69+
@Override
70+
public Row deserialize(byte[] message) throws IOException {
71+
JsonNode root = objectMapper.readTree(message);
72+
this.parseTree(root, null);
7273
Row row = new Row(fieldNames.length);
7374

7475
try {
@@ -176,4 +177,6 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
176177
}
177178
}
178179
}
180+
181+
179182
}

0 commit comments

Comments
 (0)