From 2b16505b9cbc8acdcbb3cd16558b5530720c8dee Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 16:08:23 +0800 Subject: [PATCH 1/5] reference from client-go/tools/cache/reflector.go --- .../clustersynchro/informer/reflector.go | 605 ++++++++++++++++++ 1 file changed, 605 insertions(+) create mode 100644 pkg/synchromanager/clustersynchro/informer/reflector.go diff --git a/pkg/synchromanager/clustersynchro/informer/reflector.go b/pkg/synchromanager/clustersynchro/informer/reflector.go new file mode 100644 index 000000000..07b8a62c9 --- /dev/null +++ b/pkg/synchromanager/clustersynchro/informer/reflector.go @@ -0,0 +1,605 @@ +/* +Reference from + https://github.com/kubernetes/kubernetes/blob/b695d79d4f967c403a96986f1750a35eb75e75f1/staging/src/k8s.io/client-go/tools/cache/reflector.go +*/ + +package informer + +import ( + "context" + "errors" + "fmt" + "io" + "math/rand" + "reflect" + "sync" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/naming" + utilnet "k8s.io/apimachinery/pkg/util/net" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/pager" + "k8s.io/klog/v2" + "k8s.io/utils/trace" +) + +const defaultExpectedTypeName = "" + +// Reflector watches a specified resource and causes all changes to be reflected in the given store. +type Reflector struct { + // name identifies this reflector. By default it will be a file:line if possible. + name string + + // The name of the type we expect to place in the store. The name + // will be the stringification of expectedGVK if provided, and the + // stringification of expectedType otherwise. It is for display + // only, and should not be used for parsing or comparison. + expectedTypeName string + // An example object of the type we expect to place in the store. + // Only the type needs to be right, except that when that is + // `unstructured.Unstructured` the object's `"apiVersion"` and + // `"kind"` must also be right. + expectedType reflect.Type + // The GVK of the object we expect to place in the store if unstructured. + expectedGVK *schema.GroupVersionKind + // The destination to sync up with the watch source + store cache.Store + // listerWatcher is used to perform lists and watches. + listerWatcher cache.ListerWatcher + + // backoff manages backoff of ListWatch + backoffManager wait.BackoffManager + // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch. + initConnBackoffManager wait.BackoffManager + + resyncPeriod time.Duration + // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked + ShouldResync func() bool + // clock allows tests to manipulate time + clock clock.Clock + // paginatedResult defines whether pagination should be forced for list calls. + // It is set based on the result of the initial list call. + paginatedResult bool + // lastSyncResourceVersion is the resource version token last + // observed when doing a sync with the underlying store + // it is thread safe, but not synchronized with the underlying store + lastSyncResourceVersion string + // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with + // lastSyncResourceVersion failed with an "expired" or "too large resource version" error. + isLastSyncResourceVersionUnavailable bool + // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion + lastSyncResourceVersionMutex sync.RWMutex + // WatchListPageSize is the requested chunk size of initial and resync watch lists. + // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data + // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0") + // it will turn off pagination to allow serving them from watch cache. + // NOTE: It should be used carefully as paginated lists are always served directly from + // etcd, which is significantly less efficient and may lead to serious performance and + // scalability problems. + WatchListPageSize int64 + // Called whenever the ListAndWatch drops the connection with an error. + watchErrorHandler WatchErrorHandler +} + +// ResourceVersionUpdater is an interface that allows store implementation to +// track the current resource version of the reflector. This is especially +// important if storage bookmarks are enabled. +type ResourceVersionUpdater interface { + // UpdateResourceVersion is called each time current resource version of the reflector + // is updated. + UpdateResourceVersion(resourceVersion string) +} + +// The WatchErrorHandler is called whenever ListAndWatch drops the +// connection with an error. After calling this handler, the informer +// will backoff and retry. +// +// The default implementation looks at the error type and tries to log +// the error message at an appropriate level. +// +// Implementations of this handler may display the error message in other +// ways. Implementations should return quickly - any expensive processing +// should be offloaded. +type WatchErrorHandler func(r *Reflector, err error) + +// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler +func DefaultWatchErrorHandler(r *Reflector, err error) { + switch { + case isExpiredError(err): + // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + case err == io.EOF: + // watch closed normally + case err == io.ErrUnexpectedEOF: + klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) + default: + utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) + } +} + +var ( + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + minWatchTimeout = 5 * time.Minute +) + +// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector +// The indexer is configured to key on namespace +func NewNamespaceKeyedIndexerAndReflector(lw cache.ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer cache.Indexer, reflector *Reflector) { + indexer = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) + return indexer, reflector +} + +// NewReflector creates a new Reflector object which will keep the +// given store up to date with the server's contents for the given +// resource. Reflector promises to only put things in the store that +// have the type of expectedType, unless expectedType is nil. If +// resyncPeriod is non-zero, then the reflector will periodically +// consult its ShouldResync function to determine whether to invoke +// the Store's Resync operation; `ShouldResync==nil` means always +// "yes". This enables you to use reflectors to periodically process +// everything as well as incrementally processing the things that +// change. +func NewReflector(lw cache.ListerWatcher, expectedType interface{}, store cache.Store, resyncPeriod time.Duration) *Reflector { + return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) +} + +// NewNamedReflector same as NewReflector, but with a specified name for logging +func NewNamedReflector(name string, lw cache.ListerWatcher, expectedType interface{}, store cache.Store, resyncPeriod time.Duration) *Reflector { + realClock := &clock.RealClock{} + r := &Reflector{ + name: name, + listerWatcher: lw, + store: store, + // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when + // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is + // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. + backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), + resyncPeriod: resyncPeriod, + clock: realClock, + watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), + } + r.setExpectedType(expectedType) + return r +} + +func (r *Reflector) setExpectedType(expectedType interface{}) { + r.expectedType = reflect.TypeOf(expectedType) + if r.expectedType == nil { + r.expectedTypeName = defaultExpectedTypeName + return + } + + r.expectedTypeName = r.expectedType.String() + + if obj, ok := expectedType.(*unstructured.Unstructured); ok { + // Use gvk to check that watch event objects are of the desired type. + gvk := obj.GroupVersionKind() + if gvk.Empty() { + klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name) + return + } + r.expectedGVK = &gvk + r.expectedTypeName = gvk.String() + } +} + +// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common +// call chains to NewReflector, so they'd be low entropy names for reflectors +var internalPackages = []string{"client-go/tools/cache/"} + +// Run repeatedly uses the reflector's ListAndWatch to fetch all the +// objects and subsequent deltas. +// Run will exit when stopCh is closed. +func (r *Reflector) Run(stopCh <-chan struct{}) { + klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) + wait.BackoffUntil(func() { + if err := r.ListAndWatch(stopCh); err != nil { + r.watchErrorHandler(r, err) + } + }, r.backoffManager, true, stopCh) + klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) +} + +var ( + // nothing will ever be sent down this channel + neverExitWatch <-chan time.Time = make(chan time.Time) + + // Used to indicate that watching stopped because of a signal from the stop + // channel passed in from a client of the reflector. + errorStopRequested = errors.New("Stop requested") +) + +// resyncChan returns a channel which will receive something when a resync is +// required, and a cleanup function. +func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { + if r.resyncPeriod == 0 { + return neverExitWatch, func() bool { return false } + } + // The cleanup function is required: imagine the scenario where watches + // always fail so we end up listing frequently. Then, if we don't + // manually stop the timer, we could end up with many timers active + // concurrently. + t := r.clock.NewTimer(r.resyncPeriod) + return t.C(), t.Stop +} + +// ListAndWatch first lists all items and get the resource version at the moment of call, +// and then use the resource version to watch. +// It returns error if ListAndWatch didn't even try to initialize watch. +func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { + klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) + var resourceVersion string + + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} + + if err := func() error { + initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name}) + defer initTrace.LogIfLong(10 * time.Second) + var list runtime.Object + var paginatedResult bool + var err error + listCh := make(chan struct{}, 1) + panicCh := make(chan interface{}, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + switch { + case r.WatchListPageSize != 0: + pager.PageSize = r.WatchListPageSize + case r.paginatedResult: + // We got a paginated result initially. Assume this resource and server honor + // paging requests (i.e. watch cache is probably disabled) and leave the default + // pager size set. + case options.ResourceVersion != "" && options.ResourceVersion != "0": + // User didn't explicitly request pagination. + // + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + // + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watch cache is not enabled + // we don't introduce regression. + pager.PageSize = 0 + } + + list, paginatedResult, err = pager.List(context.Background(), options) + if isExpiredError(err) || isTooLargeResourceVersionError(err) { + r.setIsLastSyncResourceVersionUnavailable(true) + // Retry immediately if the resource version used to list is unavailable. + // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on + // continuation pages, but the pager might not be enabled, the full list might fail because the + // resource version it is listing at is expired or the cache may not yet be synced to the provided + // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure + // the reflector makes forward progress. + list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + } + close(listCh) + }() + select { + case <-stopCh: + return nil + case r := <-panicCh: + panic(r) + case <-listCh: + } + if err != nil { + return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) + } + + // We check if the list was paginated and if so set the paginatedResult based on that. + // However, we want to do that only for the initial list (which is the only case + // when we set ResourceVersion="0"). The reasoning behind it is that later, in some + // situations we may force listing directly from etcd (by setting ResourceVersion="") + // which will return paginated result, even if watch cache is enabled. However, in + // that case, we still want to prefer sending requests to watch cache if possible. + // + // Paginated result returned for request with ResourceVersion="0" mean that watch + // cache is disabled and there are a lot of objects of a given type. In such case, + // there is no need to prefer listing from watch cache. + if options.ResourceVersion == "0" && paginatedResult { + r.paginatedResult = true + } + + r.setIsLastSyncResourceVersionUnavailable(false) // list was successful + initTrace.Step("Objects listed") + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v: %v", list, err) + } + resourceVersion = listMetaInterface.GetResourceVersion() + initTrace.Step("Resource version extracted") + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v (%v)", list, err) + } + initTrace.Step("Objects extracted") + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("unable to sync list result: %v", err) + } + initTrace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + initTrace.Step("Resource version updated") + return nil + }(); err != nil { + return err + } + + resyncerrc := make(chan error, 1) + cancelCh := make(chan struct{}) + defer close(cancelCh) + go func() { + resyncCh, cleanup := r.resyncChan() + defer func() { + cleanup() // Call the last one written into cleanup + }() + for { + select { + case <-resyncCh: + case <-stopCh: + return + case <-cancelCh: + return + } + if r.ShouldResync == nil || r.ShouldResync() { + klog.V(4).Infof("%s: forcing resync", r.name) + if err := r.store.Resync(); err != nil { + resyncerrc <- err + return + } + } + cleanup() + resyncCh, cleanup = r.resyncChan() + } + }() + + for { + // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors + select { + case <-stopCh: + return nil + default: + } + + timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + options = metav1.ListOptions{ + ResourceVersion: resourceVersion, + // We want to avoid situations of hanging watchers. Stop any wachers that do not + // receive any events within the timeout window. + TimeoutSeconds: &timeoutSeconds, + // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. + // Reflector doesn't assume bookmarks are returned at all (if the server do not support + // watch bookmarks, it will ignore this field). + AllowWatchBookmarks: true, + } + + // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent + start := r.clock.Now() + w, err := r.listerWatcher.Watch(options) + if err != nil { + // If this is "connection refused" error, it means that most likely apiserver is not responsive. + // It doesn't make sense to re-list all objects because most likely we will be able to restart + // watch where we ended. + // If that's the case begin exponentially backing off and resend watch request. + // Do the same for "429" errors. + if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { + <-r.initConnBackoffManager.Backoff().C() + continue + } + return err + } + + if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err != errorStopRequested { + switch { + case isExpiredError(err): + // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already + // has a semantic that it returns data at least as fresh as provided RV. + // So first try to LIST with setting RV to resource version of last observed object. + klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + case apierrors.IsTooManyRequests(err): + klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) + <-r.initConnBackoffManager.Backoff().C() + continue + default: + klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) + } + } + return nil + } + } +} + +// syncWith replaces the store's items with the given list. +func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { + found := make([]interface{}, 0, len(items)) + for _, item := range items { + found = append(found, item) + } + return r.store.Replace(found, resourceVersion) +} + +// watchHandler watches w and keeps *resourceVersion up to date. +func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { + eventCount := 0 + + // Stopping the watcher should be idempotent and if we return from this function there's no way + // we're coming back in with the same watch interface. + defer w.Stop() + +loop: + for { + select { + case <-stopCh: + return errorStopRequested + case err := <-errc: + return err + case event, ok := <-w.ResultChan(): + if !ok { + break loop + } + if event.Type == watch.Error { + return apierrors.FromObject(event.Object) + } + if r.expectedType != nil { + if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { + utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) + continue + } + } + if r.expectedGVK != nil { + if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { + utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) + continue + } + } + meta, err := meta.Accessor(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + continue + } + newResourceVersion := meta.GetResourceVersion() + switch event.Type { + case watch.Added: + err := r.store.Add(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) + } + case watch.Modified: + err := r.store.Update(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) + } + case watch.Deleted: + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + err := r.store.Delete(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) + } + case watch.Bookmark: + // A `Bookmark` means watch has synced here, just update the resourceVersion + default: + utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) + } + *resourceVersion = newResourceVersion + r.setLastSyncResourceVersion(newResourceVersion) + if rvu, ok := r.store.(ResourceVersionUpdater); ok { + rvu.UpdateResourceVersion(newResourceVersion) + } + eventCount++ + } + } + + watchDuration := r.clock.Since(start) + if watchDuration < 1*time.Second && eventCount == 0 { + return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) + } + klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) + return nil +} + +// LastSyncResourceVersion is the resource version observed when last sync with the underlying store +// The value returned is not synchronized with access to the underlying store and is not thread-safe +func (r *Reflector) LastSyncResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + return r.lastSyncResourceVersion +} + +func (r *Reflector) setLastSyncResourceVersion(v string) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.lastSyncResourceVersion = v +} + +// relistResourceVersion determines the resource version the reflector should list or relist from. +// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource +// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted +// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in +// etcd via a quorum read. +func (r *Reflector) relistResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() + + if r.isLastSyncResourceVersionUnavailable { + // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache + // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector + // to the latest available ResourceVersion, using a consistent read from etcd. + return "" + } + if r.lastSyncResourceVersion == "" { + // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to + // be served from the watch cache if it is enabled. + return "0" + } + return r.lastSyncResourceVersion +} + +// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned +// "expired" or "too large resource version" error. +func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.isLastSyncResourceVersionUnavailable = isUnavailable +} + +func isExpiredError(err error) bool { + // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and + // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent + // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone + // check when we fully drop support for Kubernetes 1.17 servers from reflectors. + return apierrors.IsResourceExpired(err) || apierrors.IsGone(err) +} + +func isTooLargeResourceVersionError(err error) bool { + if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) { + return true + } + // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to + // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource + // version is larger than the largest currently available resource version. To ensure backward + // compatibility with these server versions we also need to detect the error based on the content + // of the error message field. + if !apierrors.IsTimeout(err) { + return false + } + apierr, ok := err.(apierrors.APIStatus) + if !ok || apierr == nil || apierr.Status().Details == nil { + return false + } + for _, cause := range apierr.Status().Details.Causes { + // Matches the message returned by api server 1.17.0-1.18.5 for this error condition + if cause.Message == "Too large resource version" { + return true + } + } + return false +} From 68449b1c3b9868c28dad4c08687b5118d67fa46b Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 23:03:57 +0800 Subject: [PATCH 2/5] update go.mod and vendor --- go.mod | 1 + vendor/modules.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/go.mod b/go.mod index b7766e38d..4b25afdde 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( k8s.io/component-base v0.22.4 k8s.io/klog/v2 v2.9.0 k8s.io/kubernetes v1.22.4 + k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a sigs.k8s.io/controller-runtime v0.10.3 sigs.k8s.io/controller-tools v0.7.0 ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 5703c8c80..f9753be6d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1095,6 +1095,7 @@ k8s.io/kubernetes/pkg/printers/storage k8s.io/kubernetes/pkg/util/node k8s.io/kubernetes/pkg/util/parsers # k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a +## explicit k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/internal/third_party/forked/golang/golang-lru From 256d9196a7f6001990b73d338967f637e7e169ab Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 23:04:18 +0800 Subject: [PATCH 3/5] change reflector to watch-relist --- .../clustersynchro/informer/reflector.go | 235 ++++++++++-------- 1 file changed, 129 insertions(+), 106 deletions(-) diff --git a/pkg/synchromanager/clustersynchro/informer/reflector.go b/pkg/synchromanager/clustersynchro/informer/reflector.go index 07b8a62c9..131128557 100644 --- a/pkg/synchromanager/clustersynchro/informer/reflector.go +++ b/pkg/synchromanager/clustersynchro/informer/reflector.go @@ -1,6 +1,10 @@ /* Reference from https://github.com/kubernetes/kubernetes/blob/b695d79d4f967c403a96986f1750a35eb75e75f1/staging/src/k8s.io/client-go/tools/cache/reflector.go + +Changes: + * add isLastSyncResourceVersionExpired. If lastSyncResourceVersion is expired, set true. + * change reflector.ListAndWatch to reflector.WatchAndRelist */ package informer @@ -89,6 +93,8 @@ type Reflector struct { WatchListPageSize int64 // Called whenever the ListAndWatch drops the connection with an error. watchErrorHandler WatchErrorHandler + + isLastSyncResourceVersionExpired bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -208,7 +214,7 @@ var internalPackages = []string{"client-go/tools/cache/"} func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { - if err := r.ListAndWatch(stopCh); err != nil { + if err := r.WatchAndRelist(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) @@ -238,117 +244,23 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { return t.C(), t.Stop } -// ListAndWatch first lists all items and get the resource version at the moment of call, -// and then use the resource version to watch. +// WatchAndRelist use the resource version to watch, if relistResourceVersion is "", "0" +// or the lastSyncResourceVersion is expired, call relist() to get all items. // It returns error if ListAndWatch didn't even try to initialize watch. -func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { +func (r *Reflector) WatchAndRelist(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) - var resourceVersion string - - options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} - - if err := func() error { - initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name}) - defer initTrace.LogIfLong(10 * time.Second) - var list runtime.Object - var paginatedResult bool - var err error - listCh := make(chan struct{}, 1) - panicCh := make(chan interface{}, 1) - go func() { - defer func() { - if r := recover(); r != nil { - panicCh <- r - } - }() - // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first - // list request will return the full response. - pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { - return r.listerWatcher.List(opts) - })) - switch { - case r.WatchListPageSize != 0: - pager.PageSize = r.WatchListPageSize - case r.paginatedResult: - // We got a paginated result initially. Assume this resource and server honor - // paging requests (i.e. watch cache is probably disabled) and leave the default - // pager size set. - case options.ResourceVersion != "" && options.ResourceVersion != "0": - // User didn't explicitly request pagination. - // - // With ResourceVersion != "", we have a possibility to list from watch cache, - // but we do that (for ResourceVersion != "0") only if Limit is unset. - // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly - // switch off pagination to force listing from watch cache (if enabled). - // With the existing semantic of RV (result is at least as fresh as provided RV), - // this is correct and doesn't lead to going back in time. - // - // We also don't turn off pagination for ResourceVersion="0", since watch cache - // is ignoring Limit in that case anyway, and if watch cache is not enabled - // we don't introduce regression. - pager.PageSize = 0 - } + relistResourceVersion := r.relistResourceVersion() + if relistResourceVersion == "" || relistResourceVersion == "0" || r.isLastSyncResourceVersionExpired { + if err := r.relist(stopCh); err != nil { + return err + } + r.isLastSyncResourceVersionExpired = false - list, paginatedResult, err = pager.List(context.Background(), options) - if isExpiredError(err) || isTooLargeResourceVersionError(err) { - r.setIsLastSyncResourceVersionUnavailable(true) - // Retry immediately if the resource version used to list is unavailable. - // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on - // continuation pages, but the pager might not be enabled, the full list might fail because the - // resource version it is listing at is expired or the cache may not yet be synced to the provided - // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure - // the reflector makes forward progress. - list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) - } - close(listCh) - }() select { case <-stopCh: return nil - case r := <-panicCh: - panic(r) - case <-listCh: - } - if err != nil { - return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) - } - - // We check if the list was paginated and if so set the paginatedResult based on that. - // However, we want to do that only for the initial list (which is the only case - // when we set ResourceVersion="0"). The reasoning behind it is that later, in some - // situations we may force listing directly from etcd (by setting ResourceVersion="") - // which will return paginated result, even if watch cache is enabled. However, in - // that case, we still want to prefer sending requests to watch cache if possible. - // - // Paginated result returned for request with ResourceVersion="0" mean that watch - // cache is disabled and there are a lot of objects of a given type. In such case, - // there is no need to prefer listing from watch cache. - if options.ResourceVersion == "0" && paginatedResult { - r.paginatedResult = true - } - - r.setIsLastSyncResourceVersionUnavailable(false) // list was successful - initTrace.Step("Objects listed") - listMetaInterface, err := meta.ListAccessor(list) - if err != nil { - return fmt.Errorf("unable to understand list result %#v: %v", list, err) - } - resourceVersion = listMetaInterface.GetResourceVersion() - initTrace.Step("Resource version extracted") - items, err := meta.ExtractList(list) - if err != nil { - return fmt.Errorf("unable to understand list result %#v (%v)", list, err) - } - initTrace.Step("Objects extracted") - if err := r.syncWith(items, resourceVersion); err != nil { - return fmt.Errorf("unable to sync list result: %v", err) + default: } - initTrace.Step("SyncWith done") - r.setLastSyncResourceVersion(resourceVersion) - initTrace.Step("Resource version updated") - return nil - }(); err != nil { - return err } resyncerrc := make(chan error, 1) @@ -379,6 +291,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } }() + resourceVersion := r.LastSyncResourceVersion() for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { @@ -388,7 +301,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) - options = metav1.ListOptions{ + options := metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. @@ -412,6 +325,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { <-r.initConnBackoffManager.Backoff().C() continue } + + if isExpiredError(err) { + r.isLastSyncResourceVersionExpired = true + } return err } @@ -423,6 +340,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + r.isLastSyncResourceVersionExpired = true case apierrors.IsTooManyRequests(err): klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) <-r.initConnBackoffManager.Backoff().C() @@ -436,6 +354,111 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } } +func (r *Reflector) relist(stopCh <-chan struct{}) error { + klog.V(2).Infof("Relist %v from %s", r.expectedTypeName, r.name) + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} + initTrace := trace.New("Reflector Relist", trace.Field{Key: "name", Value: r.name}) + defer initTrace.LogIfLong(10 * time.Second) + + var list runtime.Object + var paginatedResult bool + var err error + listCh := make(chan struct{}, 1) + panicCh := make(chan interface{}, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + switch { + case r.WatchListPageSize != 0: + pager.PageSize = r.WatchListPageSize + case r.paginatedResult: + // We got a paginated result initially. Assume this resource and server honor + // paging requests (i.e. watch cache is probably disabled) and leave the default + // pager size set. + case options.ResourceVersion != "" && options.ResourceVersion != "0": + // User didn't explicitly request pagination. + // + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + // + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watch cache is not enabled + // we don't introduce regression. + pager.PageSize = 0 + } + + list, paginatedResult, err = pager.List(context.Background(), options) + if isExpiredError(err) || isTooLargeResourceVersionError(err) { + r.setIsLastSyncResourceVersionUnavailable(true) + // Retry immediately if the resource version used to list is unavailable. + // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on + // continuation pages, but the pager might not be enabled, the full list might fail because the + // resource version it is listing at is expired or the cache may not yet be synced to the provided + // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure + // the reflector makes forward progress. + list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + } + close(listCh) + }() + select { + case <-stopCh: + return nil + case r := <-panicCh: + panic(r) + case <-listCh: + } + if err != nil { + return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) + } + + // We check if the list was paginated and if so set the paginatedResult based on that. + // However, we want to do that only for the initial list (which is the only case + // when we set ResourceVersion="0"). The reasoning behind it is that later, in some + // situations we may force listing directly from etcd (by setting ResourceVersion="") + // which will return paginated result, even if watch cache is enabled. However, in + // that case, we still want to prefer sending requests to watch cache if possible. + // + // Paginated result returned for request with ResourceVersion="0" mean that watch + // cache is disabled and there are a lot of objects of a given type. In such case, + // there is no need to prefer listing from watch cache. + if options.ResourceVersion == "0" && paginatedResult { + r.paginatedResult = true + } + + r.setIsLastSyncResourceVersionUnavailable(false) // list was successful + initTrace.Step("Objects listed") + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v: %v", list, err) + } + resourceVersion := listMetaInterface.GetResourceVersion() + initTrace.Step("Resource version extracted") + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v (%v)", list, err) + } + initTrace.Step("Objects extracted") + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("unable to sync list result: %v", err) + } + initTrace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + initTrace.Step("Resource version updated") + return nil +} + // syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) From a5acce89fa53996a949677182b0a1c75649ce89f Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 23:06:34 +0800 Subject: [PATCH 4/5] sync resource with the last resource version --- .../informer/named_controller.go | 47 ++++++++++++--- .../informer/resourceversion_informer.go | 15 +++-- .../informer/resourceversion_storage.go | 60 +++++++++++++++++-- .../clustersynchro/resource_synchro.go | 15 +++-- 4 files changed, 113 insertions(+), 24 deletions(-) diff --git a/pkg/synchromanager/clustersynchro/informer/named_controller.go b/pkg/synchromanager/clustersynchro/informer/named_controller.go index fb785148d..608081fc7 100644 --- a/pkg/synchromanager/clustersynchro/informer/named_controller.go +++ b/pkg/synchromanager/clustersynchro/informer/named_controller.go @@ -14,24 +14,34 @@ type controller struct { config cache.Config reflectorMutex sync.RWMutex - reflector *cache.Reflector - queue cache.Queue + reflector *Reflector + + lastResourceVersion string } -func NewNamedController(name string, config *cache.Config) cache.Controller { +func NewNamedController(name string, config *cache.Config) *controller { return &controller{ name: name, config: *config, } } +func (c *controller) SetLastResourceVersion(lastResourceVersion string) { + c.reflectorMutex.Lock() + defer c.reflectorMutex.Unlock() + if c.reflector != nil { + panic("controller is running, connot set last resource version") + } + c.lastResourceVersion = lastResourceVersion +} + func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() - r := cache.NewNamedReflector( + r := NewNamedReflector( c.name, c.config.ListerWatcher, c.config.ObjectType, @@ -42,6 +52,9 @@ func (c *controller) Run(stopCh <-chan struct{}) { r.WatchListPageSize = c.config.WatchListPageSize c.reflectorMutex.Lock() + if c.lastResourceVersion != "" { + r.lastSyncResourceVersion = c.lastResourceVersion + } c.reflector = r c.reflectorMutex.Unlock() @@ -52,6 +65,27 @@ func (c *controller) Run(stopCh <-chan struct{}) { wg.Wait() } +/* +func (c *controller) setLastResourceVersionForReflector(reflector *cache.Reflector) { + if c.resourceVersionGetter == nil { + return + } + + rv := c.resourceVersionGetter.LastResourceVersion() + if rv == "" || rv == "0" { + return + } + rvValue := reflect.ValueOf(rv) + + field := reflect.ValueOf(reflector).Elem().FieldByName("lastSyncResourceVersion") + value := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() + if value.Kind() != rvValue.Kind() { + panic(fmt.Sprintf("reflector.lastSyncResourceVersion's value kind is %v", value.Kind())) + } + value.Set(rvValue) +} +*/ + func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process)) @@ -71,10 +105,7 @@ func (c *controller) HasSynced() bool { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() - if c.queue == nil { - return false - } - return c.queue.HasSynced() + return c.config.Queue.HasSynced() } func (c *controller) LastSyncResourceVersion() string { diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go index 4c92aecc0..59c744ad6 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go @@ -7,7 +7,7 @@ import ( ) type ResourceVersionInformer interface { - Run(stopCh <-chan struct{}) + Run(withLastResourceVersion bool, stopCh <-chan struct{}) HasSynced() bool } @@ -15,7 +15,7 @@ type resourceVersionInformer struct { name string storage *ResourceVersionStorage handler ResourceEventHandler - controller cache.Controller + controller *controller listerWatcher cache.ListerWatcher } @@ -24,7 +24,6 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re panic("name is required") } - // storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc), informer := &resourceVersionInformer{ name: name, listerWatcher: lw, @@ -38,7 +37,7 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re RetryOnError: false, Process: func(obj interface{}) error { deltas := obj.(cache.Deltas) - return informer.HandleDeltas(deltas) + return informer.handleDeltas(deltas) }, Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, @@ -54,11 +53,15 @@ func (informer *resourceVersionInformer) HasSynced() bool { return informer.controller.HasSynced() } -func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) { +func (informer *resourceVersionInformer) Run(withLastResourceVersion bool, stopCh <-chan struct{}) { + // TODO(iceber): It can only be run once and an error is reported if it is run a second time + if withLastResourceVersion { + informer.controller.SetLastResourceVersion(informer.storage.LastResourceVersion()) + } informer.controller.Run(stopCh) } -func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error { +func (informer *resourceVersionInformer) handleDeltas(deltas cache.Deltas) error { for _, d := range deltas { switch d.Type { case cache.Replaced, cache.Added, cache.Updated: diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go index 77960e9a7..923aee577 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go @@ -1,23 +1,35 @@ package informer import ( + "strconv" + "sync/atomic" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/client-go/tools/cache" ) type ResourceVersionStorage struct { keyFunc cache.KeyFunc - cacheStorage cache.ThreadSafeStore + lastResourceVersion *uint64 + cacheStorage cache.ThreadSafeStore } var _ cache.KeyListerGetter = &ResourceVersionStorage{} func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage { - return &ResourceVersionStorage{ - cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - keyFunc: keyFunc, + var lastResourceVersion uint64 + storage := &ResourceVersionStorage{ + keyFunc: keyFunc, + lastResourceVersion: &lastResourceVersion, + cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), } + return storage +} + +func (c *ResourceVersionStorage) LastResourceVersion() string { + return strconv.FormatUint(atomic.LoadUint64(c.lastResourceVersion), 10) } func (c *ResourceVersionStorage) Add(obj interface{}) error { @@ -25,12 +37,19 @@ func (c *ResourceVersionStorage) Add(obj interface{}) error { if err != nil { return cache.KeyError{Obj: obj, Err: err} } + accessor, err := meta.Accessor(obj) if err != nil { return err } - c.cacheStorage.Add(key, accessor.GetResourceVersion()) + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + c.cacheStorage.Add(key, resourceversion) return nil } @@ -39,12 +58,19 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error { if err != nil { return cache.KeyError{Obj: obj, Err: err} } + accessor, err := meta.Accessor(obj) if err != nil { return err } - c.cacheStorage.Update(key, accessor.GetResourceVersion()) + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + c.cacheStorage.Update(key, resourceversion) return nil } @@ -54,6 +80,15 @@ func (c *ResourceVersionStorage) Delete(obj interface{}) error { return cache.KeyError{Obj: obj, Err: err} } + if accessor, err := meta.Accessor(obj); err == nil { + resourceversion := accessor.GetResourceVersion() + rv, err := etcd3.Versioner.ParseResourceVersion(resourceversion) + if err != nil { + return err + } + atomic.CompareAndSwapUint64(c.lastResourceVersion, *c.lastResourceVersion, rv) + } + c.cacheStorage.Delete(key) return nil } @@ -80,6 +115,19 @@ func (c *ResourceVersionStorage) GetByKey(key string) (item interface{}, exists } func (c *ResourceVersionStorage) Replace(versions map[string]interface{}) error { + var lastResourceVersion uint64 + for _, version := range versions { + rv, err := etcd3.Versioner.ParseResourceVersion(version.(string)) + if err != nil { + // TODO(iceber): handle err + continue + } + + if rv > lastResourceVersion { + lastResourceVersion = rv + } + } + atomic.StoreUint64(c.lastResourceVersion, lastResourceVersion) c.cacheStorage.Replace(versions, "") return nil } diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index 4aa16d2f4..3cc41d1a4 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -29,9 +29,10 @@ type ResourceSynchro struct { cluster string storageResource schema.GroupResource - queue queue.EventQueue - listerWatcher cache.ListerWatcher - cache *informer.ResourceVersionStorage + queue queue.EventQueue + listerWatcher cache.ListerWatcher + cache *informer.ResourceVersionStorage + syncWithLastResourceVersion bool memoryVersion schema.GroupVersion convertor runtime.ObjectConvertor @@ -72,6 +73,9 @@ func newResourceSynchro(cluster string, lw cache.ListerWatcher, rvcache *informe } close(synchro.stoped) + // TODO(iceber): add feature gate + synchro.syncWithLastResourceVersion = true + status := clustersv1alpha1.ClusterResourceSyncCondition{ Status: clustersv1alpha1.SyncStatusPending, LastTransitionTime: metav1.Now().Rfc3339Copy(), @@ -141,7 +145,10 @@ func (synchro *ResourceSynchro) Run(stopCh <-chan struct{}) { synchro.cache, &unstructured.Unstructured{}, synchro, - ).Run(informerStopCh) + ).Run(synchro.syncWithLastResourceVersion, informerStopCh) + + // next run informer with last resource version + synchro.syncWithLastResourceVersion = true status = clustersv1alpha1.ClusterResourceSyncCondition{ Status: clustersv1alpha1.SyncStatusStop, From f0fb6d180f2899678e738c253602a21e962a9d84 Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Tue, 14 Dec 2021 17:58:48 +0800 Subject: [PATCH 5/5] set the gvk of the unstructured example object --- pkg/synchromanager/clustersynchro/cluster_synchro.go | 5 ++++- .../clustersynchro/resource_synchro.go | 12 +++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 533b5a8e7..5f8e53bff 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -153,6 +153,7 @@ func (s *ClusterSynchro) initWithResourceVersions(resourceversions map[schema.Gr } type syncConfig struct { + kind string syncResource schema.GroupVersionResource storageResource schema.GroupVersionResource convertor runtime.ObjectConvertor @@ -258,6 +259,7 @@ func (s *ClusterSynchro) SetResources(clusterResources []clustersv1alpha1.Cluste storageResource := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version) if _, ok := configs[storageResource]; !ok { config := &syncConfig{ + kind: info.Kind, syncResource: syncResource, storageResource: storageResource, storageConfig: storageConfig, @@ -340,7 +342,8 @@ func (s *ClusterSynchro) SetResources(clusterResources []clustersv1alpha1.Cluste s.resourceVersionCaches[gvr] = resourceVersionCache } - synchro := newResourceSynchro(s.name, + syncKind := config.syncResource.GroupVersion().WithKind(config.kind) + synchro := newResourceSynchro(s.name, syncKind, s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource), resourceVersionCache, config.convertor, diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index 3cc41d1a4..dca96953f 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -26,7 +26,9 @@ import ( ) type ResourceSynchro struct { - cluster string + cluster string + + syncKind schema.GroupVersionKind storageResource schema.GroupResource queue queue.EventQueue @@ -49,12 +51,13 @@ type ResourceSynchro struct { closed chan struct{} } -func newResourceSynchro(cluster string, lw cache.ListerWatcher, rvcache *informer.ResourceVersionStorage, +func newResourceSynchro(cluster string, syncKind schema.GroupVersionKind, lw cache.ListerWatcher, rvcache *informer.ResourceVersionStorage, convertor runtime.ObjectConvertor, storage storage.ResourceStorage, ) *ResourceSynchro { ctx, cancel := context.WithCancel(context.Background()) synchro := &ResourceSynchro{ cluster: cluster, + syncKind: syncKind, storageResource: storage.GetStorageConfig().StorageGroupResource, listerWatcher: lw, @@ -139,11 +142,14 @@ func (synchro *ResourceSynchro) Run(stopCh <-chan struct{}) { } synchro.status.Store(status) + exampleObj := &unstructured.Unstructured{} + exampleObj.SetGroupVersionKind(synchro.syncKind) + informer.NewResourceVersionInformer( synchro.cluster, synchro.listerWatcher, synchro.cache, - &unstructured.Unstructured{}, + exampleObj, synchro, ).Run(synchro.syncWithLastResourceVersion, informerStopCh)