Improve StreamingResult iteration performance with parallel prefetching#157
Improve StreamingResult iteration performance with parallel prefetching#157douglas wants to merge 4 commits intorinsed-org:masterfrom
Conversation
…reamingResult Improves streaming result iteration performance by 4-5x through parallel partition fetching while maintaining memory efficiency. StreamingResult hardcoded a single-threaded pool, causing partitions to be fetched sequentially. For large result sets (100k+ rows), this led to slow iteration times (~25s for 300k rows) despite efficient memory usage. - Add prefetch_threads parameter to StreamingResult#initialize (default: 1) - Update #each to prefetch N partitions in parallel (N = prefetch_threads) - Automatically calculate optimal thread count using existing number_of_threads_to_use logic - Ensure proper thread pool shutdown after iteration Real-world measurements with 300k rows (64 partitions): | Threads | Iteration Time | Memory Growth | Speedup | |---------|---------------|---------------|---------| | 1 (before) | 25-27s | 37 MB | baseline | | 4 | 6-7s | 35 MB | 4x faster | | 8 | 5-6s | 24-36 MB | 4.5x faster | Total time now comparable to non-streaming approaches (7s vs 6.4s) while maintaining 50-70% memory savings. - Default behavior unchanged (prefetch_threads: 1) - All existing tests pass - No breaking changes to API - Uses same thread calculation as ThreadedInMemoryStrategy (number_of_threads_to_use) - Respects max_threads_per_query and thread_scale_factor settings - Maintains memory efficiency by still clearing processed partitions - Properly shuts down thread pool to prevent resource leaks Added comprehensive spec with 10 test cases covering: - Single thread backward compatibility - Multi-threaded prefetching - Thread pool cleanup - Edge cases (more threads than partitions) - Enumerator support
2636b15 to
fc23b78
Compare
Wraps iteration logic in begin/ensure to guarantee thread pool shutdown even when exceptions occur during iteration. Prevents resource leaks. Also adds test to verify thread pool cleanup on exception.
fc23b78 to
3f18852
Compare
|
Thanks Douglas, I'll try to take a detailed look into this in the next week or so - seems like a good idea to me! |
reidnimz
left a comment
There was a problem hiding this comment.
This is a great change! There's a couple of small items, but nothing major.
Can you also update the Unreleased section of the changelog with these changes. We should mention that now using streaming mode is going to use more memory but will be much much faster in there.
| end | ||
| ensure | ||
| # Ensure thread pool is properly shut down even if an exception occurs | ||
| thread_pool.shutdown |
There was a problem hiding this comment.
ooh, this is a good catch - we should have been doing that already
| module RubySnowflake | ||
| class StreamingResult < Result | ||
| def initialize(partition_count, row_type_data, retreive_proc) | ||
| def initialize(partition_count, row_type_data, retreive_proc, prefetch_threads: 1) |
There was a problem hiding this comment.
I think we need to raise an error if you tried to pass in a non positive number for prefetch_threads (I could reasonably see someone trying 0) which I think would cause us to never fetch data
There was a problem hiding this comment.
Good call. Added an ArgumentError in initialize if prefetch_threads isn't a positive integer — covers 0, negatives, and non-integer values. Added specs for it too.
| ensure | ||
| # Ensure thread pool is properly shut down even if an exception occurs | ||
| thread_pool.shutdown | ||
| thread_pool.wait_for_termination |
There was a problem hiding this comment.
Let's put a timeout parameter on this, or it will wait indefinitely. Since we're already handling an exception, maybe something short like 5 seconds is appropriate?
There was a problem hiding this comment.
Done! Added wait_for_termination(5) — 5 seconds should be plenty for in-flight fetches to wrap up during exception handling.
lib/ruby_snowflake/client.rb
Outdated
| if streaming | ||
| StreamingResultStrategy.result(json_body, retrieve_proc) | ||
| # Use same thread calculation logic for streaming to enable parallel prefetching | ||
| # This dramatically improves iteration performance while maintaining memory efficiency |
There was a problem hiding this comment.
Claude code has a tendency to insert comments like this that seem to it very important and relevant in the current context, but then, are sort of redundant later. If you don't mind cleaning these up unless there's something surprising or tricky the reader should keep in mind - that'll keep it matching the rest of the codebase. I think I have like 6 lines in my ~/.claude/CLAUDE.md file to try to keep it from doing this (and sometimes it even works!)
There was a problem hiding this comment.
Ha, fair enough — cleaned up the redundant comments in both client.rb and streaming_result.rb. Left the existing ones that were already there (partition clearing explanation) since those seem genuinely useful.
- Add 5-second timeout to wait_for_termination to avoid indefinite hangs - Validate prefetch_threads is a positive integer in initialize - Remove redundant AI-generated comments - Update CHANGELOG with streaming performance improvement
|
All feedback addressed in 80498ec — added the CHANGELOG entry under Unreleased as well. Thanks for the review! |
Overview
This PR dramatically improves
StreamingResultiteration performance (4-5x faster) through parallel partition prefetching while maintaining memory efficiency.The idea to use a thread pool came from a performance profiling session at work, while replacing ODBC with the REST API for large Snowflake queries.
While comparing memory usage between the two approaches (ODBC loads everything into memory vs. REST API streaming), I noticed ODBC's iteration was dramatically faster (~0.4s vs ~25s for 300k rows) despite the REST API's superior memory efficiency.
Investigating the
StreamingResultimplementation revealed it hardcoded a single-threaded pool for partition fetching, meaning partitions were fetched sequentially.Since the gem already had
ThreadedInMemoryStrategywith configurable thread pools for non-streaming queries and calculated optimal thread counts via number_of_threads_to_use, it seemed natural to apply the same parallel-fetching approach to streaming.A quick prototype showed 4-5x faster iteration while maintaining the memory benefits, achieving the best of both worlds: ODBC-like speed with streaming's efficiency.
Important: I used
Claude Codeto generate most of this PR, but I reviewed the code to ensure I'm not introducing any stupid errors or AI slop.Problem
StreamingResultcurrently hardcodes a single-threaded pool for prefetching partitions:This causes sequential partition fetching, leading to slow iteration for large result sets:
Solution
Added configurable
prefetch_threadsparameter that:number_of_threads_to_uselogicChanges
StreamingResult#initialize: Addedprefetch_threads:parameter (default: 1)StreamingResult#each: Updated to prefetch N partitions in parallelStreamingResultStrategy.result: Passes throughprefetch_threadsClient#retrieve_result_set: Uses same thread calculation as non-streamingPerformance Results
Real-world measurements with 300k rows (64 partitions):
Key Metrics
Backward Compatibility
✅ Fully backward compatible
prefetch_threads: 1maintains existing behaviorImplementation Details
Thread Pool Calculation
Uses the same logic as
ThreadedInMemoryStrategy:Memory Safety
:finished)Resource Management
thread_pool.shutdownandwait_for_terminationTesting
Added
spec/ruby_snowflake/streaming_result_spec.rbwith comprehensive coverage:✅ Single thread backward compatibility
✅ Multi-threaded prefetching behavior
✅ Partition clearing/memory management
✅ Thread pool shutdown
✅ Edge cases (more threads than partitions)
✅ Enumerator support
✅ Concurrent fetch verification
All unit tests pass (22 examples, 0 failures).
Use Cases
This improvement particularly benefits:
Configuration
Automatically configured based on partition count and existing settings:
max_threads_per_query(env:SNOWFLAKE_MAX_THREADS_PER_QUERY, default: 8)thread_scale_factor(env:SNOWFLAKE_THREAD_SCALE_FACTOR, default: 4)Formula:
min(partition_count / thread_scale_factor, max_threads_per_query)Real-World Impact
For our prescriber coverage data exports:
Result: Achieves comparable speed to ODBC while using half the memory.
Questions?
Happy to answer any questions or make adjustments!