Skip to content

Commit 515b566

Browse files
out_opensearch_data_stream: allow placeholders in customize_template
Signed-off-by: Kristian Grønås <Kristian.Gronas@uib.no>
1 parent aa793d9 commit 515b566

File tree

2 files changed

+45
-4
lines changed

2 files changed

+45
-4
lines changed

lib/fluent/plugin/out_opensearch_data_stream.rb

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def configure(conf)
2222
@data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?
2323

2424
# ref. https://opensearch.org/docs/latest/opensearch/data-streams/
25-
unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
25+
unless placeholder_substitution_needed?
2626
validate_data_stream_parameters
2727
else
2828
@use_placeholder = true
@@ -67,12 +67,20 @@ def validate_data_stream_parameters
6767
end
6868
end
6969

70-
def create_index_template(datastream_name, template_name, host = nil)
70+
def placeholder_substitution_needed?
71+
need_substitution = placeholder?(:data_stream_name_placeholder, @data_stream_name) ||
72+
@customize_template&.values&.any? { |value| placeholder?(:customize_template, value.to_s) } ||
73+
placeholder?(:data_stream_template_name, @data_stream_template_name)
74+
log.debug("Needs substitution: #{need_substitution}")
75+
need_substitution
76+
end
77+
78+
def create_index_template(datastream_name, template_name, customize_template = nil, host = nil)
7179
# Create index template from file
7280
if !dry_run?
7381
if @template_file
7482
return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
75-
template_installation_actual(template_name, @customize_template, @application_name, datastream_name, host)
83+
template_installation_actual(template_name, customize_template, @application_name, datastream_name, host)
7684
else # Create default index template
7785
return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
7886
body = {
@@ -162,8 +170,11 @@ def write(chunk)
162170
end
163171
data_stream_name = extract_placeholders(@data_stream_name, chunk).downcase
164172
data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk).downcase
173+
if @customize_template
174+
customize_template = @customize_template.each_with_object({}) { |(key, value), hash| hash[key] = extract_placeholders(value, chunk) }
175+
end
165176
begin
166-
create_index_template(data_stream_name, data_stream_template_name, host)
177+
create_index_template(data_stream_name, data_stream_template_name, customize_template, host)
167178
rescue => e
168179
raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
169180
end

test/plugin/test_out_opensearch_data_stream.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,4 +743,34 @@ def test_record_with_remove_keys
743743
assert(!index_cmds[1].has_key?('remove_me'))
744744
end
745745

746+
def test_custom_data_stream_template_create_with_placeholders
747+
cwd = File.dirname(__FILE__)
748+
conf = config_element(
749+
'ROOT', '', {
750+
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
751+
'data_stream_name' => 'foo',
752+
'data_stream_name_placeholder' => 'foo',
753+
'data_stream_template_name' => '${tag}_template',
754+
'template_file' => File.join(cwd, 'datastream_template.json'),
755+
'customize_template' => '{"foo*": "${tag}--*"}',
756+
})
757+
758+
stub_default
759+
stub_bulk_feed('foo', 'test_templata')
760+
stub_nonexistent_template?('test_template')
761+
762+
stub_request(:put, "http://localhost:9200/_index_template/test_template")
763+
.to_return(:status => 200, :body => "", :headers => {})
764+
765+
driver(conf).run(default_tag: 'test') do
766+
driver.feed(sample_record)
767+
end
768+
769+
assert_requested(
770+
:put,
771+
"http://localhost:9200/_index_template/test_template",
772+
body: {"index_patterns" => ["test--*"], "data_stream" => {}}
773+
)
774+
775+
end
746776
end

0 commit comments

Comments
 (0)