diff --git a/Makefile b/Makefile index eb98fd80..2d2dea65 100644 --- a/Makefile +++ b/Makefile @@ -483,6 +483,11 @@ $(BINDIR)/$(UNITDIR)/splinterdb_stress_test: $(COMMON_TESTOBJ) $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ $(LIBDIR)/libsplinterdb.so +$(BINDIR)/$(UNITDIR)/splinterdb_optimize_test: $(COMMON_TESTOBJ) \ + $(COMMON_UNIT_TESTOBJ) \ + $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ + $(LIBDIR)/libsplinterdb.so + $(BINDIR)/$(UNITDIR)/writable_buffer_test: $(COMMON_TESTOBJ) \ $(COMMON_UNIT_TESTOBJ) \ $(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \ @@ -541,6 +546,7 @@ unit/btree_stress_test: $(BINDIR)/$(UNITDIR)/btree_stress_test unit/splinter_test: $(BINDIR)/$(UNITDIR)/splinter_test unit/splinterdb_quick_test: $(BINDIR)/$(UNITDIR)/splinterdb_quick_test unit/splinterdb_stress_test: $(BINDIR)/$(UNITDIR)/splinterdb_stress_test +unit/splinterdb_optimize_test: $(BINDIR)/$(UNITDIR)/splinterdb_optimize_test unit/writable_buffer_test: $(BINDIR)/$(UNITDIR)/writable_buffer_test unit/config_parse_test: $(BINDIR)/$(UNITDIR)/config_parse_test unit/limitations_test: $(BINDIR)/$(UNITDIR)/limitations_test @@ -565,6 +571,9 @@ $(BINDIR)/$(EXAMPLES_DIR)/splinterdb_wide_values_example: $(OBJDIR)/$(EXAMPLES_D $(BINDIR)/$(EXAMPLES_DIR)/splinterdb_iterators_example: $(OBJDIR)/$(EXAMPLES_DIR)/splinterdb_iterators_example.o \ $(LIBDIR)/libsplinterdb.so +$(BINDIR)/$(EXAMPLES_DIR)/splinterdb_optimize_example: $(OBJDIR)/$(EXAMPLES_DIR)/splinterdb_optimize_example.o \ + $(LIBDIR)/libsplinterdb.so + $(BINDIR)/$(EXAMPLES_DIR)/splinterdb_custom_ipv4_addr_sortcmp_example: $(OBJDIR)/$(EXAMPLES_DIR)/splinterdb_custom_ipv4_addr_sortcmp_example.o \ $(LIBDIR)/libsplinterdb.so diff --git a/examples/splinterdb_optimize_example.c b/examples/splinterdb_optimize_example.c new file mode 100644 index 00000000..aadfab77 --- /dev/null +++ b/examples/splinterdb_optimize_example.c @@ -0,0 +1,223 @@ +// Copyright 2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/* + * Open an existing SplinterDB disk image and optimize a key range. + * + * This is intended as a small utility for preparing a database image before + * point/range-query benchmarking. + */ + +#include +#include +#include +#include +#include + +#include "splinterdb/default_data_config.h" +#include "splinterdb/splinterdb.h" + +#define DEFAULT_CACHE_SIZE_MIB 1024 +#define DEFAULT_NORMAL_BG_THREADS 2 +#define DEFAULT_MEMTABLE_BG_THREADS 1 +#define BYTES_PER_MIB (1024ULL * 1024ULL) +#define OPTIMIZE_HELP_REQUESTED (-1) + +typedef struct optimize_options { + const char *filename; + const char *min_key; + const char *max_key; + uint64 cache_size; + uint64 disk_size; + uint64 num_normal_bg_threads; + uint64 num_memtable_bg_threads; + _Bool use_direct_io; + _Bool full_leaf_compactions; +} optimize_options; + +static void +usage(const char *progname); + +static int +parse_uint64(const char *arg, uint64 *result); + +static int +parse_options(int argc, char **argv, optimize_options *opts); + +int +main(int argc, char **argv) +{ + optimize_options opts = { + .cache_size = DEFAULT_CACHE_SIZE_MIB * BYTES_PER_MIB, + .num_normal_bg_threads = DEFAULT_NORMAL_BG_THREADS, + .num_memtable_bg_threads = DEFAULT_MEMTABLE_BG_THREADS, + .full_leaf_compactions = TRUE, + }; + + int rc = parse_options(argc, argv, &opts); + if (rc == OPTIMIZE_HELP_REQUESTED) { + return 0; + } + if (rc != 0) { + return rc; + } + + data_config data_cfg; + default_data_config_init(&data_cfg); + + splinterdb_config splinterdb_cfg; + memset(&splinterdb_cfg, 0, sizeof(splinterdb_cfg)); + splinterdb_cfg.filename = opts.filename; + splinterdb_cfg.cache_size = opts.cache_size; + splinterdb_cfg.disk_size = opts.disk_size; + splinterdb_cfg.data_cfg = &data_cfg; + splinterdb_cfg.num_normal_bg_threads = opts.num_normal_bg_threads; + splinterdb_cfg.num_memtable_bg_threads = opts.num_memtable_bg_threads; + if (opts.use_direct_io) { + splinterdb_cfg.io_flags = O_RDWR | O_DIRECT; + } + + splinterdb *spl = NULL; + rc = splinterdb_open(&splinterdb_cfg, &spl); + if (rc != 0) { + fprintf(stderr, "splinterdb_open(%s) failed: %d\n", opts.filename, rc); + return rc; + } + + slice min_key = opts.min_key == NULL + ? NULL_SLICE + : slice_create(strlen(opts.min_key), opts.min_key); + slice max_key = opts.max_key == NULL + ? NULL_SLICE + : slice_create(strlen(opts.max_key), opts.max_key); + + splinterdb_notification notification; + splinterdb_notification_init_blocking(¬ification); + + printf("Optimizing %s in [%s, %s)...\n", + opts.filename, + opts.min_key == NULL ? "-infinity" : opts.min_key, + opts.max_key == NULL ? "+infinity" : opts.max_key); + + rc = splinterdb_optimize( + spl, min_key, max_key, opts.full_leaf_compactions, ¬ification); + splinterdb_notification_deinit(¬ification); + + if (rc == 0) { + printf("Optimize completed successfully.\n"); + } else { + fprintf(stderr, "splinterdb_optimize failed: %d\n", rc); + } + + splinterdb_close(&spl); + return rc; +} + +static void +usage(const char *progname) +{ + fprintf(stderr, + "Usage: %s [options] \n" + "\n" + "Options:\n" + " --min-key Inclusive lower bound\n" + " --max-key Exclusive upper bound\n" + " --cache-mib Cache size in MiB (default %u)\n" + " --disk-size-mib Check image size in MiB\n" + " --set-O_DIRECT Open the image with O_DIRECT\n" + " --no-full-leaf-compactions Skip full leaf compactions\n" + " --normal-bg-threads Normal task threads (default %u)\n" + " --memtable-bg-threads Memtable task threads (default %u)\n" + " --help Show this help\n", + progname, + DEFAULT_CACHE_SIZE_MIB, + DEFAULT_NORMAL_BG_THREADS, + DEFAULT_MEMTABLE_BG_THREADS); +} + +static int +parse_uint64(const char *arg, uint64 *result) +{ + char *end = NULL; + + errno = 0; + *result = strtoull(arg, &end, 10); + if (errno != 0 || end == arg || *end != '\0') { + return EINVAL; + } + + return 0; +} + +static int +parse_options(int argc, char **argv, optimize_options *opts) +{ + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "--help") == 0) { + usage(argv[0]); + return OPTIMIZE_HELP_REQUESTED; + } else if (strcmp(argv[i], "--min-key") == 0) { + if (++i == argc) { + usage(argv[0]); + return EINVAL; + } + opts->min_key = argv[i]; + } else if (strcmp(argv[i], "--max-key") == 0) { + if (++i == argc) { + usage(argv[0]); + return EINVAL; + } + opts->max_key = argv[i]; + } else if (strcmp(argv[i], "--cache-mib") == 0) { + uint64 mib; + if (++i == argc || parse_uint64(argv[i], &mib) != 0) { + usage(argv[0]); + return EINVAL; + } + opts->cache_size = mib * BYTES_PER_MIB; + } else if (strcmp(argv[i], "--disk-size-mib") == 0) { + uint64 mib; + if (++i == argc || parse_uint64(argv[i], &mib) != 0) { + usage(argv[0]); + return EINVAL; + } + opts->disk_size = mib * BYTES_PER_MIB; + } else if (strcmp(argv[i], "--set-O_DIRECT") == 0) { + opts->use_direct_io = TRUE; + } else if (strcmp(argv[i], "--no-full-leaf-compactions") == 0) { + opts->full_leaf_compactions = FALSE; + } else if (strcmp(argv[i], "--normal-bg-threads") == 0) { + if (++i == argc + || parse_uint64(argv[i], &opts->num_normal_bg_threads) != 0) + { + usage(argv[0]); + return EINVAL; + } + } else if (strcmp(argv[i], "--memtable-bg-threads") == 0) { + if (++i == argc + || parse_uint64(argv[i], &opts->num_memtable_bg_threads) != 0) + { + usage(argv[0]); + return EINVAL; + } + } else if (opts->filename == NULL) { + opts->filename = argv[i]; + } else { + usage(argv[0]); + return EINVAL; + } + } + + if (opts->filename == NULL) { + usage(argv[0]); + return EINVAL; + } + if (opts->num_normal_bg_threads == 0) { + fprintf(stderr, + "--normal-bg-threads must be greater than 0 for blocking " + "optimize completion.\n"); + return EINVAL; + } + + return 0; +} diff --git a/include/splinterdb/splinterdb.h b/include/splinterdb/splinterdb.h index c9830f3d..229cac44 100644 --- a/include/splinterdb/splinterdb.h +++ b/include/splinterdb/splinterdb.h @@ -42,7 +42,11 @@ typedef struct splinterdb_config { // required configuration const char *filename; uint64 cache_size; - uint64 disk_size; + + // Required for splinterdb_create(). For splinterdb_open(), zero means read + // the value from the on-disk superblock; nonzero values are checked against + // the superblock. + uint64 disk_size; // data_config is a required field that defines how your data should be // read and written in SplinterDB. See data.h for details. @@ -58,6 +62,8 @@ typedef struct splinterdb_config { uint64 shmem_size; _Bool use_shmem; // Default is FALSE. + // For splinterdb_open(), zero means read the value from the on-disk + // superblock; nonzero values are checked against the superblock. uint64 page_size; uint64 extent_size; @@ -160,7 +166,10 @@ typedef struct splinterdb splinterdb; int splinterdb_create(const splinterdb_config *cfg, splinterdb **kvs); -// Open an existing splinterdb from a file/device on disk +// Open an existing splinterdb from a file/device on disk. +// +// disk_size, page_size, and extent_size are read from the on-disk superblock +// when left unset. If they are set, they must match the superblock. // // The library will allocate and own the memory for splinterdb // and will free it on splinterdb_close(). @@ -178,6 +187,49 @@ void splinterdb_close(splinterdb **kvs); +//////////////////////////////////// +// Notifications +//////////////////////////////////// + +// Size of opaque data required to hold a notification. +#define SPLINTERDB_NOTIFICATION_BUFSIZE (32 * sizeof(void *)) + +typedef struct splinterdb_notification { + char opaque[SPLINTERDB_NOTIFICATION_BUFSIZE]; +} __attribute__((__aligned__(8))) splinterdb_notification; + +typedef void (*splinterdb_notification_callback)( + splinterdb_notification *notification); + +// The caller owns notification storage and must keep it alive until completion. +// After completion, call splinterdb_notification_deinit before reusing or +// destroying the notification. +void +splinterdb_notification_init_blocking(splinterdb_notification *notification); + +void +splinterdb_notification_init_polling(splinterdb_notification *notification, + void *user_data); + +void +splinterdb_notification_init_callback(splinterdb_notification *notification, + splinterdb_notification_callback callback, + void *user_data); + +void +splinterdb_notification_deinit(splinterdb_notification *notification); + +_Bool +splinterdb_notification_poll(const splinterdb_notification *notification, + int *status); + +int +splinterdb_notification_wait(splinterdb_notification *notification); + +void * +splinterdb_notification_user_data(const splinterdb_notification *notification); + + //////////////////////////////////// // Lookups //////////////////////////////////// @@ -290,6 +342,19 @@ splinterdb_update(splinterdb *kvsb, slice delta, splinterdb_lookup_result *old_result); +// Optimize already-incorporated trunk data in [min_key, max_key). A null +// min_key or max_key means the range is unbounded on that side. If +// full_leaf_compactions is true, enqueue full compactions for leaves in the +// range after flushing. Passing a NULL notification makes this fire-and-forget. +// Blocking notifications wait before this function returns; polling and +// callback notifications complete later. +int +splinterdb_optimize(splinterdb *kvs, + slice min_key, + slice max_key, + _Bool full_leaf_compactions, + splinterdb_notification *notification); + /* Iterator API (range query) diff --git a/src/allocator.h b/src/allocator.h index 40ea970a..eb83b631 100644 --- a/src/allocator.h +++ b/src/allocator.h @@ -87,6 +87,12 @@ typedef struct allocator_config { uint64 extent_mask; } allocator_config; +typedef struct disk_geometry { + uint64 disk_size; + uint64 page_size; + uint64 extent_size; +} disk_geometry; + /* *----------------------------------------------------------------------------- * allocator_config_init diff --git a/src/core.c b/src/core.c index 7a0011ea..cbfb0868 100644 --- a/src/core.c +++ b/src/core.c @@ -9,6 +9,7 @@ #include "core.h" #include "data_internal.h" +#include "notification.h" #include "platform_sleep.h" #include "platform_time.h" #include "platform_util.h" @@ -1613,6 +1614,30 @@ core_insert(core_handle *spl, return rc; } +platform_status +core_optimize(core_handle *spl, + key minkey, + key maxkey, + bool32 full_leaf_compactions, + splinterdb_notification *notification) +{ + if (key_is_null(minkey) || key_is_null(maxkey)) { + return STATUS_BAD_PARAM; + } + + int cmp = data_key_compare(spl->cfg.data_cfg, minkey, maxkey); + if (cmp > 0) { + return STATUS_BAD_PARAM; + } + if (cmp == 0) { + splinterdb_notification_complete(notification, STATUS_OK); + return STATUS_OK; + } + + return trunk_optimize( + &spl->trunk_context, minkey, maxkey, full_leaf_compactions, notification); +} + // If any change is made in here, please make similar change in // core_lookup_async platform_status diff --git a/src/core.h b/src/core.h index e44c5328..db90f982 100644 --- a/src/core.h +++ b/src/core.h @@ -158,6 +158,13 @@ core_insert(core_handle *spl, message data, lookup_result *old_result); +platform_status +core_optimize(core_handle *spl, + key minkey, + key maxkey, + bool32 full_leaf_compactions, + struct splinterdb_notification *notification); + platform_status core_lookup(core_handle *spl, key target, lookup_result *result); diff --git a/src/notification.c b/src/notification.c new file mode 100644 index 00000000..cb044697 --- /dev/null +++ b/src/notification.c @@ -0,0 +1,198 @@ +// Copyright 2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/* + *------------------------------------------------------------------------------ + * notification.c -- + * + * Public SplinterDB notification API and internal completion helpers. + *------------------------------------------------------------------------------ + */ + +#include "notification.h" +#include "platform_assert.h" +#include "platform_condvar.h" +#include "platform_heap.h" +#include "poison.h" + +typedef enum notification_mode { + NOTIFICATION_MODE_BLOCKING, + NOTIFICATION_MODE_POLLING, + NOTIFICATION_MODE_CALLBACK, +} notification_mode; + +typedef struct notification { + notification_mode mode; + bool32 complete; + platform_status status; + void *user_data; + splinterdb_notification_callback callback; + platform_condvar cv; +} notification; + +_Static_assert(sizeof(notification) <= sizeof(splinterdb_notification), + "sizeof(splinterdb_notification) is too small"); + +_Static_assert(__alignof__(splinterdb_notification) + == __alignof__(notification), + "mismatched alignment for splinterdb_notification"); + +static inline notification * +notification_from_splinterdb(splinterdb_notification *note) +{ + return (notification *)note; +} + +static inline const notification * +notification_from_const_splinterdb(const splinterdb_notification *note) +{ + return (const notification *)note; +} + +static inline int +platform_status_to_int(const platform_status status) +{ + return status.r; +} + +static void +splinterdb_notification_init_common(splinterdb_notification *note, + notification_mode mode, + splinterdb_notification_callback callback, + void *user_data) +{ + notification *n = notification_from_splinterdb(note); + + n->mode = mode; + n->complete = FALSE; + n->status = STATUS_OK; + n->user_data = user_data; + n->callback = callback; + + platform_status rc = platform_condvar_init(&n->cv, PROCESS_PRIVATE_HEAP_ID); + platform_assert_status_ok(rc); +} + +void +splinterdb_notification_init_blocking(splinterdb_notification *note) +{ + splinterdb_notification_init_common( + note, NOTIFICATION_MODE_BLOCKING, NULL, NULL); +} + +void +splinterdb_notification_init_polling(splinterdb_notification *note, + void *user_data) +{ + splinterdb_notification_init_common( + note, NOTIFICATION_MODE_POLLING, NULL, user_data); +} + +void +splinterdb_notification_init_callback(splinterdb_notification *note, + splinterdb_notification_callback callback, + void *user_data) +{ + splinterdb_notification_init_common( + note, NOTIFICATION_MODE_CALLBACK, callback, user_data); +} + +void +splinterdb_notification_deinit(splinterdb_notification *note) +{ + notification *n = notification_from_splinterdb(note); + + platform_condvar_destroy(&n->cv); +} + +_Bool +splinterdb_notification_poll(const splinterdb_notification *note, int *status) +{ + notification *n = + notification_from_splinterdb((splinterdb_notification *)note); + + platform_status rc = platform_condvar_lock(&n->cv); + platform_assert_status_ok(rc); + + bool32 complete = n->complete; + if (status != NULL) { + *status = platform_status_to_int(n->status); + } + + rc = platform_condvar_unlock(&n->cv); + platform_assert_status_ok(rc); + + return complete; +} + +int +splinterdb_notification_wait(splinterdb_notification *note) +{ + notification *n = notification_from_splinterdb(note); + + platform_status rc = platform_condvar_lock(&n->cv); + platform_assert_status_ok(rc); + + while (!n->complete) { + rc = platform_condvar_wait(&n->cv); + platform_assert_status_ok(rc); + } + + int status = platform_status_to_int(n->status); + + rc = platform_condvar_unlock(&n->cv); + platform_assert_status_ok(rc); + + return status; +} + +void * +splinterdb_notification_user_data(const splinterdb_notification *note) +{ + const notification *n = notification_from_const_splinterdb(note); + + return n->user_data; +} + +bool32 +splinterdb_notification_is_blocking(splinterdb_notification *note) +{ + if (note == NULL) { + return FALSE; + } + + notification *n = notification_from_splinterdb(note); + + return n->mode == NOTIFICATION_MODE_BLOCKING; +} + +void +splinterdb_notification_complete(splinterdb_notification *note, + platform_status status) +{ + if (note == NULL) { + return; + } + + notification *n = notification_from_splinterdb(note); + + platform_status rc = platform_condvar_lock(&n->cv); + platform_assert_status_ok(rc); + + platform_assert(!n->complete); + n->status = status; + n->complete = TRUE; + + splinterdb_notification_callback callback = + n->mode == NOTIFICATION_MODE_CALLBACK ? n->callback : NULL; + + rc = platform_condvar_broadcast(&n->cv); + platform_assert_status_ok(rc); + + rc = platform_condvar_unlock(&n->cv); + platform_assert_status_ok(rc); + + if (callback != NULL) { + callback(note); + } +} diff --git a/src/notification.h b/src/notification.h new file mode 100644 index 00000000..d0509100 --- /dev/null +++ b/src/notification.h @@ -0,0 +1,14 @@ +// Copyright 2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "splinterdb/splinterdb.h" +#include "platform_status.h" + +bool32 +splinterdb_notification_is_blocking(splinterdb_notification *note); + +void +splinterdb_notification_complete(splinterdb_notification *note, + platform_status status); diff --git a/src/platform_linux/platform_io.c b/src/platform_linux/platform_io.c index e0da5507..42e93026 100644 --- a/src/platform_linux/platform_io.c +++ b/src/platform_linux/platform_io.c @@ -3,6 +3,15 @@ #include "platform_io.h" #include "laio.h" +#include "platform_log.h" +#include +#include +#include +#if defined(__has_feature) +# if __has_feature(memory_sanitizer) +# include +# endif +#endif io_handle * io_handle_create(io_config *cfg, platform_heap_id hid) @@ -21,3 +30,75 @@ io_config_valid(io_config *cfg) { return laio_config_valid(cfg); } + +platform_status +io_read_bootstrap(const char *filename, void *buf, uint64 bytes, uint64 addr) +{ + if (filename == NULL || buf == NULL) { + return STATUS_BAD_PARAM; + } + + if (bytes == 0) { + return STATUS_OK; + } + + int open_flags = O_RDONLY; +#ifdef O_CLOEXEC + open_flags |= O_CLOEXEC; +#endif +#ifdef O_LARGEFILE + open_flags |= O_LARGEFILE; +#endif + + int fd = open(filename, open_flags); + if (fd == -1) { + int saved_errno = errno; + platform_error_log( + "open() '%s' failed: %s\n", filename, strerror(saved_errno)); + return CONST_STATUS(saved_errno); + } + + uint64 bytes_read = 0; + while (bytes_read < bytes) { + ssize_t ret = pread( + fd, (char *)buf + bytes_read, bytes - bytes_read, addr + bytes_read); + if (ret == -1) { + if (errno == EINTR) { + continue; + } + int saved_errno = errno; + platform_error_log( + "io_read_bootstrap: pread failed for addr %lu, bytes %lu, " + "ret %ld: %s\n", + addr + bytes_read, + bytes - bytes_read, + (long)ret, + strerror(saved_errno)); + close(fd); + return CONST_STATUS(saved_errno); + } + if (ret == 0) { + platform_error_log( + "io_read_bootstrap: short read for addr %lu, bytes %lu\n", + addr + bytes_read, + bytes - bytes_read); + close(fd); + return STATUS_IO_ERROR; + } +#if defined(__has_feature) +# if __has_feature(memory_sanitizer) + __msan_unpoison((char *)buf + bytes_read, ret); +# endif +#endif + bytes_read += ret; + } + + if (close(fd) != 0) { + int saved_errno = errno; + platform_error_log( + "close() '%s' failed: %s\n", filename, strerror(saved_errno)); + return CONST_STATUS(saved_errno); + } + + return STATUS_OK; +} diff --git a/src/platform_linux/platform_io.h b/src/platform_linux/platform_io.h index 7c9ca592..1d96bf1a 100644 --- a/src/platform_linux/platform_io.h +++ b/src/platform_linux/platform_io.h @@ -281,6 +281,9 @@ io_config_init(io_config *io_cfg, platform_status io_config_valid(io_config *cfg); +platform_status +io_read_bootstrap(const char *filename, void *buf, uint64 bytes, uint64 addr); + io_handle * io_handle_create(io_config *cfg, platform_heap_id hid); diff --git a/src/rc_allocator.c b/src/rc_allocator.c index a9b97e95..70257dc5 100644 --- a/src/rc_allocator.c +++ b/src/rc_allocator.c @@ -223,6 +223,63 @@ rc_allocator_extent_number(rc_allocator *al, uint64 addr) return (addr / al->cfg->io_cfg->extent_size); } +static checksum128 +rc_allocator_meta_page_checksum(const rc_allocator_meta_page *meta_page) +{ + return platform_checksum128(meta_page, + offsetof(rc_allocator_meta_page, checksum), + RC_ALLOCATOR_META_PAGE_CSUM_SEED); +} + +static disk_geometry +rc_allocator_config_get_disk_geometry(allocator_config *cfg) +{ + return (disk_geometry){ + .disk_size = cfg->capacity, + .page_size = cfg->io_cfg->page_size, + .extent_size = cfg->io_cfg->extent_size, + }; +} + +static platform_status +rc_allocator_validate_disk_geometry(rc_allocator *al) +{ + disk_geometry geometry = al->meta_page->geometry; + + return rc_allocator_disk_geometry_matches_config(&geometry, al->cfg); +} + +platform_status +rc_allocator_disk_geometry_matches_config(const disk_geometry *geometry, + const allocator_config *cfg) +{ + if (geometry->disk_size != cfg->capacity + || geometry->page_size != cfg->io_cfg->page_size + || geometry->extent_size != cfg->io_cfg->extent_size) + { + platform_error_log( + "SplinterDB disk geometry does not match configuration: " + "disk=(disk_size=%lu, page_size=%lu, extent_size=%lu), " + "config=(disk_size=%lu, page_size=%lu, extent_size=%lu)\n", + geometry->disk_size, + geometry->page_size, + geometry->extent_size, + cfg->capacity, + cfg->io_cfg->page_size, + cfg->io_cfg->extent_size); + return STATUS_BAD_PARAM; + } + + return STATUS_OK; +} + +platform_status +rc_allocator_read_disk_geometry(const char *filename, disk_geometry *geometry) +{ + return io_read_bootstrap( + filename, geometry, sizeof(*geometry), RC_ALLOCATOR_BASE_OFFSET); +} + static platform_status rc_allocator_init_meta_page(rc_allocator *al) { @@ -252,6 +309,7 @@ rc_allocator_init_meta_page(rc_allocator *al) memset(al->meta_page->splinters, INVALID_ALLOCATOR_ROOT_ID, sizeof(al->meta_page->splinters)); + al->meta_page->geometry = rc_allocator_config_get_disk_geometry(al->cfg); return STATUS_OK; } @@ -449,19 +507,39 @@ rc_allocator_mount(rc_allocator *al, // load the meta page from disk. status = io_read( io, al->meta_page, al->cfg->io_cfg->page_size, RC_ALLOCATOR_BASE_OFFSET); - platform_assert_status_ok(status); + if (!SUCCESS(status)) { + platform_free(al->heap_id, al->meta_page); + platform_buffer_deinit(&al->bh); + platform_mutex_destroy(&al->lock); + return status; + } + + status = rc_allocator_validate_disk_geometry(al); + if (!SUCCESS(status)) { + platform_free(al->heap_id, al->meta_page); + platform_buffer_deinit(&al->bh); + platform_mutex_destroy(&al->lock); + return status; + } + // validate the checksum of the meta page. - checksum128 currChecksum = - platform_checksum128(al->meta_page, - sizeof(al->meta_page->splinters), - RC_ALLOCATOR_META_PAGE_CSUM_SEED); + checksum128 currChecksum = rc_allocator_meta_page_checksum(al->meta_page); if (!platform_checksum_is_equal(al->meta_page->checksum, currChecksum)) { - platform_assert(0, "Corrupt Meta Page upon mount"); + platform_error_log("Corrupt SplinterDB allocator meta page on mount\n"); + platform_free(al->heap_id, al->meta_page); + platform_buffer_deinit(&al->bh); + platform_mutex_destroy(&al->lock); + return STATUS_BAD_PARAM; } // load the ref counts from disk. status = io_read(io, al->ref_count, buffer_size, cfg->io_cfg->extent_size); - platform_assert_status_ok(status); + if (!SUCCESS(status)) { + platform_free(al->heap_id, al->meta_page); + platform_buffer_deinit(&al->bh); + platform_mutex_destroy(&al->lock); + return status; + } for (uint64 i = 0; i < al->cfg->extent_capacity; i++) { if (al->ref_count[i] != 0) { @@ -609,9 +687,7 @@ rc_allocator_alloc_super_addr(rc_allocator *al, al->meta_page->splinters[idx] = allocator_root_id; *addr = (1 + idx) * al->cfg->io_cfg->page_size; al->meta_page->checksum = - platform_checksum128(al->meta_page, - sizeof(al->meta_page->splinters), - RC_ALLOCATOR_META_PAGE_CSUM_SEED); + rc_allocator_meta_page_checksum(al->meta_page); platform_status io_status = io_write(al->io, al->meta_page, al->cfg->io_cfg->page_size, @@ -640,9 +716,7 @@ rc_allocator_remove_super_addr(rc_allocator *al, if (al->meta_page->splinters[idx] == allocator_root_id) { al->meta_page->splinters[idx] = INVALID_ALLOCATOR_ROOT_ID; al->meta_page->checksum = - platform_checksum128(al->meta_page, - sizeof(al->meta_page->splinters), - RC_ALLOCATOR_META_PAGE_CSUM_SEED); + rc_allocator_meta_page_checksum(al->meta_page); platform_status status = io_write(al->io, al->meta_page, al->cfg->io_cfg->page_size, diff --git a/src/rc_allocator.h b/src/rc_allocator.h index c41c1471..06ed7256 100644 --- a/src/rc_allocator.h +++ b/src/rc_allocator.h @@ -27,18 +27,21 @@ *---------------------------------------------------------------------- * rc_allocator_meta_page -- Disk-resident structure. * - * An on disk structure to hold the super block information about all the - * Splinter tables using this allocator. This is persisted at - * offset 0 of the device. + * An on disk structure to hold the bootstrap disk geometry and the super block + * addresses for all Splinter tables using this allocator. The geometry lives at + * offset 0 so open can read it before mounting the rest of SplinterDB. *---------------------------------------------------------------------- */ typedef struct ONDISK rc_allocator_meta_page { + disk_geometry geometry; allocator_root_id splinters[RC_ALLOCATOR_MAX_ROOT_IDS]; checksum128 checksum; } rc_allocator_meta_page; -_Static_assert(offsetof(rc_allocator_meta_page, splinters) == 0, - "splinters array should be first field in meta_page struct"); +_Static_assert(offsetof(rc_allocator_meta_page, geometry) == 0, + "disk geometry should be first field in meta_page struct"); +_Static_assert(sizeof(rc_allocator_meta_page) <= IO_DEFAULT_PAGE_SIZE, + "allocator meta page must fit in the default page size"); /* *---------------------------------------------------------------------- @@ -94,5 +97,12 @@ rc_allocator_mount(rc_allocator *al, platform_heap_id hid, platform_module_id mid); +platform_status +rc_allocator_read_disk_geometry(const char *filename, disk_geometry *geometry); + +platform_status +rc_allocator_disk_geometry_matches_config(const disk_geometry *geometry, + const allocator_config *cfg); + void rc_allocator_unmount(rc_allocator *al); diff --git a/src/splinterdb.c b/src/splinterdb.c index 45b70616..581fedba 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -19,6 +19,7 @@ #include "rc_allocator.h" #include "core.h" #include "lookup_result.h" +#include "notification.h" #include "shard_log.h" #include "splinterdb_tests_private.h" #include "platform_typed_alloc.h" @@ -158,7 +159,8 @@ splinterdb_validate_app_data_config(const data_config *cfg) */ static platform_status splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN - splinterdb *kvs // OUT + const disk_geometry *geometry, + splinterdb *kvs // OUT ) { platform_status rc = STATUS_OK; @@ -170,10 +172,10 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN kvs->data_cfg = kvs_cfg->data_cfg; if (kvs_cfg->filename == NULL || kvs_cfg->cache_size == 0 - || kvs_cfg->disk_size == 0) + || (geometry == NULL && kvs_cfg->disk_size == 0)) { - platform_error_log( - "Expect filename, cache_size and disk_size to be set.\n"); + platform_error_log("Expect filename and cache_size to be set; disk_size " + "is required when creating.\n"); return STATUS_BAD_PARAM; } @@ -181,6 +183,14 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN splinterdb_config cfg = {0}; memcpy(&cfg, kvs_cfg, sizeof(cfg)); splinterdb_config_set_defaults(&cfg); + if (geometry != NULL) { + cfg.disk_size = + kvs_cfg->disk_size != 0 ? kvs_cfg->disk_size : geometry->disk_size; + cfg.page_size = + kvs_cfg->page_size != 0 ? kvs_cfg->page_size : geometry->page_size; + cfg.extent_size = kvs_cfg->extent_size != 0 ? kvs_cfg->extent_size + : geometry->extent_size; + } io_config_init(&kvs->io_cfg, cfg.page_size, @@ -197,6 +207,13 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN } allocator_config_init(&kvs->allocator_cfg, &kvs->io_cfg, cfg.disk_size); + if (geometry != NULL) { + rc = rc_allocator_disk_geometry_matches_config(geometry, + &kvs->allocator_cfg); + if (!SUCCESS(rc)) { + return rc; + } + } clockcache_config_init(&kvs->cache_cfg, &kvs->io_cfg, @@ -252,6 +269,34 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN return STATUS_OK; } +static platform_status +splinterdb_config_read_disk_geometry(const char *filename, + disk_geometry *geometry) +{ + if (filename == NULL) { + platform_error_log("Expect filename to be set.\n"); + return STATUS_BAD_PARAM; + } + + platform_status status = rc_allocator_read_disk_geometry(filename, geometry); + if (!SUCCESS(status)) { + return status; + } + + if (geometry->disk_size == 0 || geometry->page_size == 0 + || geometry->extent_size == 0) + { + platform_error_log("Invalid SplinterDB disk geometry: " + "disk_size=%lu, page_size=%lu, extent_size=%lu\n", + geometry->disk_size, + geometry->page_size, + geometry->extent_size); + return STATUS_BAD_PARAM; + } + + return STATUS_OK; +} + /* * Internal function for create or open @@ -265,8 +310,10 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN splinterdb *kvs = NULL; platform_status status; - bool we_created_heap = FALSE; - platform_heap_id use_this_heap_id = kvs_cfg->heap_id; + bool we_created_heap = FALSE; + platform_heap_id use_this_heap_id = kvs_cfg->heap_id; + disk_geometry geometry; + const disk_geometry *config_geometry = NULL; status = platform_ensure_thread_registered(); if (!SUCCESS(status)) { @@ -296,6 +343,19 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN we_created_heap = TRUE; } + if (open_existing) { + status = + splinterdb_config_read_disk_geometry(kvs_cfg->filename, &geometry); + if (!SUCCESS(status)) { + platform_error_log("Failed to read SplinterDB disk geometry from " + "'%s': %s\n", + kvs_cfg->filename, + platform_status_to_string(status)); + goto deinit_kvhandle; + } + config_geometry = &geometry; + } + platform_assert(kvs_out != NULL); kvs = TYPED_ZALLOC(use_this_heap_id, kvs); @@ -308,7 +368,7 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN // All memory allocation after this call should -ONLY- use heap handles // from the handle to the running Splinter instance; i.e. 'kvs'. - status = splinterdb_init_config(kvs_cfg, kvs); + status = splinterdb_init_config(kvs_cfg, config_geometry, kvs); if (!SUCCESS(status)) { platform_error_log("Failed to %s SplinterDB device '%s' with specified " "configuration: %s\n", @@ -655,6 +715,40 @@ splinterdb_update(splinterdb *kvsb, return splinterdb_insert_message(kvsb, user_key, msg, _old_result); } +int +splinterdb_optimize(splinterdb *kvs, + slice user_min_key, + slice user_max_key, + _Bool full_leaf_compactions, + splinterdb_notification *notification) +{ + int rc = splinterdb_ensure_thread_registered(); + if (rc != 0) { + return rc; + } + + platform_assert(kvs != NULL); + + key min_key = slice_is_null(user_min_key) + ? NEGATIVE_INFINITY_KEY + : key_create_from_slice(TRUE, user_min_key); + key max_key = slice_is_null(user_max_key) + ? POSITIVE_INFINITY_KEY + : key_create_from_slice(TRUE, user_max_key); + + platform_status status = core_optimize( + &kvs->spl, min_key, max_key, full_leaf_compactions, notification); + if (!SUCCESS(status)) { + return platform_status_to_int(status); + } + + if (splinterdb_notification_is_blocking(notification)) { + return splinterdb_notification_wait(notification); + } + + return 0; +} + struct splinterdb_iterator { core_range_iterator sri; platform_status last_rc; diff --git a/src/task.c b/src/task.c index 965552f7..97d17ca8 100644 --- a/src/task.c +++ b/src/task.c @@ -14,6 +14,101 @@ const char *task_type_name[] = {"TASK_TYPE_INVALID", _Static_assert((ARRAY_SIZE(task_type_name) == NUM_TASK_TYPES), "Array task_type_name[] is incorrectly sized."); +/**************************************** + * Task trackers + ****************************************/ + +void +task_tracker_init(task_tracker *tracker, + task_tracker_callback callback, + void *user_data) +{ + tracker->outstanding = 1; + tracker->failed = FALSE; + tracker->status = STATUS_OK; + tracker->callback = callback; + tracker->user_data = user_data; + tracker->next = NULL; +} + +void +task_tracker_list_init(task_tracker_list *list) +{ + list->head = NULL; +} + +void +task_tracker_add(task_tracker *tracker) +{ + if (tracker != NULL) { + __sync_fetch_and_add(&tracker->outstanding, 1); + } +} + +static uint64 +tracker_done_common(task_tracker *tracker, platform_status status) +{ + platform_assert(tracker != NULL); + + if (!SUCCESS(status) + && __sync_bool_compare_and_swap(&tracker->failed, FALSE, TRUE)) + { + tracker->status = status; + } + + uint64 old_outstanding = __sync_fetch_and_sub(&tracker->outstanding, 1); + platform_assert(0 < old_outstanding); + + return old_outstanding; +} + +void +task_tracker_done(task_tracker *tracker, + platform_status status, + task_tracker_list *completed) +{ + if (tracker == NULL) { + return; + } + + uint64 old_outstanding = tracker_done_common(tracker, status); + + if (old_outstanding == 1 && tracker->callback != NULL) { + platform_assert(completed != NULL); + tracker->next = completed->head; + completed->head = tracker; + } +} + +/* Use this in cases where you want to document that this finish site should + * never call the callback, e.g. in error paths inside locks. */ +void +task_tracker_done_but_not_last(task_tracker *tracker, platform_status status) +{ + if (tracker == NULL) { + return; + } + + uint64 old_outstanding = tracker_done_common(tracker, status); + platform_assert(1 < old_outstanding); +} + +void +task_tracker_notify_all(task_tracker_list *completed) +{ + while (completed->head != NULL) { + task_tracker *tracker = completed->head; + task_tracker_callback callback = tracker->callback; + completed->head = tracker->next; + tracker->next = NULL; + + if (callback != NULL) { + callback(tracker); + } + } +} + + /**************************************** * Background task management ****************************************/ diff --git a/src/task.h b/src/task.h index 69cf57a5..df1e463e 100644 --- a/src/task.h +++ b/src/task.h @@ -48,6 +48,59 @@ struct task { timestamp enqueue_time; }; +typedef struct task_tracker task_tracker; +typedef struct task_tracker_list task_tracker_list; + +typedef void (*task_tracker_callback)(task_tracker *tracker); + +/* + * Tracks a dynamically growing set of tasks. The tracker starts with one + * outstanding reference owned by the launcher. Call task_tracker_add() before + * publishing each new unit of work, including follow-up work published by a + * tracked task. Each unit must call task_tracker_done() exactly once. + * + * The final caller of task_tracker_done() links the tracker onto the supplied + * completion list. Call task_tracker_notify_all() after dropping any locks + * that should not be held while invoking callbacks. The callback owns the + * lifetime of the tracker and may free an enclosing object. + * + * The completion list is caller-owned. It is intended to be a private local + * accumulator for one thread of execution, not a shared multi-producer queue. + */ +struct task_tracker { + volatile uint64 outstanding; + volatile bool32 failed; + platform_status status; + task_tracker_callback callback; + void *user_data; + task_tracker *next; +}; + +struct task_tracker_list { + task_tracker *head; +}; + +void +task_tracker_init(task_tracker *tracker, + task_tracker_callback callback, + void *user_data); + +void +task_tracker_list_init(task_tracker_list *list); + +void +task_tracker_add(task_tracker *tracker); + +void +task_tracker_done(task_tracker *tracker, + platform_status status, + task_tracker_list *completed); +void +task_tracker_done_but_not_last(task_tracker *tracker, platform_status status); + +void +task_tracker_notify_all(task_tracker_list *completed); + /* * Run-time task-specific execution metrics structure. */ diff --git a/src/trunk.c b/src/trunk.c index 089eee17..a723a59c 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -18,6 +18,7 @@ #include "merge.h" #include "data_internal.h" #include "task.h" +#include "notification.h" #include "poison.h" typedef VECTOR(routing_filter) routing_filter_vector; @@ -61,52 +62,118 @@ typedef struct ONDISK trunk_ondisk_node { uint32 pivot_offsets[]; } trunk_ondisk_node; -typedef enum bundle_compaction_state { - BUNDLE_COMPACTION_NOT_STARTED = 0, - BUNDLE_COMPACTION_IN_PROGRESS = 1, - BUNDLE_COMPACTION_MIN_ENDED = 2, - BUNDLE_COMPACTION_FAILED = 2, - BUNDLE_COMPACTION_ABORTED = 3, - BUNDLE_COMPACTION_SUCCEEDED = 4 -} bundle_compaction_state; +typedef enum bundle_compaction_phase { + BUNDLE_COMPACTION_IN_PROGRESS = 0, + // Terminal but not consumable by maplet compaction. See output.rc. + BUNDLE_COMPACTION_ENDED = 1, + BUNDLE_COMPACTION_SUCCEEDED = 2 +} bundle_compaction_phase; typedef VECTOR(trunk_branch_info) trunk_branch_info_vector; typedef struct bundle_compaction { struct bundle_compaction *next; - task tsk; - trunk_pivot_state *pivot_state; - uint64 num_bundles; - trunk_pivot_stats input_stats; - bundle_compaction_state state; - trunk_branch_info_vector input_branches; - merge_behavior merge_mode; - branch_ref output_branch; - trunk_pivot_stats output_stats; - uint32 *fingerprints; - uint64 compaction_time_ns; + task tsk; // bundle_comaction_task + task_tracker *tracker; + + // Immutable after creation. + struct { + trunk_pivot_state *pivot_state; + uint64 num_bundles; + trunk_pivot_stats stats; + trunk_branch_info_vector branches; + merge_behavior merge_mode; + } input; + + // Written by bundle_compaction_task before terminal publication. + struct { + // For ENDED compactions, rc is the local compaction error. If the + // compaction ended because the pivot was abandoned, rc remains STATUS_OK + // and the pivot/maplet compaction status determines notification status. + platform_status rc; + branch_ref branch; + trunk_pivot_stats stats; + uint32 *fingerprints; + uint64 time_ns; + } output; + + // IN_PROGRESS covers both queued and actively running compaction tasks. + // + // bundle_compaction_task must not access a bundle_compaction after + // transitioning it to a terminal phase, except possibly to garbage-collect + // the pivot_state and its bundle_compactions as part of releasing the + // pivot_state. + // + // When a bundle_compaction becomes both (1) SUCCEEDED, and (2) the front of + // its pivot_state's compactions list, then we must ensure that there exists + // a maplet compaction that will consume it. There are two ways that a + // bundle_compaction could enter this state: + // * It is the head of the list and bundle_compaction_task transitions it to + // SUCCEEDED. In this case, bundle_compaction_task must launch the + // maplet compaction. + // * It is SUCCEEDED and then maplet_compaction_task removes preceding + // compactions so that this one becomes the head. In that case, the + // maplet_compaction_task should enqueue another maplet_compaction or + // perform the maplet_compaction for this bundle_compaction. + // Thus transitions to SUCCEEDED, and mutations of the pivot_state's + // compactions list, must both be done under the pivot_state->lock. + bundle_compaction_phase phase; } bundle_compaction; typedef struct trunk_context trunk_context; struct trunk_pivot_state { struct trunk_pivot_state *next; - task tsk; - uint64 refcount; - bool32 maplet_compaction_initiated; - bool32 abandoned; + task tsk; // maplet_compaction_task trunk_context *context; key_buffer key; key_buffer ubkey; uint64 height; - routing_filter maplet; - uint64 num_branches; - bool32 maplet_compaction_failed; - uint64 total_bundles; - platform_spinlock compactions_lock; - bundle_compaction *bundle_compactions; + + uint64 refcount; + + // lock covers all fields in protected. + platform_spinlock lock; + struct { + // Abandonment may occur because of tree structure changes that make these + // compactions inapplicable to the tree, or because of a maplet compaction + // failure that blocks all subsequent maplet compactions of this pivot, + // which means we cannot use the results of any of the remaining + // bundle_compactions. (A failed bundle_compaction blocks subsequent + // bundle_compactions, but not logically earlier bundle_compactions, so it + // is not cause to immediately abandon the pivot_state.) + bool32 abandoned; + + platform_status maplet_compaction_rc; + routing_filter maplet; + uint64 num_branches; + uint64 total_bundles; + + // Once a maplet_compaction_task removes some prefix of bundle_compactions + // from this list so that the new head of the list is not in a >= + // MIN_ENDED state, it must not touch the pivot_state again after + // releasing lock, because the bundle_compaction_task of the new head of + // this list could launch another maplet_compaction_task. + bundle_compaction *bundle_compactions; + } protected; }; +typedef enum trunk_flush_policy_type { + TRUNK_FLUSH_POLICY_STANDARD, + TRUNK_FLUSH_POLICY_RANGE, +} trunk_flush_policy_type; + +typedef struct trunk_flush_policy { + trunk_flush_policy_type type; + bool32 full_leaf_compactions; + union { + struct { + key minkey; + key maxkey; + } range; + } u; +} trunk_flush_policy; + struct pending_gc { pending_gc *next; uint64 addr; @@ -2600,6 +2667,45 @@ trunk_apply_changes(trunk_context *context, uint64 bc_incs = 0; uint64 bc_decs = 0; +static bool32 +bundle_compaction_terminal(const bundle_compaction *compaction) +{ + return compaction->phase != BUNDLE_COMPACTION_IN_PROGRESS; +} + +static bool32 +bundle_compaction_succeeded(const bundle_compaction *compaction) +{ + return compaction->phase == BUNDLE_COMPACTION_SUCCEEDED; +} + +static bool32 +bundle_compaction_consumable(const bundle_compaction *compaction) +{ + return bundle_compaction_succeeded(compaction); +} + +static platform_status +bundle_compaction_notify_status(const bundle_compaction *compaction, + platform_status maplet_compaction_rc) +{ + return !SUCCESS(compaction->output.rc) ? compaction->output.rc + : maplet_compaction_rc; +} + +static void +bundle_compaction_publish_terminal(bundle_compaction *compaction, + bundle_compaction_phase phase, + platform_status rc) +{ + platform_assert(phase == BUNDLE_COMPACTION_ENDED + || phase == BUNDLE_COMPACTION_SUCCEEDED); + + compaction->output.rc = rc; + platform_assert(__sync_bool_compare_and_swap( + &compaction->phase, BUNDLE_COMPACTION_IN_PROGRESS, phase)); +} + static void bundle_compaction_print_table_header(platform_log_handle *log, int indent) { @@ -2610,7 +2716,7 @@ bundle_compaction_print_table_header(platform_log_handle *log, int indent) "nbundles", "in_tuples", "in_kvbytes", - "state", + "phase", "out_branch", "out_tuples", "out_kvbytes", @@ -2626,55 +2732,381 @@ bundle_compaction_print_table_entry(const bundle_compaction *bc, "%*s%10lu %12lu %12lu %5d %12lu %12lu %12lu %18p ", indent, "", - bc->num_bundles, - bc->input_stats.num_tuples, - bc->input_stats.num_kv_bytes, - bc->state, - branch_ref_addr(bc->output_branch), - bc->output_stats.num_tuples, - bc->output_stats.num_kv_bytes, - bc->fingerprints); - for (uint64 i = 0; i < vector_length(&bc->input_branches); i++) { - platform_log(log, "%lu ", vector_get(&bc->input_branches, i).addr); + bc->input.num_bundles, + bc->input.stats.num_tuples, + bc->input.stats.num_kv_bytes, + bc->phase, + branch_ref_addr(bc->output.branch), + bc->output.stats.num_tuples, + bc->output.stats.num_kv_bytes, + bc->output.fingerprints); + for (uint64 i = 0; i < vector_length(&bc->input.branches); i++) { + platform_log(log, "%lu ", vector_get(&bc->input.branches, i).addr); } platform_log(log, "\n"); } static void -bundle_compaction_destroy(bundle_compaction *compaction, trunk_context *context) +bundle_compaction_destroy(bundle_compaction *compaction, + trunk_context *context, + platform_status maplet_compaction_rc, + task_tracker_list *completed) { // platform_default_log("bundle_compaction_destroy: %p\n", compaction); // bundle_compaction_print_table_header(Platform_default_log_handle, 4); // bundle_compaction_print_table_entry( // compaction, Platform_default_log_handle, 4); - platform_assert(compaction->state >= BUNDLE_COMPACTION_MIN_ENDED); + platform_assert(bundle_compaction_terminal(compaction)); - for (uint64 i = 0; i < vector_length(&compaction->input_branches); i++) { - trunk_branch_info bi = vector_get(&compaction->input_branches, i); + for (uint64 i = 0; i < vector_length(&compaction->input.branches); i++) { + trunk_branch_info bi = vector_get(&compaction->input.branches, i); btree_dec_ref(context->cc, context->cfg->btree_cfg, bi.addr, bi.type); __sync_fetch_and_add(&bc_decs, 1); } - vector_deinit(&compaction->input_branches); + vector_deinit(&compaction->input.branches); - if (compaction->fingerprints) { - platform_free(context->hid, compaction->fingerprints); + if (compaction->output.fingerprints) { + platform_free(context->hid, compaction->output.fingerprints); } - if (!branch_is_null(compaction->output_branch)) { + if (!branch_is_null(compaction->output.branch)) { btree_dec_ref(context->cc, context->cfg->btree_cfg, - branch_ref_addr(compaction->output_branch), + branch_ref_addr(compaction->output.branch), PAGE_TYPE_BRANCH); } + task_tracker_done( + compaction->tracker, + bundle_compaction_notify_status(compaction, maplet_compaction_rc), + completed); + platform_free(context->hid, compaction); } +static void +trunk_pivot_state_lock(trunk_pivot_state *state) +{ + platform_spin_lock(&state->lock); +} + +static void +trunk_pivot_state_unlock(trunk_pivot_state *state) +{ + platform_spin_unlock(&state->lock); +} + +typedef struct pivot_state_compaction_create_info { + bool32 first_compaction_for_empty_leaf; + int64 num_old_bundles; +} pivot_state_compaction_create_info; + +typedef struct maplet_compaction_snapshot { + bool32 abandoned; + bundle_compaction *first; + bundle_compaction *last; + routing_filter base_maplet; + uint64 base_num_branches; +} maplet_compaction_snapshot; + +typedef struct maplet_compaction_finish_result { + platform_status rc; + bool32 must_abandon; + bool32 enqueue_followup; + bundle_compaction *completed_bcs; +} maplet_compaction_finish_result; + +typedef struct trunk_pivot_state_detached_protected { + routing_filter maplet; + platform_status maplet_compaction_rc; + bundle_compaction *bundle_compactions; +} trunk_pivot_state_detached_protected; + +/* Caller must hold state->lock. */ +static bundle_compaction * +bundle_compactions_extract_through_locked(trunk_pivot_state *state, + bundle_compaction *last) +{ + platform_assert(last != NULL); + platform_assert(state->protected.bundle_compactions != NULL); + + bundle_compaction *result = state->protected.bundle_compactions; + bundle_compaction *bc = result; + while (TRUE) { + platform_assert(bc != NULL); + state->protected.total_bundles -= bc->input.num_bundles; + if (bc == last) { + break; + } + bc = bc->next; + } + + state->protected.bundle_compactions = last->next; + last->next = NULL; + + return result; +} + +static pivot_state_compaction_create_info +trunk_pivot_state_get_compaction_create_info(trunk_pivot_state *state, + bool32 is_leaf, + const bundle *pivot_bundle) +{ + trunk_pivot_state_lock(state); + pivot_state_compaction_create_info result = { + .first_compaction_for_empty_leaf = + is_leaf && state->protected.bundle_compactions == NULL + && bundle_num_branches(pivot_bundle) == 0, + .num_old_bundles = state->protected.total_bundles, + }; + trunk_pivot_state_unlock(state); + + return result; +} + +static void +trunk_pivot_state_mark_abandoned(trunk_pivot_state *state, + platform_status maplet_compaction_rc) +{ + trunk_pivot_state_lock(state); + state->protected.abandoned = TRUE; + state->protected.maplet_compaction_rc = maplet_compaction_rc; + trunk_pivot_state_unlock(state); +} + +static void +trunk_pivot_state_assert_abandoned(trunk_pivot_state *state) +{ + trunk_pivot_state_lock(state); + platform_assert(state->protected.abandoned == TRUE); + trunk_pivot_state_unlock(state); +} + +static trunk_pivot_state_detached_protected +trunk_pivot_state_detach_protected(trunk_pivot_state *state) +{ + trunk_pivot_state_detached_protected result; + + trunk_pivot_state_lock(state); + result.maplet = state->protected.maplet; + result.maplet_compaction_rc = state->protected.maplet_compaction_rc; + result.bundle_compactions = state->protected.bundle_compactions; + + state->protected.maplet = NULL_ROUTING_FILTER; + state->protected.bundle_compactions = NULL; + state->protected.total_bundles = 0; + trunk_pivot_state_unlock(state); + + return result; +} + +static void +trunk_pivot_state_destroy_detached_protected( + trunk_pivot_state *state, + trunk_pivot_state_detached_protected *detached, + task_tracker_list *completed) +{ + trunk_context *context = state->context; + threadid tid = platform_get_tid(); + + routing_filter_dec_ref(context->cc, &detached->maplet); + bundle_compaction *bc = detached->bundle_compactions; + while (bc != NULL) { + if (context->stats && bundle_compaction_succeeded(bc)) { + // Any completed bundle compactions still hanging off of this state + // were never applied. + context->stats[tid].compactions_discarded[state->height]++; + context->stats[tid].compaction_time_wasted_ns[state->height] += + bc->output.time_ns; + } + bundle_compaction *next = bc->next; + bundle_compaction_destroy( + bc, state->context, detached->maplet_compaction_rc, completed); + bc = next; + } +} + +static void +trunk_pivot_state_deinit_protected(trunk_pivot_state *state, + task_tracker_list *completed) +{ + trunk_pivot_state_detached_protected detached = + trunk_pivot_state_detach_protected(state); + trunk_pivot_state_destroy_detached_protected(state, &detached, completed); +} + +static bool32 +trunk_pivot_state_abort_bundle_compaction_if_abandoned( + trunk_pivot_state *state, + bundle_compaction *compaction, + threadid tid) +{ + bool32 result = FALSE; + trunk_context *context = state->context; + + trunk_pivot_state_lock(state); + if (state->protected.abandoned) { + if (context->stats) { + context->stats[tid].compactions_aborted[state->height]++; + } + + bundle_compaction_publish_terminal( + compaction, BUNDLE_COMPACTION_ENDED, STATUS_OK); + result = TRUE; + } + trunk_pivot_state_unlock(state); + + return result; +} + +static bool32 +trunk_pivot_state_finish_bundle_compaction(trunk_pivot_state *state, + bundle_compaction *compaction, + platform_status rc) +{ + bool32 must_enqueue_maplet_compaction = FALSE; + + trunk_pivot_state_lock(state); + if (state->protected.abandoned) { + bundle_compaction_publish_terminal( + compaction, BUNDLE_COMPACTION_ENDED, STATUS_OK); + } else { + if (SUCCESS(rc)) { + bool32 is_head = state->protected.bundle_compactions == compaction; + bundle_compaction_publish_terminal( + compaction, BUNDLE_COMPACTION_SUCCEEDED, rc); + must_enqueue_maplet_compaction = is_head; + } else { + bundle_compaction_publish_terminal( + compaction, BUNDLE_COMPACTION_ENDED, rc); + } + } + trunk_pivot_state_unlock(state); + + return must_enqueue_maplet_compaction; +} + +static void +trunk_pivot_state_append_compaction(trunk_pivot_state *state, + bundle_compaction *compaction) +{ + platform_assert(compaction != NULL); + platform_assert(0 < vector_length(&compaction->input.branches)); + + trunk_pivot_state_lock(state); + if (state->protected.bundle_compactions == NULL) { + state->protected.bundle_compactions = compaction; + } else { + bundle_compaction *last = state->protected.bundle_compactions; + while (last->next != NULL) { + last = last->next; + } + last->next = compaction; + } + state->protected.total_bundles += compaction->input.num_bundles; + trunk_pivot_state_unlock(state); +} + +static maplet_compaction_snapshot +trunk_pivot_state_prepare_maplet_compaction(trunk_pivot_state *state, + threadid tid, + routing_filter *new_maplet, + bool32 *new_maplet_owned) +{ + maplet_compaction_snapshot result; + trunk_context *context = state->context; + + ZERO_STRUCT(result); + + trunk_pivot_state_lock(state); + if (state->protected.abandoned) { + result.abandoned = TRUE; + if (context->stats) { + for (bundle_compaction *bc = state->protected.bundle_compactions; + bc != NULL; + bc = bc->next) + { + context->stats[tid].maplet_builds_aborted[state->height]++; + } + } + trunk_pivot_state_unlock(state); + return result; + } + + result.first = state->protected.bundle_compactions; + for (bundle_compaction *bc = result.first; + bc != NULL && bundle_compaction_consumable(bc); + bc = bc->next) + { + result.last = bc; + } + + platform_assert(result.last != NULL); + + result.base_maplet = state->protected.maplet; + result.base_num_branches = state->protected.num_branches; + *new_maplet = state->protected.maplet; + *new_maplet_owned = TRUE; + routing_filter_inc_ref(context->cc, new_maplet); + trunk_pivot_state_unlock(state); + + return result; +} + +static uint64 +trunk_pivot_state_count_compactions(trunk_pivot_state *state) +{ + uint64 result = 0; + + trunk_pivot_state_lock(state); + for (bundle_compaction *bc = state->protected.bundle_compactions; bc != NULL; + bc = bc->next) + { + result++; + } + trunk_pivot_state_unlock(state); + + return result; +} + +static void +trunk_pivot_state_init_protected(trunk_pivot_state *state, + trunk_context *context, + const bundle *pivot_bundle) +{ + trunk_pivot_state_lock(state); + state->protected.maplet_compaction_rc = STATUS_OK; + state->protected.maplet = pivot_bundle->maplet; + routing_filter_inc_ref(context->cc, &state->protected.maplet); + state->protected.num_branches = bundle_num_branches(pivot_bundle); + trunk_pivot_state_unlock(state); +} + +debug_only static void +trunk_pivot_state_print_protected(trunk_pivot_state *state, + platform_log_handle *log, + int indent) +{ + trunk_pivot_state_lock(state); + platform_log( + log, "%*smaplet: %lu\n", indent, "", state->protected.maplet.addr); + platform_log( + log, "%*snum_branches: %lu\n", indent, "", state->protected.num_branches); + bundle_compaction_print_table_header(log, indent + 4); + for (bundle_compaction *bc = state->protected.bundle_compactions; bc != NULL; + bc = bc->next) + { + bundle_compaction_print_table_entry(bc, log, indent + 4); + } + trunk_pivot_state_unlock(state); +} + static bundle_compaction * bundle_compaction_create(trunk_context *context, trunk_node *node, uint64 pivot_num, - trunk_pivot_state *state) + trunk_pivot_state *state, + task_tracker *tracker) { platform_status rc; trunk_pivot *pvt = trunk_node_pivot(node, pivot_num); @@ -2686,38 +3118,40 @@ bundle_compaction_create(trunk_context *context, "%s():%d: platform_malloc() failed", __func__, __LINE__); return NULL; } - result->pivot_state = state; - result->state = BUNDLE_COMPACTION_NOT_STARTED; - result->input_stats = trunk_pivot_received_bundles_stats(pvt); + result->input.pivot_state = state; + result->phase = BUNDLE_COMPACTION_IN_PROGRESS; + result->input.stats = trunk_pivot_received_bundles_stats(pvt); - if (trunk_node_is_leaf(node) && state->bundle_compactions == NULL - && bundle_num_branches(pvt_bndl) == 0) - { - result->merge_mode = MERGE_FULL; + pivot_state_compaction_create_info create_info = + trunk_pivot_state_get_compaction_create_info( + state, trunk_node_is_leaf(node), pvt_bndl); + + if (create_info.first_compaction_for_empty_leaf) { + result->input.merge_mode = MERGE_FULL; } else { - result->merge_mode = MERGE_INTERMEDIATE; + result->input.merge_mode = MERGE_INTERMEDIATE; } - vector_init(&result->input_branches, context->hid); - int64 num_old_bundles = state->total_bundles; + vector_init(&result->input.branches, context->hid); uint64 first_new_bundle = - trunk_pivot_inflight_bundle_start(pvt) + num_old_bundles; + trunk_pivot_inflight_bundle_start(pvt) + create_info.num_old_bundles; platform_assert(first_new_bundle == node->num_old_bundles); for (int64 i = first_new_bundle; i < vector_length(&node->inflight_bundles); i++) { bundle *bndl = vector_get_ptr(&node->inflight_bundles, i); - rc = vector_ensure_capacity(&result->input_branches, - vector_length(&result->input_branches) + rc = vector_ensure_capacity(&result->input.branches, + vector_length(&result->input.branches) + vector_length(&bndl->branches)); if (!SUCCESS(rc)) { platform_error_log("%s():%d: vector_ensure_capacity() failed: %s", __func__, __LINE__, platform_status_to_string(rc)); - result->state = BUNDLE_COMPACTION_FAILED; - bundle_compaction_destroy(result, context); + bundle_compaction_publish_terminal( + result, BUNDLE_COMPACTION_ENDED, rc); + bundle_compaction_destroy(result, context, STATUS_OK, NULL); return NULL; } for (int64 j = 0; j < bundle_num_branches(bndl); j++) { @@ -2726,15 +3160,17 @@ bundle_compaction_create(trunk_context *context, context->cc, context->cfg->btree_cfg, branch_ref_addr(bref)); page_type type = bundle_branch_type(bndl); trunk_branch_info bi = {bref.addr, type}; - rc = vector_append(&result->input_branches, bi); + rc = vector_append(&result->input.branches, bi); platform_assert_status_ok(rc); __sync_fetch_and_add(&bc_incs, 1); } } - result->num_bundles = + result->input.num_bundles = vector_length(&node->inflight_bundles) - first_new_bundle; - platform_assert(0 < result->num_bundles); + platform_assert(0 < result->input.num_bundles); + + result->tracker = tracker; return result; } @@ -2788,18 +3224,6 @@ trunk_pivot_state_decref(trunk_pivot_state *state) return oldrc - 1; } -static void -trunk_pivot_state_lock_compactions(trunk_pivot_state *state) -{ - platform_spin_lock(&state->compactions_lock); -} - -static void -trunk_pivot_state_unlock_compactions(trunk_pivot_state *state) -{ - platform_spin_unlock(&state->compactions_lock); -} - debug_only static void trunk_pivot_state_print(trunk_pivot_state *state, platform_log_handle *log, @@ -2817,22 +3241,7 @@ trunk_pivot_state_print(trunk_pivot_state *state, indent, "", key_string(data_cfg, key_buffer_key(&state->ubkey))); - platform_log(log, "%*smaplet: %lu\n", indent, "", state->maplet.addr); - platform_log(log, "%*snum_branches: %lu\n", indent, "", state->num_branches); - platform_log(log, - "%*smaplet_compaction_failed: %d\n", - indent, - "", - state->maplet_compaction_failed); - - trunk_pivot_state_lock_compactions(state); - bundle_compaction_print_table_header(log, indent + 4); - for (bundle_compaction *bc = state->bundle_compactions; bc != NULL; - bc = bc->next) - { - bundle_compaction_print_table_entry(bc, log, indent + 4); - } - trunk_pivot_state_unlock_compactions(state); + trunk_pivot_state_print_protected(state, log, indent); } debug_only static void @@ -2853,58 +3262,21 @@ trunk_pivot_state_map_print(trunk_pivot_state_map *map, static uint64 pivot_state_destructions = 0; static void -trunk_pivot_state_destroy(trunk_pivot_state *state) +trunk_pivot_state_destroy(trunk_pivot_state *state, + task_tracker_list *completed) { - trunk_context *context = state->context; - threadid tid = platform_get_tid(); platform_assert(state->refcount == 0); // platform_default_log("pivot_state_destroy: %p\n", state); // pivot_compaction_state_print( // state, Platform_default_log_handle, state->context->cfg->data_cfg, 4); key_buffer_deinit(&state->key); - routing_filter_dec_ref(state->context->cc, &state->maplet); - trunk_pivot_state_lock_compactions(state); - bundle_compaction *bc = state->bundle_compactions; - while (bc != NULL) { - if (context->stats) { - if (bc->state == BUNDLE_COMPACTION_SUCCEEDED) { - // Any completed bundle compactions still hanging off of this state - // were never applied. - context->stats[tid].compactions_discarded[state->height]++; - context->stats[tid].compaction_time_wasted_ns[state->height] += - bc->compaction_time_ns; - } - } - bundle_compaction *next = bc->next; - bundle_compaction_destroy(bc, state->context); - bc = next; - } - trunk_pivot_state_unlock_compactions(state); - platform_spinlock_destroy(&state->compactions_lock); + key_buffer_deinit(&state->ubkey); + trunk_pivot_state_deinit_protected(state, completed); + platform_spinlock_destroy(&state->lock); platform_free(state->context->hid, state); __sync_fetch_and_add(&pivot_state_destructions, 1); } -static void -trunk_pivot_state_append_compaction(trunk_pivot_state *state, - bundle_compaction *compaction) -{ - platform_assert(compaction != NULL); - platform_assert(0 < vector_length(&compaction->input_branches)); - trunk_pivot_state_lock_compactions(state); - if (state->bundle_compactions == NULL) { - state->bundle_compactions = compaction; - } else { - bundle_compaction *last = state->bundle_compactions; - while (last->next != NULL) { - last = last->next; - } - last->next = compaction; - } - state->total_bundles += compaction->num_bundles; - trunk_pivot_state_unlock_compactions(state); -} - static void trunk_pivot_state_map_init(trunk_pivot_state_map *map) { @@ -2982,10 +3354,8 @@ trunk_pivot_state_map_create_entry(trunk_context *context, } state->context = context; state->height = height; - state->maplet = pivot_bundle->maplet; - routing_filter_inc_ref(context->cc, &state->maplet); - state->num_branches = bundle_num_branches(pivot_bundle); - platform_spinlock_init(&state->compactions_lock); + platform_spinlock_init(&state->lock); + trunk_pivot_state_init_protected(state, context, pivot_bundle); state->next = map->buckets[*lock]; map->buckets[*lock] = state; @@ -3041,20 +3411,26 @@ trunk_pivot_state_map_get_or_create_entry(trunk_context *context, static void trunk_pivot_state_map_release_entry(trunk_context *context, trunk_pivot_state_map *map, - trunk_pivot_state *state) + trunk_pivot_state *state, + task_tracker_list *completed) { + trunk_pivot_state *destroy_state = NULL; pivot_state_map_lock lock; trunk_pivot_state_map_aquire_lock( &lock, context, map, key_buffer_key(&state->key), state->height); if (0 == trunk_pivot_state_decref(state)) { trunk_pivot_state_map_remove(map, &lock, state); - trunk_pivot_state_destroy(state); + destroy_state = state; } trunk_pivot_state_map_release_lock(&lock, map); + + if (destroy_state != NULL) { + trunk_pivot_state_destroy(destroy_state, completed); + } } static bool32 -trunk_pivot_state_map_abandon_entry(trunk_context *context, +trunk_pivot_state_map_abandon_pivot(trunk_context *context, key k, uint64 height) { @@ -3065,7 +3441,7 @@ trunk_pivot_state_map_abandon_entry(trunk_context *context, trunk_pivot_state *pivot_state = trunk_pivot_state_map_get_entry( context, &context->pivot_states, &lock, k, height); if (pivot_state) { - pivot_state->abandoned = TRUE; + trunk_pivot_state_mark_abandoned(pivot_state, STATUS_OK); trunk_pivot_state_map_remove(&context->pivot_states, &lock, pivot_state); result = TRUE; } @@ -3073,6 +3449,35 @@ trunk_pivot_state_map_abandon_entry(trunk_context *context, return result; } +static bool32 +trunk_pivot_state_map_abandon_entry(trunk_context *context, + trunk_pivot_state *pivot_state, + platform_status maplet_compaction_rc) +{ + bool32 result = FALSE; + pivot_state_map_lock lock; + trunk_pivot_state_map_aquire_lock(&lock, + context, + &context->pivot_states, + key_buffer_key(&pivot_state->key), + pivot_state->height); + trunk_pivot_state *state = + trunk_pivot_state_map_get_entry(context, + &context->pivot_states, + &lock, + key_buffer_key(&pivot_state->key), + pivot_state->height); + if (state == pivot_state) { + trunk_pivot_state_mark_abandoned(pivot_state, maplet_compaction_rc); + trunk_pivot_state_map_remove(&context->pivot_states, &lock, pivot_state); + result = TRUE; + } else { + trunk_pivot_state_assert_abandoned(pivot_state); + } + trunk_pivot_state_map_release_lock(&lock, &context->pivot_states); + return result; +} + debug_only static void print_pivot_states_for_node(trunk_context *context, trunk_node *node) { @@ -3108,7 +3513,9 @@ print_pivot_states_for_node(trunk_context *context, trunk_node *node) typedef struct maplet_compaction_apply_args { trunk_pivot_state *state; uint64 num_input_bundles; + routing_filter base_maplet; routing_filter new_maplet; + trunk_branch_info oldest_input_branch; branch_ref_vector branches; trunk_pivot_stats delta; // Outputs @@ -3125,13 +3532,6 @@ pivot_matches_compaction(const trunk_context *context, bundle *pivot_bndl = trunk_node_pivot_bundle(target, pivot_num); platform_assert(0 < args->num_input_bundles); - platform_assert(args->state->bundle_compactions != NULL); - platform_assert( - 0 < vector_length(&args->state->bundle_compactions->input_branches)); - - bundle_compaction *oldest_bc = args->state->bundle_compactions; - trunk_branch_info oldest_input_branch = - vector_get(&oldest_bc->input_branches, 0); uint64 ifs = trunk_pivot_inflight_bundle_start(pvt); if (vector_length(&target->inflight_bundles) < ifs + args->num_input_bundles) @@ -3151,8 +3551,8 @@ pivot_matches_compaction(const trunk_context *context, key_buffer_key(&args->state->ubkey), trunk_node_pivot_key(target, pivot_num + 1)) == 0 - && routing_filters_equal(&pivot_bndl->maplet, &args->state->maplet) - && oldest_pivot_inflight_branch.addr == oldest_input_branch.addr; + && routing_filters_equal(&pivot_bndl->maplet, &args->base_maplet) + && oldest_pivot_inflight_branch.addr == args->oldest_input_branch.addr; return result; } @@ -3205,42 +3605,156 @@ trunk_apply_changes_maplet_compaction(trunk_context *context, return STATUS_OK; } +static bool32 +trunk_pivot_state_validate_maplet_compaction( + trunk_pivot_state *state, + const maplet_compaction_snapshot *snapshot) +{ + bool32 result = TRUE; + + trunk_pivot_state_lock(state); + if (state->protected.abandoned) { + result = FALSE; + } else { + platform_assert(state->protected.bundle_compactions == snapshot->first); + platform_assert(routing_filters_equal(&state->protected.maplet, + &snapshot->base_maplet)); + } + trunk_pivot_state_unlock(state); + + return result; +} + +static maplet_compaction_finish_result +trunk_pivot_state_commit_maplet_compaction( + trunk_pivot_state *state, + const maplet_compaction_snapshot *snapshot, + maplet_compaction_apply_args *apply_args, + routing_filter *new_maplet, + bool32 *new_maplet_owned) +{ + maplet_compaction_finish_result result = {.rc = STATUS_OK}; + + trunk_pivot_state_lock(state); + platform_assert(!state->protected.abandoned); + platform_assert(state->protected.bundle_compactions == snapshot->first); + platform_assert( + routing_filters_equal(&state->protected.maplet, &snapshot->base_maplet)); + + if (new_maplet->addr != state->protected.maplet.addr) { + routing_filter_dec_ref(state->context->cc, &state->protected.maplet); + state->protected.maplet = *new_maplet; + *new_maplet_owned = FALSE; + } + state->protected.num_branches += vector_length(&apply_args->branches); + + result.completed_bcs = + bundle_compactions_extract_through_locked(state, snapshot->last); + + if (state->protected.bundle_compactions + && bundle_compaction_consumable(state->protected.bundle_compactions)) + { + result.enqueue_followup = TRUE; + } + trunk_pivot_state_unlock(state); + + return result; +} + +static maplet_compaction_finish_result +trunk_pivot_state_apply_maplet_compaction( + trunk_pivot_state *state, + const maplet_compaction_snapshot *snapshot, + maplet_compaction_apply_args *apply_args, + routing_filter *new_maplet, + bool32 *new_maplet_owned) +{ + maplet_compaction_finish_result result = {.rc = STATUS_OK}; + trunk_context *context = state->context; + + if (!trunk_pivot_state_validate_maplet_compaction(state, snapshot)) { + return result; + } + + result.rc = trunk_apply_changes(context, + key_buffer_key(&state->key), + key_buffer_key(&state->ubkey), + state->height, + trunk_apply_changes_maplet_compaction, + apply_args); + + if (!SUCCESS(result.rc)) { + platform_error_log("maplet_compaction_task: apply_changes failed: %d\n", + result.rc.r); + result.must_abandon = TRUE; + } else if (!apply_args->found_match) { + platform_assert(FALSE, + "Failed to find matching pivot after validating " + "maplet compaction state\n"); + result.rc = STATUS_INVALID_STATE; + result.must_abandon = TRUE; + } else { + result = trunk_pivot_state_commit_maplet_compaction( + state, snapshot, apply_args, new_maplet, new_maplet_owned); + } + + return result; +} + +static void +bundle_compactions_destroy(bundle_compaction *completed_bcs, + trunk_context *context, + task_tracker_list *completed) +{ + while (completed_bcs != NULL) { + bundle_compaction *next = completed_bcs->next; + bundle_compaction_destroy(completed_bcs, context, STATUS_OK, completed); + completed_bcs = next; + } +} + static platform_status -enqueue_maplet_compaction(trunk_pivot_state *args); +enqueue_maplet_compaction(trunk_pivot_state *args, + task_tracker_list *completed); static void maplet_compaction_task(task *arg) { - platform_status rc = STATUS_OK; - trunk_pivot_state *state = container_of(arg, trunk_pivot_state, tsk); - trunk_context *context = state->context; - routing_filter new_maplet = state->maplet; + platform_status rc = STATUS_OK; + platform_status followup_rc = STATUS_OK; + trunk_pivot_state *state = container_of(arg, trunk_pivot_state, tsk); + trunk_context *context = state->context; + routing_filter new_maplet = NULL_ROUTING_FILTER; + bool32 new_maplet_owned = FALSE; maplet_compaction_apply_args apply_args; threadid tid; + bundle_compaction *bc = NULL; + uint64 num_builds = 0; + uint64 total_build_time_ns = 0; + bool32 attempted_apply = FALSE; + maplet_compaction_snapshot snapshot; + task_tracker_list completed; tid = platform_get_tid(); + task_tracker_list_init(&completed); ZERO_STRUCT(apply_args); apply_args.state = state; vector_init(&apply_args.branches, PROCESS_PRIVATE_HEAP_ID); - if (state->abandoned) { - if (context->stats) { - for (bundle_compaction *bc = state->bundle_compactions; bc != NULL; - bc = bc->next) - { - context->stats[tid].maplet_builds_aborted[state->height]++; - } - } + snapshot = trunk_pivot_state_prepare_maplet_compaction( + state, tid, &new_maplet, &new_maplet_owned); + if (snapshot.abandoned) { goto cleanup; } + platform_assert(snapshot.first != NULL); + platform_assert(0 < vector_length(&snapshot.first->input.branches)); + apply_args.base_maplet = snapshot.base_maplet; + apply_args.oldest_input_branch = + vector_get(&snapshot.first->input.branches, 0); - bundle_compaction *bc = state->bundle_compactions; - bundle_compaction *last = NULL; - uint64 num_builds = 0; - uint64 total_build_time_ns = 0; - while (bc != NULL && bc->state == BUNDLE_COMPACTION_SUCCEEDED) { - if (!branch_is_null(bc->output_branch)) { + for (bc = snapshot.first; bc != NULL; bc = bc->next) { + if (!branch_is_null(bc->output.branch)) { uint64 filter_build_start; filter_build_start = platform_get_timestamp(); @@ -3249,21 +3763,19 @@ maplet_compaction_task(task *arg) context->cfg->filter_cfg, &new_maplet, &tmp_maplet, - bc->fingerprints, - bc->output_stats.num_tuples, - state->num_branches + bc->output.fingerprints, + bc->output.stats.num_tuples, + snapshot.base_num_branches + vector_length(&apply_args.branches)); - if (new_maplet.addr != state->maplet.addr) { - routing_filter_dec_ref(context->cc, &new_maplet); - } if (!SUCCESS(rc)) { platform_error_log( "maplet_compaction_task: routing_filter_add failed: %d\n", rc.r); goto cleanup; } + routing_filter_dec_ref(context->cc, &new_maplet); new_maplet = tmp_maplet; - rc = vector_append(&apply_args.branches, bc->output_branch); + rc = vector_append(&apply_args.branches, bc->output.branch); if (!SUCCESS(rc)) { platform_error_log( "maplet_compaction_task: vector_append failed: %d\n", rc.r); @@ -3287,15 +3799,15 @@ maplet_compaction_task(task *arg) } trunk_pivot_stats delta = - trunk_pivot_stats_subtract(bc->input_stats, bc->output_stats); + trunk_pivot_stats_subtract(bc->input.stats, bc->output.stats); apply_args.delta = trunk_pivot_stats_add(apply_args.delta, delta); - apply_args.num_input_bundles += bc->num_bundles; + apply_args.num_input_bundles += bc->input.num_bundles; - last = bc; - bc = bc->next; + if (bc == snapshot.last) { + break; + } } - platform_assert(last != NULL); platform_assert(0 < apply_args.num_input_bundles); if (context->stats) { @@ -3307,90 +3819,56 @@ maplet_compaction_task(task *arg) trunk_modification_begin(context); - rc = trunk_apply_changes(context, - key_buffer_key(&state->key), - key_buffer_key(&state->ubkey), - state->height, - trunk_apply_changes_maplet_compaction, - &apply_args); - if (!SUCCESS(rc)) { - platform_error_log("maplet_compaction_task: apply_changes failed: %d\n", - rc.r); - trunk_modification_end(context); - goto cleanup; - } + attempted_apply = TRUE; - if (!apply_args.found_match) { - if (!state->abandoned) { - trunk_pivot_state_print( - state, Platform_error_log_handle, context->cfg->data_cfg, 4); - platform_assert(!state->abandoned, - "Failed to find matching pivot for non-abandoned " - "compaction state\n"); - } + maplet_compaction_finish_result finish = + trunk_pivot_state_apply_maplet_compaction( + state, &snapshot, &apply_args, &new_maplet, &new_maplet_owned); + rc = finish.rc; - trunk_modification_end(context); + if (finish.must_abandon) { + trunk_pivot_state_map_abandon_entry(context, state, rc); + } + trunk_modification_end(context); + + if (attempted_apply && (!SUCCESS(rc) || !apply_args.found_match)) { if (context->stats) { context->stats[tid].maplet_builds_discarded[state->height] += num_builds; context->stats[tid].maplet_build_time_wasted_ns[state->height] += total_build_time_ns; } - - goto cleanup; } - if (new_maplet.addr != state->maplet.addr) { - routing_filter_dec_ref(context->cc, &state->maplet); - state->maplet = new_maplet; + if (finish.completed_bcs != NULL) { + bundle_compactions_destroy(finish.completed_bcs, context, &completed); } - state->num_branches += vector_length(&apply_args.branches); - - trunk_pivot_state_lock_compactions(state); - { - while (state->bundle_compactions != last) { - bundle_compaction *next = state->bundle_compactions->next; - state->total_bundles -= state->bundle_compactions->num_bundles; - bundle_compaction_destroy(state->bundle_compactions, context); - state->bundle_compactions = next; - } - platform_assert(state->bundle_compactions == last); - state->bundle_compactions = last->next; - state->total_bundles -= last->num_bundles; - bundle_compaction_destroy(last, context); - - __sync_lock_release(&state->maplet_compaction_initiated); - - if (state->bundle_compactions - && state->bundle_compactions->state == BUNDLE_COMPACTION_SUCCEEDED) - { - enqueue_maplet_compaction(state); + if (finish.enqueue_followup) { + followup_rc = enqueue_maplet_compaction(state, &completed); + if (!SUCCESS(followup_rc)) { + rc = followup_rc; } } - trunk_pivot_state_unlock_compactions(state); - - trunk_modification_end(context); cleanup: - if (!SUCCESS(rc) || !apply_args.found_match) { - state->maplet_compaction_failed = TRUE; - if (new_maplet.addr != state->maplet.addr) { - routing_filter_dec_ref(context->cc, &new_maplet); - } + if (!attempted_apply && !SUCCESS(rc)) { + trunk_pivot_state_map_abandon_entry(context, state, rc); + } + if (new_maplet_owned) { + routing_filter_dec_ref(context->cc, &new_maplet); } - - trunk_pivot_state_map_release_entry(context, &context->pivot_states, state); vector_deinit(&apply_args.branches); + + trunk_pivot_state_map_release_entry( + context, &context->pivot_states, state, &completed); + task_tracker_notify_all(&completed); } static platform_status -enqueue_maplet_compaction(trunk_pivot_state *args) +enqueue_maplet_compaction(trunk_pivot_state *args, task_tracker_list *completed) { - if (__sync_lock_test_and_set(&args->maplet_compaction_initiated, 1)) { - return STATUS_OK; - } trunk_pivot_state_incref(args); platform_status rc = task_enqueue(args->context->ts, TASK_TYPE_NORMAL, @@ -3400,7 +3878,9 @@ enqueue_maplet_compaction(trunk_pivot_state *args) if (!SUCCESS(rc)) { platform_error_log("enqueue_maplet_compaction: task_enqueue failed: %d\n", rc.r); - trunk_pivot_state_decref(args); + trunk_pivot_state_map_abandon_entry(args->context, args, rc); + trunk_pivot_state_map_release_entry( + args->context, &args->context->pivot_states, args, completed); } return rc; } @@ -3433,36 +3913,32 @@ bundle_compaction_task(task *arg) { platform_status rc; bundle_compaction *bc = container_of(arg, bundle_compaction, tsk); - trunk_pivot_state *state = bc->pivot_state; + trunk_pivot_state *state = bc->input.pivot_state; trunk_context *context = state->context; threadid tid = platform_get_tid(); + task_tracker_list completed; - platform_assert(__sync_bool_compare_and_swap(&bc->state, - BUNDLE_COMPACTION_NOT_STARTED, - BUNDLE_COMPACTION_IN_PROGRESS)); + task_tracker_list_init(&completed); if (context->stats) { context->stats[tid].compactions[state->height]++; } - if (state->abandoned) { - if (context->stats) { - context->stats[tid].compactions_aborted[state->height]++; - } - - platform_assert(__sync_bool_compare_and_swap( - &bc->state, BUNDLE_COMPACTION_IN_PROGRESS, BUNDLE_COMPACTION_ABORTED)); - + if (trunk_pivot_state_abort_bundle_compaction_if_abandoned(state, bc, tid)) { + bc = NULL; // Cannot touch bc after switching to terminal state trunk_pivot_state_map_release_entry( - context, &context->pivot_states, state); + context, &context->pivot_states, state, &completed); + + // Notify after releasing all locks + task_tracker_notify_all(&completed); return; } uint64 compaction_start = platform_get_timestamp(); - platform_assert(0 < vector_length(&bc->input_branches)); + platform_assert(0 < vector_length(&bc->input.branches)); trunk_branch_merger merger; trunk_branch_merger_init(&merger, @@ -3474,8 +3950,8 @@ bundle_compaction_task(task *arg) rc = trunk_branch_merger_add_branches(&merger, context->cc, context->cfg->btree_cfg, - vector_length(&bc->input_branches), - vector_data(&bc->input_branches)); + vector_length(&bc->input.branches), + vector_data(&bc->input.branches)); if (!SUCCESS(rc)) { platform_error_log( "branch_merger_add_branches failed for state: %p bc: %p: %s\n", @@ -3487,7 +3963,7 @@ bundle_compaction_task(task *arg) uint64 tuple_bound; rc = compute_tuple_bound(context, - &bc->input_branches, + &bc->input.branches, key_buffer_key(&state->key), key_buffer_key(&state->ubkey), &tuple_bound); @@ -3500,7 +3976,7 @@ bundle_compaction_task(task *arg) goto cleanup_branch_merger; } - rc = trunk_branch_merger_build_merge_itor(&merger, bc->merge_mode); + rc = trunk_branch_merger_build_merge_itor(&merger, bc->input.merge_mode); if (!SUCCESS(rc)) { platform_error_log( "branch_merger_build_merge_itor failed for state: %p bc: %p: %s\n", @@ -3528,16 +4004,6 @@ bundle_compaction_task(task *arg) goto cleanup_branch_merger; } - // This is just a quick shortcut to avoid wasting time on a compaction when - // the pivot is already stuck due to an earlier maplet compaction failure. - if (state->maplet_compaction_failed) { - platform_error_log("maplet compaction failed, skipping bundle compaction " - "for state %p\n", - state); - rc = STATUS_INVALID_STATE; - goto cleanup_pack_req; - } - uint64 pack_start = platform_get_timestamp(); rc = btree_pack(&pack_req); if (!SUCCESS(rc)) { @@ -3552,12 +4018,12 @@ bundle_compaction_task(task *arg) platform_timestamp_elapsed(pack_start); } - bc->output_branch = create_branch_ref(pack_req.root_addr); - bc->output_stats = (trunk_pivot_stats){ + bc->output.branch = create_branch_ref(pack_req.root_addr); + bc->output.stats = (trunk_pivot_stats){ .num_tuples = pack_req.num_tuples, .num_kv_bytes = pack_req.key_bytes + pack_req.message_bytes}; - // trunk_pivot_stats_subtract(bc->input_stats, bc->output_stats); - bc->fingerprints = pack_req.fingerprint_arr; + // trunk_pivot_stats_subtract(bc->input.stats, bc->output.stats); + bc->output.fingerprints = pack_req.fingerprint_arr; pack_req.fingerprint_arr = NULL; if (context->stats) { @@ -3566,12 +4032,12 @@ bundle_compaction_task(task *arg) context->stats[tid].compaction_max_tuples[state->height] = MAX(context->stats[tid].compaction_max_tuples[state->height], pack_req.num_tuples); - bc->compaction_time_ns = platform_timestamp_elapsed(compaction_start); + bc->output.time_ns = platform_timestamp_elapsed(compaction_start); context->stats[tid].compaction_time_ns[state->height] += - bc->compaction_time_ns; + bc->output.time_ns; context->stats[tid].compaction_time_max_ns[state->height] = MAX(context->stats[tid].compaction_time_max_ns[state->height], - bc->compaction_time_ns); + bc->output.time_ns); } cleanup_pack_req: @@ -3579,27 +4045,24 @@ bundle_compaction_task(task *arg) cleanup_branch_merger: trunk_branch_merger_deinit(&merger); - trunk_pivot_state_lock_compactions(state); - if (SUCCESS(rc)) { - platform_assert( - __sync_bool_compare_and_swap(&bc->state, - BUNDLE_COMPACTION_IN_PROGRESS, - BUNDLE_COMPACTION_SUCCEEDED)); - } else { - platform_assert(__sync_bool_compare_and_swap( - &bc->state, BUNDLE_COMPACTION_IN_PROGRESS, BUNDLE_COMPACTION_FAILED)); - } - if (bc->state == BUNDLE_COMPACTION_SUCCEEDED - && state->bundle_compactions == bc) - { - enqueue_maplet_compaction(state); + bool32 must_enqueue_maplet_compaction = + trunk_pivot_state_finish_bundle_compaction(state, bc, rc); + bc = NULL; // Cannot touch bc after switching to terminal state + + if (must_enqueue_maplet_compaction) { + rc = enqueue_maplet_compaction(state, &completed); } - trunk_pivot_state_unlock_compactions(state); - trunk_pivot_state_map_release_entry(context, &context->pivot_states, state); + + trunk_pivot_state_map_release_entry( + context, &context->pivot_states, state, &completed); + task_tracker_notify_all(&completed); } static platform_status -enqueue_bundle_compaction(trunk_context *context, trunk_node *node) +enqueue_bundle_compaction(trunk_context *context, + trunk_node *node, + task_tracker *tracker, + task_tracker_list *completed) { uint64 height = trunk_node_height(node); uint64 num_children = trunk_node_num_children(node); @@ -3628,13 +4091,15 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) goto next; } - bc = bundle_compaction_create(context, node, pivot_num, state); + bc = + bundle_compaction_create(context, node, pivot_num, state, tracker); if (bc == NULL) { platform_error_log("enqueue_bundle_compaction: " "bundle_compaction_create failed\n"); rc = STATUS_NO_MEMORY; goto next; } + task_tracker_add(tracker); trunk_pivot_state_incref(state); @@ -3645,8 +4110,11 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) &bc->tsk, bundle_compaction_task, FALSE); + // Upon success, the trunk_pivot_state_incref and task_tracker_add are + // passed to the task + if (!SUCCESS(rc)) { - trunk_pivot_state_decref(state); + trunk_pivot_state_decref(state); // undoes trunk_pivot_state_incref platform_error_log( "enqueue_bundle_compaction: task_enqueue failed: %s\n", platform_status_to_string(rc)); @@ -3654,14 +4122,19 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) next: if (!SUCCESS(rc) && bc) { - bc->state = BUNDLE_COMPACTION_FAILED; + bool32 must_enqueue_maplet_compaction = + trunk_pivot_state_finish_bundle_compaction(state, bc, rc); + platform_assert(!must_enqueue_maplet_compaction); + bc = NULL; // Cannot touch bc after switching to terminal state } if (!SUCCESS(rc) && SUCCESS(result)) { result = rc; } if (state != NULL) { + // undoes trunk_pivot_state_map_get_or_create trunk_pivot_state_map_release_entry( - context, &context->pivot_states, state); + context, &context->pivot_states, state, completed); + state = NULL; // cannot touch state after release } } } @@ -3682,18 +4155,29 @@ incorporation_tasks_deinit(incorporation_tasks *itasks, trunk_context *context) vector_deinit(&itasks->node_compactions); } -static void -incorporation_tasks_execute(incorporation_tasks *itasks, trunk_context *context) +static platform_status +incorporation_tasks_execute(incorporation_tasks *itasks, + trunk_context *context, + task_tracker *tracker, + task_tracker_list *completed) { + platform_status result = STATUS_OK; + for (uint64 i = 0; i < vector_length(&itasks->node_compactions); i++) { trunk_node *node = vector_get_ptr(&itasks->node_compactions, i); - platform_status rc = enqueue_bundle_compaction(context, node); + platform_status rc = + enqueue_bundle_compaction(context, node, tracker, completed); if (!SUCCESS(rc)) { platform_error_log("incorporation_tasks_execute: " "enqueue_bundle_compaction failed: %d\n", rc.r); + if (SUCCESS(result)) { + result = rc; + } } } + + return result; } static platform_status @@ -3707,7 +4191,7 @@ serialize_nodes_and_save_contingent_compactions( rc = serialize_nodes(context, nodes, result); if (!SUCCESS(rc)) { - platform_error_log("serialize_nodes_and_enqueue_bundle_compactions: " + platform_error_log("serialize_nodes_and_save_contingent_compactions: " "serialize_nodes failed: %d\n", rc.r); return rc; @@ -4218,13 +4702,7 @@ trunk_node_pivot_eventual_num_branches(trunk_context *context, trunk_node_pivot_key(node, pivot_num), trunk_node_height(node)); if (state != NULL) { - trunk_pivot_state_lock_compactions(state); - bundle_compaction *bc = state->bundle_compactions; - while (bc != NULL) { - num_branches++; - bc = bc->next; - } - trunk_pivot_state_unlock_compactions(state); + num_branches += trunk_pivot_state_count_compactions(state); } trunk_pivot_state_map_release_lock(&lock, &context->pivot_states); @@ -4235,10 +4713,26 @@ trunk_node_pivot_eventual_num_branches(trunk_context *context, return num_branches; } +static bool32 +leaf_has_full_compaction_input(trunk_node *leaf) +{ + platform_assert(trunk_node_is_leaf(leaf)); + + bundle *pivot_bundle = trunk_node_pivot_bundle(leaf, 0); + if (0 < bundle_num_branches(pivot_bundle)) { + return TRUE; + } + + trunk_pivot *pvt = trunk_node_pivot(leaf, 0); + uint64 inflight_start = trunk_pivot_inflight_bundle_start(pvt); + return inflight_start < vector_length(&leaf->inflight_bundles); +} + static platform_status leaf_split(trunk_context *context, trunk_node *leaf, trunk_node_vector *new_leaves, + bool32 force_full_compaction, bool32 *abandon_compactions) { platform_status rc; @@ -4253,9 +4747,17 @@ leaf_split(trunk_context *context, return rc; } - if (target_num_leaves == 1 - && trunk_node_pivot_eventual_num_branches(context, leaf, 0) - <= context->cfg->target_fanout) + force_full_compaction = + force_full_compaction && leaf_has_full_compaction_input(leaf); + bool32 needs_branch_compaction = FALSE; + if (target_num_leaves == 1) { + needs_branch_compaction = + context->cfg->target_fanout + < trunk_node_pivot_eventual_num_branches(context, leaf, 0); + } + + if (target_num_leaves == 1 && !force_full_compaction + && !needs_branch_compaction) { if (context->stats) { context->stats[tid].single_leaf_splits++; @@ -4266,9 +4768,15 @@ leaf_split(trunk_context *context, } if (context->stats) { - context->stats[tid].node_splits[leaf->height]++; - context->stats[tid].node_splits_nodes_created[leaf->height] += - target_num_leaves - 1; + if (target_num_leaves == 1 && force_full_compaction + && !needs_branch_compaction) + { + context->stats[tid].single_leaf_splits++; + } else { + context->stats[tid].node_splits[leaf->height]++; + context->stats[tid].node_splits_nodes_created[leaf->height] += + target_num_leaves - 1; + } } @@ -4460,25 +4968,132 @@ index_split(trunk_context *context, static uint64 abandoned_leaf_compactions = 0; +static const trunk_flush_policy TRUNK_FLUSH_POLICY_DEFAULT = { + .type = TRUNK_FLUSH_POLICY_STANDARD, + .full_leaf_compactions = FALSE, +}; + +static const trunk_flush_policy * +trunk_flush_policy_or_default(const trunk_flush_policy *policy) +{ + return policy == NULL ? &TRUNK_FLUSH_POLICY_DEFAULT : policy; +} + +static bool32 +trunk_ranges_intersect(const trunk_context *context, + key min1, + key max1, + key min2, + key max2) +{ + return data_key_compare(context->cfg->data_cfg, min1, max2) < 0 + && data_key_compare(context->cfg->data_cfg, min2, max1) < 0; +} + +static bool32 +trunk_flush_range_policy_intersects_child(const trunk_context *context, + const trunk_flush_policy *policy, + const trunk_node *index, + uint64 pivot_num) +{ + platform_assert(policy->type == TRUNK_FLUSH_POLICY_RANGE); + + key child_minkey = trunk_node_pivot_key(index, pivot_num); + key child_maxkey = trunk_node_pivot_key(index, pivot_num + 1); + + return trunk_ranges_intersect(context, + child_minkey, + child_maxkey, + policy->u.range.minkey, + policy->u.range.maxkey); +} + +static bool32 +trunk_flush_range_policy_intersects_node(const trunk_context *context, + const trunk_flush_policy *policy, + const trunk_node *node) +{ + platform_assert(policy->type == TRUNK_FLUSH_POLICY_RANGE); + + key node_minkey = trunk_node_pivot_key(node, 0); + key node_maxkey = trunk_node_pivot_key(node, trunk_node_num_children(node)); + + return trunk_ranges_intersect(context, + node_minkey, + node_maxkey, + policy->u.range.minkey, + policy->u.range.maxkey); +} + +static bool32 +trunk_flush_policy_should_flush_to_child(trunk_context *context, + const trunk_flush_policy *policy, + trunk_node *index, + uint64 pivot_num, + uint64 rflimit) +{ + policy = trunk_flush_policy_or_default(policy); + + switch (policy->type) { + case TRUNK_FLUSH_POLICY_STANDARD: + { + trunk_pivot *pvt = trunk_node_pivot(index, pivot_num); + return context->cfg->target_fanout + < trunk_node_pivot_eventual_num_branches( + context, index, pivot_num) + || rflimit < pvt->stats.num_tuples; + } + + case TRUNK_FLUSH_POLICY_RANGE: + return trunk_flush_range_policy_intersects_child( + context, policy, index, pivot_num); + + default: + platform_assert(0); + return FALSE; + } +} + +static bool32 +trunk_flush_policy_forces_full_leaf_compaction(trunk_context *context, + const trunk_flush_policy *policy, + trunk_node *leaf) +{ + policy = trunk_flush_policy_or_default(policy); + + if (!policy->full_leaf_compactions) { + return FALSE; + } + + if (policy->type != TRUNK_FLUSH_POLICY_RANGE) { + return FALSE; + } + + return trunk_flush_range_policy_intersects_node(context, policy, leaf); +} + static platform_status restore_balance_leaf(trunk_context *context, trunk_node *leaf, trunk_ondisk_node_ref_vector *new_leaf_refs, - incorporation_tasks *itasks) + incorporation_tasks *itasks, + const trunk_flush_policy *policy) { trunk_node_vector new_nodes; vector_init(&new_nodes, PROCESS_PRIVATE_HEAP_ID); - bool32 abandon_compactions = FALSE; - platform_status rc = - leaf_split(context, leaf, &new_nodes, &abandon_compactions); + bool32 abandon_compactions = FALSE; + bool32 force_full_compaction = + trunk_flush_policy_forces_full_leaf_compaction(context, policy, leaf); + platform_status rc = leaf_split( + context, leaf, &new_nodes, force_full_compaction, &abandon_compactions); if (!SUCCESS(rc)) { platform_error_log("restore_balance_leaf: leaf_split failed: %d\n", rc.r); goto cleanup_new_nodes; } if (abandon_compactions) { - trunk_pivot_state_map_abandon_entry( + trunk_pivot_state_map_abandon_pivot( context, trunk_node_pivot_min_key(leaf), trunk_node_height(leaf)); abandoned_leaf_compactions++; } @@ -4533,14 +5148,17 @@ flush_then_compact(trunk_context *context, bundle_vector *inflight, uint64 inflight_start, trunk_ondisk_node_ref_vector *new_node_refs, - incorporation_tasks *itasks); + incorporation_tasks *itasks, + const trunk_flush_policy *policy); static platform_status flush_to_one_child(trunk_context *context, trunk_node *index, uint64 pivot_num, trunk_ondisk_node_ref_vector *new_childrefs_accumulator, - incorporation_tasks *itasks) + incorporation_tasks *itasks, + const trunk_flush_policy *policy, + uint64 *num_new_children) { platform_status rc = STATUS_OK; @@ -4571,7 +5189,8 @@ flush_to_one_child(trunk_context *context, &index->inflight_bundles, trunk_pivot_inflight_bundle_start(pvt), &new_childrefs, - itasks); + itasks, + policy); trunk_node_deinit(&child, context); if (!SUCCESS(rc)) { platform_error_log("flush_to_one_child: flush_then_compact failed: %d\n", @@ -4597,6 +5216,9 @@ flush_to_one_child(trunk_context *context, trunk_pivot_set_inflight_bundle_start( new_pivot, vector_length(&index->inflight_bundles)); } + if (num_new_children != NULL) { + *num_new_children = vector_length(&new_pivots); + } // Construct the new empty pivot bundles for the new children bundle_vector new_pivot_bundles; @@ -4643,7 +5265,7 @@ flush_to_one_child(trunk_context *context, // the index in place. // Abandon the enqueued compactions now, before we destroy pvt. - trunk_pivot_state_map_abandon_entry( + trunk_pivot_state_map_abandon_pivot( context, trunk_pivot_key(pvt), trunk_node_height(index)); // Replace the old pivot and pivot bundles with the new ones @@ -4683,13 +5305,16 @@ static platform_status restore_balance_index(trunk_context *context, trunk_node *index, trunk_ondisk_node_ref_vector *new_index_refs, - incorporation_tasks *itasks) + incorporation_tasks *itasks, + const trunk_flush_policy *policy) { platform_status rc; threadid tid = platform_get_tid(); uint64 rflimit = routing_filter_max_fingerprints( cache_get_config(context->cc), context->cfg->filter_cfg); + policy = trunk_flush_policy_or_default(policy); + debug_assert(trunk_node_is_well_formed_index(context->cfg->data_cfg, index)); trunk_ondisk_node_ref_vector all_new_childrefs; @@ -4697,14 +5322,18 @@ restore_balance_index(trunk_context *context, uint64 fullest_child = 0; uint64 fullest_kv_bytes = 0; - for (uint64 i = 0; i < trunk_node_num_children(index); i++) { - trunk_pivot *pvt = trunk_node_pivot(index, i); - - if (context->cfg->target_fanout - < trunk_node_pivot_eventual_num_branches(context, index, i) - || rflimit < pvt->stats.num_tuples) + for (uint64 i = 0; i < trunk_node_num_children(index);) { + if (trunk_flush_policy_should_flush_to_child( + context, policy, index, i, rflimit)) { - rc = flush_to_one_child(context, index, i, &all_new_childrefs, itasks); + uint64 num_new_children; + rc = flush_to_one_child(context, + index, + i, + &all_new_childrefs, + itasks, + policy, + &num_new_children); if (!SUCCESS(rc)) { platform_error_log("%s():%d: flush_to_one_child() failed: %s", __func__, @@ -4713,19 +5342,33 @@ restore_balance_index(trunk_context *context, goto cleanup_all_new_children; } - if (context->stats) { + if (policy->type == TRUNK_FLUSH_POLICY_STANDARD && context->stats) { context->stats[tid].full_flushes[trunk_node_height(index)]++; } - } else if (fullest_kv_bytes < trunk_pivot_num_kv_bytes(pvt)) { - fullest_child = i; - fullest_kv_bytes = trunk_pivot_num_kv_bytes(pvt); + i += num_new_children; + } else if (policy->type == TRUNK_FLUSH_POLICY_STANDARD) { + trunk_pivot *pvt = trunk_node_pivot(index, i); + if (fullest_kv_bytes < trunk_pivot_num_kv_bytes(pvt)) { + fullest_child = i; + fullest_kv_bytes = trunk_pivot_num_kv_bytes(pvt); + } + i++; + } else { + i++; } } - if (context->cfg->incorporation_size_kv_bytes < fullest_kv_bytes) { - rc = flush_to_one_child( - context, index, fullest_child, &all_new_childrefs, itasks); + if (policy->type == TRUNK_FLUSH_POLICY_STANDARD + && context->cfg->incorporation_size_kv_bytes < fullest_kv_bytes) + { + rc = flush_to_one_child(context, + index, + fullest_child, + &all_new_childrefs, + itasks, + policy, + NULL); if (!SUCCESS(rc)) { platform_error_log("%s():%d: flush_to_one_child() failed: %s", __func__, @@ -4785,10 +5428,13 @@ flush_then_compact(trunk_context *context, bundle_vector *inflight, uint64 inflight_start, trunk_ondisk_node_ref_vector *new_node_refs, - incorporation_tasks *itasks) + incorporation_tasks *itasks, + const trunk_flush_policy *policy) { platform_status rc; + policy = trunk_flush_policy_or_default(policy); + // Add the bundles to the node rc = trunk_node_receive_bundles( context, node, routed, inflight, inflight_start); @@ -4809,9 +5455,9 @@ flush_then_compact(trunk_context *context, // Perform any needed recursive flushes and node splits if (trunk_node_is_leaf(node)) { - rc = restore_balance_leaf(context, node, new_node_refs, itasks); + rc = restore_balance_leaf(context, node, new_node_refs, itasks, policy); } else { - rc = restore_balance_index(context, node, new_node_refs, itasks); + rc = restore_balance_index(context, node, new_node_refs, itasks, policy); } return rc; @@ -4926,37 +5572,40 @@ build_new_roots(trunk_context *context, } platform_status -trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) +trunk_flush_prepare(trunk_context *context, + uint64 branch_addr, + const trunk_flush_policy *policy) { platform_status rc; - uint64 height; + uint64 height = 0; + + policy = trunk_flush_policy_or_default(policy); trunk_modification_begin(context); incorporation_tasks_init(&context->tasks, context->hid); - branch_ref branch = create_branch_ref(branch_addr); - bundle_vector inflight; vector_init(&inflight, PROCESS_PRIVATE_HEAP_ID); trunk_ondisk_node_ref_vector new_node_refs; vector_init(&new_node_refs, PROCESS_PRIVATE_HEAP_ID); - trunk_pivot_vector new_pivot; - vector_init(&new_pivot, PROCESS_PRIVATE_HEAP_ID); + if (branch_addr != 0) { + branch_ref branch = create_branch_ref(branch_addr); - // Construct a vector of inflight bundles with one singleton bundle for - // the new branch. - rc = VECTOR_EMPLACE_APPEND(&inflight, - bundle_init_single, - PROCESS_PRIVATE_HEAP_ID, - NULL_ROUTING_FILTER, - branch); - if (!SUCCESS(rc)) { - platform_error_log( - "trunk_incorporate: VECTOR_EMPLACE_APPEND failed: %d\n", rc.r); - goto cleanup_vectors; + // Construct a vector of inflight bundles with one singleton bundle for + // the new branch. + rc = VECTOR_EMPLACE_APPEND(&inflight, + bundle_init_single, + PROCESS_PRIVATE_HEAP_ID, + NULL_ROUTING_FILTER, + branch); + if (!SUCCESS(rc)) { + platform_error_log( + "%s: VECTOR_EMPLACE_APPEND failed: %d\n", __func__, rc.r); + goto cleanup_vectors; + } } // Read the old root. @@ -4964,11 +5613,11 @@ trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) if (context->root != NULL) { rc = trunk_node_deserialize(context, context->root->ref.addr, &root); if (!SUCCESS(rc)) { - platform_error_log("trunk_incorporate: node_deserialize failed: %d\n", - rc.r); + platform_error_log( + "%s: node_deserialize failed: %d\n", __func__, rc.r); goto cleanup_vectors; } - } else { + } else if (branch_addr != 0) { // If there is no root, create an empty one. rc = trunk_node_init_empty_leaf(&root, PROCESS_PRIVATE_HEAP_ID, @@ -4976,22 +5625,30 @@ trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) POSITIVE_INFINITY_KEY); if (!SUCCESS(rc)) { platform_error_log( - "trunk_incorporate: node_init_empty_leaf failed: %d\n", rc.r); + "%s: node_init_empty_leaf failed: %d\n", __func__, rc.r); goto cleanup_vectors; } debug_assert( trunk_node_is_well_formed_leaf(context->cfg->data_cfg, &root)); + } else { + rc = STATUS_OK; + goto finish; } height = trunk_node_height(&root); // "flush" the new bundle to the root, then do any rebalancing needed. - rc = flush_then_compact( - context, &root, NULL, &inflight, 0, &new_node_refs, &context->tasks); + rc = flush_then_compact(context, + &root, + NULL, + &inflight, + 0, + &new_node_refs, + &context->tasks, + policy); trunk_node_deinit(&root, context); if (!SUCCESS(rc)) { - platform_error_log("trunk_incorporate: flush_then_compact failed: %d\n", - rc.r); + platform_error_log("%s: flush_then_compact failed: %d\n", __func__, rc.r); goto cleanup_vectors; } @@ -5000,15 +5657,16 @@ trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) while (1 < vector_length(&new_node_refs)) { rc = build_new_roots(context, height, &new_node_refs); if (!SUCCESS(rc)) { - platform_error_log("trunk_incorporate: build_new_roots failed: %d\n", - rc.r); + platform_error_log("%s: build_new_roots failed: %d\n", __func__, rc.r); goto cleanup_vectors; } height++; } platform_assert(context->post_incorporation_root == NULL); - context->post_incorporation_root = vector_get(&new_node_refs, 0); + if (0 < vector_length(&new_node_refs)) { + context->post_incorporation_root = vector_get(&new_node_refs, 0); + } if (context->stats) { threadid tid = platform_get_tid(); @@ -5019,6 +5677,7 @@ trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) context->stats[tid].incorporation_footprint_distribution[footprint]++; } +finish: cleanup_vectors: if (!SUCCESS(rc)) { VECTOR_APPLY_TO_ELTS( @@ -5034,7 +5693,7 @@ trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) } void -trunk_incorporate_commit(trunk_context *context) +trunk_flush_commit(trunk_context *context) { batch_rwlock_lock(&context->root_lock, 0); platform_assert(context->pre_incorporation_root == NULL); @@ -5045,16 +5704,107 @@ trunk_incorporate_commit(trunk_context *context) } void -trunk_incorporate_cleanup(trunk_context *context) +trunk_flush_cleanup(trunk_context *context, task_tracker *tracker) { + platform_status rc; + task_tracker_list completed; + + task_tracker_list_init(&completed); + if (context->pre_incorporation_root != NULL) { trunk_ondisk_node_ref_destroy( context->pre_incorporation_root, context, context->hid); context->pre_incorporation_root = NULL; } - incorporation_tasks_execute(&context->tasks, context); + rc = incorporation_tasks_execute( + &context->tasks, context, tracker, &completed); incorporation_tasks_deinit(&context->tasks, context); trunk_modification_end(context); + task_tracker_done(tracker, rc, &completed); + task_tracker_notify_all(&completed); +} + +platform_status +trunk_incorporate_prepare(trunk_context *context, uint64 branch_addr) +{ + platform_assert(branch_addr != 0); + + return trunk_flush_prepare( + context, branch_addr, &TRUNK_FLUSH_POLICY_DEFAULT); +} + +void +trunk_incorporate_commit(trunk_context *context) +{ + trunk_flush_commit(context); +} + +void +trunk_incorporate_cleanup(trunk_context *context) +{ + trunk_flush_cleanup(context, NULL); +} + +static void +trunk_flush_tracker_done(task_tracker *tracker) +{ + splinterdb_notification *notification = + (splinterdb_notification *)tracker->user_data; + platform_status status = tracker->failed ? tracker->status : STATUS_OK; + + splinterdb_notification_complete(notification, status); + platform_free(PROCESS_PRIVATE_HEAP_ID, tracker); +} + +platform_status +trunk_optimize(trunk_context *context, + key minkey, + key maxkey, + bool32 full_leaf_compactions, + splinterdb_notification *notification) +{ + if (key_is_null(minkey) || key_is_null(maxkey)) { + return STATUS_BAD_PARAM; + } + + int cmp = data_key_compare(context->cfg->data_cfg, minkey, maxkey); + if (cmp > 0) { + return STATUS_BAD_PARAM; + } + if (cmp == 0) { + splinterdb_notification_complete(notification, STATUS_OK); + return STATUS_OK; + } + + task_tracker *tracker = NULL; + if (notification != NULL) { + tracker = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, tracker); + if (tracker == NULL) { + return STATUS_NO_MEMORY; + } + task_tracker_init(tracker, trunk_flush_tracker_done, notification); + } + + trunk_flush_policy policy = { + .type = TRUNK_FLUSH_POLICY_RANGE, + .full_leaf_compactions = full_leaf_compactions, + .u.range = + { + .minkey = minkey, + .maxkey = maxkey, + }, + }; + + platform_status rc = trunk_flush_prepare(context, 0, &policy); + if (!SUCCESS(rc)) { + platform_free(PROCESS_PRIVATE_HEAP_ID, tracker); + return rc; + } + + trunk_flush_commit(context); + trunk_flush_cleanup(context, tracker); + + return STATUS_OK; } /*********************************** diff --git a/src/trunk.h b/src/trunk.h index 9ce8c7f5..c4c82956 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -260,6 +260,13 @@ trunk_incorporate_commit(trunk_context *context); void trunk_incorporate_cleanup(trunk_context *context); +platform_status +trunk_optimize(trunk_context *context, + key minkey, + key maxkey, + bool32 full_leaf_compactions, + struct splinterdb_notification *notification); + void trunk_modification_end(trunk_context *context); diff --git a/tests/unit/splinterdb_optimize_test.c b/tests/unit/splinterdb_optimize_test.c new file mode 100644 index 00000000..a72d0f64 --- /dev/null +++ b/tests/unit/splinterdb_optimize_test.c @@ -0,0 +1,450 @@ +// Copyright 2026 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/* + * Tests for the public splinterdb_optimize() API and notification modes. + */ + +#include +#include +#include +#include + +#include "splinterdb/default_data_config.h" +#include "splinterdb/splinterdb.h" +#include "platform_sleep.h" +#include "platform_threads.h" +#include "splinterdb_tests_private.h" +#include "unit_tests.h" +#include "util.h" +#include "config.h" +#include "ctest.h" + +#define OPTIMIZE_TEST_KEY_LENGTH 22 +#define OPTIMIZE_TEST_VALUE_LENGTH 31 +#define OPTIMIZE_TEST_KEY_SIZE (OPTIMIZE_TEST_KEY_LENGTH + 1) +#define OPTIMIZE_TEST_VALUE_SIZE (OPTIMIZE_TEST_VALUE_LENGTH + 1) + +static const char optimize_key_fmt[] = "key-%018u"; +static const char optimize_value_fmt[] = "value-%018u-marker"; + +typedef struct optimize_callback_state { + volatile uint64 callbacks; + int status; + splinterdb_notification *notification; +} optimize_callback_state; + +typedef struct optimize_thread_args { + splinterdb *kvsb; + splinterdb_notification *notification; + pthread_barrier_t *barrier; + char min_key[OPTIMIZE_TEST_KEY_SIZE]; + char max_key[OPTIMIZE_TEST_KEY_SIZE]; + int rc; +} optimize_thread_args; + +static void +format_key(char key[OPTIMIZE_TEST_KEY_SIZE], uint32 key_no); + +static void +format_value(char value[OPTIMIZE_TEST_VALUE_SIZE], uint32 key_no); + +static void +create_optimize_cfg(splinterdb_config *out_cfg, data_config *default_data_cfg); + +static void +force_flush_current_memtable(splinterdb *kvsb); + +static void +insert_key(splinterdb *kvsb, uint32 key_no); + +static void +load_key_batches(splinterdb *kvsb, uint32 num_keys, uint32 batch_size); + +static void +verify_point_lookups(splinterdb *kvsb, uint32 num_keys); + +static void +verify_full_scan(splinterdb *kvsb, uint32 num_keys); + +static void +optimize_callback(splinterdb_notification *notification); + +static void * +optimize_thread(void *arg); + +CTEST_DATA(splinterdb_optimize) +{ + splinterdb *kvsb; + splinterdb_config cfg; + data_config data_cfg; +}; + +CTEST_SETUP(splinterdb_optimize) +{ + default_data_config_init(&data->data_cfg); + create_optimize_cfg(&data->cfg, &data->data_cfg); + data->cfg.use_shmem = + config_parse_use_shmem(Ctest_argc, (char **)Ctest_argv); + + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); +} + +CTEST_TEARDOWN(splinterdb_optimize) +{ + if (data->kvsb != NULL) { + splinterdb_close(&data->kvsb); + } + platform_deregister_thread(); +} + +CTEST2(splinterdb_optimize, test_blocking_full_range) +{ + const uint32 num_keys = 320; + + load_key_batches(data->kvsb, num_keys, 40); + + splinterdb_notification notification; + splinterdb_notification_init_blocking(¬ification); + int rc = splinterdb_optimize( + data->kvsb, NULL_SLICE, NULL_SLICE, TRUE, ¬ification); + ASSERT_EQUAL(0, rc); + splinterdb_notification_deinit(¬ification); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +CTEST2(splinterdb_optimize, test_blocking_without_full_leaf_compactions) +{ + const uint32 num_keys = 320; + + load_key_batches(data->kvsb, num_keys, 40); + + splinterdb_notification notification; + splinterdb_notification_init_blocking(¬ification); + int rc = splinterdb_optimize( + data->kvsb, NULL_SLICE, NULL_SLICE, FALSE, ¬ification); + ASSERT_EQUAL(0, rc); + splinterdb_notification_deinit(¬ification); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +CTEST2(splinterdb_optimize, test_open_reads_disk_geometry) +{ + const uint32 num_keys = 160; + + load_key_batches(data->kvsb, num_keys, 40); + splinterdb_close(&data->kvsb); + + data->cfg.disk_size = 0; + data->cfg.page_size = 0; + data->cfg.extent_size = 0; + + int rc = splinterdb_open(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +CTEST2(splinterdb_optimize, test_polling_subrange) +{ + const uint32 num_keys = 320; + char min_key[OPTIMIZE_TEST_KEY_SIZE]; + char max_key[OPTIMIZE_TEST_KEY_SIZE]; + + load_key_batches(data->kvsb, num_keys, 40); + format_key(min_key, 80); + format_key(max_key, 240); + + uint64 user_data = 42; + splinterdb_notification notification; + splinterdb_notification_init_polling(¬ification, &user_data); + + int rc = splinterdb_optimize(data->kvsb, + slice_create(strlen(min_key), min_key), + slice_create(strlen(max_key), max_key), + TRUE, + ¬ification); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(&user_data == splinterdb_notification_user_data(¬ification)); + + int status = EINVAL; + rc = splinterdb_notification_wait(¬ification); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_notification_poll(¬ification, &status)); + ASSERT_EQUAL(0, status); + + splinterdb_notification_deinit(¬ification); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +CTEST2(splinterdb_optimize, test_callback_completion) +{ + const uint32 num_keys = 320; + char min_key[OPTIMIZE_TEST_KEY_SIZE]; + char max_key[OPTIMIZE_TEST_KEY_SIZE]; + + load_key_batches(data->kvsb, num_keys, 40); + format_key(min_key, 0); + format_key(max_key, num_keys); + + optimize_callback_state callback_state = {0}; + splinterdb_notification notification; + splinterdb_notification_init_callback( + ¬ification, optimize_callback, &callback_state); + + int rc = splinterdb_optimize(data->kvsb, + slice_create(strlen(min_key), min_key), + slice_create(strlen(max_key), max_key), + TRUE, + ¬ification); + ASSERT_EQUAL(0, rc); + + rc = splinterdb_notification_wait(¬ification); + ASSERT_EQUAL(0, rc); + + for (uint64 i = 0; + i < 100000 && __sync_fetch_and_add(&callback_state.callbacks, 0) == 0; + i++) + { + platform_sleep_ns(1000); + } + + ASSERT_EQUAL(1, __sync_fetch_and_add(&callback_state.callbacks, 0)); + ASSERT_EQUAL(0, callback_state.status); + ASSERT_TRUE(¬ification == callback_state.notification); + + splinterdb_notification_deinit(¬ification); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +CTEST2(splinterdb_optimize, test_concurrent_overlapping_ranges) +{ + const uint32 num_keys = 320; + const uint32 num_threads = 4; + const uint32 range_min[] = {0, 40, 80, 120}; + const uint32 range_max[] = {200, 240, 280, 320}; + pthread_t threads[4]; + optimize_thread_args args[4]; + splinterdb_notification notifications[4]; + pthread_barrier_t barrier; + + load_key_batches(data->kvsb, num_keys, 40); + + int rc = pthread_barrier_init(&barrier, NULL, num_threads); + ASSERT_EQUAL(0, rc); + + for (uint32 i = 0; i < num_threads; i++) { + ZERO_STRUCT(args[i]); + args[i].kvsb = data->kvsb; + args[i].notification = ¬ifications[i]; + args[i].barrier = &barrier; + format_key(args[i].min_key, range_min[i]); + format_key(args[i].max_key, range_max[i]); + splinterdb_notification_init_polling(¬ifications[i], &args[i]); + + rc = pthread_create(&threads[i], NULL, optimize_thread, &args[i]); + ASSERT_EQUAL(0, rc); + } + + for (uint32 i = 0; i < num_threads; i++) { + rc = pthread_join(threads[i], NULL); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(0, args[i].rc); + } + + for (uint32 i = 0; i < num_threads; i++) { + rc = splinterdb_notification_wait(¬ifications[i]); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(&args[i] + == splinterdb_notification_user_data(¬ifications[i])); + splinterdb_notification_deinit(¬ifications[i]); + } + + rc = pthread_barrier_destroy(&barrier); + ASSERT_EQUAL(0, rc); + + verify_point_lookups(data->kvsb, num_keys); + verify_full_scan(data->kvsb, num_keys); +} + +static void +format_key(char key[OPTIMIZE_TEST_KEY_SIZE], uint32 key_no) +{ + ASSERT_EQUAL( + OPTIMIZE_TEST_KEY_LENGTH, + snprintf(key, OPTIMIZE_TEST_KEY_SIZE, optimize_key_fmt, key_no)); +} + +static void +format_value(char value[OPTIMIZE_TEST_VALUE_SIZE], uint32 key_no) +{ + ASSERT_EQUAL( + OPTIMIZE_TEST_VALUE_LENGTH, + snprintf(value, OPTIMIZE_TEST_VALUE_SIZE, optimize_value_fmt, key_no)); +} + +static void +create_optimize_cfg(splinterdb_config *out_cfg, data_config *default_data_cfg) +{ + *out_cfg = (splinterdb_config){ + .filename = TEST_CONFIG_DEFAULT_IO_FILENAME, + .cache_size = 128 * Mega, + .disk_size = 256 * Mega, + .data_cfg = default_data_cfg, + .fanout = 4, + .memtable_capacity = 1 * Mega, + .num_memtable_bg_threads = 1, + .num_normal_bg_threads = 4, + .queue_scale_percent = UINT64_MAX, + }; +} + +static void +force_flush_current_memtable(splinterdb *kvsb) +{ + core_handle *core = (core_handle *)splinterdb_get_trunk_handle(kvsb); + uint64 generation = memtable_force_finalize(&core->mt_ctxt); + core->mt_ctxt.process(core->mt_ctxt.process_ctxt, generation); + platform_status rc = task_perform_until_quiescent(core->ts); + ASSERT_TRUE(SUCCESS(rc)); +} + +static void +insert_key(splinterdb *kvsb, uint32 key_no) +{ + char key[OPTIMIZE_TEST_KEY_SIZE]; + char value[OPTIMIZE_TEST_VALUE_SIZE]; + + format_key(key, key_no); + format_value(value, key_no); + + int rc = splinterdb_insert(kvsb, + slice_create(strlen(key), key), + slice_create(strlen(value), value), + NULL); + ASSERT_EQUAL(0, rc); +} + +static void +load_key_batches(splinterdb *kvsb, uint32 num_keys, uint32 batch_size) +{ + for (uint32 key_no = 0; key_no < num_keys; key_no++) { + insert_key(kvsb, key_no); + if ((key_no + 1) % batch_size == 0) { + force_flush_current_memtable(kvsb); + } + } + + if (num_keys % batch_size != 0) { + force_flush_current_memtable(kvsb); + } +} + +static void +verify_point_lookups(splinterdb *kvsb, uint32 num_keys) +{ + splinterdb_lookup_result result; + splinterdb_lookup_result_init( + kvsb, &result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + + for (uint32 key_no = 0; key_no < num_keys; key_no++) { + char key[OPTIMIZE_TEST_KEY_SIZE]; + char value[OPTIMIZE_TEST_VALUE_SIZE]; + + format_key(key, key_no); + format_value(value, key_no); + + int rc = splinterdb_lookup(kvsb, slice_create(strlen(key), key), &result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_lookup_found(&result)); + + slice found; + rc = splinterdb_lookup_result_value(&result, &found); + ASSERT_EQUAL(0, rc); + ASSERT_EQUAL(strlen(value), slice_length(found)); + ASSERT_EQUAL(0, memcmp(value, slice_data(found), slice_length(found))); + } + + splinterdb_lookup_result_deinit(&result); +} + +static void +verify_full_scan(splinterdb *kvsb, uint32 num_keys) +{ + splinterdb_iterator *itor = NULL; + int rc = + splinterdb_iterator_init(kvsb, &itor, greater_than_or_equal, NULL_SLICE); + ASSERT_EQUAL(0, rc); + + for (uint32 key_no = 0; key_no < num_keys; key_no++) { + char key[OPTIMIZE_TEST_KEY_SIZE]; + char value[OPTIMIZE_TEST_VALUE_SIZE]; + + ASSERT_TRUE(splinterdb_iterator_valid(itor)); + + format_key(key, key_no); + format_value(value, key_no); + + slice found_key; + slice found_value; + splinterdb_iterator_get_current(itor, &found_key, &found_value); + + ASSERT_EQUAL(strlen(key), slice_length(found_key)); + ASSERT_EQUAL(0, + memcmp(key, slice_data(found_key), slice_length(found_key))); + ASSERT_EQUAL(strlen(value), slice_length(found_value)); + ASSERT_EQUAL( + 0, memcmp(value, slice_data(found_value), slice_length(found_value))); + + splinterdb_iterator_next(itor); + } + + ASSERT_FALSE(splinterdb_iterator_valid(itor)); + rc = splinterdb_iterator_status(itor); + ASSERT_EQUAL(0, rc); + splinterdb_iterator_deinit(itor); +} + +static void +optimize_callback(splinterdb_notification *notification) +{ + optimize_callback_state *state = + splinterdb_notification_user_data(notification); + int status = EINVAL; + + platform_assert(splinterdb_notification_poll(notification, &status)); + state->status = status; + state->notification = notification; + __sync_fetch_and_add(&state->callbacks, 1); +} + +static void * +optimize_thread(void *arg) +{ + optimize_thread_args *args = (optimize_thread_args *)arg; + int rc = pthread_barrier_wait(args->barrier); + + if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { + args->rc = rc; + return NULL; + } + + args->rc = + splinterdb_optimize(args->kvsb, + slice_create(strlen(args->min_key), args->min_key), + slice_create(strlen(args->max_key), args->max_key), + TRUE, + args->notification); + + return NULL; +} diff --git a/tests/unit/splinterdb_quick_test.c b/tests/unit/splinterdb_quick_test.c index 43d27c1c..5dee4428 100644 --- a/tests/unit/splinterdb_quick_test.c +++ b/tests/unit/splinterdb_quick_test.c @@ -578,13 +578,14 @@ CTEST2(splinterdb_quick, test_variable_length_values) // freshen up the buffer memset(big_buffer, 'x', sizeof(big_buffer)); char saved_big_buffer[sizeof(big_buffer)]; - memcpy(saved_big_buffer, big_buffer, sizeof(big_buffer)); + memset(saved_big_buffer, 'x', sizeof(saved_big_buffer)); - // init the result again, but pretend the buffer is small + // init the result again, but give it a buffer too small for the value + uint64 short_buffer_len = TEST_MAX_VALUE_SIZE - 1; splinterdb_lookup_result_init(data->kvsb, &result, SPLINTERDB_LOOKUP_VALUE, - sizeof(big_buffer) / 2, + short_buffer_len, big_buffer); // lookup tuple with max-sized-value, passing it the short buffer @@ -602,6 +603,7 @@ CTEST2(splinterdb_quick, test_variable_length_values) // we can deinit the result, and it doesn't try to free the stack space we // originally gave it splinterdb_lookup_result_deinit(&result); + ASSERT_EQUAL(0, memcmp(saved_big_buffer, big_buffer, sizeof(big_buffer))); // init another result, but don't give it a buffer