diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index a5b79f8cb27..95529db5d64 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -52,6 +52,7 @@ jobs: - "-DFLB_SANITIZE_THREAD=On" - "-DFLB_SIMD=On" - "-DFLB_SIMD=Off" + - "-DFLB_ARROW=On" cmake_version: - "3.31.6" compiler: @@ -66,6 +67,10 @@ jobs: compiler: cc: clang cxx: clang++ + - flb_option: "-DFLB_ARROW=On" + compiler: + cc: clang + cxx: clang++ permissions: contents: read steps: @@ -86,6 +91,15 @@ jobs: with: repository: calyptia/fluent-bit-ci path: ci + - name: Setup Apache Arrow libraries for parquet (-DFLB_ARROW=On Only) + if: matrix.flb_option == '-DFLB_ARROW=On' + run: | + sudo apt-get update + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://packages.apache.org/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-glib-dev libparquet-glib-dev - name: ${{ matrix.compiler.cc }} & ${{ matrix.compiler.cxx }} - ${{ matrix.flb_option }} run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d9e7c879c9..c4d53f99460 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1232,6 +1232,14 @@ else() set(FLB_ARROW OFF) endif() +# Additional prerequisites for Apache Parquet +pkg_check_modules(ARROW_GLIB_PARQUET QUIET parquet-glib) +if(FLB_ARROW AND ARROW_GLIB_PARQUET_FOUND) + FLB_DEFINITION(FLB_HAVE_ARROW_PARQUET) +else() + message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression") +endif() + # EBPF Support # ============ if (FLB_IN_EBPF) diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index e1cf9222377..d9e929c6669 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -21,9 +21,10 @@ #define FLB_AWS_COMPRESS #include -#define FLB_AWS_COMPRESS_NONE 0 -#define FLB_AWS_COMPRESS_GZIP 1 -#define FLB_AWS_COMPRESS_ARROW 2 +#define FLB_AWS_COMPRESS_NONE 0 +#define FLB_AWS_COMPRESS_GZIP 1 +#define FLB_AWS_COMPRESS_ARROW 2 +#define FLB_AWS_COMPRESS_PARQUET 3 /* * Get compression type from compression keyword. The return value is used to identify diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index af3c9c37bea..d9d25f187b1 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -703,9 +703,11 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } - if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) { + if (ctx->use_put_object == FLB_FALSE && + (ret == FLB_AWS_COMPRESS_ARROW || + ret == FLB_AWS_COMPRESS_PARQUET)) { flb_plg_error(ctx->ins, - "use_put_object must be enabled when Apache Arrow is enabled"); + "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); return -1; } ctx->compression = ret; @@ -730,7 +732,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes"); return -1; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); return -1; @@ -1125,13 +1127,18 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, file_first_log_time = chunk->first_log_time; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); + if (chunk != NULL) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + } return FLB_RETRY; - } else { + } + else { preCompress_size = body_size; body = (void *) payload_buf; body_size = payload_size; @@ -1190,7 +1197,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, * remove chunk from buffer list */ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } if (ret < 0) { @@ -1217,7 +1224,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } return FLB_RETRY; @@ -1231,7 +1238,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } return FLB_RETRY; @@ -1241,7 +1248,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = upload_part(ctx, m_upload, body, body_size, NULL); if (ret < 0) { - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } m_upload->upload_errors += 1; @@ -1258,7 +1265,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, s3_store_file_delete(ctx, chunk); chunk = NULL; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } if (m_upload->bytes >= ctx->file_size) { @@ -3991,8 +3998,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " - "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. " + "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." }, diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt index 846f654412d..c80a09df3ec 100644 --- a/src/aws/compression/arrow/CMakeLists.txt +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -5,3 +5,11 @@ add_library(flb-aws-arrow STATIC ${src}) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) +if (ARROW_GLIB_PARQUET_FOUND) +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS}) +endif() + +if(FLB_JEMALLOC) + target_link_libraries(flb-aws-arrow ${JEMALLOC_LIBRARIES}) +endif() diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c index a48b34f8096..000d629b553 100644 --- a/src/aws/compression/arrow/compress.c +++ b/src/aws/compression/arrow/compress.c @@ -8,6 +8,11 @@ */ #include +#ifdef FLB_HAVE_ARROW_PARQUET +#include +#endif +#include +#include #include /* @@ -145,3 +150,128 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s g_bytes_unref(bytes); return 0; } + +#ifdef FLB_HAVE_ARROW_PARQUET +static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GParquetArrowFileWriter *writer; + GArrowSchema *schema; + GError *error = NULL; + gboolean success; + gint64 n_rows = 0; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + schema = garrow_table_get_schema(table); + if (schema == NULL) { + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + + /* Create a new Parquet file writer */ + writer = gparquet_arrow_file_writer_new_arrow(schema, + GARROW_OUTPUT_STREAM(sink), + NULL, /* Arrow writer properties */ + &error); + g_object_unref(schema); + if (writer == NULL) { + flb_error("[aws][compress] Failed to create parquet writer: %s", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + + n_rows = garrow_table_get_n_rows(table); + + /* Write the entire table to the Parquet file buffer */ + success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error); + if (!success) { + flb_error("[aws][compress] Failed to write table to parquet buffer: %s", error->message); + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + g_object_unref(writer); + return NULL; + } + + /* Close the writer to finalize the Parquet file metadata */ + success = gparquet_arrow_file_writer_close(writer, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + g_object_unref(writer); + return NULL; + } + + g_object_unref(sink); + g_object_unref(writer); + return buffer; +} + + +int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json((uint8_t *) json, size); + if (table == NULL) { + flb_error("[aws][compress] Failed to parse JSON into Arrow Table for Parquet conversion"); + return -1; + } + + buffer = table_to_parquet_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + flb_error("[aws][compress] Failed to convert Arrow Table into Parquet buffer"); + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = flb_malloc(len); + if (buf == NULL) { + flb_errno(); + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} +#endif diff --git a/src/aws/compression/arrow/compress.h b/src/aws/compression/arrow/compress.h index 82e94f43cee..f8dcd4a4248 100644 --- a/src/aws/compression/arrow/compress.h +++ b/src/aws/compression/arrow/compress.h @@ -11,3 +11,18 @@ */ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size); + +#ifdef FLB_HAVE_ARROW_PARQUET +/* + * This function converts out_s3 buffer into Apache Parquet format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ +int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size); +#endif diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index a06d181193f..253020e392a 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -54,6 +54,13 @@ static const struct compression_option compression_options[] = { "arrow", &out_s3_compress_arrow }, +#endif +#ifdef FLB_HAVE_ARROW_PARQUET + { + FLB_AWS_COMPRESS_PARQUET, + "parquet", + &out_s3_compress_parquet + }, #endif { 0 } };