Skip to content

Commit 6e5aebc

Browse files
committed
Merge branch 'feat_sink_dirty_log' into 'v1.8.0_dev'
优化 output format 的第一条数据输入和第一条脏数据输出 优化 output format 的第一条数据输入和第一条脏数据输出 See merge request !195
2 parents a1e244c + 83daea8 commit 6e5aebc

File tree

7 files changed

+65
-41
lines changed

7 files changed

+65
-41
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
193193
try {
194194
if (retract) {
195195
insertWrite(row);
196-
outRecords.inc();
197196
} else {
198197
//do nothing
199198
}
@@ -204,14 +203,24 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
204203

205204
private void insertWrite(Row row) {
206205
try {
206+
207+
if(outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
208+
LOG.info("Receive data : {}", row);
209+
}
210+
207211
String cql = buildSql(row);
208212
if (cql != null) {
209213
ResultSet resultSet = session.execute(cql);
210214
resultSet.wasApplied();
215+
outRecords.inc();
211216
}
212217
} catch (Exception e) {
218+
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
219+
LOG.error("record insert failed ..", row.toString().substring(0, 100));
220+
LOG.error("", e);
221+
}
222+
213223
outDirtyRecords.inc();
214-
LOG.error("[upsert] is error:" + e.getMessage());
215224
}
216225
}
217226

core/src/main/java/com/dtstack/flink/sql/outputformat/DtRichOutputFormat.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,22 @@
2929
* extend RichOutputFormat with metric 'dtNumRecordsOut', 'dtNumDirtyRecordsOut', 'dtNumRecordsOutRate'
3030
* Created by sishu.yss on 2018/11/28.
3131
*/
32-
public abstract class DtRichOutputFormat extends RichOutputFormat<Tuple2>{
32+
public abstract class DtRichOutputFormat extends RichOutputFormat<Tuple2>{
3333

34-
protected transient Counter outRecords;
34+
protected transient Counter outRecords;
3535

36-
protected transient Counter outDirtyRecords;
36+
protected transient Counter outDirtyRecords;
3737

38-
protected transient Meter outRecordsRate;
38+
protected transient Meter outRecordsRate;
3939

40-
public void initMetric() {
40+
protected static int ROW_PRINT_FREQUENCY = 1000;
41+
42+
protected static int DIRTY_PRINT_FREQUENCY = 1000;
43+
44+
public void initMetric() {
4145
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
4246
outDirtyRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_DIRTY_RECORDS_OUT);
4347
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
44-
}
48+
}
4549

4650
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ public class HbaseOutputFormat extends DtRichOutputFormat {
6666

6767
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
6868

69-
private static int rowLenth = 1000;
70-
private static int dirtyDataPrintFrequency = 1000;
71-
72-
7369
@Override
7470
public void configure(Configuration parameters) {
7571
LOG.warn("---configure---");
@@ -126,14 +122,16 @@ public void writeRecord(Tuple2 tuple2) {
126122
try {
127123
table.put(put);
128124
} catch (IOException e) {
129-
outDirtyRecords.inc();
130-
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0 || LOG.isDebugEnabled()) {
131-
LOG.error("record insert failed ..", record.toString());
125+
126+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
127+
LOG.error("record insert failed,dirty record num:{}, current row:{}", outDirtyRecords.getCount(), record.toString());
132128
LOG.error("", e);
133129
}
130+
131+
outDirtyRecords.inc();
134132
}
135133

136-
if (outRecords.getCount() % rowLenth == 0) {
134+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
137135
LOG.info(record.toString());
138136
}
139137
outRecords.inc();

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduOutputFormat.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,24 +103,40 @@ public void writeRecord(Tuple2 record) throws IOException {
103103
if (!retract) {
104104
return;
105105
}
106+
106107
Row row = tupleTrans.getField(1);
108+
107109
if (row.getArity() != fieldNames.length) {
110+
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0) {
111+
LOG.error("record insert failed ..", row.toString());
112+
LOG.error("cause by row.getArity() != fieldNames.length");
113+
}
114+
115+
outDirtyRecords.inc();
108116
return;
109117
}
110118

111119
Operation operation = toOperation(writeMode, row);
112120
AsyncKuduSession session = client.newSession();
113121

114122
try {
123+
124+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
125+
LOG.info("Receive data : {}", row);
126+
}
127+
115128
session.apply(operation);
116129
session.close();
117130
outRecords.inc();
118131
} catch (KuduException e) {
132+
133+
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
134+
LOG.error("record insert failed ..", row.toString().substring(0, 100));
135+
LOG.error("", e);
136+
}
137+
119138
outDirtyRecords.inc();
120-
LOG.error("record insert failed ..", row.toString().substring(0, 100));
121-
LOG.error("", e);
122139
}
123-
124140
}
125141

126142
@Override

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoOutputFormat.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ public class MongoOutputFormat extends DtRichOutputFormat {
6363

6464
private static String PK = "_ID";
6565

66-
private static int rowLenth = 1000;
67-
68-
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
69-
7066
@Override
7167
public void configure(Configuration parameters) {
7268
}
@@ -97,6 +93,7 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
9793
for (int i = 0; i < fieldNames.length; i++) {
9894
doc.append(fieldNames[i], record.getField(i));
9995
}
96+
10097
if (doc.containsKey(PK)) {
10198
Document updateValue = new Document();
10299
Document filter = new Document(PK.toLowerCase(), new ObjectId(doc.getString(PK)));
@@ -110,7 +107,7 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
110107
dbCollection.insertOne(doc);
111108
}
112109

113-
if (outRecords.getCount()%rowLenth == 0){
110+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
114111
LOG.info(record.toString());
115112
}
116113
outRecords.inc();

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ public class RetractJDBCOutputFormat extends DtRichOutputFormat {
5252

5353
private static final Logger LOG = LoggerFactory.getLogger(RetractJDBCOutputFormat.class);
5454

55-
private static int dirtyDataPrintFrequency = 1000;
56-
57-
private static int receiveDataPrintFrequency = 1000;
58-
5955
private String username;
6056
private String password;
6157
private String drivername;
@@ -171,10 +167,12 @@ public void writeRecord(Tuple2 tuple2) {
171167
}
172168

173169
if (retract) {
174-
outRecords.inc();
175-
if (outRecords.getCount() % receiveDataPrintFrequency == 0) {
170+
171+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
176172
LOG.info("Receive data : {}", row);
177173
}
174+
175+
outRecords.inc();
178176
insertWrite(row);
179177
} else {
180178
//do nothing
@@ -207,11 +205,13 @@ private void writeSingleRecord(Row row) {
207205
upload.executeUpdate();
208206
dbConn.commit();
209207
} catch (SQLException e) {
210-
outDirtyRecords.inc();
211-
if (outDirtyRecords.getCount() % dirtyDataPrintFrequency == 0 || LOG.isDebugEnabled()) {
212-
LOG.error("record insert failed ..{}", row.toString());
208+
209+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
210+
LOG.error("record insert failed,dirty record num:{}, current row:{}", outDirtyRecords.getCount(), row.toString());
213211
LOG.error("", e);
214212
}
213+
214+
outDirtyRecords.inc();
215215
}
216216
}
217217

redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ public class RedisOutputFormat extends DtRichOutputFormat {
6868

6969
private GenericObjectPoolConfig poolConfig;
7070

71-
private static int rowLenth = 1000;
72-
7371
private RedisOutputFormat(){
7472
}
7573
@Override
@@ -113,8 +111,8 @@ private void establishConnection() {
113111
if (timeout == 0){
114112
timeout = 10000;
115113
}
116-
if (database == null)
117-
{
114+
115+
if (database == null) {
118116
database = "0";
119117
}
120118

@@ -142,13 +140,13 @@ public void writeRecord(Tuple2 record) throws IOException {
142140
if (!retract) {
143141
return;
144142
}
143+
145144
Row row = tupleTrans.getField(1);
146145
if (row.getArity() != fieldNames.length) {
147146
return;
148147
}
149148

150149
HashMap<String, Integer> map = new HashMap<>();
151-
152150
for (String primaryKey : primaryKeys){
153151
for (int i=0; i<fieldNames.length; i++){
154152
if (fieldNames[i].equals(primaryKey)){
@@ -166,8 +164,6 @@ public void writeRecord(Tuple2 record) throws IOException {
166164
}
167165

168166
String perKey = String.join(":", kvList);
169-
170-
171167
for (int i = 0; i < fieldNames.length; i++) {
172168
StringBuilder key = new StringBuilder();
173169
key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]);
@@ -177,12 +173,14 @@ public void writeRecord(Tuple2 record) throws IOException {
177173
if (field != null) {
178174
value = field.toString();
179175
}
176+
180177
jedis.set(key.toString(), value);
181178
}
182179

183-
if (outRecords.getCount()%rowLenth == 0){
180+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
184181
LOG.info(record.toString());
185182
}
183+
186184
outRecords.inc();
187185
}
188186

@@ -191,9 +189,11 @@ public void close() throws IOException {
191189
if (jedisSentinelPool != null) {
192190
jedisSentinelPool.close();
193191
}
192+
194193
if (pool != null) {
195194
pool.close();
196195
}
196+
197197
if (jedis != null){
198198
if (jedis instanceof Closeable){
199199
((Closeable) jedis).close();

0 commit comments

Comments
 (0)