diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index be9c1f2a42..6c22389b6f 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -446,20 +446,14 @@ def end_session # # @since 2.7.0 def with_transaction(options = nil) - if timeout_ms = (options || {})[:timeout_ms] - timeout_sec = timeout_ms / 1_000.0 - deadline = Utils.monotonic_time + timeout_sec - @with_transaction_deadline = deadline - elsif default_timeout_ms = @options[:default_timeout_ms] - timeout_sec = default_timeout_ms / 1_000.0 - deadline = Utils.monotonic_time + timeout_sec - @with_transaction_deadline = deadline - elsif @client.timeout_sec - deadline = Utils.monotonic_time + @client.timeout_sec - @with_transaction_deadline = deadline - else - deadline = Utils.monotonic_time + 120 - end + @with_transaction_deadline = calculate_with_transaction_deadline(options) + deadline = if @with_transaction_deadline + # CSOT enabled, so we have a customer defined deadline. + @with_transaction_deadline + else + # CSOT not enabled, so we use the default deadline, 120 seconds. + Utils.monotonic_time + 120 + end transaction_in_progress = false loop do commit_options = {} @@ -478,7 +472,7 @@ def with_transaction(options = nil) transaction_in_progress = false end - if Utils.monotonic_time >= deadline + if deadline_expired?(deadline) transaction_in_progress = false raise end @@ -500,7 +494,7 @@ def with_transaction(options = nil) return rv rescue Mongo::Error => e if e.label?('UnknownTransactionCommitResult') - if Utils.monotonic_time >= deadline || + if deadline_expired?(deadline) || e.is_a?(Error::OperationFailure::Family) && e.max_time_ms_expired? then transaction_in_progress = false @@ -1191,6 +1185,8 @@ def txn_num # @api private attr_accessor :snapshot_timestamp + # @return [ Integer | nil ] The deadline for the current transaction, if any. + # @api private attr_reader :with_transaction_deadline private @@ -1286,5 +1282,30 @@ def operation_timeouts(opts) end end end + + def calculate_with_transaction_deadline(opts) + calc = -> (timeout) { + if timeout == 0 + 0 + else + Utils.monotonic_time + (timeout / 1000.0) + end + } + if timeout_ms = opts&.dig(:timeout_ms) + calc.call(timeout_ms) + elsif default_timeout_ms = @options[:default_timeout_ms] + calc.call(default_timeout_ms) + elsif @client.timeout_ms + calc.call(@client.timeout_ms) + end + end + + def deadline_expired?(deadline) + if deadline.zero? + false + else + Utils.monotonic_time >= deadline + end + end end end diff --git a/spec/mongo/session_transaction_spec.rb b/spec/mongo/session_transaction_spec.rb index 37fcc92f08..53e7e68a85 100644 --- a/spec/mongo/session_transaction_spec.rb +++ b/spec/mongo/session_transaction_spec.rb @@ -10,8 +10,18 @@ class SessionTransactionSpecError < StandardError; end min_server_fcv '4.0' require_topology :replica_set, :sharded + let(:subscriber) do + Mrss::EventSubscriber.new(name: 'SessionTransactionSpec') + end + + let(:client) do + authorized_client.tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + let(:session) do - authorized_client.start_session(session_options) + client.start_session(session_options) end let(:session_options) do @@ -218,5 +228,54 @@ class SessionTransactionSpecError < StandardError; end expect(collection.find(timeout_around_with_tx: 2).first).not_to be nil end end + + context 'csot' do + context 'when csot is enabled' do + context 'when timeout_ms is set to zero' do + it 'sets with_transaction_deadline to infinite' do + session.with_transaction(timeout_ms: 0) do + expect(session.with_transaction_deadline).to be_zero + end + end + + it 'does not sent maxTimeMS' do + session.with_transaction(timeout_ms: 0) do + collection.insert_one({ a: 1 }, session: session) + end + event = subscriber.single_command_started_event('insert', database_name: collection.database.name) + expect(event.command['maxTimeMS']).to be_nil + end + end + + context 'when timeout_ms is set to a positive value' do + before do + allow(Mongo::Utils).to receive(:monotonic_time).and_return(0) + end + + it 'sets with_transaction_deadline to the specified value' do + session.with_transaction(timeout_ms: 1000) do + expect(session.with_transaction_deadline).to be_within(0.1).of(1000 / 1000.0) + end + end + + it 'sends maxTimeMS with the operation' do + session.with_transaction(timeout_ms: 1_000) do + collection.insert_one({ a: 1 }, session: session) + end + event = subscriber.single_command_started_event('insert', database_name: collection.database.name) + expect(event.command['maxTimeMS']).not_to be_nil + expect(event.command['maxTimeMS']).to be <= 1_000 + end + end + end + + context 'when csot is disabled' do + it 'does not set with_transaction_deadline' do + session.with_transaction do + expect(session.with_transaction_deadline).to be_nil + end + end + end + end end end