Skip to content

Commit 0afb495

Browse files
committed
out_s3: Use arrow or parquet correctly as compression methods
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 139815d commit 0afb495

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

plugins/out_s3/s3.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -652,9 +652,11 @@ static int cb_s3_init(struct flb_output_instance *ins,
652652
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
653653
return -1;
654654
}
655-
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) {
655+
if (ctx->use_put_object == FLB_FALSE &&
656+
(ctx->compression == FLB_AWS_COMPRESS_ARROW ||
657+
ctx->compression == FLB_AWS_COMPRESS_PARQUET)) {
656658
flb_plg_error(ctx->ins,
657-
"use_put_object must be enabled when Apache Arrow is enabled");
659+
"use_put_object must be enabled when Apache Arrow or Parquet is enabled");
658660
return -1;
659661
}
660662
ctx->compression = ret;
@@ -679,7 +681,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
679681
flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes");
680682
return -1;
681683
}
682-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
684+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
683685
if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
684686
flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
685687
return -1;
@@ -1003,7 +1005,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10031005
file_first_log_time = chunk->first_log_time;
10041006
}
10051007

1006-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1008+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
10071009
/* Map payload */
10081010
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
10091011
if (ret == -1) {
@@ -1046,7 +1048,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10461048
goto multipart;
10471049
}
10481050
else {
1049-
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1051+
if (ctx->use_put_object == FLB_FALSE &&
1052+
(ctx->compression == FLB_AWS_COMPRESS_ARROW ||
1053+
ctx->compression == FLB_AWS_COMPRESS_PARQUET)) {
10501054
flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, "
10511055
"the chunk was too small, using PutObject to upload", preCompress_size, body_size);
10521056
}
@@ -1068,7 +1072,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10681072
* remove chunk from buffer list
10691073
*/
10701074
ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
1071-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1075+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
10721076
flb_free(payload_buf);
10731077
}
10741078
if (ret < 0) {
@@ -1095,7 +1099,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
10951099
if (chunk) {
10961100
s3_store_file_unlock(chunk);
10971101
}
1098-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1102+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
10991103
flb_free(payload_buf);
11001104
}
11011105
return FLB_RETRY;
@@ -1109,7 +1113,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
11091113
if (chunk) {
11101114
s3_store_file_unlock(chunk);
11111115
}
1112-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1116+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
11131117
flb_free(payload_buf);
11141118
}
11151119
return FLB_RETRY;
@@ -1119,7 +1123,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
11191123

11201124
ret = upload_part(ctx, m_upload, body, body_size);
11211125
if (ret < 0) {
1122-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1126+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
11231127
flb_free(payload_buf);
11241128
}
11251129
m_upload->upload_errors += 1;
@@ -1136,7 +1140,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
11361140
s3_store_file_delete(ctx, chunk);
11371141
chunk = NULL;
11381142
}
1139-
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
1143+
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
11401144
flb_free(payload_buf);
11411145
}
11421146
if (m_upload->bytes >= ctx->file_size) {
@@ -2371,8 +2375,8 @@ static struct flb_config_map config_map[] = {
23712375
{
23722376
FLB_CONFIG_MAP_STR, "compression", NULL,
23732377
0, FLB_FALSE, 0,
2374-
"Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. "
2375-
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
2378+
"Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. "
2379+
"'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. "
23762380
"Defaults to no compression. "
23772381
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
23782382
},

0 commit comments

Comments
 (0)