File tree Expand file tree Collapse file tree 1 file changed +7
-1
lines changed
database-commons/src/main/java/io/cdap/plugin/db/batch/source Expand file tree Collapse file tree 1 file changed +7
-1
lines changed Original file line number Diff line number Diff line change 63
63
import java .util .List ;
64
64
import java .util .Properties ;
65
65
import java .util .regex .Pattern ;
66
+ import java .util .stream .Collectors ;
66
67
import javax .annotation .Nullable ;
67
68
68
69
/**
@@ -266,7 +267,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
266
267
connectionConfigAccessor .setSchema (schemaStr );
267
268
}
268
269
LineageRecorder lineageRecorder = new LineageRecorder (context , sourceConfig .referenceName );
269
- lineageRecorder .createExternalDataset (sourceConfig .getSchema ());
270
+ Schema schema = sourceConfig .getSchema () == null ? schemaFromDB : sourceConfig .getSchema ();
271
+ lineageRecorder .createExternalDataset (schema );
272
+ if (schema != null && schema .getFields () != null ) {
273
+ lineageRecorder .recordRead ("Read" , "Read from database plugin" ,
274
+ schema .getFields ().stream ().map (Schema .Field ::getName ).collect (Collectors .toList ()));
275
+ }
270
276
context .setInput (Input .of (sourceConfig .referenceName , new SourceInputFormatProvider (
271
277
DataDrivenETLDBInputFormat .class , connectionConfigAccessor .getConfiguration ())));
272
278
}
You can’t perform that action at this time.
0 commit comments