Skip to content

out_s3: Add parquet compression type with pure C #10691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
- "-DFLB_SANITIZE_THREAD=On"
- "-DFLB_SIMD=On"
- "-DFLB_SIMD=Off"
- "-DFLB_ARROW=On"
cmake_version:
- "3.31.6"
compiler:
Expand All @@ -66,6 +67,10 @@ jobs:
compiler:
cc: clang
cxx: clang++
- flb_option: "-DFLB_ARROW=On"
compiler:
cc: clang
cxx: clang++
permissions:
contents: read
steps:
Expand All @@ -86,6 +91,15 @@ jobs:
with:
repository: calyptia/fluent-bit-ci
path: ci
- name: Setup Apache Arrow libraries for parquet (-DFLB_ARROW=On Only)
if: matrix.flb_option == '-DFLB_ARROW=On'
run: |
sudo apt-get update
sudo apt-get install -y -V ca-certificates lsb-release wget
wget https://packages.apache.org/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
sudo apt-get update
sudo apt-get install -y -V libarrow-glib-dev libparquet-glib-dev

- name: ${{ matrix.compiler.cc }} & ${{ matrix.compiler.cxx }} - ${{ matrix.flb_option }}
run: |
Expand Down
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,14 @@ else()
set(FLB_ARROW OFF)
endif()

# Additional prerequisites for Apache Parquet
pkg_check_modules(ARROW_GLIB_PARQUET QUIET parquet-glib)
if(FLB_ARROW AND ARROW_GLIB_PARQUET_FOUND)
FLB_DEFINITION(FLB_HAVE_ARROW_PARQUET)
else()
message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression for AWS module")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression for AWS module")
message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression.")
set(FLB_HAVE_ARROW_PARQUET OFF)

nits:
a. Removing "for AWS module" incase this is used for other modules too in the future",
b. Explicitly set it to OFF for consistency with other checks above.

endif()

# EBPF Support
# ============
if (FLB_IN_EBPF)
Expand Down
7 changes: 4 additions & 3 deletions include/fluent-bit/aws/flb_aws_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
#define FLB_AWS_COMPRESS

#include <sys/types.h>
#define FLB_AWS_COMPRESS_NONE 0
#define FLB_AWS_COMPRESS_GZIP 1
#define FLB_AWS_COMPRESS_ARROW 2
#define FLB_AWS_COMPRESS_NONE 0
#define FLB_AWS_COMPRESS_GZIP 1
#define FLB_AWS_COMPRESS_ARROW 2
#define FLB_AWS_COMPRESS_PARQUET 3

/*
* Get compression type from compression keyword. The return value is used to identify
Expand Down
28 changes: 16 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,11 @@ static int cb_s3_init(struct flb_output_instance *ins,
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
return -1;
}
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) {
if (ctx->use_put_object == FLB_FALSE &&
(ctx->compression == FLB_AWS_COMPRESS_ARROW ||
ctx->compression == FLB_AWS_COMPRESS_PARQUET)) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when Apache Arrow is enabled");
"use_put_object must be enabled when Apache Arrow or Parquet is enabled");
return -1;
}
ctx->compression = ret;
Expand All @@ -679,7 +681,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes");
return -1;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the Arrow compression did not work, before this?

if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) {
flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB");
return -1;
Expand Down Expand Up @@ -1003,7 +1005,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
file_first_log_time = chunk->first_log_time;
}

if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
/* Map payload */
ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size);
if (ret == -1) {
Expand Down Expand Up @@ -1046,7 +1048,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
goto multipart;
}
else {
if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->use_put_object == FLB_FALSE &&
(ctx->compression == FLB_AWS_COMPRESS_ARROW ||
ctx->compression == FLB_AWS_COMPRESS_PARQUET)) {
Comment on lines +1051 to +1053
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For arrow and parquet, this if condition will never evaluate to true right? Since we error out during initialization?

"use_put_object must be enabled when Apache Arrow or Parquet is enabled"

I think this block should stay unchanged. It seems to specifically handle the GZIP scenario where even if use_put_object is disabled, the plugin wants to use it for small gzip compressed data.

flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, "
"the chunk was too small, using PutObject to upload", preCompress_size, body_size);
}
Expand All @@ -1068,7 +1072,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
* remove chunk from buffer list
*/
ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(payload_buf);
}
if (ret < 0) {
Expand All @@ -1095,7 +1099,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (chunk) {
s3_store_file_unlock(chunk);
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(payload_buf);
}
return FLB_RETRY;
Expand All @@ -1109,7 +1113,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (chunk) {
s3_store_file_unlock(chunk);
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(payload_buf);
}
return FLB_RETRY;
Expand All @@ -1119,7 +1123,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,

ret = upload_part(ctx, m_upload, body, body_size);
if (ret < 0) {
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(payload_buf);
}
m_upload->upload_errors += 1;
Expand All @@ -1136,7 +1140,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
s3_store_file_delete(ctx, chunk);
chunk = NULL;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
if (ctx->compression != FLB_AWS_COMPRESS_NONE) {
flb_free(payload_buf);
}
if (m_upload->bytes >= ctx->file_size) {
Expand Down Expand Up @@ -2371,8 +2375,8 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. "
"'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. "
"Defaults to no compression. "
"If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
},
Expand Down
4 changes: 4 additions & 0 deletions src/aws/compression/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ add_library(flb-aws-arrow STATIC ${src})

target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS})
target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS})
if (ARROW_GLIB_PARQUET_FOUND)
target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS})
target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS})
endif()
128 changes: 128 additions & 0 deletions src/aws/compression/arrow/compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
*/

#include <arrow-glib/arrow-glib.h>
#ifdef FLB_HAVE_ARROW_PARQUET
#include <parquet-glib/parquet-glib.h>
#endif
#include <fluent-bit/flb_log.h>
#include <inttypes.h>

/*
Expand Down Expand Up @@ -145,3 +149,127 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s
g_bytes_unref(bytes);
return 0;
}

#ifdef FLB_HAVE_ARROW_PARQUET
static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table)
{
GArrowResizableBuffer *buffer;
GArrowBufferOutputStream *sink;
GParquetArrowFileWriter *writer;
GArrowSchema *schema;
GError *error = NULL;
gboolean success;
gint64 n_rows = 0;

buffer = garrow_resizable_buffer_new(0, &error);
if (buffer == NULL) {
g_error_free(error);
return NULL;
}

sink = garrow_buffer_output_stream_new(buffer);
if (sink == NULL) {
g_object_unref(buffer);
return NULL;
}

schema = garrow_table_get_schema(table);
if (schema == NULL) {
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}

/* Create a new Parquet file writer */
writer = gparquet_arrow_file_writer_new_arrow(schema,
GARROW_OUTPUT_STREAM(sink),
NULL, /* Arrow writer properties */
&error);
g_object_unref(schema);
if (writer == NULL) {
flb_error("[aws][compress] Failed to create parquet writer: %s", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
return NULL;
}

n_rows = garrow_table_get_n_rows(table);

/* Write the entire table to the Parquet file buffer */
success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error);
if (!success) {
flb_error("[aws][compress] Failed to write table to parquet buffer: %s", error->message);
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
g_object_unref(writer);
return NULL;
}

/* Close the writer to finalize the Parquet file metadata */
success = gparquet_arrow_file_writer_close(writer, &error);
if (!success) {
g_error_free(error);
g_object_unref(buffer);
g_object_unref(sink);
g_object_unref(writer);
return NULL;
}

g_object_unref(sink);
g_object_unref(writer);
return buffer;
}


int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size)
{
GArrowTable *table;
GArrowResizableBuffer *buffer;
GBytes *bytes;
gconstpointer ptr;
gsize len;
uint8_t *buf;

table = parse_json((uint8_t *) json, size);
if (table == NULL) {
flb_error("[aws][compress] Failed to parse JSON into Arrow Table for Parquet conversion");
return -1;
}

buffer = table_to_parquet_buffer(table);
g_object_unref(table);
if (buffer == NULL) {
flb_error("[aws][compress] Failed to convert Arrow Table into Parquet buffer");
return -1;
}

bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer));
if (bytes == NULL) {
g_object_unref(buffer);
return -1;
}

ptr = g_bytes_get_data(bytes, &len);
if (ptr == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}

buf = malloc(len);
if (buf == NULL) {
g_object_unref(buffer);
g_bytes_unref(bytes);
return -1;
}
memcpy(buf, ptr, len);
*out_buf = (void *) buf;
*out_size = len;

g_object_unref(buffer);
g_bytes_unref(bytes);
return 0;
}
#endif
15 changes: 15 additions & 0 deletions src/aws/compression/arrow/compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,18 @@
*/

int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size);

#ifdef FLB_HAVE_ARROW_PARQUET
/*
* This function converts out_s3 buffer into Apache Parquet format.
*
* `json` is a string that contain (concatenated) JSON objects.
*
* `size` is the length of the json data (excluding the trailing
* null-terminator character).
*
* Return 0 on success (with `out_buf` and `out_size` updated),
* and -1 on failure
*/
int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size);
#endif
7 changes: 7 additions & 0 deletions src/aws/flb_aws_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ static const struct compression_option compression_options[] = {
"arrow",
&out_s3_compress_arrow
},
#endif
#ifdef FLB_HAVE_ARROW_PARQUET
{
FLB_AWS_COMPRESS_PARQUET,
"parquet",
&out_s3_compress_parquet
},
#endif
{ 0 }
};
Expand Down
Loading