@@ -161,24 +161,6 @@ def exec(
161161 and self .flat_kwargs
162162 and self .num_returns == 1
163163 ):
164- # self.data = RayWrapper.materialize(self.data)
165- # self.args = [
166- # RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o
167- # for o in self.args
168- # ]
169- # self.kwargs = {
170- # k: RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o
171- # for k, o in self.kwargs.items()
172- # }
173- # obj = _REMOTE_EXEC.exec_func(
174- # RayWrapper.materialize(self.func), self.data, self.args, self.kwargs
175- # )
176- # result, length, width, ip = (
177- # obj,
178- # len(obj) if hasattr(obj, "__len__") else 0,
179- # len(obj.columns) if hasattr(obj, "columns") else 0,
180- # "",
181- # )
182164 result , length , width , ip = remote_exec_func .remote (
183165 self .func , self .data , * self .args , ** self .kwargs
184166 )
@@ -191,13 +173,6 @@ def exec(
191173 self .subscribers += 2
192174 consumers , output = self ._deconstruct ()
193175
194- # assert not any(isinstance(o, ListOrTuple) for o in output)
195- # tmp = [
196- # RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o
197- # for o in output
198- # ]
199- # list(_REMOTE_EXEC.construct(tmp))
200-
201176 # The last result is the MetaList, so adding +1 here.
202177 num_returns = sum (c .num_returns for c in consumers ) + 1
203178 results = self ._remote_exec_chain (num_returns , * output )
@@ -336,7 +311,9 @@ def _deconstruct_chain(
336311 out_extend = output .extend
337312 while True :
338313 de .unsubscribe ()
339- if not de .has_result and (out_pos := getattr (de , "out_pos" , None )):
314+ if not (has_result := de .has_result ) and (
315+ out_pos := getattr (de , "out_pos" , None )
316+ ):
340317 out_append (_Tag .REF )
341318 out_append (out_pos )
342319 output [out_pos ] = out_pos
@@ -357,7 +334,7 @@ def _deconstruct_chain(
357334 )
358335 else :
359336 out_append (data )
360- if not de . has_result :
337+ if not has_result :
361338 stack .append (de )
362339 break
363340 else :
@@ -425,28 +402,24 @@ def _deconstruct_list(
425402 """
426403 for obj in lst :
427404 if isinstance (obj , DeferredExecution ):
428- if out_pos := getattr (obj , "out_pos" , None ):
405+ if obj .has_result :
406+ obj = obj .data
407+ elif out_pos := getattr (obj , "out_pos" , None ):
429408 obj .unsubscribe ()
430- if obj .has_result :
431- if isinstance (obj .data , ListOrTuple ):
432- out_append (_Tag .LIST )
433- yield cls ._deconstruct_list (
434- obj .data , output , stack , result_consumers , out_append
435- )
436- else :
437- out_append (obj .data )
438- else :
439- out_append (_Tag .REF )
440- out_append (out_pos )
441- output [out_pos ] = out_pos
442- if obj .subscribers == 0 :
443- output [out_pos + 1 ] = 0
444- result_consumers .remove (obj )
409+ out_append (_Tag .REF )
410+ out_append (out_pos )
411+ output [out_pos ] = out_pos
412+ if obj .subscribers == 0 :
413+ output [out_pos + 1 ] = 0
414+ result_consumers .remove (obj )
415+ continue
445416 else :
446417 out_append (_Tag .CHAIN )
447418 yield cls ._deconstruct_chain (obj , output , stack , result_consumers )
448419 out_append (_Tag .END )
449- elif isinstance (obj , ListOrTuple ):
420+ continue
421+
422+ if isinstance (obj , ListOrTuple ):
450423 out_append (_Tag .LIST )
451424 yield cls ._deconstruct_list (
452425 obj , output , stack , result_consumers , out_append
@@ -517,27 +490,13 @@ class DeferredGetItem(DeferredExecution):
517490 ----------
518491 data : ObjectRefOrDeType
519492 The object to get the item from.
520- idx : int
493+ index : int
521494 The item index.
522495 """
523496
524- def __init__ (self , data : ObjectRefOrDeType , idx : int ):
525- super ().__init__ (data , self ._remote_fn (), [idx ])
526- self .index = idx
527-
528- @_inherit_docstrings (DeferredExecution .exec )
529- def exec (self ) -> Tuple [ObjectRefType , "MetaList" , int ]:
530- if self .has_result :
531- return self .data , self .meta , self .meta_offset
532-
533- if not isinstance (self .data , DeferredExecution ) or self .data .num_returns == 1 :
534- return super ().exec ()
535-
536- # If `data` is a `DeferredExecution`, that returns multiple results,
537- # it's not required to execute `_remote_fn()`. We can only execute
538- # `data` and get the result by index.
539- self ._data_exec ()
540- return self .data , self .meta , self .meta_offset
497+ def __init__ (self , data : ObjectRefOrDeType , index : int ):
498+ super ().__init__ (data , self ._remote_fn (), [index ])
499+ self .index = index
541500
542501 @property
543502 @_inherit_docstrings (DeferredExecution .has_result )
@@ -550,16 +509,18 @@ def has_result(self):
550509 and self .data .has_result
551510 and self .data .num_returns != 1
552511 ):
553- self ._data_exec ()
512+ # If `data` is a `DeferredExecution`, that returns multiple results,
513+ # it's not required to execute `_remote_fn()`. We can only execute
514+ # `data` and get the result by index.
515+ self ._set_result (
516+ self .data .data [self .index ],
517+ self .data .meta ,
518+ self .data .meta_offset [self .index ],
519+ )
554520 return True
555521
556522 return False
557523
558- def _data_exec (self ):
559- """Execute the `data` task and get the result."""
560- obj , meta , offsets = self .data .exec ()
561- self ._set_result (obj [self .index ], meta , offsets [self .index ])
562-
563524 @classmethod
564525 def _remote_fn (cls ) -> ObjectRefType :
565526 """
@@ -592,7 +553,8 @@ def __init__(self, obj: Union[ray.ObjectID, ClientObjectRef, List]):
592553
593554 def materialize (self ):
594555 """Materialized the list, if required."""
595- self ._obj = RayWrapper .materialize (self ._obj )
556+ if not isinstance (self ._obj , list ):
557+ self ._obj = RayWrapper .materialize (self ._obj )
596558
597559 def __getitem__ (self , index ):
598560 """
@@ -632,14 +594,13 @@ class MetaListHook(MaterializationHook, DeferredGetItem):
632594 ----------
633595 meta : MetaList
634596 Non-materialized list to get the value from.
635- idx : int
597+ index : int
636598 The value index in the list.
637599 """
638600
639- def __init__ (self , meta : MetaList , idx : int ):
640- super ().__init__ (meta ._obj , idx )
601+ def __init__ (self , meta : MetaList , index : int ):
602+ super ().__init__ (meta ._obj , index )
641603 self .meta = meta
642- self .idx = idx
643604
644605 def pre_materialize (self ):
645606 """
@@ -650,7 +611,7 @@ def pre_materialize(self):
650611 object
651612 """
652613 obj = self .meta ._obj
653- return obj [self .idx ] if isinstance (obj , list ) else obj
614+ return obj [self .index ] if isinstance (obj , list ) else obj
654615
655616 def post_materialize (self , materialized ):
656617 """
@@ -665,7 +626,7 @@ def post_materialize(self, materialized):
665626 object
666627 """
667628 self .meta ._obj = materialized
668- return materialized [self .idx ]
629+ return materialized [self .index ]
669630
670631
671632class _Tag (Enum ): # noqa: PR01
0 commit comments