1919
2020import  java .io .IOException ;
2121import  java .net .URI ;
22+ import  java .util .Collection ;
2223import  java .util .Collections ;
2324import  java .util .List ;
2425import  java .util .Map ;
2526import  java .util .Properties ;
2627
2728import  org .apache .commons .lang3 .StringUtils ;
28- import  org .apache .gobblin .runtime .spec_catalog .FlowCatalog ;
29- import  org .apache .gobblin .service .modules .orchestration .UserQuotaManager ;
30- import  org .apache .gobblin .util .reflection .GobblinConstructorUtils ;
3129import  org .quartz .CronExpression ;
3230import  org .slf4j .Logger ;
3331import  org .slf4j .LoggerFactory ;
4846import  org .apache .gobblin .configuration .State ;
4947import  org .apache .gobblin .instrumented .Instrumented ;
5048import  org .apache .gobblin .metrics .MetricContext ;
49+ import  org .apache .gobblin .metrics .ServiceMetricNames ;
5150import  org .apache .gobblin .metrics .Tag ;
5251import  org .apache .gobblin .runtime .api .FlowSpec ;
5352import  org .apache .gobblin .runtime .api .JobSpec ;
5857import  org .apache .gobblin .runtime .job_catalog .FSJobCatalog ;
5958import  org .apache .gobblin .runtime .job_spec .ResolvedJobSpec ;
6059import  org .apache .gobblin .runtime .spec_catalog .AddSpecResponse ;
60+ import  org .apache .gobblin .runtime .spec_catalog .FlowCatalog ;
6161import  org .apache .gobblin .service .ServiceConfigKeys ;
62- import  org .apache .gobblin .metrics .ServiceMetricNames ;
6362import  org .apache .gobblin .service .modules .flowgraph .Dag ;
63+ import  org .apache .gobblin .service .modules .orchestration .UserQuotaManager ;
6464import  org .apache .gobblin .service .modules .spec .JobExecutionPlan ;
6565import  org .apache .gobblin .util .ConfigUtils ;
6666import  org .apache .gobblin .util .PropertiesUtils ;
67+ import  org .apache .gobblin .util .reflection .GobblinConstructorUtils ;
6768
6869
6970// Provide base implementation for constructing multi-hops route. 
@@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
7374  // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected 
7475  // to these data structures. 
7576  @ Getter 
76-   @ Setter 
77-   protected  final  Map <URI , TopologySpec > topologySpecMap ;
77+   protected  final  Map <URI , TopologySpec > topologySpecMap  = Maps .newConcurrentMap ();
7878
7979  protected  final  Config  config ;
8080  protected  final  Logger  log ;
@@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
9797
9898  private  Optional <UserQuotaManager > userQuotaManager ;
9999
100-   public  BaseFlowToJobSpecCompiler (Config  config ){
101-     this (config ,true );
102-   }
103- 
104-   public  BaseFlowToJobSpecCompiler (Config  config , boolean  instrumentationEnabled ){
105-     this (config , Optional .<Logger >absent (),  true );
106-   }
107- 
108-   public  BaseFlowToJobSpecCompiler (Config  config , Optional <Logger > log ){
109-     this (config , log ,true );
110-   }
111- 
112-   public  BaseFlowToJobSpecCompiler (Config  config , Optional <Logger > log , boolean  instrumentationEnabled ){
113-     this .log  = log .isPresent () ? log .get () : LoggerFactory .getLogger (getClass ());
114-     if  (instrumentationEnabled ) {
115-       this .metricContext  = Instrumented .getMetricContext (ConfigUtils .configToState (config ), IdentityFlowToJobSpecCompiler .class );
116-       this .flowCompilationSuccessFulMeter  = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_SUCCESSFUL_METER ));
117-       this .flowCompilationFailedMeter  = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_FAILED_METER ));
118-       this .flowCompilationTimer  = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .FLOW_COMPILATION_TIMER ));
119-       this .dataAuthorizationTimer  = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .DATA_AUTHORIZATION_TIMER ));
120-     }
121-     else  {
122-       this .metricContext  = null ;
123-       this .flowCompilationSuccessFulMeter  = Optional .absent ();
124-       this .flowCompilationFailedMeter  = Optional .absent ();
125-       this .flowCompilationTimer  = Optional .absent ();
126-       this .dataAuthorizationTimer  = Optional .absent ();
127-     }
128- 
100+   public  BaseFlowToJobSpecCompiler (Config  config , Collection <TopologySpec > topologySpecSet ){
101+     this .log  = LoggerFactory .getLogger (getClass ());
102+     this .metricContext  = Instrumented .getMetricContext (ConfigUtils .configToState (config ), IdentityFlowToJobSpecCompiler .class );
103+     this .flowCompilationSuccessFulMeter  = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_SUCCESSFUL_METER ));
104+     this .flowCompilationFailedMeter  = Optional .of (this .metricContext .meter (ServiceMetricNames .FLOW_COMPILATION_FAILED_METER ));
105+     this .flowCompilationTimer  = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .FLOW_COMPILATION_TIMER ));
106+     this .dataAuthorizationTimer  = Optional .<Timer >of (this .metricContext .timer (ServiceMetricNames .DATA_AUTHORIZATION_TIMER ));
129107    this .warmStandbyEnabled  = ConfigUtils .getBoolean (config , ServiceConfigKeys .GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY , false );
130108    if  (this .warmStandbyEnabled ) {
131109      userQuotaManager  = Optional .of (GobblinConstructorUtils .invokeConstructor (UserQuotaManager .class ,
@@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
134112      userQuotaManager  = Optional .absent ();
135113    }
136114
137-     this .topologySpecMap  = Maps .newConcurrentMap ();
115+     topologySpecSet .forEach (this ::onAddTopologySpec );
116+ 
117+ 
138118    this .config  = config ;
139119
140-     /***  
120+     /* 
141121     * ETL-5996 
142122     * For multi-tenancy, the following needs to be added: 
143123     * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs 
@@ -219,8 +199,6 @@ private  AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
219199  public  AddSpecResponse  onAddSpec (Spec  addedSpec ) {
220200    if  (addedSpec  instanceof  FlowSpec ) {
221201      return  onAddFlowSpec ((FlowSpec ) addedSpec );
222-     } else  if  (addedSpec  instanceof  TopologySpec ) {
223-       return  onAddTopologySpec ( (TopologySpec ) addedSpec );
224202    }
225203    return  new  AddSpecResponse (null );
226204  }
0 commit comments