From 517bc8c4be2e4721d6904bb60da08f7ae7642ee5 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Jan 2021 14:57:54 +0100 Subject: [PATCH] Take operator doesn't dispose upstream observable If there is an active polling observable upstream it will keep polling as long as it isn't disposed. By disposing when we reach the desired amount of items, the possible upstream polling is also stopped. --- src/Operator/TakeOperator.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Operator/TakeOperator.php b/src/Operator/TakeOperator.php index f9585e96..a4b68fe7 100644 --- a/src/Operator/TakeOperator.php +++ b/src/Operator/TakeOperator.php @@ -27,12 +27,15 @@ public function __invoke(ObservableInterface $observable, ObserverInterface $obs $remaining = $this->count; $callbackObserver = new CallbackObserver( - function ($nextValue) use ($observer, &$remaining) { + function ($nextValue) use ($observer, &$remaining, &$disposable) { if ($remaining > 0) { $remaining--; $observer->onNext($nextValue); if ($remaining === 0) { $observer->onCompleted(); + if ($disposable instanceof DisposableInterface) { + $disposable->dispose(); + } } } }, @@ -40,6 +43,7 @@ function ($nextValue) use ($observer, &$remaining) { [$observer, 'onCompleted'] ); - return $observable->subscribe($callbackObserver); + $disposable = $observable->subscribe($callbackObserver); + return $disposable; } }