3131#include  <fluent-bit/flb_plugin.h> 
3232#include  <fluent-bit/flb_notification.h> 
3333#include  <fluent-bit/flb_scheduler.h> 
34+ #include  <fluent-bit/flb_record_accessor.h> 
35+ #include  <fluent-bit/flb_ra_key.h> 
3436
3537#include  <msgpack.h> 
3638
@@ -53,6 +55,113 @@ struct worker_info {
5355
5456FLB_TLS_DEFINE (struct  worker_info , worker_info );
5557
58+ static  flb_sds_t  cb_azb_msgpack_extract_log_key (void  * out_context , const  char  * data ,
59+                                                 uint64_t  bytes )
60+ {
61+     struct  flb_azure_blob  * ctx  =  out_context ;
62+     flb_sds_t  out_buf  =  NULL ;
63+     msgpack_unpacked  result ;
64+     msgpack_object  root ;
65+     msgpack_object  map ;
66+     struct  flb_record_accessor  * ra  =  NULL ;
67+     struct  flb_ra_value  * rval  =  NULL ;
68+     size_t  off  =  0 ;
69+ 
70+     ra  =  flb_ra_create (ctx -> log_key , FLB_FALSE );
71+     if  (!ra ) {
72+         flb_plg_error (ctx -> ins , "invalid record accessor pattern '%s'" , ctx -> log_key );
73+         flb_errno ();
74+         return  NULL ;
75+     }
76+ 
77+     /* Unpack the data */ 
78+     msgpack_unpacked_init (& result );
79+     while  (1 ) {
80+         msgpack_unpack_return  ret  =  msgpack_unpack_next (& result , data , bytes , & off );
81+         if  (ret  ==  MSGPACK_UNPACK_SUCCESS ) {
82+             root  =  result .data ;
83+             if  (root .type  !=  MSGPACK_OBJECT_ARRAY ) {
84+                 continue ;
85+             }
86+ 
87+             if  (root .via .array .size  <  2 ) {
88+                 flb_plg_debug (ctx -> ins , "msgpack array has insufficient elements" );
89+                 continue ;
90+             }
91+ 
92+             map  =  root .via .array .ptr [1 ];
93+ 
94+             /* Get value using record accessor */ 
95+             rval  =  flb_ra_get_value_object (ra , map );
96+             if  (!rval ) {
97+                 flb_plg_error (ctx -> ins , "could not find field '%s'" , ctx -> log_key );
98+                 continue ;
99+             }
100+ 
101+             /* Convert value based on its type */ 
102+             if  (rval -> type  ==  FLB_RA_STRING ) {
103+                 out_buf  =  flb_sds_create_size (rval -> o .via .str .size  +  1 );
104+                 if  (out_buf ) {
105+                     flb_sds_copy (out_buf , rval -> o .via .str .ptr , rval -> o .via .str .size );
106+                     flb_sds_cat (out_buf , "\n" , 1 );
107+                 }
108+             }
109+             else  if  (rval -> type  ==  FLB_RA_FLOAT ) {
110+                 out_buf  =  flb_sds_create_size (64 );
111+                 if  (out_buf ) {
112+                     flb_sds_printf (& out_buf , "%f\n" , rval -> val .f64 );
113+                 }
114+             }
115+             else  if  (rval -> type  ==  FLB_RA_INT ) {
116+                 out_buf  =  flb_sds_create_size (64 );
117+                 if  (out_buf ) {
118+                     flb_sds_printf (& out_buf , "%"  PRId64  "\n" , rval -> val .i64 );
119+                 }
120+             }
121+             else  {
122+                 flb_errno ();
123+                 flb_plg_error (ctx -> ins , "cannot convert given value for field '%s'" , ctx -> log_key );
124+                 flb_ra_key_value_destroy (rval );
125+                 rval  =  NULL ;
126+                 break ;
127+             }
128+ 
129+             /* Check if buffer allocation succeeded */ 
130+             if  (!out_buf ) {
131+                 flb_errno ();
132+                 flb_plg_error (ctx -> ins , "could not allocate output buffer" );
133+             }
134+ 
135+             flb_ra_key_value_destroy (rval );
136+             rval  =  NULL ;
137+ 
138+             /* Successfully found and processed log_key, exit loop */ 
139+             break ;
140+         }
141+         else  if  (ret  ==  MSGPACK_UNPACK_CONTINUE ) {
142+             /* Buffer exhausted or truncated data, stop processing */ 
143+             flb_plg_debug (ctx -> ins , "msgpack unpack needs more data or data truncated" );
144+             break ;
145+         }
146+         else  if  (ret  ==  MSGPACK_UNPACK_PARSE_ERROR ) {
147+             flb_errno ();
148+             flb_plg_error (ctx -> ins , "msgpack parse error" );
149+             break ;
150+         }
151+         else  {
152+             flb_errno ();
153+             flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code %d" , ret );
154+             break ;
155+         }
156+     }
157+ 
158+     /* Clean up */ 
159+     msgpack_unpacked_destroy (& result );
160+     flb_ra_destroy (ra );
161+ 
162+     return  out_buf ;
163+ }
164+ 
56165static  int  azure_blob_format (struct  flb_config  * config ,
57166                             struct  flb_input_instance  * ins ,
58167                             void  * plugin_context ,
@@ -65,11 +174,16 @@ static int azure_blob_format(struct flb_config *config,
65174    flb_sds_t  out_buf ;
66175    struct  flb_azure_blob  * ctx  =  plugin_context ;
67176
68-     out_buf  =  flb_pack_msgpack_to_json_format (data , bytes ,
69-                                               FLB_PACK_JSON_FORMAT_LINES ,
70-                                               FLB_PACK_JSON_DATE_ISO8601 ,
71-                                               ctx -> date_key ,
72-                                               config -> json_escape_unicode );
177+     if  (ctx -> log_key ) {
178+         out_buf  =  cb_azb_msgpack_extract_log_key (ctx , data , bytes );
179+     }
180+     else  {
181+         out_buf  =  flb_pack_msgpack_to_json_format (data , bytes ,
182+                                                 FLB_PACK_JSON_FORMAT_LINES ,
183+                                                 FLB_PACK_JSON_DATE_ISO8601 ,
184+                                                 ctx -> date_key ,
185+                                                 config -> json_escape_unicode );
186+     }
73187    if  (!out_buf ) {
74188        return  -1 ;
75189    }
@@ -713,7 +827,7 @@ static int ensure_container(struct flb_azure_blob *ctx)
713827                      ctx -> container_name );
714828        return  FLB_FALSE ;
715829    }
716-      
830+ 
717831    flb_plg_error (ctx -> ins , "get container request failed, status=%i" ,
718832                  status );
719833
@@ -1780,6 +1894,14 @@ static struct flb_config_map config_map[] = {
17801894     "Set the block type: appendblob or blockblob" 
17811895    },
17821896
1897+     {
1898+      FLB_CONFIG_MAP_STR , "log_key" , NULL ,
1899+      0 , FLB_TRUE , offsetof(struct  flb_azure_blob , log_key ),
1900+      "By default, the whole log record will be sent to blob storage. " 
1901+      "If you specify a key name with this option, then only the value of " 
1902+      "that key will be sent" 
1903+     },
1904+ 
17831905    {
17841906     FLB_CONFIG_MAP_STR , "compress" , NULL ,
17851907     0 , FLB_FALSE , 0 ,
@@ -1939,7 +2061,7 @@ static struct flb_config_map config_map[] = {
19392061     "Whether to delete the buffered file early after successful blob creation. Default is false" 
19402062    },
19412063
1942-     {  
2064+     {
19432065     FLB_CONFIG_MAP_INT , "blob_uri_length" , "64" ,
19442066     0 , FLB_TRUE , offsetof(struct  flb_azure_blob , blob_uri_length ),
19452067     "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64" 
0 commit comments