1919
2020#include <fluent-bit/flb_output_plugin.h>
2121#include <fluent-bit/flb_utils.h>
22+ #include <fluent-bit/flb_time.h>
23+ #include <fluent-bit/flb_log_event_decoder.h>
2224
23- #define FLB_EXIT_FLUSH_COUNT "1"
25+ #define FLB_EXIT_FLUSH_COUNT "-1"
26+ #define FLB_EXIT_RECORD_COUNT "-1"
27+ #define FLB_EXIT_TIME_COUNT "-1"
2428
2529struct flb_exit {
2630 int is_running ;
27- int count ;
31+ struct flb_time start_time ;
2832
2933 /* config */
3034 int flush_count ;
35+ int record_count ;
36+ int time_count ;
37+ struct flb_output_instance * ins ;
3138};
3239
3340static int cb_exit_init (struct flb_output_instance * ins , struct flb_config * config ,
@@ -43,15 +50,28 @@ static int cb_exit_init(struct flb_output_instance *ins, struct flb_config *conf
4350 flb_errno ();
4451 return -1 ;
4552 }
46- ctx -> count = 0 ;
53+ ctx -> ins = ins ;
4754 ctx -> is_running = FLB_TRUE ;
55+ flb_time_get (& ctx -> start_time );
56+
57+ ctx -> flush_count = -1 ;
58+ ctx -> record_count = -1 ;
59+ ctx -> time_count = -1 ;
4860
4961 ret = flb_output_config_map_set (ins , (void * ) ctx );
5062 if (ret == -1 ) {
5163 flb_free (ctx );
5264 return -1 ;
5365 }
5466
67+ if (ctx -> flush_count == -1 &&
68+ ctx -> record_count == -1 &&
69+ ctx -> time_count == -1 ) {
70+ flb_plg_error (ctx -> ins , "no count set for flush, record or time, set at least one" );
71+ flb_free (ctx );
72+ return -1 ;
73+ }
74+
5575 flb_output_set_context (ins , ctx );
5676
5777 return 0 ;
@@ -66,11 +86,57 @@ static void cb_exit_flush(struct flb_event_chunk *event_chunk,
6686 (void ) i_ins ;
6787 (void ) out_context ;
6888 struct flb_exit * ctx = out_context ;
69-
70- ctx -> count ++ ;
71- if (ctx -> is_running == FLB_TRUE && ctx -> count >= ctx -> flush_count ) {
72- flb_engine_exit (config );
73- ctx -> is_running = FLB_FALSE ;
89+ struct flb_log_event_decoder log_decoder ;
90+ struct flb_log_event log_event ;
91+ struct flb_time now ;
92+ struct flb_time run ;
93+ int result ;
94+
95+ if (ctx -> is_running == FLB_TRUE ) {
96+ if (ctx -> flush_count > 0 ) {
97+ ctx -> flush_count -- ;
98+ }
99+
100+ if (ctx -> record_count > 0 && event_chunk -> type == FLB_EVENT_TYPE_LOGS ) {
101+ result = flb_log_event_decoder_init (& log_decoder ,
102+ (char * ) event_chunk -> data ,
103+ event_chunk -> size );
104+ if (result != FLB_EVENT_DECODER_SUCCESS ) {
105+ flb_plg_error (ctx -> ins ,
106+ "Log event decoder initialization error : %d" , result );
107+
108+ FLB_OUTPUT_RETURN (FLB_RETRY );
109+ }
110+
111+ while (flb_log_event_decoder_next (& log_decoder ,
112+ & log_event ) == FLB_EVENT_DECODER_SUCCESS ) {
113+ if (ctx -> record_count > 0 ) {
114+ ctx -> record_count -- ;
115+ }
116+ }
117+
118+ result = flb_log_event_decoder_get_last_result (& log_decoder );
119+ flb_log_event_decoder_destroy (& log_decoder );
120+
121+ if (result != FLB_EVENT_DECODER_SUCCESS ) {
122+ flb_plg_error (ctx -> ins , "Log event decoder error : %d" , result );
123+ FLB_OUTPUT_RETURN (FLB_ERROR );
124+ }
125+
126+ FLB_OUTPUT_RETURN (FLB_OK );
127+ }
128+
129+ if (ctx -> time_count > 0 ) {
130+ flb_time_get (& now );
131+ flb_time_diff (& now , & ctx -> start_time , & run );
132+ }
133+
134+ if (ctx -> flush_count == 0 ||
135+ ctx -> record_count == 0 ||
136+ (ctx -> time_count && flb_time_to_millisec (& run ) > (ctx -> time_count * 1000 ))) {
137+ flb_engine_exit (config );
138+ ctx -> is_running = FLB_FALSE ;
139+ }
74140 }
75141
76142 FLB_OUTPUT_RETURN (FLB_OK );
@@ -90,7 +156,17 @@ static struct flb_config_map config_map[] = {
90156 {
91157 FLB_CONFIG_MAP_INT , "flush_count" , FLB_EXIT_FLUSH_COUNT ,
92158 0 , FLB_TRUE , offsetof(struct flb_exit , flush_count ),
93- NULL
159+ "number of flushes before exiting"
160+ },
161+ {
162+ FLB_CONFIG_MAP_INT , "record_count" , FLB_EXIT_RECORD_COUNT ,
163+ 0 , FLB_TRUE , offsetof(struct flb_exit , record_count ),
164+ "number of records received before exiting"
165+ },
166+ {
167+ FLB_CONFIG_MAP_INT , "time_count" , FLB_EXIT_TIME_COUNT ,
168+ 0 , FLB_TRUE , offsetof(struct flb_exit , time_count ),
169+ "number of seconds before exiting (will trigger upon receiving a flush)"
94170 },
95171
96172 /* EOF */
0 commit comments