Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions plugins/out_kinesis_firehose/firehose.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ static int cb_firehose_init(struct flb_output_instance *ins,
ctx->sts_endpoint = (char *) tmp;
}

/*
* Sets the port number for the Kinesis output plugin.
*
* This function uses the port number already set in the output instance's host structure.
* If the port is not set (0), the default HTTPS port is used.
*
* @param ins The output instance.
* @param ctx The Kinesis output plugin context.
*/
flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port);

if (ins->host.port == 0) {
ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT;
flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port);
}
else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) {
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
}
else {
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
ins->host.port, 1, UINT16_MAX);
goto error;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
ret = flb_aws_compression_get_type(tmp);
Expand Down Expand Up @@ -259,14 +283,14 @@ static int cb_firehose_init(struct flb_output_instance *ins,
ctx->firehose_client->region = (char *) ctx->region;
ctx->firehose_client->retry_requests = ctx->retry_requests;
ctx->firehose_client->service = "firehose";
ctx->firehose_client->port = 443;
ctx->firehose_client->port = ctx->port;
ctx->firehose_client->flags = 0;
ctx->firehose_client->proxy = NULL;
ctx->firehose_client->static_headers = &content_type_header;
ctx->firehose_client->static_headers_len = 1;

struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint,
443, FLB_IO_TLS,
ctx->port, FLB_IO_TLS,
ctx->client_tls);
if (!upstream) {
flb_plg_error(ctx->ins, "Connection initialization error");
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_kinesis_firehose/firehose.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"

#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443

/* buffers used for each flush */
struct flush {
/* temporary buffer for storing the serialized event messages */
Expand Down Expand Up @@ -87,6 +89,7 @@ struct flb_firehose {
const char *log_key;
const char *external_id;
char *sts_endpoint;
uint16_t port;
char *profile;
int custom_endpoint;
int retry_requests;
Expand Down
15 changes: 7 additions & 8 deletions plugins/out_kinesis_streams/kinesis.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,17 @@ static int cb_kinesis_init(struct flb_output_instance *ins,
* @param ctx The Kinesis output plugin context.
*/
flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port);

if (ins->host.port >= FLB_KINESIS_MIN_PORT && ins->host.port <= FLB_KINESIS_MAX_PORT) {
ctx->port = ins->host.port;
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
}
else if (ins->host.port == 0) {

if (ins->host.port == 0) {
ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT;
flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port);
}
else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) {
flb_plg_debug(ins, "Setting port to: %d", ctx->port);
}
else {
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
ins->host.port, FLB_KINESIS_MIN_PORT, FLB_KINESIS_MAX_PORT);
flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d",
ins->host.port, 1, UINT16_MAX);
goto error;
}

Expand Down
4 changes: 1 addition & 3 deletions plugins/out_kinesis_streams/kinesis.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
#define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S"

#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443
#define FLB_KINESIS_MIN_PORT 1
#define FLB_KINESIS_MAX_PORT 65535

/* buffers used for each flush */
struct flush {
Expand Down Expand Up @@ -96,7 +94,7 @@ struct flb_kinesis {
int retry_requests;
char *sts_endpoint;
int custom_endpoint;
int port;
uint16_t port;
char *profile;

/* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */
Expand Down
Loading