23
23
import io .opensergo .proto .transport .v1 .SubscribeOpType ;
24
24
import io .opensergo .proto .transport .v1 .SubscribeRequest ;
25
25
import io .opensergo .proto .transport .v1 .SubscribeRequestTarget ;
26
- import io .opensergo .subscribe .OpenSergoConfigSubscriber ;
27
26
import io .opensergo .subscribe .SubscribeKey ;
28
27
import io .opensergo .subscribe .SubscribeRegistry ;
29
28
import io .opensergo .subscribe .SubscribedConfigCache ;
30
29
import io .opensergo .util .AssertUtils ;
31
30
import io .opensergo .util .IdentifierUtils ;
32
31
32
+ import java .util .Optional ;
33
+ import java .util .concurrent .TimeUnit ;
33
34
import java .util .concurrent .atomic .AtomicInteger ;
34
35
35
36
/**
@@ -46,6 +47,7 @@ public class OpenSergoClient implements AutoCloseable {
46
47
private final SubscribeRegistry subscribeRegistry ;
47
48
48
49
private AtomicInteger reqId ;
50
+ protected static volatile OpensergoClientStatus status ;
49
51
50
52
public OpenSergoClient (String host , int port ) {
51
53
this .channel = ManagedChannelBuilder .forAddress (host , port )
@@ -56,17 +58,64 @@ public OpenSergoClient(String host, int port) {
56
58
this .configCache = new SubscribedConfigCache ();
57
59
this .subscribeRegistry = new SubscribeRegistry ();
58
60
this .reqId = new AtomicInteger (0 );
61
+ status = OpensergoClientStatus .INITIAL ;
62
+ }
63
+
64
+ public void registerSubscribeInfo (OpensergoClientSubscribeInfo subscribeInfo ) {
65
+ // Register subscriber to local.
66
+ if (Optional .of (subscribeInfo .getSubscriberList ()).isPresent () && subscribeInfo .getSubscriberList ().size () > 0 ) {
67
+ subscribeInfo .getSubscriberList ().forEach (subscriber -> {
68
+ this .subscribeRegistry .registerSubscriber (subscribeInfo .getSubscribeKey (), subscriber );
69
+ OpenSergoLogger .info ("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}" , subscribeInfo .getSubscribeKey (), subscriber );
70
+ });
71
+ }
59
72
}
60
73
61
74
public void start () throws Exception {
75
+ OpenSergoLogger .info ("OpensergoClient is starting..." );
76
+
77
+ if (status == OpensergoClientStatus .INITIAL ) {
78
+ OpenSergoLogger .info ("open keepavlive thread" );
79
+ new Thread (this ::keepAlive ).start ();
80
+ }
81
+
82
+ status = OpensergoClientStatus .STARTING ;
83
+
62
84
this .requestAndResponseWriter = transportGrpcStub .withWaitForReady ()
63
85
.subscribeConfig (new OpenSergoSubscribeClientObserver (configCache , subscribeRegistry ));
86
+
87
+ OpenSergoLogger .info ("begin to subscribe config-data..." );
88
+ this .subscribeRegistry .getSubscriberKeysAll ().forEach (subscribeKey -> {
89
+ this .subscribeConfig (subscribeKey );
90
+ });
91
+
92
+ OpenSergoLogger .info ("openSergoClient is started" );
93
+ status = OpensergoClientStatus .STARTED ;
94
+ }
95
+
96
+ private void keepAlive () {
97
+ try {
98
+ if (status != OpensergoClientStatus .STARTING
99
+ && status != OpensergoClientStatus .STARTED
100
+ && status != OpensergoClientStatus .SHUTDOWN ) {
101
+ OpenSergoLogger .info ("try to restart openSergoClient..." );
102
+ this .start ();
103
+ }
104
+ Thread .sleep (TimeUnit .SECONDS .toMillis (10 ));
105
+ if ( status != OpensergoClientStatus .SHUTDOWN ) {
106
+ keepAlive ();
107
+ }
108
+ } catch (Exception e ) {
109
+ e .printStackTrace ();
110
+ }
64
111
}
65
112
66
113
@ Override
67
114
public void close () throws Exception {
68
115
requestAndResponseWriter .onCompleted ();
69
116
117
+ status = OpensergoClientStatus .SHUTDOWN ;
118
+
70
119
// gracefully drain the requests, then close the connection
71
120
channel .shutdown ();
72
121
}
@@ -77,8 +126,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77
126
AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
78
127
79
128
if (requestAndResponseWriter == null ) {
80
- // TODO: return status that indicates not ready
81
- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
129
+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
130
+ status = OpensergoClientStatus . INTERRUPTED ;
82
131
}
83
132
SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
84
133
.setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -97,17 +146,13 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
97
146
}
98
147
99
148
public boolean subscribeConfig (SubscribeKey subscribeKey ) {
100
- return subscribeConfig (subscribeKey , null );
101
- }
102
-
103
- public boolean subscribeConfig (SubscribeKey subscribeKey , OpenSergoConfigSubscriber subscriber ) {
104
149
AssertUtils .assertNotNull (subscribeKey , "subscribeKey cannot be null" );
105
150
AssertUtils .assertNotNull (subscribeKey .getApp (), "app cannot be null" );
106
151
AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
107
152
108
153
if (requestAndResponseWriter == null ) {
109
- // TODO: return status that indicates not ready
110
- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
154
+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
155
+ status = OpensergoClientStatus . INTERRUPTED ;
111
156
}
112
157
SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
113
158
.setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -121,13 +166,6 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121
166
// Send SubscribeRequest
122
167
requestAndResponseWriter .onNext (request );
123
168
124
- // Register subscriber to local.
125
- if (subscriber != null ) {
126
- subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127
- OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128
- subscribeKey , subscriber );
129
- }
130
-
131
169
return true ;
132
170
}
133
171
0 commit comments