Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 68 additions & 55 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ with_ssl
have_yaml
YAML_LIBS
EVENT_LIBS
EVENT_CFLAGS
apr_cppflags
apr_cflags
apr_link_ld_libs
Expand Down Expand Up @@ -977,6 +978,8 @@ LDFLAGS_EX
LDFLAGS_SL
PERL
PYTHON
EVENT_CFLAGS
EVENT_LIBS
MSGFMT
TCLSH'

Expand Down Expand Up @@ -1734,6 +1737,9 @@ Some influential environment variables:
LDFLAGS_SL extra linker flags for linking shared libraries only
PERL Perl program
PYTHON Python program
EVENT_CFLAGS
C compiler flags for EVENT, overriding pkg-config
EVENT_LIBS linker flags for EVENT, overriding pkg-config
MSGFMT msgfmt program for NLS
TCLSH Tcl interpreter program (tclsh)

Expand Down Expand Up @@ -10930,10 +10936,6 @@ $as_echo "yes" >&6; }
fi
fi





#
# Realtime library
#
Expand Down Expand Up @@ -14554,7 +14556,6 @@ fi

fi


if test "$enable_external_fts" = yes; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for jansson_version_str in -ljansson" >&5
$as_echo_n "checking for jansson_version_str in -ljansson... " >&6; }
Expand Down Expand Up @@ -14886,65 +14887,78 @@ fi
LIBS="$_LIBS"

_LIBS="$LIBS"
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing event_add" >&5
$as_echo_n "checking for library containing event_add... " >&6; }
if ${ac_cv_search_event_add+:} false; then :
$as_echo_n "(cached) " >&6
else
ac_func_search_save_LIBS=$LIBS
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */

/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char event_add ();
int
main ()
{
return event_add ();
;
return 0;
}
_ACEOF
for ac_lib in '' event; do
if test -z "$ac_lib"; then
ac_res="none required"
else
ac_res=-l$ac_lib
LIBS="-l$ac_lib $ac_func_search_save_LIBS"
fi
if ac_fn_c_try_link "$LINENO"; then :
ac_cv_search_event_add=$ac_res
pkg_failed=no
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libevent >= 2.0.0" >&5
$as_echo_n "checking for libevent >= 2.0.0... " >&6; }

if test -n "$EVENT_CFLAGS"; then
pkg_cv_EVENT_CFLAGS="$EVENT_CFLAGS"
elif test -n "$PKG_CONFIG"; then
if test -n "$PKG_CONFIG" && \
{ { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libevent >= 2.0.0\""; } >&5
($PKG_CONFIG --exists --print-errors "libevent >= 2.0.0") 2>&5
ac_status=$?
$as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
test $ac_status = 0; }; then
pkg_cv_EVENT_CFLAGS=`$PKG_CONFIG --cflags "libevent >= 2.0.0" 2>/dev/null`
test "x$?" != "x0" && pkg_failed=yes
else
pkg_failed=yes
fi
rm -f core conftest.err conftest.$ac_objext \
conftest$ac_exeext
if ${ac_cv_search_event_add+:} false; then :
break
else
pkg_failed=untried
fi
done
if ${ac_cv_search_event_add+:} false; then :

if test -n "$EVENT_LIBS"; then
pkg_cv_EVENT_LIBS="$EVENT_LIBS"
elif test -n "$PKG_CONFIG"; then
if test -n "$PKG_CONFIG" && \
{ { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libevent >= 2.0.0\""; } >&5
($PKG_CONFIG --exists --print-errors "libevent >= 2.0.0") 2>&5
ac_status=$?
$as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5
test $ac_status = 0; }; then
pkg_cv_EVENT_LIBS=`$PKG_CONFIG --libs "libevent >= 2.0.0" 2>/dev/null`
test "x$?" != "x0" && pkg_failed=yes
else
ac_cv_search_event_add=no
pkg_failed=yes
fi
rm conftest.$ac_ext
LIBS=$ac_func_search_save_LIBS
else
pkg_failed=untried
fi
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_event_add" >&5
$as_echo "$ac_cv_search_event_add" >&6; }
ac_res=$ac_cv_search_event_add
if test "$ac_res" != no; then :
test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"



if test $pkg_failed = yes; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5
$as_echo "no" >&6; }

if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then
_pkg_short_errors_supported=yes
else
as_fn_error $? "libevent is required for gpfdist" "$LINENO" 5
_pkg_short_errors_supported=no
fi
if test $_pkg_short_errors_supported = yes; then
EVENT_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libevent >= 2.0.0" 2>&1`
else
EVENT_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libevent >= 2.0.0" 2>&1`
fi
# Put the nasty error message in config.log where it belongs
echo "$EVENT_PKG_ERRORS" >&5

as_fn_error $? "libevent >= 2.0.0 is required for gpfdist" "$LINENO" 5
elif test $pkg_failed = untried; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5
$as_echo "no" >&6; }
as_fn_error $? "libevent >= 2.0.0 is required for gpfdist" "$LINENO" 5
else
EVENT_CFLAGS=$pkg_cv_EVENT_CFLAGS
EVENT_LIBS=$pkg_cv_EVENT_LIBS
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5
$as_echo "yes" >&6; }

EVENT_LIBS=" -levent"
fi
EVENT_LIBS="$EVENT_LIBS"


{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing yaml_parser_initialize" >&5
Expand Down Expand Up @@ -16811,7 +16825,6 @@ fi

fi


if test "$enable_external_fts" = yes; then
# Check for jansson
ac_fn_c_check_header_mongrel "$LINENO" "jansson.h" "ac_cv_header_jansson_h" "$ac_includes_default"
Expand Down
4 changes: 2 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -1636,8 +1636,8 @@ AC_DEFUN([CHECK_APR], [
if test "$enable_gpfdist" = yes ; then
CHECK_APR()
_LIBS="$LIBS"
AC_SEARCH_LIBS(event_add, [event], [], [AC_MSG_ERROR([libevent is required for gpfdist])])
EVENT_LIBS=" -levent"
PKG_CHECK_MODULES([EVENT], [libevent >= 2.0.0], [], [AC_MSG_ERROR([libevent >= 2.0.0 is required for gpfdist])])
EVENT_LIBS="$EVENT_LIBS"
AC_SUBST(EVENT_LIBS)

AC_SEARCH_LIBS(yaml_parser_initialize, [yaml], [have_yaml=yes; YAML_LIBS=" -lyaml"], [AC_MSG_WARN([libyaml is not found. disabling transformations for gpfdist.])])
Expand Down
59 changes: 42 additions & 17 deletions src/bin/gpfdist/gpfdist.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ static struct
SSL_CTX *server_ctx;/* for SSL */
#endif
int wdtimer; /* Kill gpfdist after k seconds of inactivity. 0 to disable. */
struct event_base *event_base; /* for libevent 2.0+ */
} gcb;

/* A session */
Expand Down Expand Up @@ -1600,7 +1601,7 @@ static void session_detach(request_t* r)
}

event_del(&session->ev);
evtimer_set(&session->ev, free_session_cb, session);
evtimer_assign(&session->ev, gcb.event_base, free_session_cb, session);
session->tm.tv_sec = opt.w;
session->tm.tv_usec = 0;
(void)evtimer_add(&session->ev, &session->tm);
Expand Down Expand Up @@ -1811,7 +1812,7 @@ static int session_attach(request_t* r)
session->active_segids[r->segid] = 1; /* mark this segid as active */
session->maxsegs = r->totalsegs;
session->requests = apr_hash_make(pool);
event_set(&session->ev, 0, 0, 0, 0);
event_assign(&session->ev, gcb.event_base, -1, 0, NULL, NULL);

if (session->tid == 0 || session->path == 0 || session->key == 0)
gfatal(r, "out of memory in session_attach");
Expand Down Expand Up @@ -2368,7 +2369,7 @@ static void do_accept(int fd, short event, void* arg)
r->pool = pool;
r->sock = sock;

event_set(&r->ev, 0, 0, 0, 0);
event_assign(&r->ev, gcb.event_base, -1, 0, NULL, NULL);

/* use the block size specified by -m option */
r->outblock.data = palloc_safe(r, pool, opt.m, "out of memory when allocating buffer: %d bytes", opt.m);
Expand Down Expand Up @@ -2421,7 +2422,7 @@ static int setup_write(request_t* r)
if (r->sock < 0)
gwarning(r, "internal error in setup_write - no socket to use");
event_del(&r->ev);
event_set(&r->ev, r->sock, EV_WRITE, do_write, r);
event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, do_write, r);
return (event_add(&r->ev, 0));
}

Expand All @@ -2445,7 +2446,7 @@ static int setup_read(request_t* r)
gwarning(r, "internal error in setup_read - no socket to use");

event_del(&r->ev);
event_set(&r->ev, r->sock, EV_READ, do_read_request, r);
event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_read_request, r);

if(opt.t == 0)
{
Expand Down Expand Up @@ -2552,18 +2553,32 @@ static void
signal_register()
{
/* when SIGTERM raised invoke process_term_signal */
signal_set(&gcb.signal_event,SIGTERM,process_term_signal,0);
evsignal_assign(&gcb.signal_event, gcb.event_base, SIGTERM, process_term_signal, 0);

/* high priority so we accept as fast as possible */
if(event_priority_set(&gcb.signal_event, 0))
gwarning(NULL,"signal event priority set failed");

/* start watching this event */
if(signal_add(&gcb.signal_event, 0))
if(evsignal_add(&gcb.signal_event, 0))
gfatal(NULL,"cannot set up event on signal register");

}

/*
* gpfdist_cleanup
*
* Clean up all resources before exiting
*/
static void gpfdist_cleanup(void)
{
/* Clean up event_base if initialized */
if (gcb.event_base) {
event_base_free(gcb.event_base);
gcb.event_base = NULL;
}
}

static void clear_listen_sock(void)
{
SOCKET sock = -1;
Expand Down Expand Up @@ -2616,9 +2631,8 @@ http_setup(void)
hostaddr = opt.b;

/* setup event priority */
if (event_priority_init(10))
gwarning(NULL, "event_priority_init failed");

if (event_base_priority_init(gcb.event_base, 10))
gwarning(NULL, "event_base_priority_init failed");

/* Try each possible port from opt.p to opt.last_port */
for (;;)
Expand Down Expand Up @@ -2811,8 +2825,8 @@ http_setup(void)
for (i = 0; i < gcb.listen_sock_count; i++)
{
/* when this socket is ready, do accept */
event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | EV_PERSIST,
do_accept, 0);
event_assign(&gcb.listen_events[i], gcb.event_base, gcb.listen_socks[i],
EV_READ | EV_PERSIST, do_accept, 0);

/* only signal process function priority higher than socket handler */
if (event_priority_set(&gcb.listen_events[i], 1))
Expand All @@ -2838,6 +2852,9 @@ process_term_signal(int sig,short event,void* arg)
{
closesocket(gcb.listen_socks[i]);
}

/* Clean up resources before exiting */
gpfdist_cleanup();
_exit(1);
}

Expand Down Expand Up @@ -3913,7 +3930,10 @@ int gpfdist_init(int argc, const char* const argv[])
putenv("EVENT_SHOW_METHOD=1");
putenv("EVENT_NOKQUEUE=1");

event_init();
/* libevent 2.0+ */
gcb.event_base = event_base_new();
if (!gcb.event_base)
gfatal(NULL, "event_base_new failed");

signal_register();
http_setup();
Expand Down Expand Up @@ -3991,16 +4011,19 @@ int gpfdist_init(int argc, const char* const argv[])

int gpfdist_run()
{
return event_dispatch();
return event_base_dispatch(gcb.event_base);
}

#ifndef WIN32

int main(int argc, const char* const argv[])
{
int ret;
if (gpfdist_init(argc, argv) == -1)
gfatal(NULL, "Initialization failed");
return gpfdist_run();
ret = gpfdist_run();
gpfdist_cleanup();
return ret;
}


Expand Down Expand Up @@ -4175,6 +4198,7 @@ int main(int argc, const char* const argv[])
if (gpfdist_init(argc, argv) == -1)
gfatal(NULL, "Initialization failed");
main_ret = gpfdist_run();
gpfdist_cleanup();
}


Expand Down Expand Up @@ -4264,6 +4288,7 @@ void ServiceMain(int argc, char** argv)
* actual service work
*/
gpfdist_run();
gpfdist_cleanup();
}

void ControlHandler(DWORD request)
Expand Down Expand Up @@ -4566,7 +4591,7 @@ static void flush_ssl_buffer(int fd, short event, void* arg)
static void setup_flush_ssl_buffer(request_t* r)
{
event_del(&r->ev);
event_set(&r->ev, r->sock, EV_WRITE, flush_ssl_buffer, r);
event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, flush_ssl_buffer, r);
r->tm.tv_sec = 5;
r->tm.tv_usec = 0;
(void)event_add(&r->ev, &r->tm);
Expand Down Expand Up @@ -4678,7 +4703,7 @@ static void request_cleanup(request_t *r)
static void setup_do_close(request_t* r)
{
event_del(&r->ev);
event_set(&r->ev, r->sock, EV_READ, do_close, r);
event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_close, r);

r->tm.tv_sec = 60;
r->tm.tv_usec = 0;
Expand Down
Loading