55import com .uber .m3 .tally .Scope ;
66import com .uber .m3 .tally .StatsReporter ;
77import com .uber .m3 .util .Duration ;
8- import io .grpc .netty .shaded .io .grpc .netty .GrpcSslContexts ;
9- import io .grpc .netty .shaded .io .netty .handler .ssl .SslContext ;
10- import io .grpc .netty .shaded .io .netty .handler .ssl .SslContextBuilder ;
118import io .micrometer .prometheus .PrometheusConfig ;
129import io .micrometer .prometheus .PrometheusMeterRegistry ;
1310import io .temporal .client .WorkflowClient ;
1411import io .temporal .client .WorkflowClientOptions ;
1512import io .temporal .common .reporter .MicrometerClientStatsReporter ;
1613import io .temporal .serviceclient .WorkflowServiceStubs ;
1714import io .temporal .serviceclient .WorkflowServiceStubsOptions ;
18- import java .io .FileInputStream ;
19- import java .io .InputStream ;
2015import java .io .OutputStream ;
2116import java .net .InetSocketAddress ;
2217import java .nio .charset .StandardCharsets ;
2318
2419public final class TemporalConnection {
2520 private TemporalConnection () {}
2621
27- // Read from environment (docker-compose env_file: .env)
28- public static final String NAMESPACE = env ("TEMPORAL_NAMESPACE" , "<namespace>.<account-id>" );
29- public static final String ADDRESS =
30- env ("TEMPORAL_ADDRESS" , "<namespace>.<account-id>.tmprl.cloud:7233" );
31- public static final String CERT =
32- env ("TEMPORAL_CERT" , "path/to/client.pem" );
33- public static final String KEY =
34- env ("TEMPORAL_KEY" , "path/to/client.key" );
22+ // Required: MUST be set in env. No defaults.
23+ public static final String NAMESPACE = envRequired ("TEMPORAL_NAMESPACE" );
24+ public static final String ADDRESS = envRequired ("TEMPORAL_ADDRESS" );
3525 public static final String TASK_QUEUE = env ("TASK_QUEUE" , "openmetrics-task-queue" );
26+
3627 private static final int METRICS_PORT = envInt ("METRICS_PORT" , 9464 );
3728 private static final int METRICS_REPORT_SECONDS = envInt ("METRICS_REPORT_SECONDS" , 10 );
3829
3930 private static volatile WorkflowClient CLIENT ;
40- private static volatile WorkflowServiceStubs SERVICE ;
41-
42- private static volatile boolean METRICS_STARTED = false ;
43- private static volatile PrometheusMeterRegistry PROM_REGISTRY ;
31+ private static volatile PrometheusMeterRegistry PROM ;
32+ private static volatile boolean METRICS_STARTED ;
4433
4534 public static WorkflowClient client () {
4635 if (CLIENT != null ) return CLIENT ;
4736 synchronized (TemporalConnection .class ) {
4837 if (CLIENT != null ) return CLIENT ;
4938
50- SERVICE = serviceStubs ();
39+ String apiKey = envRequired ("TEMPORAL_API_KEY" );
40+
41+ // Validation
42+ validate ();
43+ System .out .println ("TemporalConnection: ADDRESS=" + ADDRESS );
44+ System .out .println ("TemporalConnection: NAMESPACE=" + NAMESPACE );
45+
46+ Scope scope = metricsScope ();
47+
48+ WorkflowServiceStubs service =
49+ WorkflowServiceStubs .newServiceStubs (
50+ WorkflowServiceStubsOptions .newBuilder ()
51+ .setTarget (ADDRESS )
52+ .setEnableHttps (true )
53+ .addApiKey (() -> apiKey )
54+ .setMetricsScope (scope )
55+ .build ());
56+
5157 CLIENT =
5258 WorkflowClient .newInstance (
53- SERVICE , WorkflowClientOptions .newBuilder ().setNamespace (NAMESPACE ).build ());
59+ service , WorkflowClientOptions .newBuilder ().setNamespace (NAMESPACE ).build ());
60+
5461 return CLIENT ;
5562 }
5663 }
5764
58- // create service stubs used by worker + starter
59- private static WorkflowServiceStubs serviceStubs () {
60- try (InputStream clientCert = new FileInputStream (CERT );
61- InputStream clientKey = new FileInputStream (KEY )) {
62-
63- SslContext sslContext =
64- GrpcSslContexts .configure (SslContextBuilder .forClient ().keyManager (clientCert , clientKey ))
65- .build ();
66-
67- Scope metricsScope = metricsScope (); // ✅ tally scope that writes into Prometheus registry
68-
69- WorkflowServiceStubsOptions options =
70- WorkflowServiceStubsOptions .newBuilder ()
71- .setTarget (ADDRESS )
72- .setSslContext (sslContext )
73- .setMetricsScope (metricsScope ) // ✅ Temporal SDK emits metrics here
74- .build ();
75-
76- return WorkflowServiceStubs .newServiceStubs (options );
77- } catch (Exception e ) {
78- throw new RuntimeException ("Failed to create Temporal TLS connection" , e );
65+ private static void validate () {
66+ if (NAMESPACE .isBlank ()) {
67+ throw new IllegalStateException ("TEMPORAL_NAMESPACE must be set (non-blank)." );
68+ }
69+ if (ADDRESS .isBlank ()) {
70+ throw new IllegalStateException ("TEMPORAL_ADDRESS must be set (non-blank)." );
7971 }
8072 }
8173
8274 private static Scope metricsScope () {
8375 synchronized (TemporalConnection .class ) {
84- if (PROM_REGISTRY == null ) {
85- PROM_REGISTRY = new PrometheusMeterRegistry (PrometheusConfig .DEFAULT );
86- }
87-
88- StatsReporter reporter = new MicrometerClientStatsReporter (PROM_REGISTRY );
76+ if (PROM == null ) PROM = new PrometheusMeterRegistry (PrometheusConfig .DEFAULT );
8977
78+ StatsReporter reporter = new MicrometerClientStatsReporter (PROM );
9079 Scope scope =
9180 new RootScopeBuilder ()
9281 .reporter (reporter )
9382 .reportEvery (Duration .ofSeconds (METRICS_REPORT_SECONDS ));
9483
9584 if (!METRICS_STARTED ) {
9685 METRICS_STARTED = true ;
97- startMetricsHttpServer (PROM_REGISTRY );
86+ startMetricsHttpServer (PROM );
9887 }
99-
10088 return scope ;
10189 }
10290 }
@@ -116,17 +104,24 @@ private static void startMetricsHttpServer(PrometheusMeterRegistry registry) {
116104 os .write (body );
117105 }
118106 });
119- server .setExecutor (null );
120107 server .start ();
121- System .out .println ("Worker metrics exposed at http://0.0.0.0:" + METRICS_PORT + "/metrics" );
108+ System .out .println ("Worker metrics at http://0.0.0.0:" + METRICS_PORT + "/metrics" );
122109 } catch (Exception e ) {
123110 throw new RuntimeException ("Failed to start /metrics endpoint" , e );
124111 }
125112 }
126113
127114 private static String env (String key , String def ) {
128115 String v = System .getenv (key );
129- return (v == null || v .isBlank ()) ? def : v ;
116+ return (v == null || v .isBlank ()) ? def : v .trim ();
117+ }
118+
119+ private static String envRequired (String key ) {
120+ String v = System .getenv (key );
121+ if (v == null || v .isBlank ()) {
122+ throw new IllegalStateException ("Missing required env var: " + key );
123+ }
124+ return v .trim ();
130125 }
131126
132127 private static int envInt (String key , int def ) {
0 commit comments