diff --git a/cdb2api/cdb2api.c b/cdb2api/cdb2api.c index 4fe1094c93..b6257a140a 100644 --- a/cdb2api/cdb2api.c +++ b/cdb2api/cdb2api.c @@ -131,6 +131,9 @@ static int cdb2_comdb2db_timeout_set_from_env = 0; static int CDB2_API_CALL_TIMEOUT = 120000; /* defaults to 2 minute */ static int cdb2_api_call_timeout_set_from_env = 0; +static int CDB2_ENFORCE_API_CALL_TIMEOUT = 0; +static int cdb2_enforce_api_call_timeout_set_from_env = 0; + static int CDB2_SOCKET_TIMEOUT = 5000; static int cdb2_socket_timeout_set_from_env = 0; @@ -856,6 +859,37 @@ static int is_sql_read(const char *sqlstr) #define HAVE_MSGHDR_MSG_CONTROL #endif +static int is_api_call_timedout(cdb2_hndl_tp *hndl) { + struct timeval tv; + gettimeofday(&tv, NULL); + long long current_time = tv.tv_sec*1000 + tv.tv_usec/1000; + if (hndl->max_call_time && (hndl->max_call_time < current_time)) { + return 1; + } + return 0; +} + +static long long get_call_timeout(const cdb2_hndl_tp *hndl, long long timeout) { + if (!hndl) + return timeout; + struct timeval tv; + gettimeofday(&tv, NULL); + long long current_time = tv.tv_sec*1000 + tv.tv_usec/1000; + long long time_left = hndl->max_call_time - current_time; + if (hndl->max_call_time && time_left <= 0) + time_left = 1; + if (time_left > 0 && (time_left < timeout)) + return time_left; + return timeout; +} + +static void set_max_call_time(cdb2_hndl_tp *hndl) { + struct timeval tv; + gettimeofday(&tv, NULL); + if (hndl) + hndl->max_call_time = tv.tv_sec*1000 + tv.tv_usec/1000 + hndl->api_call_timeout; +} + enum { PASSFD_SUCCESS = 0, PASSFD_RECVMSG = -1, /* error with recvmsg() */ @@ -995,8 +1029,7 @@ static int recv_fd_int(int sockfd, void *data, size_t nbytes, int *fd_recvd, int static int recv_fd(const cdb2_hndl_tp *hndl, int sockfd, void *data, size_t nbytes, int *fd_recvd) { int rc, timeoutms = hndl ? hndl->sockpool_recv_timeoutms : CDB2_SOCKPOOL_RECV_TIMEOUTMS; - if (hndl && hndl->api_call_timeout && (timeoutms > hndl->api_call_timeout)) - timeoutms = hndl->api_call_timeout; + timeoutms = get_call_timeout(hndl, timeoutms); rc = recv_fd_int(sockfd, data, nbytes, fd_recvd, timeoutms); if (rc != 0 && *fd_recvd != -1) { int errno_save = errno; @@ -1116,8 +1149,7 @@ static int send_fd_to(int sockfd, const void *data, size_t nbytes, static int send_fd(const cdb2_hndl_tp *hndl, int sockfd, const void *data, size_t nbytes, int fd_to_send) { int timeoutms = hndl ? hndl->sockpool_send_timeoutms : CDB2_SOCKPOOL_SEND_TIMEOUTMS; - if (hndl && hndl->api_call_timeout && (timeoutms > hndl->api_call_timeout)) - timeoutms = hndl->api_call_timeout; + timeoutms = get_call_timeout(hndl, timeoutms); return send_fd_to(sockfd, data, nbytes, fd_to_send, timeoutms); } @@ -1591,6 +1623,8 @@ static void read_comdb2db_environment_cfg(cdb2_hndl_tp *hndl, const char *comdb2 &cdb2_sockpool_recv_timeoutms_set_from_env); process_env_var_int("COMDB2_CONFIG_API_CALL_TIMEOUT", &CDB2_API_CALL_TIMEOUT, &cdb2_api_call_timeout_set_from_env); + process_env_var_int("COMDB2_CONFIG_ENFORCE_API_CALL_TIMEOUT", &CDB2_ENFORCE_API_CALL_TIMEOUT, + &cdb2_enforce_api_call_timeout_set_from_env); process_env_var_int("COMDB2_CONFIG_COMDB2DB_TIMEOUT", &COMDB2DB_TIMEOUT, &cdb2_comdb2db_timeout_set_from_env); process_env_var_int("COMDB2_CONFIG_SOCKET_TIMEOUT", &CDB2_SOCKET_TIMEOUT, &cdb2_socket_timeout_set_from_env); process_env_var_int("COMDB2_CONFIG_PROTOBUF_SIZE", &CDB2_PROTOBUF_SIZE, &cdb2_protobuf_size_set_from_env); @@ -1841,6 +1875,9 @@ static void read_comdb2db_cfg(cdb2_hndl_tp *hndl, SBUF2 *s, const char *comdb2db hndl->api_call_timeout = atoi(tok); else if (tok) CDB2_API_CALL_TIMEOUT = atoi(tok); + } else if (!cdb2_api_call_timeout_set_from_env && (strcasecmp("enforce_api_call_timeout",tok) == 0)) { + tok = strtok_r(NULL, " :,", &last); + CDB2_ENFORCE_API_CALL_TIMEOUT = value_on_off(tok, &err); } else if (strcasecmp("auto_consume_timeout", tok) == 0) { tok = strtok_r(NULL, " :,", &last); if (tok) @@ -2125,6 +2162,8 @@ static void set_cdb2_timeouts(cdb2_hndl_tp *hndl) hndl->comdb2db_timeout = hndl->api_call_timeout; if (hndl->socket_timeout > hndl->api_call_timeout) hndl->socket_timeout = hndl->api_call_timeout; + + set_max_call_time(hndl); } /* Read all available comdb2 configuration files. @@ -3688,8 +3727,22 @@ static int cdb2portmux_get(cdb2_hndl_tp *hndl, const char *type, debugprint("name %s\n", name); + int connect_timeout = hndl->connect_timeout; + + if (CDB2_ENFORCE_API_CALL_TIMEOUT) { +# ifdef CDB2API_TEST + printf("RETRY with timeout %d\n", get_call_timeout(hndl, connect_timeout)); +# endif + if (is_api_call_timedout(hndl)) { + snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out connecting to db\n", __func__, __LINE__); + port = -1; + goto after_callback; + } + connect_timeout = get_call_timeout(hndl, connect_timeout); + } + fd = cdb2_tcpconnecth_to(hndl, remote_host, CDB2_PORTMUXPORT, 0, - hndl->connect_timeout); + connect_timeout); if (fd < 0) { debugprint("cdb2_tcpconnecth_to returns fd=%d'\n", fd); if (errno == EINPROGRESS) { @@ -3715,7 +3768,18 @@ static int cdb2portmux_get(cdb2_hndl_tp *hndl, const char *type, port = -1; goto after_callback; } - sbuf2settimeout(ss, hndl->connect_timeout, hndl->connect_timeout); + if (CDB2_ENFORCE_API_CALL_TIMEOUT) { +# ifdef CDB2API_TEST + printf("RETRY with timeout %d\n", get_call_timeout(hndl, connect_timeout)); +# endif + if (is_api_call_timedout(hndl)) { + snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out connecting to db\n", __func__, __LINE__); + port = -1; + goto after_callback; + } + connect_timeout = get_call_timeout(hndl, connect_timeout); + } + sbuf2settimeout(ss, connect_timeout, connect_timeout); sbuf2printf(ss, "get %s\n", name); sbuf2flush(ss); res[0] = 0; @@ -3904,6 +3968,17 @@ static int cdb2_read_record(cdb2_hndl_tp *hndl, uint8_t **buf, int *len, int *ty goto after_callback; retry: + if (CDB2_ENFORCE_API_CALL_TIMEOUT) { + int socket_timeout = get_call_timeout(hndl, hndl->socket_timeout); +# ifdef CDB2API_TEST + printf("GOT HEARTBEAT || Set timeout to %d\n", socket_timeout); +# endif + if (is_api_call_timedout(hndl)) { + snprintf(hndl->errstr, sizeof(hndl->errstr), "%s:%d Timed out reading response from the db\n", __func__, __LINE__); + rc = -1; + } + sbuf2settimeout(sb, socket_timeout, socket_timeout); + } b_read = sbuf2fread((char *)&hdr, 1, sizeof(hdr), sb); debugprint("READ HDR b_read=%d, sizeof(hdr)=(%zu):\n", b_read, sizeof(hdr)); @@ -4068,6 +4143,10 @@ static int cdb2_read_record(cdb2_hndl_tp *hndl, uint8_t **buf, int *len, int *ty rc = 0; after_callback: + // reset here + if (CDB2_ENFORCE_API_CALL_TIMEOUT) { + sbuf2settimeout(sb, hndl->socket_timeout, hndl->socket_timeout); + } while ((e = cdb2_next_callback(hndl, CDB2_AFTER_READ_RECORD, e)) != NULL) { callbackrc = cdb2_invoke_callback(hndl, e, 1, CDB2_RETURN_VALUE, (intptr_t)rc); @@ -5954,10 +6033,8 @@ static int cdb2_run_statement_typed_int(cdb2_hndl_tp *hndl, const char *sql, int hndl->retry_all = 1; int run_last = 1; - time_t max_time = - time(NULL) + (hndl->api_call_timeout - hndl->connect_timeout) / 1000; - if (max_time < 0) - max_time = 0; + set_max_call_time(hndl); + retry_queries: debugprint( "retry_queries: hndl->host=%d (%s)\n", hndl->connected_host, @@ -5985,18 +6062,11 @@ static int cdb2_run_statement_typed_int(cdb2_hndl_tp *hndl, const char *sql, int cdb2_get_dbhosts(hndl); } - int tmsec = 0; - - // Add wait if we have already tried on all the nodes. - if (!hndl->sb && (retries_done > hndl->num_hosts)) { - tmsec = (retries_done - hndl->num_hosts) * 100; - } - if (hndl->sslerr != 0) PRINT_AND_RETURN(CDB2ERR_CONNECT_ERROR); if ((retries_done > 1) && ((retries_done > hndl->max_retries) || - ((time(NULL) + (tmsec / 1000)) >= max_time))) { + (is_api_call_timedout(hndl)))) { sprintf(hndl->errstr, "%s: Maximum number of retries done.", __func__); if (is_hasql_commit) { cleanup_query_list(hndl, &commit_query_list, __LINE__); @@ -7873,17 +7943,13 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl) } } - time_t max_time = - time(NULL) + - (hndl->api_call_timeout - (CDB2_POLL_TIMEOUT + hndl->connect_timeout)) / - 1000; - if (max_time < 0) - max_time = 0; + if (!hndl->max_call_time) + set_max_call_time(hndl); use_bmsd = cdb2_use_bmsd && (*cdb2_bmssuffix != '\0') && !hndl->num_shards; // cannot find shards via bmsd yet retry: if (rc) { - if (num_retry >= MAX_RETRIES || time(NULL) > max_time) + if (num_retry >= MAX_RETRIES || is_api_call_timedout(hndl)) goto after_callback; num_retry++; @@ -7901,7 +7967,7 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl) hndl, cdb2_default_cluster, comdb2db_name, comdb2db_num, comdb2db_hosts[i], comdb2db_hosts, comdb2db_ports, &master, &num_comdb2db_hosts, NULL); - if (rc == 0 || time(NULL) >= max_time) { + if (rc == 0 || is_api_call_timedout(hndl)) { break; } } @@ -7918,7 +7984,7 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl) hndl->hosts, &hndl->num_hosts, hndl->dbname, hndl->cluster, &hndl->dbnum, &hndl->num_hosts_sameroom, num_retry, use_bmsd, hndl->shards, &hndl->num_shards, &hndl->num_shards_sameroom); - if (rc == 0 || time(NULL) >= max_time) { + if (rc == 0 || is_api_call_timedout(hndl)) { break; } else if (use_bmsd) { if (cdb2_comdb2db_fallback) @@ -7926,7 +7992,7 @@ static int cdb2_get_dbhosts(cdb2_hndl_tp *hndl) goto retry; } } - if (rc == -1 && time(NULL) < max_time) { + if (rc == -1 && !is_api_call_timedout(hndl)) { rc = comdb2db_get_dbhosts(hndl, comdb2db_name, comdb2db_num, comdb2db_hosts[master], comdb2db_ports[master], hndl->hosts, &hndl->num_hosts, hndl->dbname, hndl->cluster, &hndl->dbnum, &hndl->num_hosts_sameroom, num_retry, use_bmsd, hndl->shards, &hndl->num_shards, diff --git a/cdb2api/cdb2api_hndl.h b/cdb2api/cdb2api_hndl.h index 3eb89acb98..36aa367320 100644 --- a/cdb2api/cdb2api_hndl.h +++ b/cdb2api/cdb2api_hndl.h @@ -169,6 +169,7 @@ struct cdb2_hndl { int is_hasql; int sent_client_info; void *user_arg; + long long max_call_time; int api_call_timeout; int connect_timeout; int comdb2db_timeout;