Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions apisix/plugins/clickhouse-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local plugin = require("apisix.plugin")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
Expand Down Expand Up @@ -71,7 +72,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -174,9 +180,12 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

Expand All @@ -201,7 +210,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
end


Expand Down
17 changes: 13 additions & 4 deletions apisix/plugins/elasticsearch-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ local core = require("apisix.core")
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")

local plugin = require("apisix.plugin")
local ngx = ngx
local str_format = core.string.format
local math_random = math.random
Expand Down Expand Up @@ -104,7 +104,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -264,17 +269,21 @@ function _M.access(conf)
end

function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = get_logger_entry(conf, ctx)

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

local process = function(entries)
return send_to_elasticsearch(conf, entries)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process, max_pending_entries)
end


Expand Down
16 changes: 13 additions & 3 deletions apisix/plugins/google-cloud-logging.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--

local core = require("apisix.core")
local plugin = require("apisix.plugin")
local tostring = tostring
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
Expand Down Expand Up @@ -110,7 +111,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -241,6 +247,9 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local oauth, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_oauth_object, conf)
if not oauth then
Expand All @@ -250,15 +259,16 @@ function _M.log(conf, ctx)

local entry = get_logger_entry(conf, ctx, oauth)

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

local process = function(entries)
return send_to_google(oauth, entries)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process, max_pending_entries)
end


Expand Down
15 changes: 12 additions & 3 deletions apisix/plugins/http-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--

local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
Expand Down Expand Up @@ -63,7 +64,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -168,13 +174,16 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)

if not entry.route_id then
entry.route_id = "no-matched"
end

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

Expand Down Expand Up @@ -216,7 +225,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
end


Expand Down
15 changes: 12 additions & 3 deletions apisix/plugins/loki-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local http = require("resty.http")
local new_tab = require("table.new")

Expand Down Expand Up @@ -115,7 +116,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -193,6 +199,9 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)

if not entry.route_id then
Expand All @@ -205,7 +214,7 @@ function _M.log(conf, ctx)
-- and then add 6 zeros by string concatenation
entry.loki_log_time = tostring(ngx.req.start_time() * 1000) .. "000000"

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

Expand Down Expand Up @@ -244,7 +253,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
end


Expand Down
15 changes: 12 additions & 3 deletions apisix/plugins/rocketmq-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.rocketmq.producer")
local acl_rpchook = require("resty.rocketmq.acl_rpchook")
Expand Down Expand Up @@ -77,7 +78,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -138,14 +144,17 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry
if conf.meta_format == "origin" then
entry = log_util.get_req_original(ctx, conf)
else
entry = log_util.get_log_entry(plugin_name, conf, ctx)
end

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

Expand Down Expand Up @@ -184,7 +193,7 @@ function _M.log(conf, ctx)
return send_rocketmq_data(conf, data, prod)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
end


Expand Down
15 changes: 12 additions & 3 deletions apisix/plugins/skywalking-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--

local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
Expand Down Expand Up @@ -64,7 +65,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
}
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
},
},
}

Expand Down Expand Up @@ -139,6 +145,9 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local log_body = log_util.get_log_entry(plugin_name, conf, ctx)
local trace_context
local sw_header = ngx.req.get_headers()["sw8"]
Expand Down Expand Up @@ -173,7 +182,7 @@ function _M.log(conf, ctx)
endpoint = ctx.var.uri,
}

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

Expand All @@ -187,7 +196,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
end


Expand Down
14 changes: 12 additions & 2 deletions apisix/plugins/splunk-hec-logging.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local ngx_now = ngx.now
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin = require("apisix.plugin")
local table_insert = core.table.insert
local table_concat = core.table.concat
local ipairs = ipairs
Expand Down Expand Up @@ -75,6 +76,11 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
},
max_pending_entries = {
type = "integer",
description = "maximum number of pending entries in the batch processor",
minimum = 1,
}
},
}
Expand Down Expand Up @@ -169,17 +175,21 @@ end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
metadata.value.max_pending_entries or nil
local entry = get_logger_entry(conf, ctx)

if batch_processor_manager:add_entry(conf, entry) then
if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

local process = function(entries)
return send_to_splunk(conf, entries)
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process, max_pending_entries)
end


Expand Down
Loading
Loading