Skip to content

Commit 25e132d

Browse files
committed
fix resize processor
1 parent 9dcf52e commit 25e132d

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

src/query/pipeline/core/src/processors/resize_processor.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ impl Processor for ResizeProcessor {
129129
self.outputs[output_index]
130130
.port
131131
.push_data(self.inputs[input_index].port.pull_data().unwrap());
132-
self.inputs[input_index].status = PortStatus::Idle;
133132
self.outputs[output_index].status = PortStatus::Idle;
134133

135134
if self.inputs[input_index].port.is_finished() {
@@ -141,7 +140,12 @@ impl Processor for ResizeProcessor {
141140
continue;
142141
}
143142

144-
self.inputs[input_index].port.set_need_data();
143+
if !self.inputs[input_index].port.has_data() {
144+
self.inputs[input_index].status = PortStatus::Idle;
145+
self.inputs[input_index].port.set_need_data();
146+
} else {
147+
self.waiting_inputs.push_back(input_index);
148+
}
145149
}
146150

147151
if self.finished_outputs == self.outputs.len() {

0 commit comments

Comments
 (0)