diff --git a/src/main/java/io/opensergo/ConfigKind.java b/src/main/java/io/opensergo/ConfigKind.java index 72a8d01..31536cc 100644 --- a/src/main/java/io/opensergo/ConfigKind.java +++ b/src/main/java/io/opensergo/ConfigKind.java @@ -26,8 +26,7 @@ public enum ConfigKind { FAULT_TOLERANCE_RULE("fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule", "FaultToleranceRule"), RATE_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy", "RateLimitStrategy"), THROTTLING_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ThrottlingStrategy", "ThrottlingStrategy"), - CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", - "ConcurrencyLimitStrategy"), + CONCURRENCY_LIMIT_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy", "ConcurrencyLimitStrategy"), CIRCUIT_BREAKER_STRATEGY("fault-tolerance.opensergo.io/v1alpha1/CircuitBreakerStrategy", "CircuitBreakerStrategy"); private final String kindName; diff --git a/src/main/java/io/opensergo/OpenSergoClient.java b/src/main/java/io/opensergo/OpenSergoClient.java index 0f3de80..8849835 100644 --- a/src/main/java/io/opensergo/OpenSergoClient.java +++ b/src/main/java/io/opensergo/OpenSergoClient.java @@ -30,6 +30,8 @@ import io.opensergo.util.AssertUtils; import io.opensergo.util.IdentifierUtils; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable { private final SubscribeRegistry subscribeRegistry; private AtomicInteger reqId; + protected volatile OpenSergoClientStatus status; public OpenSergoClient(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) @@ -56,17 +59,67 @@ public OpenSergoClient(String host, int port) { this.configCache = new SubscribedConfigCache(); this.subscribeRegistry = new SubscribeRegistry(); this.reqId = new AtomicInteger(0); + status = OpenSergoClientStatus.INITIAL; } public void start() throws Exception { + OpenSergoLogger.info("OpensergoClient is starting..."); + + if (status == OpenSergoClientStatus.INITIAL) { + OpenSergoLogger.info("open keepavlive thread"); + Thread keepAliveThread = new Thread(this::keepAlive); + keepAliveThread.setName("thread-opensergo-keepalive-" + keepAliveThread.getId()); + keepAliveThread.setDaemon(true); + keepAliveThread.start(); + } + + status = OpenSergoClientStatus.STARTING; + this.requestAndResponseWriter = transportGrpcStub.withWaitForReady() - .subscribeConfig(new OpenSergoSubscribeClientObserver(configCache, subscribeRegistry)); + .subscribeConfig(new OpenSergoSubscribeClientObserver(this)); + + OpenSergoLogger.info("begin to subscribe config-data..."); + this.subscribeRegistry.getSubscriberKeysAll().forEach(subscribeKey -> { + this.subscribeConfig(subscribeKey); + }); + + OpenSergoLogger.info("openSergoClient is started"); + status = OpenSergoClientStatus.STARTED; + } + + private void keepAlive() { + // TODO change to event-based design, instead of for-loop. + for (;;) { + if (status == OpenSergoClientStatus.SHUTDOWN) { + return; + } + + try { + if (status == OpenSergoClientStatus.INTERRUPTED) { + OpenSergoLogger.info("try to restart openSergoClient..."); + this.start(); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + } catch (InterruptedException e) { + OpenSergoLogger.error(e.toString(), e); + } catch (Exception e) { + try { + this.close(); + } catch (Exception ex) { + status = OpenSergoClientStatus.SHUTDOWN; + } + OpenSergoLogger.error("close OpenSergoClient because of " + e, e); + } + } } @Override public void close() throws Exception { requestAndResponseWriter.onCompleted(); + // stop the keepAliveThread + status = OpenSergoClientStatus.SHUTDOWN; + // gracefully drain the requests, then close the connection channel.shutdown(); } @@ -77,8 +130,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) { AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpenSergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -106,8 +159,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri AssertUtils.assertNotNull(subscribeKey.getKind(), "kind cannot be null"); if (requestAndResponseWriter == null) { - // TODO: return status that indicates not ready - throw new IllegalStateException("gRPC stream is not ready"); + OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", new IllegalStateException("gRPC stream is not ready")); + status = OpenSergoClientStatus.INTERRUPTED; } SubscribeRequestTarget subTarget = SubscribeRequestTarget.newBuilder() .setNamespace(subscribeKey.getNamespace()).setApp(subscribeKey.getApp()) @@ -124,8 +177,7 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri // Register subscriber to local. if (subscriber != null) { subscribeRegistry.registerSubscriber(subscribeKey, subscriber); - OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", - subscribeKey, subscriber); + OpenSergoLogger.info("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}", subscribeKey, subscriber); } return true; @@ -135,4 +187,8 @@ public SubscribedConfigCache getConfigCache() { return configCache; } + public SubscribeRegistry getSubscribeRegistry() { + return subscribeRegistry; + } + } diff --git a/src/main/java/io/opensergo/OpenSergoClientStatus.java b/src/main/java/io/opensergo/OpenSergoClientStatus.java new file mode 100644 index 0000000..b0c4fa0 --- /dev/null +++ b/src/main/java/io/opensergo/OpenSergoClientStatus.java @@ -0,0 +1,30 @@ +/* + * Copyright 2022, OpenSergo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.opensergo; + +/** + * @author Jiangnan Jia + **/ +public enum OpenSergoClientStatus { + + /* initial*/ + INITIAL, + STARTING, + STARTED, + INTERRUPTED, + SHUTDOWN + +} diff --git a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java index 1ea1b8d..ce908aa 100644 --- a/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java +++ b/src/main/java/io/opensergo/OpenSergoSubscribeClientObserver.java @@ -30,8 +30,6 @@ import io.opensergo.proto.transport.v1.SubscribeResponse; import io.opensergo.subscribe.OpenSergoConfigSubscriber; import io.opensergo.subscribe.SubscribeKey; -import io.opensergo.subscribe.SubscribeRegistry; -import io.opensergo.subscribe.SubscribedConfigCache; import io.opensergo.util.StringUtils; /** @@ -41,13 +39,10 @@ public class OpenSergoSubscribeClientObserver implements ClientResponseObserver< private ClientCallStreamObserver requestStream; - private final SubscribedConfigCache configCache; - private final SubscribeRegistry subscribeRegistry; + private OpenSergoClient openSergoClient; - public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache, - SubscribeRegistry subscribeRegistry) { - this.configCache = configCache; - this.subscribeRegistry = subscribeRegistry; + public OpenSergoSubscribeClientObserver(OpenSergoClient openSergoClient) { + this.openSergoClient = openSergoClient; } @Override @@ -58,7 +53,7 @@ public void beforeStart(ClientCallStreamObserver requestStream private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion) throws Exception { long receivedVersion = dataWithVersion.getVersion(); - SubscribedData cachedData = configCache.getDataFor(subscribeKey); + SubscribedData cachedData = this.openSergoClient.getConfigCache().getDataFor(subscribeKey); if (cachedData != null && cachedData.getVersion() > receivedVersion) { // The upcoming data is out-dated, so we'll not resolve the push request. return new LocalDataNotifyResult().setCode(OpenSergoTransportConstants.CODE_ERROR_VERSION_OUTDATED); @@ -67,9 +62,9 @@ private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWi // Decode actual data from the raw "Any" data. List dataList = decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList()); // Update to local config cache. - configCache.updateData(subscribeKey, dataList, receivedVersion); + this.openSergoClient.getConfigCache().updateData(subscribeKey, dataList, receivedVersion); - List subscribers = subscribeRegistry.getSubscribersOf(subscribeKey); + List subscribers = this.openSergoClient.getSubscribeRegistry().getSubscribersOf(subscribeKey); if (subscribers == null || subscribers.isEmpty()) { // no-subscriber is acceptable (just for cache-and-pull mode) return LocalDataNotifyResult.withSuccess(dataList); @@ -178,6 +173,11 @@ private List decodeActualData(String kind, List rawList) throws Exc @Override public void onError(Throwable t) { + // TODO add handles for different io.grpc.Status of Throwable from ClientCallStreamObserver + io.grpc.Status.Code errorCode = io.grpc.Status.fromThrowable(t).getCode(); + if(errorCode.equals(io.grpc.Status.UNAVAILABLE.getCode())) { + this.openSergoClient.status = OpenSergoClientStatus.INTERRUPTED; + } OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t); } diff --git a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java index ed66be0..ae124a8 100644 --- a/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java +++ b/src/main/java/io/opensergo/subscribe/SubscribeRegistry.java @@ -16,6 +16,7 @@ package io.opensergo.subscribe; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -33,9 +34,14 @@ public void registerSubscriber(SubscribeKey key, OpenSergoConfigSubscriber subsc AssertUtils.assertNotNull(key, "subscribeKey cannot be null"); AssertUtils.assertNotNull(subscriber, "subscriber cannot be null"); List list = subscriberMap.computeIfAbsent(key, v -> new CopyOnWriteArrayList<>()); + // TODO distinct the same OpenSergoConfigSubscriber list.add(subscriber); } + public Set getSubscriberKeysAll() { + return subscriberMap.keySet(); + } + public List getSubscribersOf(SubscribeKey key) { if (key == null) { return null;