@@ -573,34 +573,19 @@ static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_
573
573
is_timestamped_fleet_config (ctx , cfg );
574
574
}
575
575
576
- static int exists_new_fleet_config (struct flb_in_calyptia_fleet_config * ctx )
576
+ static int exists_fleet_config (struct flb_in_calyptia_fleet_config * ctx ,
577
+ const char * ref_name )
577
578
{
578
579
int ret = FLB_FALSE ;
579
- flb_sds_t cfgnewname ;
580
-
581
- cfgnewname = fleet_config_deref (ctx , "new" );
582
- if (cfgnewname == NULL ) {
583
- return FLB_FALSE ;
584
- }
585
-
586
- ret = access (cfgnewname , F_OK ) == 0 ? FLB_TRUE : FLB_FALSE ;
587
- flb_sds_destroy (cfgnewname );
588
-
589
- return ret ;
590
- }
591
-
592
- static int exists_old_fleet_config (struct flb_in_calyptia_fleet_config * ctx )
593
- {
594
- int ret = FLB_FALSE ;
595
- flb_sds_t cfgoldname ;
580
+ flb_sds_t config_path ;
596
581
597
- cfgoldname = fleet_config_deref (ctx , "old" );
598
- if (cfgoldname == NULL ) {
582
+ config_path = fleet_config_deref (ctx , ref_name );
583
+ if (config_path == NULL ) {
599
584
return FLB_FALSE ;
600
585
}
601
586
602
- ret = access (cfgoldname , F_OK ) == 0 ? FLB_TRUE : FLB_FALSE ;
603
- flb_sds_destroy (cfgoldname );
587
+ ret = access (config_path , F_OK ) == 0 ? FLB_TRUE : FLB_FALSE ;
588
+ flb_sds_destroy (config_path );
604
589
605
590
return ret ;
606
591
}
@@ -633,13 +618,15 @@ static void *do_reload(void *data)
633
618
/* avoid reloading the current configuration... just use our new one! */
634
619
flb_context_set (reload -> flb );
635
620
reload -> flb -> config -> enable_hot_reload = FLB_TRUE ;
621
+ reload -> flb -> config -> hot_reload_succeeded = FLB_FALSE ;
636
622
if (reload -> flb -> config -> conf_path_file ) {
637
623
flb_sds_destroy (reload -> flb -> config -> conf_path_file );
638
624
}
639
625
reload -> flb -> config -> conf_path_file = reload -> cfg_path ;
640
626
641
- flb_free (reload );
642
627
sleep (5 );
628
+ flb_info ("reloading configuration from path: %s" , reload -> cfg_path );
629
+ flb_free (reload );
643
630
#ifndef FLB_SYSTEM_WINDOWS
644
631
kill (getpid (), SIGHUP );
645
632
#else
@@ -1575,50 +1562,67 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct
1575
1562
return FLB_TRUE ;
1576
1563
}
1577
1564
1578
-
1579
1565
static int calyptia_config_add (struct flb_in_calyptia_fleet_config * ctx ,
1580
- const char * cfgname )
1566
+ flb_sds_t cfgname )
1581
1567
{
1582
- flb_sds_t current_config = NULL ;
1568
+ flb_sds_t derefed_cur_config_path ;
1569
+ flb_sds_t derefed_new_config_path ;
1583
1570
flb_sds_t cur_ref_filename ;
1584
1571
1585
- current_config = fleet_config_deref (ctx , "new" );
1586
- if (current_config == NULL ) {
1587
- current_config = fleet_config_deref (ctx , "cur" );
1572
+ /* Repoint the old ref to the current ref (if it exists) */
1573
+ derefed_cur_config_path = fleet_config_deref (ctx , "cur" );
1574
+ if (derefed_cur_config_path != NULL ) {
1575
+ if (fleet_config_set_ref (ctx , "old" , derefed_cur_config_path ) == FLB_FALSE ) {
1576
+ flb_plg_error (ctx -> ins , "unable to move current configuration to old" );
1577
+ flb_sds_destroy (derefed_cur_config_path );
1578
+ return FLB_FALSE ;
1579
+ }
1580
+
1581
+ flb_sds_destroy (derefed_cur_config_path );
1588
1582
}
1589
1583
1590
- /* If there's a current config, copy it to the old ref file */
1591
- if (current_config != NULL ) {
1592
- if (fleet_config_set_ref (ctx , "old" , current_config ) == FLB_FALSE ) {
1593
- flb_sds_destroy (current_config );
1584
+ /* If there is uncommitted and different new config, delete it first */
1585
+ derefed_new_config_path = fleet_config_deref (ctx , "new" );
1586
+ if (derefed_new_config_path != NULL
1587
+ && strcmp (derefed_new_config_path , cfgname ) != 0 ) {
1588
+ if (calyptia_config_delete_by_ref (ctx , "new" ) == FLB_FALSE ) {
1589
+ flb_plg_error (ctx -> ins , "unable to delete new configuration by ref" );
1590
+ flb_sds_destroy (derefed_new_config_path );
1594
1591
return FLB_FALSE ;
1595
1592
}
1596
1593
}
1594
+ if (derefed_new_config_path != NULL ) {
1595
+ flb_sds_destroy (derefed_new_config_path );
1596
+ }
1597
1597
1598
1598
/* Set the new ref file to the new config */
1599
1599
if (fleet_config_set_ref (ctx , "new" , cfgname ) == FLB_FALSE ) {
1600
- flb_plg_error (ctx -> ins , "unable to create new configuration reference." );
1601
- flb_sds_destroy (current_config );
1600
+ flb_plg_error (ctx -> ins , "unable to set new configuration by ref" );
1602
1601
return FLB_FALSE ;
1603
1602
}
1604
1603
1605
1604
/* Delete the current ref file if it exists */
1606
1605
cur_ref_filename = fleet_config_ref_filename (ctx , "cur" );
1607
1606
if (cur_ref_filename != NULL ) {
1607
+ flb_plg_info (ctx -> ins , "deleting current ref file: %s" , cur_ref_filename );
1608
1608
unlink (cur_ref_filename );
1609
1609
flb_sds_destroy (cur_ref_filename );
1610
1610
}
1611
1611
1612
- flb_sds_destroy (current_config );
1613
1612
return FLB_TRUE ;
1614
1613
}
1615
1614
1615
+ /**
1616
+ * Commits the latest received config as the valid, now-current config.
1617
+ * This updates the ref file for the current config file to point to the
1618
+ * new config file, and then deletes the old config file.
1619
+ */
1616
1620
static int calyptia_config_commit (struct flb_in_calyptia_fleet_config * ctx )
1617
1621
{
1618
1622
flb_sds_t config_path = NULL ;
1619
1623
flb_sds_t new_ref_filename = NULL ;
1620
1624
1621
- if (exists_new_fleet_config (ctx ) == FLB_FALSE ) {
1625
+ if (exists_fleet_config (ctx , "new" ) == FLB_FALSE ) {
1622
1626
flb_plg_info (ctx -> ins , "no new configuration to commit" );
1623
1627
return FLB_FALSE ;
1624
1628
}
@@ -1637,7 +1641,7 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
1637
1641
}
1638
1642
1639
1643
/* Delete the old config and its ref file */
1640
- if (exists_old_fleet_config (ctx ) == FLB_TRUE ) {
1644
+ if (exists_fleet_config (ctx , "old" ) == FLB_TRUE ) {
1641
1645
if (calyptia_config_delete_by_ref (ctx , "old" ) == FLB_FALSE ) {
1642
1646
flb_plg_error (ctx -> ins , "unable to delete old configuration by ref" );
1643
1647
return FLB_FALSE ;
@@ -1652,8 +1656,11 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
1652
1656
return FLB_FALSE ;
1653
1657
}
1654
1658
1659
+ flb_plg_info (ctx -> ins , "deleting config ref file: %s" , new_ref_filename );
1655
1660
unlink (new_ref_filename );
1656
1661
flb_sds_destroy (new_ref_filename );
1662
+
1663
+ flb_plg_info (ctx -> ins , "committed new config: %s" , config_path );
1657
1664
flb_sds_destroy (config_path );
1658
1665
1659
1666
return FLB_TRUE ;
@@ -1665,7 +1672,7 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
1665
1672
flb_sds_t old_ref_filename = NULL ;
1666
1673
1667
1674
/* Delete the new config and its ref file */
1668
- if (exists_new_fleet_config (ctx ) == FLB_TRUE ) {
1675
+ if (exists_fleet_config (ctx , "new" ) == FLB_TRUE ) {
1669
1676
if (calyptia_config_delete_by_ref (ctx , "new" ) == FLB_FALSE ) {
1670
1677
flb_plg_error (ctx -> ins , "unable to delete new configuration by ref" );
1671
1678
return FLB_FALSE ;
@@ -1685,6 +1692,7 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
1685
1692
flb_sds_destroy (old_config_path );
1686
1693
return FLB_FALSE ;
1687
1694
}
1695
+ flb_plg_info (ctx -> ins , "rolled back to previous config: %s" , old_config_path );
1688
1696
flb_sds_destroy (old_config_path );
1689
1697
1690
1698
/* Delete the old config ref */
@@ -1700,6 +1708,51 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
1700
1708
return FLB_TRUE ;
1701
1709
}
1702
1710
1711
+ /**
1712
+ * Checks if the last config was successfully reloaded and if so, commits it.
1713
+ * This considers a newly received config to be successfully reloaded if
1714
+ * the current context indicates it was loaded from the new config file.
1715
+ *
1716
+ * This returns FLB_TRUE (even if the config was not committed) and
1717
+ * FLB_FALSE if there was an error.
1718
+ */
1719
+ static int commit_config_if_reloaded (struct flb_in_calyptia_fleet_config * ctx )
1720
+ {
1721
+ struct flb_config * config ;
1722
+
1723
+ /* Get the current configuration */
1724
+ config = ctx -> ins -> config ;
1725
+ if (config == NULL ) {
1726
+ return FLB_TRUE ;
1727
+ }
1728
+
1729
+ if (config -> hot_reloading == FLB_TRUE ) {
1730
+ return FLB_TRUE ;
1731
+ }
1732
+
1733
+ if (config -> hot_reload_succeeded != FLB_TRUE ) {
1734
+ /* The config either hasn't been reloaded yet or the reload failed */
1735
+ return FLB_TRUE ;
1736
+ }
1737
+
1738
+ /* Check if the current config is from a new fleet config file */
1739
+ if (exists_fleet_config (ctx , "new" ) == FLB_FALSE ) {
1740
+ return FLB_TRUE ;
1741
+ }
1742
+
1743
+ if (is_new_fleet_config (ctx , config )) {
1744
+ /* The config was successfully reloaded from the new file, commit it */
1745
+ if (calyptia_config_commit (ctx ) == FLB_TRUE ) {
1746
+ flb_plg_info (ctx -> ins , "committed reloaded configuration" );
1747
+ } else {
1748
+ flb_plg_error (ctx -> ins , "failed to commit reloaded configuration" );
1749
+ return FLB_FALSE ;
1750
+ }
1751
+ }
1752
+
1753
+ return FLB_TRUE ;
1754
+ }
1755
+
1703
1756
static void fleet_config_get_properties (flb_sds_t * buf , struct mk_list * props , int fleet_config_legacy_format )
1704
1757
{
1705
1758
struct mk_list * head ;
@@ -1985,22 +2038,25 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
1985
2038
struct flb_config * config ,
1986
2039
void * in_context )
1987
2040
{
1988
- int ret = -1 ;
1989
2041
struct flb_in_calyptia_fleet_config * ctx = in_context ;
1990
2042
1991
2043
if (ctx -> fleet_id == NULL ) {
1992
2044
if (get_calyptia_fleet_id_by_name (ctx , config ) == -1 ) {
1993
2045
flb_plg_error (ctx -> ins , "unable to find fleet: %s" , ctx -> fleet_name );
1994
- goto fleet_id_error ;
2046
+ FLB_INPUT_RETURN ( -1 ) ;
1995
2047
}
1996
2048
}
1997
2049
1998
- if (get_calyptia_fleet_config (ctx ) != FLB_TRUE ) {
1999
- ret = -1 ;
2050
+ /* Update symlinks if a recent reload was successful */
2051
+ if (commit_config_if_reloaded (ctx ) == FLB_FALSE ) {
2052
+ FLB_INPUT_RETURN (-1 );
2000
2053
}
2001
2054
2002
- fleet_id_error :
2003
- FLB_INPUT_RETURN (ret );
2055
+ if (get_calyptia_fleet_config (ctx ) == -1 ) {
2056
+ FLB_INPUT_RETURN (-1 );
2057
+ }
2058
+
2059
+ FLB_INPUT_RETURN (0 );
2004
2060
}
2005
2061
2006
2062
static int create_fleet_directory (struct flb_in_calyptia_fleet_config * ctx )
@@ -2093,16 +2149,16 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx)
2093
2149
static int load_fleet_config (struct flb_in_calyptia_fleet_config * ctx )
2094
2150
{
2095
2151
flb_ctx_t * flb_ctx = flb_context_get ();
2096
- flb_sds_t config_path = NULL ;
2152
+ flb_sds_t config_path ;
2097
2153
2098
2154
/* check if we are already using the fleet configuration file. */
2099
2155
if (is_fleet_config (ctx , flb_ctx -> config ) == FLB_FALSE ) {
2100
2156
flb_plg_debug (ctx -> ins , "loading configuration file" );
2101
2157
2102
- /* Find the current config file, or as backup, the new one */
2158
+ /* Find the current config file, or as backup, the old one */
2103
2159
config_path = fleet_config_deref (ctx , "cur" );
2104
2160
if (config_path == NULL ) {
2105
- config_path = fleet_config_deref (ctx , "new " );
2161
+ config_path = fleet_config_deref (ctx , "old " );
2106
2162
}
2107
2163
2108
2164
if (config_path != NULL ) {
@@ -2359,6 +2415,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
2359
2415
int upstream_flags ;
2360
2416
struct flb_in_calyptia_fleet_config * ctx ;
2361
2417
(void ) data ;
2418
+ flb_sds_t new_config_path ;
2362
2419
2363
2420
#ifdef _WIN32
2364
2421
char * tmpdir ;
@@ -2461,15 +2518,27 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
2461
2518
create_fleet_header (ctx );
2462
2519
}
2463
2520
2521
+ /* If there is an uncommitted new config, delete it */
2522
+ new_config_path = fleet_config_deref (ctx , "new" );
2523
+ if (new_config_path != NULL
2524
+ && is_new_fleet_config (ctx , config ) == FLB_FALSE ) {
2525
+ flb_plg_warn (ctx -> ins , "deleting uncommitted new config: %s" , new_config_path );
2526
+ if (calyptia_config_delete_by_ref (ctx , "new" ) == FLB_FALSE ) {
2527
+ flb_plg_error (ctx -> ins , "unable to delete uncommitted new config" );
2528
+ flb_sds_destroy (new_config_path );
2529
+ in_calyptia_fleet_destroy (ctx );
2530
+ return -1 ;
2531
+ }
2532
+ flb_sds_destroy (new_config_path );
2533
+ } else if (new_config_path != NULL ) {
2534
+ flb_sds_destroy (new_config_path );
2535
+ }
2536
+
2464
2537
/* if we load a new configuration then we will be reloaded anyways */
2465
2538
if (load_fleet_config (ctx ) == FLB_TRUE ) {
2466
2539
return 0 ;
2467
2540
}
2468
2541
2469
- if (is_fleet_config (ctx , config )) {
2470
- calyptia_config_commit (ctx );
2471
- }
2472
-
2473
2542
/* Set our collector based on time */
2474
2543
ret = flb_input_set_collector_time (in ,
2475
2544
in_calyptia_fleet_collect ,
0 commit comments