Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Changed
- Streaming mode now prefetches partitions in parallel using multiple threads, dramatically improving iteration performance (4-5x faster). This uses slightly more memory than the previous single-threaded approach, but is now comparable in speed to non-streaming mode. Thread count is automatically calculated based on `max_threads_per_query` and `thread_scale_factor` settings.

## [1.5.0] - 2025-10-14
### Added
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
rb_snowflake_client (1.4.0)
rb_snowflake_client (1.5.0)
bigdecimal (>= 3.0)
concurrent-ruby (>= 1.2)
connection_pool (>= 2.4)
Expand Down
2 changes: 1 addition & 1 deletion lib/ruby_snowflake/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def retrieve_result_set(query_start_time, query, response, streaming, query_time
retrieve_proc = ->(index) { retrieve_partition_data(statement_handle, index) }

if streaming
StreamingResultStrategy.result(json_body, retrieve_proc)
StreamingResultStrategy.result(json_body, retrieve_proc, prefetch_threads: num_threads)
elsif num_threads == 1
SingleThreadInMemoryStrategy.result(json_body, retrieve_proc)
else
Expand Down
5 changes: 3 additions & 2 deletions lib/ruby_snowflake/client/streaming_result_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
module RubySnowflake
class Client
class StreamingResultStrategy
def self.result(statement_json_body, retreive_proc)
def self.result(statement_json_body, retreive_proc, prefetch_threads: 1)
partitions = statement_json_body["resultSetMetaData"]["partitionInfo"]

result = StreamingResult.new(
partitions.size,
statement_json_body["resultSetMetaData"]["rowType"],
retreive_proc
retreive_proc,
prefetch_threads: prefetch_threads
)
result[0] = statement_json_body["data"]

Expand Down
54 changes: 33 additions & 21 deletions lib/ruby_snowflake/streaming_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,52 @@

module RubySnowflake
class StreamingResult < Result
def initialize(partition_count, row_type_data, retreive_proc)
def initialize(partition_count, row_type_data, retreive_proc, prefetch_threads: 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to raise an error if you tried to pass in a non positive number for prefetch_threads (I could reasonably see someone trying 0) which I think would cause us to never fetch data

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Added an ArgumentError in initialize if prefetch_threads isn't a positive integer — covers 0, negatives, and non-integer values. Added specs for it too.

raise ArgumentError, "prefetch_threads must be a positive integer, got: #{prefetch_threads}" unless prefetch_threads.is_a?(Integer) && prefetch_threads > 0

super(partition_count, row_type_data)
@retreive_proc = retreive_proc
@prefetch_threads = prefetch_threads
end

def each
return to_enum(:each) unless block_given?

thread_pool = Concurrent::FixedThreadPool.new 1
thread_pool = Concurrent::FixedThreadPool.new(@prefetch_threads)

begin
data.each_with_index do |_partition, index|
@prefetch_threads.times do |offset|
next_index = index + offset + 1
break if next_index >= data.size

data.each_with_index do |_partition, index|
next_index = [index+1, data.size-1].min
if data[next_index].nil? # prefetch
data[next_index] = Concurrent::Future.execute(executor: thread_pool) do
@retreive_proc.call(next_index)
if data[next_index].nil?
data[next_index] = Concurrent::Future.execute(executor: thread_pool) do
@retreive_proc.call(next_index)
end
end
end
end

if data[index].is_a? Concurrent::Future
data[index] = data[index].value # wait for it to finish
end
if data[index].is_a? Concurrent::Future
data[index] = data[index].value # wait for it to finish
end

data[index].each do |row|
yield wrap_row(row)
end
data[index].each do |row|
yield wrap_row(row)
end

# After iterating over the current partition, clear the data to release memory
data[index].clear
# After iterating over the current partition, clear the data to release memory
data[index].clear

# Reassign to a symbol so:
# - When looking at the list of partitions in `data` it is easier to detect
# - Will raise an exception if `data.each` is attempted to be called again
# - It won't trigger prefetch detection as `next_index`
data[index] = :finished
# Reassign to a symbol so:
# - When looking at the list of partitions in `data` it is easier to detect
# - Will raise an exception if `data.each` is attempted to be called again
# - It won't trigger prefetch detection as `next_index`
data[index] = :finished
end
ensure
thread_pool.shutdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh, this is a good catch - we should have been doing that already

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

thread_pool.wait_for_termination(5)
end
end

Expand Down
193 changes: 193 additions & 0 deletions spec/ruby_snowflake/streaming_result_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe RubySnowflake::StreamingResult do
let(:partition_count) { 5 }
let(:row_type_data) { [{ "name" => "id", "type" => "fixed" }, { "name" => "value", "type" => "text" }] }
let(:partitions_data) do
[
[[1, "first"], [2, "second"]],
[[3, "third"], [4, "fourth"]],
[[5, "fifth"], [6, "sixth"]],
[[7, "seventh"], [8, "eighth"]],
[[9, "ninth"], [10, "tenth"]]
]
end
let(:retrieve_proc) { ->(index) { partitions_data[index] } }

describe '#initialize' do
context 'with default prefetch_threads' do
subject { described_class.new(partition_count, row_type_data, retrieve_proc) }

it 'initializes with 1 prefetch thread by default' do
expect(subject.instance_variable_get(:@prefetch_threads)).to eq(1)
end
end

context 'with custom prefetch_threads' do
subject { described_class.new(partition_count, row_type_data, retrieve_proc, prefetch_threads: 4) }

it 'initializes with specified prefetch threads' do
expect(subject.instance_variable_get(:@prefetch_threads)).to eq(4)
end
end

context 'with invalid prefetch_threads' do
it 'raises ArgumentError for zero' do
expect { described_class.new(partition_count, row_type_data, retrieve_proc, prefetch_threads: 0) }
.to raise_error(ArgumentError, /prefetch_threads must be a positive integer/)
end

it 'raises ArgumentError for negative values' do
expect { described_class.new(partition_count, row_type_data, retrieve_proc, prefetch_threads: -1) }
.to raise_error(ArgumentError, /prefetch_threads must be a positive integer/)
end
end
end

describe '#each' do
subject { described_class.new(partition_count, row_type_data, retrieve_proc, prefetch_threads: prefetch_threads) }

before do
# Populate first partition (as done in StreamingResultStrategy)
subject[0] = partitions_data[0]
end

context 'with single thread (backward compatible behavior)' do
let(:prefetch_threads) { 1 }

it 'iterates through all rows correctly' do
rows = []
subject.each { |row| rows << [row["id"], row["value"]] }

expect(rows).to eq([
[1, "first"], [2, "second"],
[3, "third"], [4, "fourth"],
[5, "fifth"], [6, "sixth"],
[7, "seventh"], [8, "eighth"],
[9, "ninth"], [10, "tenth"]
])
end

it 'clears processed partitions to save memory' do
rows = []
subject.each { |row| rows << row }

# Check that partitions were cleared (marked as :finished)
expect(subject.instance_variable_get(:@data)[0]).to eq(:finished)
expect(subject.instance_variable_get(:@data)[1]).to eq(:finished)
end

it 'calls retrieve_proc for each partition' do
call_count = 0
instrumented_proc = lambda do |index|
call_count += 1
partitions_data[index]
end

result = described_class.new(partition_count, row_type_data, instrumented_proc, prefetch_threads: 1)
result[0] = partitions_data[0]

result.each { |row| row }

# Should call for partitions 1-4 (partition 0 was pre-populated)
expect(call_count).to eq(4)
end
end

context 'with multiple threads' do
let(:prefetch_threads) { 3 }

it 'iterates through all rows correctly' do
rows = []
subject.each { |row| rows << [row["id"], row["value"]] }

expect(rows).to eq([
[1, "first"], [2, "second"],
[3, "third"], [4, "fourth"],
[5, "fifth"], [6, "sixth"],
[7, "seventh"], [8, "eighth"],
[9, "ninth"], [10, "tenth"]
])
end

it 'prefetches multiple partitions in parallel' do
# Track concurrent fetches
concurrent_fetches = []
mutex = Mutex.new
instrumented_proc = lambda do |index|
mutex.synchronize { concurrent_fetches << index }
sleep 0.01 # Simulate network latency
partitions_data[index]
end

result = described_class.new(partition_count, row_type_data, instrumented_proc, prefetch_threads: 3)
result[0] = partitions_data[0]

result.each { |row| row }

# With 3 threads, should prefetch indices 1, 2, 3 before processing them
expect(concurrent_fetches).to include(1, 2, 3)
end

it 'properly shuts down thread pool' do
thread_pool = nil
allow(Concurrent::FixedThreadPool).to receive(:new).and_wrap_original do |method, *args|
thread_pool = method.call(*args)
thread_pool
end

subject.each { |row| row }

expect(thread_pool).to be_shutdown
end
end

context 'with more threads than partitions' do
let(:prefetch_threads) { 10 }

it 'handles gracefully without errors' do
rows = []
expect { subject.each { |row| rows << row } }.not_to raise_error

expect(rows.length).to eq(10)
end
end

context 'when returning an enumerator' do
let(:prefetch_threads) { 2 }

it 'returns an enumerator when no block given' do
enumerator = subject.each

expect(enumerator).to be_a(Enumerator)
expect(enumerator.to_a.length).to eq(10)
end
end

context 'when an exception occurs during iteration' do
let(:prefetch_threads) { 3 }

it 'properly shuts down thread pool even on exception' do
thread_pool = nil
allow(Concurrent::FixedThreadPool).to receive(:new).and_wrap_original do |method, *args|
thread_pool = method.call(*args)
thread_pool
end

# Raise exception after processing 2 rows
count = 0
expect do
subject.each do |row|
count += 1
raise StandardError, "Test error" if count == 2
end
end.to raise_error(StandardError, "Test error")

# Thread pool should still be shut down
expect(thread_pool).to be_shutdown
end
end
end
end