diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index d91c6f00a8a..a10d60e24dd 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -120,6 +120,30 @@ static int cb_firehose_init(struct flb_output_instance *ins, ctx->sts_endpoint = (char *) tmp; } + /* + * Sets the port number for the Kinesis output plugin. + * + * This function uses the port number already set in the output instance's host structure. + * If the port is not set (0), the default HTTPS port is used. + * + * @param ins The output instance. + * @param ctx The Kinesis output plugin context. + */ + flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port); + + if (ins->host.port == 0) { + ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT; + flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port); + } + else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) { + flb_plg_debug(ins, "Setting port to: %d", ctx->port); + } + else { + flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d", + ins->host.port, 1, UINT16_MAX); + goto error; + } + tmp = flb_output_get_property("compression", ins); if (tmp) { ret = flb_aws_compression_get_type(tmp); @@ -259,14 +283,14 @@ static int cb_firehose_init(struct flb_output_instance *ins, ctx->firehose_client->region = (char *) ctx->region; ctx->firehose_client->retry_requests = ctx->retry_requests; ctx->firehose_client->service = "firehose"; - ctx->firehose_client->port = 443; + ctx->firehose_client->port = ctx->port; ctx->firehose_client->flags = 0; ctx->firehose_client->proxy = NULL; ctx->firehose_client->static_headers = &content_type_header; ctx->firehose_client->static_headers_len = 1; struct flb_upstream *upstream = flb_upstream_create(config, ctx->endpoint, - 443, FLB_IO_TLS, + ctx->port, FLB_IO_TLS, ctx->client_tls); if (!upstream) { flb_plg_error(ctx->ins, "Connection initialization error"); diff --git a/plugins/out_kinesis_firehose/firehose.h b/plugins/out_kinesis_firehose/firehose.h index 70a4706a878..6d27bf783be 100644 --- a/plugins/out_kinesis_firehose/firehose.h +++ b/plugins/out_kinesis_firehose/firehose.h @@ -29,6 +29,8 @@ #define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" +#define FLB_KINESIS_DEFAULT_HTTPS_PORT 443 + /* buffers used for each flush */ struct flush { /* temporary buffer for storing the serialized event messages */ @@ -87,6 +89,7 @@ struct flb_firehose { const char *log_key; const char *external_id; char *sts_endpoint; + uint16_t port; char *profile; int custom_endpoint; int retry_requests; diff --git a/plugins/out_kinesis_streams/kinesis.c b/plugins/out_kinesis_streams/kinesis.c index a225f6007f7..9a0e5d982f3 100644 --- a/plugins/out_kinesis_streams/kinesis.c +++ b/plugins/out_kinesis_streams/kinesis.c @@ -127,18 +127,17 @@ static int cb_kinesis_init(struct flb_output_instance *ins, * @param ctx The Kinesis output plugin context. */ flb_plg_debug(ins, "Retrieved port from ins->host.port: %d", ins->host.port); - - if (ins->host.port >= FLB_KINESIS_MIN_PORT && ins->host.port <= FLB_KINESIS_MAX_PORT) { - ctx->port = ins->host.port; - flb_plg_debug(ins, "Setting port to: %d", ctx->port); - } - else if (ins->host.port == 0) { + + if (ins->host.port == 0) { ctx->port = FLB_KINESIS_DEFAULT_HTTPS_PORT; flb_plg_debug(ins, "Port not set. Using default HTTPS port: %d", ctx->port); } + else if (ins->host.port == (ctx->port = (uint16_t)ins->host.port)) { + flb_plg_debug(ins, "Setting port to: %d", ctx->port); + } else { - flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d", - ins->host.port, FLB_KINESIS_MIN_PORT, FLB_KINESIS_MAX_PORT); + flb_plg_error(ins, "Invalid port number: %d. Must be between %d and %d", + ins->host.port, 1, UINT16_MAX); goto error; } diff --git a/plugins/out_kinesis_streams/kinesis.h b/plugins/out_kinesis_streams/kinesis.h index 57a72166617..a731d35841a 100644 --- a/plugins/out_kinesis_streams/kinesis.h +++ b/plugins/out_kinesis_streams/kinesis.h @@ -30,8 +30,6 @@ #define DEFAULT_TIME_KEY_FORMAT "%Y-%m-%dT%H:%M:%S" #define FLB_KINESIS_DEFAULT_HTTPS_PORT 443 -#define FLB_KINESIS_MIN_PORT 1 -#define FLB_KINESIS_MAX_PORT 65535 /* buffers used for each flush */ struct flush { @@ -96,7 +94,7 @@ struct flb_kinesis { int retry_requests; char *sts_endpoint; int custom_endpoint; - int port; + uint16_t port; char *profile; /* in this plugin the 'random' partition key is a uuid + fluent tag + timestamp */