1919package  xdsclient
2020
2121import  (
22+ 	"container/list" 
2223	"context" 
2324	"fmt" 
2425	"sync" 
@@ -28,7 +29,6 @@ import (
2829	igrpclog "google.golang.org/grpc/internal/grpclog" 
2930	"google.golang.org/grpc/internal/xds/clients" 
3031	"google.golang.org/grpc/internal/xds/clients/internal/backoff" 
31- 	"google.golang.org/grpc/internal/xds/clients/internal/buffer" 
3232	"google.golang.org/grpc/internal/xds/clients/internal/pretty" 
3333	"google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource" 
3434
@@ -104,7 +104,6 @@ type adsStreamImpl struct {
104104	// The following fields are initialized in the constructor and are not 
105105	// written to afterwards, and hence can be accessed without a mutex. 
106106	streamCh      chan  clients.Stream  // New ADS streams are pushed here. 
107- 	requestCh     * buffer.Unbounded    // Subscriptions and unsubscriptions are pushed here. 
108107	runnerDoneCh  chan  struct {}       // Notify completion of runner goroutine. 
109108	cancel        context.CancelFunc   // To cancel the context passed to the runner goroutine. 
110109	fc            * adsFlowControl      // Flow control for ADS stream. 
@@ -113,6 +112,8 @@ type adsStreamImpl struct {
113112	mu                 sync.Mutex 
114113	resourceTypeState  map [ResourceType ]* resourceTypeState  // Map of resource types to their state. 
115114	firstRequest       bool                                 // False after the first request is sent out. 
115+ 	queuedReqs         * list.List                           // Queued requests waiting to be sent. 
116+ 	queuedReqsExist    * sync.Cond                           // Condition variable for waiting on queued requests. 
116117}
117118
118119// adsStreamOpts contains the options for creating a new ADS Stream. 
@@ -137,11 +138,12 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
137138		watchExpiryTimeout : opts .watchExpiryTimeout ,
138139
139140		streamCh :          make (chan  clients.Stream , 1 ),
140- 		requestCh :         buffer .NewUnbounded (),
141141		runnerDoneCh :      make (chan  struct {}),
142142		fc :                newADSFlowControl (),
143143		resourceTypeState : make (map [ResourceType ]* resourceTypeState ),
144+ 		queuedReqs :        list .New (),
144145	}
146+ 	s .queuedReqsExist  =  sync .NewCond (& s .mu )
145147
146148	l  :=  grpclog .Component ("xds" )
147149	s .logger  =  igrpclog .NewPrefixLogger (l , opts .logPrefix + fmt .Sprintf ("[ads-stream %p] " , s ))
@@ -156,7 +158,10 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
156158func  (s  * adsStreamImpl ) Stop () {
157159	s .cancel ()
158160	s .fc .stop ()
159- 	s .requestCh .Close ()
161+ 	// Unblock the sender goroutine which might be blocked waiting for queued 
162+ 	// requests to be sent out. It is allowed but not required to hold the lock 
163+ 	// when signalling. 
164+ 	s .queuedReqsExist .Signal ()
160165	<- s .runnerDoneCh 
161166	s .logger .Infof ("Shutdown ADS stream" )
162167}
@@ -185,8 +190,13 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
185190	// be started when a request for this resource is actually sent out. 
186191	state .subscribedResources [name ] =  & xdsresource.ResourceWatchState {State : xdsresource .ResourceWatchStateStarted }
187192
188- 	// Send a request for the resource type with updated subscriptions. 
189- 	s .requestCh .Put (request {typ : typ , resourceNames : resourceNames (state .subscribedResources )})
193+ 	// Queue a request for the resource type with updated subscriptions. 
194+ 	resourceNames  :=  resourceNames (state .subscribedResources )
195+ 	if  s .logger .V (2 ) {
196+ 		s .logger .Infof ("Queueing a request for resources %q of type %q" , resourceNames , typ .TypeName )
197+ 	}
198+ 	s .queuedReqs .PushBack (request {typ : typ , resourceNames : resourceNames })
199+ 	s .queuedReqsExist .Signal ()
190200}
191201
192202// unsubscribe cancels the subscription to the given resource. It is a no-op if 
@@ -215,8 +225,13 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
215225	}
216226	delete (state .subscribedResources , name )
217227
218- 	// Send a request for the resource type with updated subscriptions. 
219- 	s .requestCh .Put (request {typ : typ , resourceNames : resourceNames (state .subscribedResources )})
228+ 	// Queue a request for the resource type with updated subscriptions. 
229+ 	resourceNames  :=  resourceNames (state .subscribedResources )
230+ 	if  s .logger .V (2 ) {
231+ 		s .logger .Infof ("Queueing a request for resources %q of type %q" , resourceNames , typ .TypeName )
232+ 	}
233+ 	s .queuedReqs .PushBack (request {typ : typ , resourceNames : resourceNames })
234+ 	s .queuedReqsExist .Signal ()
220235}
221236
222237// runner is a long-running goroutine that handles the lifecycle of the ADS 
@@ -227,8 +242,6 @@ func (s *adsStreamImpl) unsubscribe(typ ResourceType, name string) {
227242func  (s  * adsStreamImpl ) runner (ctx  context.Context ) {
228243	defer  close (s .runnerDoneCh )
229244
230- 	go  s .send (ctx )
231- 
232245	runStreamWithBackoff  :=  func () error  {
233246		stream , err  :=  s .transport .NewStream (ctx , "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources" )
234247		if  err  !=  nil  {
@@ -242,93 +255,95 @@ func (s *adsStreamImpl) runner(ctx context.Context) {
242255
243256		s .mu .Lock ()
244257		s .firstRequest  =  true 
258+ 		if  err  :=  s .sendExistingLocked (stream ); err  !=  nil  {
259+ 			s .logger .Warningf ("Failed to send existing resources on newly created stream: %v" , err )
260+ 			s .mu .Unlock ()
261+ 			return  nil 
262+ 		}
245263		s .mu .Unlock ()
246264
247- 		// Ensure that the most recently created stream is pushed on the 
248- 		// channel for the `send` goroutine to consume. 
249- 		select  {
250- 		case  <- s .streamCh :
251- 		default :
252- 		}
253- 		s .streamCh  <-  stream 
265+ 		// Spawn the sending goroutine that runs until the context is done, or 
266+ 		// writing to the stream fails. When the latter happens, the next 
267+ 		// iteration of the loop in the runner goroutine will spawn another 
268+ 		// sending goroutine. 
269+ 		sendDoneCh  :=  make (chan  struct {})
270+ 		recvDoneCh  :=  make (chan  struct {})
271+ 		go  func () {
272+ 			defer  close (sendDoneCh )
273+ 
274+ 			for  ctx .Err () ==  nil  {
275+ 				// Spawn a goroutine to wait for queued requests to be available 
276+ 				// for sending. This is required to exit the sending goroutine 
277+ 				// blocked on the condition variable when the receiving is done. 
278+ 				waitCh  :=  make (chan  struct {})
279+ 				go  func () {
280+ 					s .mu .Lock ()
281+ 					defer  s .mu .Unlock ()
282+ 
283+ 					// Wait on the condition variable only if there are no 
284+ 					// queued requests. A call to `Signal` will be a no-op if 
285+ 					// there are no blocked goroutines at that point in time. 
286+ 					// There could be newly queued requests that come in after 
287+ 					// we release the lock at the end of the loop, and signal 
288+ 					// before we get here. 
289+ 					if  s .queuedReqs .Len () ==  0  {
290+ 						s .queuedReqsExist .Wait ()
291+ 					}
292+ 					close (waitCh )
293+ 				}()
294+ 
295+ 				select  {
296+ 				case  <- waitCh :
297+ 					// Queued requests are now available, continue with sending. 
298+ 				case  <- recvDoneCh :
299+ 					// Receiving is done. Ensure that the goroutine waiting on 
300+ 					// the condition variable exits. 
301+ 					s .queuedReqsExist .Signal ()
302+ 					<- waitCh 
303+ 					return 
304+ 				}
305+ 
306+ 				// Iterate and consume the list of queued requests. 
307+ 				s .mu .Lock ()
308+ 				for  s .queuedReqs .Len () >  0  {
309+ 					elem  :=  s .queuedReqs .Front ()
310+ 					req  :=  elem .Value .(request )
311+ 					state  :=  s .resourceTypeState [req .typ ]
312+ 					if  err  :=  s .sendMessageLocked (stream , req .resourceNames , req .typ .TypeURL , state .version , state .nonce , nil ); err  !=  nil  {
313+ 						s .logger .Warningf ("Failed to send queued request for resources %q of type %q: %v" , req .resourceNames , req .typ .TypeName , err )
314+ 						s .mu .Unlock ()
315+ 						return 
316+ 					}
317+ 					s .queuedReqs .Remove (elem )
318+ 					s .startWatchTimersLocked (req .typ , req .resourceNames )
319+ 				}
320+ 				s .mu .Unlock ()
321+ 			}
322+ 		}()
254323
255324		// Backoff state is reset upon successful receipt of at least one 
256325		// message from the server. 
326+ 		err  =  nil 
257327		if  s .recv (stream ) {
258- 			return  backoff .ErrResetBackoff 
328+ 			err   =  backoff .ErrResetBackoff 
259329		}
260- 		return  nil 
261- 	}
262- 	backoff .RunF (ctx , runStreamWithBackoff , s .backoff )
263- }
264- 
265- // send is a long running goroutine that handles sending discovery requests for 
266- // two scenarios: 
267- // - a new subscription or unsubscription request is received 
268- // - a new stream is created after the previous one failed 
269- func  (s  * adsStreamImpl ) send (ctx  context.Context ) {
270- 	// Stores the most recent stream instance received on streamCh. 
271- 	var  stream  clients.Stream 
272- 	for  {
273- 		select  {
274- 		case  <- ctx .Done ():
275- 			return 
276- 		case  stream  =  <- s .streamCh :
277- 			if  err  :=  s .sendExisting (stream ); err  !=  nil  {
278- 				// Send failed, clear the current stream. Attempt to resend will 
279- 				// only be made after a new stream is created. 
280- 				stream  =  nil 
281- 				continue 
282- 			}
283- 		case  r , ok  :=  <- s .requestCh .Get ():
284- 			if  ! ok  {
285- 				return 
286- 			}
287- 			s .requestCh .Load ()
330+ 		close (recvDoneCh )
288331
289- 			req  :=  r .(request )
290- 			if  err  :=  s .sendNew (stream , req .typ , req .resourceNames ); err  !=  nil  {
291- 				stream  =  nil 
292- 				continue 
293- 			}
294- 		}
295- 	}
296- }
297- 
298- // sendNew attempts to send a discovery request based on a new subscription or 
299- // unsubscription. This method also starts the watch expiry timer for resources 
300- // that were sent in the request for the first time, i.e. their watch state is 
301- // `watchStateStarted`. 
302- func  (s  * adsStreamImpl ) sendNew (stream  clients.Stream , typ  ResourceType , names  []string ) error  {
303- 	s .mu .Lock ()
304- 	defer  s .mu .Unlock ()
305- 
306- 	// If there's no stream yet, skip the request. This request will be resent 
307- 	// when a new stream is created. If no stream is created, the watcher will 
308- 	// timeout (same as server not sending response back). 
309- 	if  stream  ==  nil  {
310- 		return  nil 
311- 	}
312- 
313- 	state  :=  s .resourceTypeState [typ ]
314- 	if  err  :=  s .sendMessageLocked (stream , names , typ .TypeURL , state .version , state .nonce , nil ); err  !=  nil  {
332+ 		<- sendDoneCh 
315333		return  err 
316334	}
317- 	s .startWatchTimersLocked (typ , names )
318- 	return  nil 
335+ 	backoff .RunF (ctx , runStreamWithBackoff , s .backoff )
319336}
320337
321- // sendExisting sends out discovery requests for existing resources when 
322- // recovering from a broken stream. 
338+ // sendExistingLocked sends out discovery requests for existing resources when 
339+ // recovering from a broken stream. The stream argument is guaranteed to be 
340+ // non-nil. 
323341// 
324- // The stream argument is guaranteed to be non-nil. 
325- func  (s  * adsStreamImpl ) sendExisting (stream  clients.Stream ) error  {
326- 	s .mu .Lock ()
327- 	defer  s .mu .Unlock ()
328- 
342+ // Caller needs to hold c.mu. 
343+ func  (s  * adsStreamImpl ) sendExistingLocked (stream  clients.Stream ) error  {
329344	// Clear any queued requests. Previously subscribed to resources will be 
330345	// resent below. 
331- 	s .requestCh . Reset ()
346+ 	s .queuedReqs . Init ()
332347
333348	for  typ , state  :=  range  s .resourceTypeState  {
334349		// Reset only the nonces map when the stream restarts. 
0 commit comments