diff --git a/README.md b/README.md index 912e3355..474ddb51 100644 --- a/README.md +++ b/README.md @@ -85,11 +85,11 @@ target/benchmarks -h 127.0.0.1 -p 3000 -n test -k 1000000 -o B1400 -w RU,80 -g 2 ``` To: - * Benchmark asynchronous methods using 1 event loop. + * Benchmark asynchronous methods using 4 event loops. * Limit the maximum number of concurrent commands to 50. * Use and 50% read 50% write pattern. ```sh -target/benchmarks -h 127.0.0.1 -p 3000 -n test -k 1000000 -o S:50 -w RU,50 --async --asyncMaxCommands 50 --eventLoops 1 +target/benchmarks -h 127.0.0.1 -p 3000 -n test -k 1000000 -o S:50 -w RU,50 --async --asyncMaxCommands 50 --threads 4 ``` Command line usage can be read with: diff --git a/src/include/benchmark.h b/src/include/benchmark.h index 84ed5299..0321074c 100644 --- a/src/include/benchmark.h +++ b/src/include/benchmark.h @@ -96,7 +96,6 @@ typedef struct args_s { int async_max_conns_per_node; bool durable_deletes; int async_max_commands; - int event_loop_capacity; as_config_tls tls; char* tls_name; as_auth_mode auth_mode; @@ -166,6 +165,12 @@ typedef struct threaddata_s { // which workload stage we're currrently on _Atomic(uint32_t) stage_idx; + // For async linear workloads + _Atomic(uint64_t) current_key; + + // the stopping point for async linear workloads + uint64_t end_key; + /* * note: to stop threads, tdata->finished must be set before tdata->do_work * to prevent deadlocking diff --git a/src/main/benchmark.c b/src/main/benchmark.c index 9ecffa27..58c6ab25 100644 --- a/src/main/benchmark.c +++ b/src/main/benchmark.c @@ -184,7 +184,7 @@ connect_to_server(args_t* args, aerospike* client) { if (stages_contain_async(&args->stages)) { #if AS_EVENT_LIB_DEFINED - if (! as_event_create_loops(args->event_loop_capacity)) { + if (! as_event_create_loops(args->transaction_worker_threads)) { blog_error("Failed to create asynchronous event loops\n"); return 2; } @@ -322,10 +322,13 @@ init_tdata(const args_t* args, cdata_t* cdata, thr_coord_t* coord, tdata->t_idx = t_idx; // always start on the first stage atomic_init(&tdata->stage_idx, 0); + atomic_init(&tdata->current_key, args->start_key); atomic_init(&tdata->do_work, true); atomic_init(&tdata->finished, false); + tdata->end_key = UINT64_MAX; + as_policies* p = &tdata->policies; as_policies_init(p); diff --git a/src/main/benchmark_init.c b/src/main/benchmark_init.c index 807274fd..2e158f5b 100644 --- a/src/main/benchmark_init.c +++ b/src/main/benchmark_init.c @@ -43,7 +43,7 @@ // Typedefs & constants. // -static const char* short_options = "vVh:p:U:P::n:s:b:K:k:o:Re:t:w:z:g:T:dLSC:N:B:M:Y:Dac:W:"; +static const char* short_options = "vVh:p:U:P::n:s:b:K:k:o:Re:t:w:z:g:T:dLSC:N:B:M:Y:Dac:"; #define WARN_MSG 0x40000000 @@ -544,6 +544,7 @@ print_usage(const char* program) printf("-z --threads # Default: 16\n"); printf(" Load generating thread count.\n"); + printf(" This also sets the number of event loops used with --async.\n"); printf("\n"); printf("-g --throughput # Default: 0\n"); @@ -745,6 +746,7 @@ print_usage(const char* program) printf("-a --async # Default: synchronous mode\n"); printf(" Enable asynchronous mode.\n"); + printf(" Use --threads and --async-max-commands to tune async performance.\n"); printf("\n"); printf("-c --async-max-commands # Default: 50\n"); @@ -752,10 +754,6 @@ print_usage(const char* program) printf(" in time.\n"); printf("\n"); - printf("-W --event-loops # Default: 1\n"); - printf(" Number of event loops (or selector threads) when running in asynchronous mode.\n"); - printf("\n"); - printf(" --tls-enable # Default: TLS disabled\n"); printf(" Enable TLS.\n"); printf("\n"); @@ -956,7 +954,7 @@ print_args(args_t* args) printf("async min conns per node: %d\n", args->async_min_conns_per_node); printf("async max conns per node: %d\n", args->async_max_conns_per_node); printf("async max commands: %d\n", args->async_max_commands); - printf("event loops: %d\n", args->event_loop_capacity); + printf("event loops: %d\n", args->transaction_worker_threads); if (args->tls.enable) { printf("TLS: enabled\n"); @@ -1156,9 +1154,9 @@ validate_args(args_t* args) return 1; } - if (args->event_loop_capacity <= 0 || args->event_loop_capacity > 1000) { - printf("Invalid event-loops: %d Valid values: [1-1000]\n", - args->event_loop_capacity); + if (args->transaction_worker_threads <= 0 || args->transaction_worker_threads > 1000) { + printf("Invalid threads/event_loops: %d Valid values: [1-1000]\n", + args->transaction_worker_threads); return 1; } return 0; @@ -1711,10 +1709,6 @@ set_args(int argc, char * const* argv, args_t* args) args->async_max_commands = atoi(optarg); break; - case 'W': - args->event_loop_capacity = atoi(optarg); - break; - case BENCH_OPT_SEND_KEY: args->key = AS_POLICY_KEY_SEND; break; @@ -1876,7 +1870,6 @@ _load_defaults(args_t* args) args->async_min_conns_per_node = 0; args->async_max_conns_per_node = 300; args->async_max_commands = 50; - args->event_loop_capacity = 1; memset(&args->tls, 0, sizeof(as_config_tls)); args->tls_name = NULL; args->auth_mode = AS_AUTH_INTERNAL; diff --git a/src/main/coordinator.c b/src/main/coordinator.c index b3590d9c..296fcf1b 100644 --- a/src/main/coordinator.c +++ b/src/main/coordinator.c @@ -135,7 +135,7 @@ coordinator_worker(void* udata) stage_t* stage = &cdata->stages.stages[stage_idx]; fprint_stage(stdout, &cdata->stages, stage_idx); - if (stage->workload.type == WORKLOAD_TYPE_I) { + if (stage->workload.type == WORKLOAD_TYPE_I && stage->batch_write_size > 1) { uint64_t nkeys = stage->key_end - stage->key_start; if (nkeys % stage->batch_write_size != 0) { blog_warn("--keys is not divisible by --batch-write-size so more than " @@ -155,7 +155,7 @@ coordinator_worker(void* udata) } } - if (stage->workload.type == WORKLOAD_TYPE_D) { + if (stage->workload.type == WORKLOAD_TYPE_D && stage->batch_delete_size > 1) { uint64_t nkeys = stage->key_end - stage->key_start; if (nkeys % stage->batch_delete_size != 0) { blog_warn("--keys is not divisible by --batch-delete-size so some records " diff --git a/src/main/transaction.c b/src/main/transaction.c index 0038d5a3..9829666d 100644 --- a/src/main/transaction.c +++ b/src/main/transaction.c @@ -6,6 +6,8 @@ #include #include +#include +#include #ifndef __aarch64__ #include #endif @@ -17,7 +19,6 @@ #include #include #include -#include #include @@ -25,16 +26,20 @@ // Typedefs & constants. // +struct async_data_s; + struct async_data_s { + tdata_t* tdata; cdata_t* cdata; - stage_t* stage; - // queue to place this item back on once the callback has finished - queue_t* adata_q; + thr_coord_t* coord; + stage_t* stage; // keep each async_data in the same event loop to prevent the possibility // of overflowing an event loop due to bad scheduling as_event_loop* ev_loop; + void (*workload_cb)(struct async_data_s* adata); + // the time at which the async call was made uint64_t start_time; @@ -48,6 +53,8 @@ struct async_data_s { delete_op, udf_op } op; + + pthread_mutex_t done_lock; }; @@ -91,6 +98,8 @@ LOCAL_HELPER int _batch_write_record_async(as_batch_records* keys, struct async_ // Thread worker helper methods LOCAL_HELPER void _calculate_subrange(uint64_t key_start, uint64_t key_end, uint32_t t_idx, uint32_t n_threads, uint64_t* t_start, uint64_t* t_end); +LOCAL_HELPER void _calculate_sub_count(uint64_t start, uint64_t end, + uint32_t t_idx, uint32_t n_threads, uint64_t* count); LOCAL_HELPER void _gen_key(uint64_t key_val, as_key* key, const cdata_t* cdata); LOCAL_HELPER as_record* _gen_record(as_random* random, const cdata_t* cdata, tdata_t* tdata, const stage_t* stage); @@ -157,17 +166,11 @@ LOCAL_HELPER void _async_batch_write_listener(as_error* err, as_batch_read_recor void* udata, as_event_loop* event_loop); LOCAL_HELPER void _async_val_listener(as_error* err, as_val* val, void* udata, as_event_loop* event_loop); -LOCAL_HELPER struct async_data_s* queue_pop_wait(queue_t* adata_q); -LOCAL_HELPER void linear_writes_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void linear_deletes_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); -LOCAL_HELPER void random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, - thr_coord_t* coord, const stage_t* stage, queue_t* adata_q); +LOCAL_HELPER void linear_writes_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_udf_async(struct async_data_s* adata); +LOCAL_HELPER void linear_deletes_async(struct async_data_s* adata); +LOCAL_HELPER void random_read_write_delete_async(struct async_data_s* adata); // Main worker thread loop LOCAL_HELPER void do_sync_workload(tdata_t* tdata, cdata_t* cdata, @@ -647,6 +650,20 @@ _calculate_subrange(uint64_t key_start, uint64_t key_end, uint32_t t_idx, *t_end = key_start + ((n_keys * (t_idx + 1)) / n_threads); } +/* + * calculates the subrange that the given thread should operate on, + * which is done by evenly dividing the interval into n_threads intervals + * used to determine how many async transactions each thread should seed + */ +LOCAL_HELPER void +_calculate_sub_count(uint64_t start, uint64_t end, uint32_t t_idx, + uint32_t n_threads, uint64_t* count) +{ + uint64_t res_start, res_end; + _calculate_subrange(start, end, t_idx, n_threads, &res_start, &res_end); + *count = res_end - res_start; +} + LOCAL_HELPER void _gen_key(uint64_t key_val, as_key* key, const cdata_t* cdata) { @@ -1172,7 +1189,8 @@ linear_deletes(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, thr_coordinator_complete(coord); } -LOCAL_HELPER void random_read_write_delete(tdata_t* tdata, cdata_t* cdata, +LOCAL_HELPER void +random_read_write_delete(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, const stage_t* stage) { uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); @@ -1373,15 +1391,9 @@ _async_listener(as_error* err, void* udata, as_event_loop* event_loop) err->code, err->message); } } - - if (err->code == AEROSPIKE_ERR_NO_MORE_CONNECTIONS) { - // this event loop is full, try another - adata->ev_loop = NULL; - } } - // put this adata object back on the queue - queue_push(adata->adata_q, adata); + adata->workload_cb(adata); } LOCAL_HELPER void @@ -1432,44 +1444,23 @@ _async_val_listener(as_error* err, as_val* val, void* udata, } } -LOCAL_HELPER struct async_data_s* -queue_pop_wait(queue_t* adata_q) -{ - struct async_data_s* adata; - - while (1) { - adata = queue_pop(adata_q); - if (adata == NULL) { - #ifdef __aarch64__ - __asm__ __volatile__("yield"); - #else - _mm_pause(); - #endif - - continue; - } - break; - } - return adata; -} - LOCAL_HELPER void -linear_writes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +linear_writes_async(struct async_data_s* adata) { - uint64_t key_val, end_key; - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; + uint64_t key_val, end_key, inc_by; struct timespec wake_time; uint64_t start_time; - key_val = stage->key_start; - end_key = stage->key_end; - while (tdata->do_work && - key_val < end_key) { - - adata = queue_pop_wait(adata_q); + inc_by = stage->batch_write_size <= 1 ? 1 : stage->batch_write_size; + key_val = atomic_fetch_add(&tdata->current_key, inc_by); + end_key = tdata->end_key; + if (tdata->do_work && key_val < end_key) { clock_gettime(COORD_CLOCK, &wake_time); start_time = timespec_to_us(&wake_time); adata->start_time = start_time; @@ -1484,14 +1475,11 @@ linear_writes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, _write_record_async(&adata->key, rec, adata, tdata, cdata); _destroy_record(rec, stage); - key_val++; } else { as_batch_records* batch; - batch = _gen_batch_writes_sequential_keys(cdata, tdata, stage, key_val); _batch_write_record_async(batch, adata, tdata, cdata); - key_val += stage->batch_write_size; } uint64_t pause_for = @@ -1499,32 +1487,29 @@ linear_writes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, timespec_add_us(&wake_time, pause_for); thr_coordinator_sleep(coord, &wake_time); } + else { + int rv; - // once we've written everything, there's nothing left to do, so tell - // coord we're done and exit - thr_coordinator_complete(coord); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + } } LOCAL_HELPER void -random_read_write_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_async(struct async_data_s* adata) { - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; - struct timespec wake_time; + struct timespec wake_time; uint64_t start_time; - uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); - - while (tdata->do_work) { - - adata = queue_pop_wait(adata_q); - + if (tdata->do_work) { clock_gettime(COORD_CLOCK, &wake_time); start_time = timespec_to_us(&wake_time); adata->start_time = start_time; @@ -1544,32 +1529,33 @@ random_read_write_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, timespec_add_us(&wake_time, pause_for); thr_coordinator_sleep(coord, &wake_time); } + else { + int rv; + + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + } } LOCAL_HELPER void -random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_udf_async(struct async_data_s* adata) { - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; - struct timespec wake_time; + struct timespec wake_time; uint64_t start_time; - uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); uint32_t write_pct = _pct_to_fp(stage->workload.write_pct); // store the cumulative probability in write_pct write_pct = read_pct + write_pct; - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); - - while (tdata->do_work) { - - adata = queue_pop_wait(adata_q); - + if (tdata->do_work) { clock_gettime(COORD_CLOCK, &wake_time); start_time = timespec_to_us(&wake_time); adata->start_time = start_time; @@ -1592,33 +1578,40 @@ random_read_write_udf_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, timespec_add_us(&wake_time, pause_for); thr_coordinator_sleep(coord, &wake_time); } + else { + int rv; + + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + } } LOCAL_HELPER void -linear_deletes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +linear_deletes_async(struct async_data_s* adata) { - uint64_t key_val, end_key; - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; + uint64_t key_val, end_key, inc_by; struct timespec wake_time; uint64_t start_time; - key_val = stage->key_start; - end_key = stage->key_end; - while (tdata->do_work && - key_val < end_key) { - - adata = queue_pop_wait(adata_q); + inc_by = stage->batch_delete_size <= 1 ? 1 : stage->batch_delete_size; + key_val = atomic_fetch_add(&tdata->current_key, inc_by); + end_key = tdata->end_key; - clock_gettime(COORD_CLOCK, &wake_time); - start_time = timespec_to_us(&wake_time); - adata->start_time = start_time; + if (tdata->do_work && key_val < end_key) { + clock_gettime(COORD_CLOCK, &wake_time); + start_time = timespec_to_us(&wake_time); + adata->start_time = start_time; adata->op = delete_op; if (stage->batch_delete_size <= 1) { - as_record* rec; _gen_key(key_val, &adata->key, cdata); rec = _gen_nil_record(tdata); @@ -1626,14 +1619,11 @@ linear_deletes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, _write_record_async(&adata->key, rec, adata, tdata, cdata); _destroy_record(rec, stage); - key_val++; } else { as_batch_records* batch; - batch = _gen_batch_deletes_sequential_keys(cdata, tdata, stage, key_val); _batch_write_record_async(batch, adata, tdata, cdata); - key_val += stage->batch_delete_size; } uint64_t pause_for = @@ -1641,36 +1631,33 @@ linear_deletes_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, timespec_add_us(&wake_time, pause_for); thr_coordinator_sleep(coord, &wake_time); } + else { + int rv; - // once we've written everything, there's nothing left to do, so tell - // coord we're done and exit - thr_coordinator_complete(coord); + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + } } LOCAL_HELPER void -random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, - const stage_t* stage, queue_t* adata_q) +random_read_write_delete_async(struct async_data_s* adata) { - struct async_data_s* adata; + tdata_t* tdata = adata->tdata; + cdata_t* cdata = adata->cdata; + thr_coord_t* coord = adata->coord; + const stage_t* stage = adata->stage; struct timespec wake_time; uint64_t start_time; - uint32_t read_pct = _pct_to_fp(stage->workload.read_pct); uint32_t write_pct = _pct_to_fp(stage->workload.write_pct); // store the cumulative probability in write_pct write_pct = read_pct + write_pct; - // since this workload has no target number of transactions to be made, we - // are always ready to be reaped, and so we notify the coordinator that we - // are finished with our required tasks and can be stopped whenever - thr_coordinator_complete(coord); - - while (tdata->do_work) { - - adata = queue_pop_wait(adata_q); - + if (tdata->do_work) { clock_gettime(COORD_CLOCK, &wake_time); start_time = timespec_to_us(&wake_time); adata->start_time = start_time; @@ -1693,6 +1680,14 @@ random_read_write_delete_async(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coor timespec_add_us(&wake_time, pause_for); thr_coordinator_sleep(coord, &wake_time); } + else { + int rv; + + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + } } @@ -1728,58 +1723,116 @@ LOCAL_HELPER void do_async_workload(tdata_t* tdata, cdata_t* cdata, thr_coord_t* coord, stage_t* stage) { - struct async_data_s* adatas; uint32_t t_idx = tdata->t_idx; + + int rv; uint64_t n_adatas; - queue_t adata_q; - // thread 0 is designated to handle async calls, the rest can immediately - // terminate - if (t_idx != 0) { + // each worker thread takes a subrange of the total set + // of async commands to seed the event loops with + _calculate_sub_count(0, cdata->async_max_commands, t_idx, + cdata->transaction_worker_threads, &n_adatas); + + // each worker thread takes a subrange of the total keys + // to transact on, this is only used in linear async workloads + // like I and DB where the workloads stop after end_key + uint64_t start_key, end_key; + _calculate_subrange(stage->key_start, stage->key_end, t_idx, + cdata->transaction_worker_threads, &start_key, &end_key); + tdata->current_key = start_key; + tdata->end_key = end_key; + + blog_info("async thread: %" PRIu32 ", starting: %" PRIu64 " transactions\n", t_idx, n_adatas); + + struct async_data_s* adatas = + (struct async_data_s*) cf_calloc(sizeof(struct async_data_s), n_adatas); + + switch (stage->workload.type) { + case WORKLOAD_TYPE_RU: + case WORKLOAD_TYPE_RR: + case WORKLOAD_TYPE_RUF: + case WORKLOAD_TYPE_RUD: + // No target num txns - tell coord this is reapable. thr_coordinator_complete(coord); - return; + break; + default: + break; } - n_adatas = cdata->async_max_commands; - adatas = - (struct async_data_s*) cf_malloc(n_adatas * sizeof(struct async_data_s)); - - queue_init(&adata_q, n_adatas); for (uint32_t i = 0; i < n_adatas; i++) { struct async_data_s* adata = &adatas[i]; + adata->tdata = tdata; adata->cdata = cdata; + adata->coord = coord; adata->stage = stage; - adata->adata_q = &adata_q; - adata->ev_loop = NULL; + adata->ev_loop = as_event_loop_get_by_index(tdata->t_idx); - queue_push(&adata_q, adata); - } + if ((rv = pthread_mutex_init(&adata->done_lock, NULL)) != 0) { + blog_error("failed to initialize mutex - %d\n", rv); + exit(-1); + } - switch (stage->workload.type) { + if ((rv = pthread_mutex_lock(&adata->done_lock)) != 0) { + blog_error("failed to lock mutex - %d\n", rv); + exit(-1); + } + + switch (stage->workload.type) { case WORKLOAD_TYPE_I: - linear_writes_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = linear_writes_async; + linear_writes_async(adata); break; case WORKLOAD_TYPE_RU: case WORKLOAD_TYPE_RR: - random_read_write_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_async; + random_read_write_async(adata); break; case WORKLOAD_TYPE_RUF: - random_read_write_udf_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_udf_async; + random_read_write_udf_async(adata); break; case WORKLOAD_TYPE_D: - linear_deletes_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = linear_deletes_async; + linear_deletes_async(adata); break; case WORKLOAD_TYPE_RUD: - random_read_write_delete_async(tdata, cdata, coord, stage, &adata_q); + adata->workload_cb = random_read_write_delete_async; + random_read_write_delete_async(adata); break; + } } // wait for all the async calls to finish for (uint32_t i = 0; i < n_adatas; i++) { - queue_pop_wait(&adata_q); + struct async_data_s* adata = &adatas[i]; + + if ((rv = pthread_mutex_lock(&adata->done_lock)) != 0) { + blog_error("failed to lock mutex - %d\n", rv); + exit(-1); + } + + if ((rv = pthread_mutex_unlock(&adata->done_lock)) != 0) { + blog_error("failed to unlock mutex - %d\n", rv); + exit(-1); + } + + if ((rv = pthread_mutex_destroy(&adata->done_lock)) != 0) { + blog_error("failed to destroy mutex - %d\n", rv); + exit(-1); + } + } + + switch (stage->workload.type) { + case WORKLOAD_TYPE_I: + case WORKLOAD_TYPE_D: + // once we've written everything, there's nothing left to do, so tell + // coord we're done and exit + thr_coordinator_complete(coord); + break; + default: + break; } - queue_free(&adata_q); // free the async_data structs cf_free(adatas); @@ -1806,8 +1859,7 @@ init_stage(const cdata_t* cdata, tdata_t* tdata, stage_t* stage) // dyn_throttle uses a target delay between consecutive events, so // calculate the target delay given the requested transactions per // second and the number of concurrent transactions (i.e. num threads) - uint32_t n_threads = stage->async ? 1 : - cdata->transaction_worker_threads; + uint32_t n_threads = cdata->transaction_worker_threads; dyn_throttle_init(&tdata->dyn_throttle, (1000000.f * n_threads) / stage->tps); } diff --git a/src/test/integration/test_delete.py b/src/test/integration/test_delete.py index 89468283..3d186abe 100644 --- a/src/test/integration/test_delete.py +++ b/src/test/integration/test_delete.py @@ -47,15 +47,15 @@ def test_linear_delete_batch(): def test_linear_delete_async_batch(): # first fill up the database - lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "100", "--batch-size", "10", "--async"]) + lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "100", "--batch-size", "10", "--async", "--threads", "10"]) lib.check_for_range(0, 100) # then delete it all - lib.run_benchmark(["--workload", "DB", "--start-key", "0", "--keys", "100", "--batch-delete-size", "10", "--async"]) + lib.run_benchmark(["--workload", "DB", "--start-key", "0", "--keys", "100", "--batch-delete-size", "10", "--async", "--threads", "10"]) lib.check_for_range(0, 0) def test_linear_delete_subset_batch(): # first fill up the database - lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "1000", "--batch-size", "5", "--async"]) + lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "1000", "--batch-size", "5", "--async", "--threads", "10"]) lib.check_for_range(0, 1000) # then delete a subset of the database lib.run_benchmark(["--workload", "DB", "--start-key", "300", "--keys", "500", "--batch-delete-size", "5", "--threads", "2"], do_reset=False) @@ -65,10 +65,10 @@ def test_linear_delete_subset_batch(): def test_linear_delete_subset_async_batch(): # first fill up the database - lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "1000"]) + lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "1000", "--threads", "10"]) lib.check_for_range(0, 1000) # then delete a subset of the database - lib.run_benchmark(["--workload", "DB", "--start-key", "300", "--keys", "500", "--batch-delete-size", "50", "--async"], do_reset=False) + lib.run_benchmark(["--workload", "DB", "--start-key", "300", "--keys", "500", "--batch-delete-size", "50", "--async", "--threads", "10"], do_reset=False) lib.check_recs_exist_in_range(0, 300) lib.check_recs_exist_in_range(800, 1000) assert(len(lib.scan_records()) == 500) diff --git a/src/test/integration/test_throttling.py b/src/test/integration/test_throttling.py index 7d2f7773..25760480 100644 --- a/src/test/integration/test_throttling.py +++ b/src/test/integration/test_throttling.py @@ -15,7 +15,8 @@ def test_tps_simple_async(): # test throughput throttling with simple objects and one thread lib.run_benchmark(["--workload", "RU,0.0001", "--duration", "5", "--start-key", "0", "--keys", "1000000000", "-o", "I", - "--throughput", f"{DEFAULT_TPS}", "-z", "1", "--async"]) + "--throughput", f"{DEFAULT_TPS}", "-z", "1", "--async", + "--async-max-commands", "1"]) n_records = len(lib.scan_records()) assert(5*DEFAULT_TPS * .90 <= n_records <= 5*DEFAULT_TPS * 1.10) @@ -29,10 +30,11 @@ def test_tps_multithreaded(): assert(5*DEFAULT_TPS * .90 <= n_records <= 5*DEFAULT_TPS * 1.10) def test_tps_multithreaded_async(): - # test throughput throttling with simple objects and one thread + # test throughput throttling with simple objects and many threads lib.run_benchmark(["--workload", "RU,0.0001", "--duration", "5", "--start-key", "0", "--keys", "1000000000", "-o", "I", - "--throughput", f"{DEFAULT_TPS}", "-z", "16", "--async"]) + "--throughput", f"{DEFAULT_TPS}", "-z", "16", "--async", + "--async-max-commands", "16"]) n_records = len(lib.scan_records()) # there is much higher variance with multiple threads assert(5*DEFAULT_TPS * .90 <= n_records <= 5*DEFAULT_TPS * 1.10) @@ -49,7 +51,8 @@ def test_tps_read_write_async(): # test throughput throttling with simple objects and one thread lib.run_benchmark(["--workload", "RU,50", "--duration", "5", "--start-key", "0", "--keys", "1000000000", "-o", "I", - "--throughput", f"{DEFAULT_TPS}", "-z", "1", "--async"]) + "--throughput", f"{DEFAULT_TPS}", "-z", "1", "--async", + "--async-max-commands", "1"]) n_records = len(lib.scan_records()) assert(5*DEFAULT_TPS/2 * .90 <= n_records <= 5*DEFAULT_TPS/2 * 1.10) @@ -67,7 +70,8 @@ def test_tps_read_write_high_variance_async(): lib.run_benchmark(["--workload", "RU,50", "--duration", "5", "--start-key", "0", "--keys", "1000000000", "-o", "I,{500*S64:[10*I,B128]}", "--throughput", f"{DEFAULT_TPS/2}", - "-z", "1", "--read-bins", "1", "--async"]) + "-z", "1", "--read-bins", "1", "--async", + "--async-max-commands", "1"]) n_records = len(lib.scan_records()) assert(5*DEFAULT_TPS/4 * .80 <= n_records <= 5*DEFAULT_TPS/4 * 1.20) diff --git a/src/test/integration/test_write_bins.py b/src/test/integration/test_write_bins.py index 260c13b1..bf52e84c 100644 --- a/src/test/integration/test_write_bins.py +++ b/src/test/integration/test_write_bins.py @@ -112,7 +112,8 @@ def check_bins(b): lib.obj_spec_is_I3(b["testbin_3"]) lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "100", - "-o", "I1,I2,I3,I4", "--random", "--write-bins", "1,3", "--async", "--batch-write-size", "25"]) + "-o", "I1,I2,I3,I4", "--random", "--write-bins", "1,3", "--async", + "--batch-write-size", "25", "--threads", "4"]) lib.check_for_range(0, 100, lambda meta, key, bins: check_bins(bins)) def test_write_batch_random(): @@ -181,7 +182,7 @@ def check_bins_after(b): lib.obj_spec_is_I4(b["testbin_4"]) lib.run_benchmark(["--workload", "I", "--start-key", "0", "--keys", "100", - "-o", "I1,I2,I3,I4", "--random", "--async", "--batch-size", "100"]) + "-o", "I1,I2,I3,I4", "--random", "--async", "--batch-size", "10", "--threads", "2"]) lib.check_for_range(0, 100, lambda meta, key, bins: check_bins_before(bins)) lib.run_benchmark(["--workload", "DB", "--start-key", "0", "--keys", "100", "-o", "I1,I2,I3,I4", "--write-bins", "1,3", "--async"], do_reset=False) diff --git a/src/test/unit/setup.c b/src/test/unit/setup.c index 09d7bdea..92ec3c54 100644 --- a/src/test/unit/setup.c +++ b/src/test/unit/setup.c @@ -59,7 +59,6 @@ setup(void) args->durable_deletes = false; args->conn_pools_per_node = 1; args->async_max_commands = 50; - args->event_loop_capacity = 1; args->auth_mode = AS_AUTH_INTERNAL; data = cf_malloc(sizeof(cdata_t));