diff --git a/CHANGELOG.md b/CHANGELOG.md index 41b401c..9d4f376 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.2.0 + - Add metrics support for events, operations, connections and errors produced during execution (based on exception being raised). + ## 4.1.1 - Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99 diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 71b3947..26d8720 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -183,6 +183,8 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base public def register + @metric_errors = metric.namespace(:errors) + require "rufus/scheduler" prepare_jdbc_connection @@ -247,6 +249,7 @@ def stop private def execute_query(queue) + metric.increment(:queries) # update default parameters @parameters['sql_last_value'] = @sql_last_value execute_statement(@statement, @parameters) do |row| @@ -257,11 +260,13 @@ def execute_query(queue) event = LogStash::Event.new(row) decorate(event) queue << event + metric.increment(:events) end end def update_state_file if @record_last_run + metric.increment(:state_file_updates) File.write(@last_run_metadata_path, YAML.dump(@sql_last_value)) end end @@ -279,9 +284,11 @@ def convert(column_name, value) if column_charset converter = @converters[column_charset] converter.convert(value) + metric.increment(:encoding_conversions) elsif @charset converter = @converters[@charset] converter.convert(value) + metric.increment(:encoding_conversions) else value end diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index c36a67b..41243bc 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -115,7 +115,8 @@ def jdbc_connect @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.") raise e else - @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.") + @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.") + @metric_errors.increment(:connection_retries) end rescue Sequel::Error => e if retry_attempts <= 0 @@ -123,6 +124,7 @@ def jdbc_connect raise e else @logger.error("Unable to connect to database. Trying again", :error_message => e.message) + @metric_errors.increment(:connection_retries) end end sleep(@connection_retry_attempts_wait_time) @@ -159,6 +161,8 @@ def prepare_jdbc_connection raise LogStash::ConfigurationError, "#{e}. #{message}" end @database = jdbc_connect() + metric.increment(:connections) + @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -201,8 +205,11 @@ def execute_statement(statement, parameters) parameters = symbolized_params(parameters) query = @database[statement, parameters] sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc + metric.gauge(:sql_last_value, sql_last_value) + @tracking_column_warning_sent = false @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) + metric.gauge(:rows_in, query.count) if @jdbc_paging_enabled query.each_page(@jdbc_page_size) do |paged_dataset| @@ -220,6 +227,7 @@ def execute_statement(statement, parameters) success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e @logger.warn("Exception when executing JDBC query", :exception => e) + @metric_errors.increment(:jdbc_query_errors) else @sql_last_value = sql_last_value end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index cf34da9..7d666f3 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-jdbc' - s.version = '4.1.1' + s.version = '4.2.0' s.licenses = ['Apache License (2.0)'] s.summary = "This example input streams a string at a definable interval." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index b862c06..74bae54 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -837,7 +837,7 @@ let(:logger) { double("logger") } it "should report the staments to logging" do - expect(logger).to receive(:debug).with(kind_of(String)).once + expect(logger).to receive(:debug).with(kind_of(String)).twice plugin.run(queue) end end