30
30
import io .opensergo .util .AssertUtils ;
31
31
import io .opensergo .util .IdentifierUtils ;
32
32
33
+ import java .util .Optional ;
34
+ import java .util .concurrent .TimeUnit ;
33
35
import java .util .concurrent .atomic .AtomicInteger ;
34
36
35
37
/**
@@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable {
46
48
private final SubscribeRegistry subscribeRegistry ;
47
49
48
50
private AtomicInteger reqId ;
51
+ protected volatile OpenSergoClientStatus status ;
49
52
50
53
public OpenSergoClient (String host , int port ) {
51
54
this .channel = ManagedChannelBuilder .forAddress (host , port )
@@ -56,17 +59,72 @@ public OpenSergoClient(String host, int port) {
56
59
this .configCache = new SubscribedConfigCache ();
57
60
this .subscribeRegistry = new SubscribeRegistry ();
58
61
this .reqId = new AtomicInteger (0 );
62
+ status = OpenSergoClientStatus .INITIAL ;
63
+ }
64
+
65
+ public void registerSubscribeInfo (OpenSergoClientSubscribeInfo subscribeInfo ) {
66
+ // Register subscriber to local.
67
+ if (Optional .of (subscribeInfo .getSubscriberList ()).isPresent () && subscribeInfo .getSubscriberList ().size () > 0 ) {
68
+ subscribeInfo .getSubscriberList ().forEach (subscriber -> {
69
+ this .subscribeRegistry .registerSubscriber (subscribeInfo .getSubscribeKey (), subscriber );
70
+ OpenSergoLogger .info ("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}" , subscribeInfo .getSubscribeKey (), subscriber );
71
+
72
+ if (requestAndResponseWriter != null && this .status == OpenSergoClientStatus .STARTED ) {
73
+ this .subscribeConfig (subscribeInfo .getSubscribeKey ());
74
+ }
75
+ });
76
+ }
59
77
}
60
78
61
79
public void start () throws Exception {
80
+ OpenSergoLogger .info ("OpensergoClient is starting..." );
81
+
82
+ if (status == OpenSergoClientStatus .INITIAL ) {
83
+ OpenSergoLogger .info ("open keepavlive thread" );
84
+ Thread keepAliveThread = new Thread (this ::keepAlive );
85
+ keepAliveThread .setName ("thread-opensergo-keepalive-" + keepAliveThread .getId ());
86
+ keepAliveThread .setDaemon (true );
87
+ keepAliveThread .start ();
88
+ }
89
+
90
+ status = OpenSergoClientStatus .STARTING ;
91
+
62
92
this .requestAndResponseWriter = transportGrpcStub .withWaitForReady ()
63
- .subscribeConfig (new OpenSergoSubscribeClientObserver (configCache , subscribeRegistry ));
93
+ .subscribeConfig (new OpenSergoSubscribeClientObserver (this ));
94
+
95
+ OpenSergoLogger .info ("begin to subscribe config-data..." );
96
+ this .subscribeRegistry .getSubscriberKeysAll ().forEach (subscribeKey -> {
97
+ this .subscribeConfig (subscribeKey );
98
+ });
99
+
100
+ OpenSergoLogger .info ("openSergoClient is started" );
101
+ status = OpenSergoClientStatus .STARTED ;
102
+ }
103
+
104
+ private void keepAlive () {
105
+ try {
106
+ if (status != OpenSergoClientStatus .STARTING
107
+ && status != OpenSergoClientStatus .STARTED
108
+ && status != OpenSergoClientStatus .SHUTDOWN ) {
109
+ OpenSergoLogger .info ("try to restart openSergoClient..." );
110
+ this .start ();
111
+ }
112
+ Thread .sleep (TimeUnit .SECONDS .toMillis (10 ));
113
+ if ( status != OpenSergoClientStatus .SHUTDOWN ) {
114
+ keepAlive ();
115
+ }
116
+ } catch (Exception e ) {
117
+ e .printStackTrace ();
118
+ }
64
119
}
65
120
66
121
@ Override
67
122
public void close () throws Exception {
68
123
requestAndResponseWriter .onCompleted ();
69
124
125
+ // stop the keepAliveThread
126
+ status = OpenSergoClientStatus .SHUTDOWN ;
127
+
70
128
// gracefully drain the requests, then close the connection
71
129
channel .shutdown ();
72
130
}
@@ -77,8 +135,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77
135
AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
78
136
79
137
if (requestAndResponseWriter == null ) {
80
- // TODO: return status that indicates not ready
81
- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
138
+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
139
+ status = OpenSergoClientStatus . INTERRUPTED ;
82
140
}
83
141
SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
84
142
.setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -106,8 +164,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
106
164
AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
107
165
108
166
if (requestAndResponseWriter == null ) {
109
- // TODO: return status that indicates not ready
110
- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
167
+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
168
+ status = OpenSergoClientStatus . INTERRUPTED ;
111
169
}
112
170
SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
113
171
.setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -121,18 +179,15 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121
179
// Send SubscribeRequest
122
180
requestAndResponseWriter .onNext (request );
123
181
124
- // Register subscriber to local.
125
- if (subscriber != null ) {
126
- subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127
- OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128
- subscribeKey , subscriber );
129
- }
130
-
131
182
return true ;
132
183
}
133
184
134
185
public SubscribedConfigCache getConfigCache () {
135
186
return configCache ;
136
187
}
137
188
189
+ public SubscribeRegistry getSubscribeRegistry () {
190
+ return subscribeRegistry ;
191
+ }
192
+
138
193
}
0 commit comments