Skip to content

Commit c51a9e4

Browse files
runderwoedsiper
authored andcommitted
out_kinesis_firehose: introduce port parameter
This mirrors the same for Kinesis Streams in #9317 Signed-off-by: Ryan Underwood <[email protected]>
1 parent d240744 commit c51a9e4

File tree

4 files changed

+37
-13
lines changed

4 files changed

+37
-13
lines changed

plugins/out_kinesis_firehose/firehose.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,30 @@ static int cb_firehose_init(struct flb_output_instance *ins,
120120
ctx->sts_endpoint = (char *) tmp;
121121
}
122122

123+
/*
124+
* Sets the port number for the Kinesis output plugin.
125+
*
126+
* This function uses the port number already set in the output instance's host structure.
127+
* If the port is not set (0), the default HTTPS port is used.
128+
*
129+
* @param ins The output instance.
130+
* @param ctx The Kinesis output plugin context.
131+
*/
132+
flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port);
133+
134+
if (ins->host.port == 0) {
135+
ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT;
136+
flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port);
137+
}
138+
else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) {
139+
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
140+
}
141+
else {
142+
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
143+
ins->host.port, 1, UINT16_MAX);
144+
goto error;
145+
}
146+
123147
tmp = flb_output_get_property("compression", ins);
124148
if (tmp) {
125149
ret = flb_aws_compression_get_type(tmp);
@@ -259,14 +283,14 @@ static int cb_firehose_init(struct flb_output_instance *ins,
259283
ctx->firehose_client->region = (char *) ctx->region;
260284
ctx->firehose_client->retry_requests = ctx->retry_requests;
261285
ctx->firehose_client->service = "firehose";
262-
ctx->firehose_client->port = 443;
286+
ctx->firehose_client->port = ctx->port;
263287
ctx->firehose_client->flags = 0;
264288
ctx->firehose_client->proxy = NULL;
265289
ctx->firehose_client->static_headers = &content_type_header;
266290
ctx->firehose_client->static_headers_len = 1;
267291

268292
struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint,
269-
443, FLB_IO_TLS,
293+
ctx->port, FLB_IO_TLS,
270294
ctx->client_tls);
271295
if (!upstream) {
272296
flb_plg_error(ctx->ins, "Connection initialization error");

plugins/out_kinesis_firehose/firehose.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3131

32+
#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443
33+
3234
/* buffers used for each flush */
3335
struct flush {
3436
/* temporary buffer for storing the serialized event messages */
@@ -87,6 +89,7 @@ struct flb_firehose {
8789
const char *log_key;
8890
const char *external_id;
8991
char *sts_endpoint;
92+
uint16_t port;
9093
char *profile;
9194
int custom_endpoint;
9295
int retry_requests;

plugins/out_kinesis_streams/kinesis.c

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,18 +127,17 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
127127
* @param ctx The Kinesis output plugin context.
128128
*/
129129
flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port);
130-
131-
if (ins->host.port >= FLB_KINESIS_MIN_PORT && ins->host.port <= FLB_KINESIS_MAX_PORT) {
132-
ctx->port = ins->host.port;
133-
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
134-
}
135-
else if (ins->host.port == 0) {
130+
131+
if (ins->host.port == 0) {
136132
ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT;
137133
flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port);
138134
}
135+
else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) {
136+
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
137+
}
139138
else {
140-
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
141-
ins->host.port, FLB_KINESIS_MIN_PORT, FLB_KINESIS_MAX_PORT);
139+
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
140+
ins->host.port, 1, UINT16_MAX);
142141
goto error;
143142
}
144143

plugins/out_kinesis_streams/kinesis.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"
3131

3232
#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443
33-
#define FLB_KINESIS_MIN_PORT 1
34-
#define FLB_KINESIS_MAX_PORT 65535
3533

3634
/* buffers used for each flush */
3735
struct flush {
@@ -96,7 +94,7 @@ struct flb_kinesis {
9694
int retry_requests;
9795
char *sts_endpoint;
9896
int custom_endpoint;
99-
int port;
97+
uint16_t port;
10098
char *profile;
10199

102100
/* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */

0 commit comments

Comments
 (0)