From 8fc1c18e964faa56db3efa0c64d77434ff892887 Mon Sep 17 00:00:00 2001 From: Martin Raiber Date: Sun, 4 May 2025 19:45:39 +0200 Subject: [PATCH] Add WAL file to potentially speed up commits --- .gitignore | 3 +- CMakeLists.txt | 12 +-- CMakePresets.json | 7 ++ src/ApiHandler.cpp | 2 + src/SingleFileStorage.cpp | 174 ++++++++++++++++++++++++++++++++------ src/SingleFileStorage.h | 23 ++++- src/WalFile.cpp | 140 ++++++++++++++++++++++++++++++ src/WalFile.h | 25 ++++++ src/main.cpp | 2 + src/s3handler.cpp | 13 ++- 10 files changed, 360 insertions(+), 41 deletions(-) create mode 100644 src/WalFile.cpp create mode 100644 src/WalFile.h diff --git a/.gitignore b/.gitignore index 6d2e101..4a6a282 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ vcpkg vcpkg_installed /homepage/_site *.sqlgenbackup -.test_venv \ No newline at end of file +.test_venv +minio diff --git a/CMakeLists.txt b/CMakeLists.txt index bf8e0ce..b280a8b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,6 +70,12 @@ configure_file( @ONLY ) +# add_compile_options(-fno-omit-frame-pointer) +# if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_CONFIGURATION_TYPES MATCHES ".*[dD]ebug.*") +# add_compile_options(-fsanitize=address) +# add_link_options(-fsanitize=address) +# endif() + add_executable(hs5 external/lmdb/midl.cpp external/lmdb/mdb.c @@ -90,6 +96,7 @@ add_executable(hs5 src/Session.cpp src/cmd.cpp src/StaticHandler.cpp + src/WalFile.cpp wwwgen/www_files.cpp ${SCHEMA_SOURCES}) @@ -116,10 +123,5 @@ target_link_libraries(hs5 PRIVATE $,zst proxygen::proxygen proxygen::proxygencurl proxygen::proxygenhttpserver expat::expat unofficial-sodium::sodium SqliteCppGen::SqliteCppGen nlohmann_json::nlohmann_json fmt::fmt) -if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_CONFIGURATION_TYPES MATCHES ".*[dD]ebug.*") - add_compile_options(-fsanitize=address) - add_link_options(-fsanitize=address) -endif() - target_compile_features(hs5 PUBLIC cxx_std_20) diff --git a/CMakePresets.json b/CMakePresets.json index 8ca09a9..a8ce3b2 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -35,6 +35,13 @@ "description": "Build with Ninja/vcpkg (Release)", "configuration": "Release" }, + { + "name": "ninja-vcpkg-release-with-debug-info", + "configurePreset": "ninja-multi-vcpkg", + "displayName": "Build (Release)", + "description": "Build with Ninja/vcpkg (Release with debug info)", + "configuration": "RelWithDebInfo" + }, { "name": "ninja-vcpkg", "configurePreset": "ninja-multi-vcpkg", diff --git a/src/ApiHandler.cpp b/src/ApiHandler.cpp index 97a02ee..e168c20 100644 --- a/src/ApiHandler.cpp +++ b/src/ApiHandler.cpp @@ -409,6 +409,8 @@ Api::ListResp ApiHandler::list(const Api::ListParams& params, const ApiSessionSt const auto bucketId = *bucketIdOpt; const auto iterStartVal = make_key({.key = params.continuationToken ? *params.continuationToken : prefix, .version=std::numeric_limits::max(), .bucketId = bucketId}); + sfs.list_commit(); + SingleFileStorage::IterData iterData = {}; if(!sfs.iter_start(iterStartVal, false, iterData)) throw ApiError(Api::Herror::errorStartingListing); diff --git a/src/SingleFileStorage.cpp b/src/SingleFileStorage.cpp index ac2a0e6..1070149 100644 --- a/src/SingleFileStorage.cpp +++ b/src/SingleFileStorage.cpp @@ -3,6 +3,7 @@ * SPDX-License-Identifier: LGPL-3.0-or-later */ #include "SingleFileStorage.h" +#include "WalFile.h" #include #include #include @@ -911,9 +912,13 @@ SingleFileStorage::SingleFileStorage(SFSOptions options) } else { + // Use random initial transid to avoid applying mismatching wal files + curr_transid = folly::Random::rand32(); XLOG(INFO) << "New data file transid " << std::to_string(curr_transid) << " curr size "+folly::prettyPrint(index_file_size, folly::PRETTY_BYTES_IEC) << " fn " << data_file_path; } + prev_transid = curr_transid; + ch = dbi_size_info_next_disk_id; val.mv_data = &ch; val.mv_size = 1; @@ -1155,6 +1160,24 @@ SingleFileStorage::SingleFileStorage(SFSOptions options) (*cb)(); }); } + + if(!options.wal_file_path.empty()) + { + XLOGF(INFO, "Opening WAL file at {}", options.wal_file_path); + wal_file = std::make_unique(options.wal_file_path); + + const auto items = wal_file->read(curr_transid); + + if(!items.empty()) + { + XLOGF(INFO, "WAL file read {} items", items.size()); + } + + for(const auto& item : items) + { + commit_queue.push_back(item); + } + } } SingleFileStorage::SingleFileStorage() @@ -2342,43 +2365,46 @@ bool SingleFileStorage::restore_old(const std::string & fn) return true; } -bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t disk_id) +bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t disk_id, const bool pre_sync) { if (is_dead) { return false; } - if (data_file.fsyncNoInt()!=0) + if(pre_sync) { - XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno); - write_offline = true; - do_stop_on_error(); - return false; - } - - { - std::scoped_lock lock(mutex); - mdb_curr_sync = true; - } - - if (mdb_env_sync(db_env, 0) != 0) - { - XLOG(ERR) << "mdb_env_sync on " << db_path << " failed. " << folly::errnoStr(errno); - write_offline = true; - do_stop_on_error(); - return false; - } - - if (cache_db_env != nullptr) - { - if (mdb_env_sync(cache_db_env, 0) != 0) + if (data_file.fsyncNoInt()!=0) { - XLOG(ERR) << "mdb_env_sync on cache db failed. " << folly::errnoStr(errno); + XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno); write_offline = true; do_stop_on_error(); return false; } + + { + std::scoped_lock lock(mutex); + mdb_curr_sync = true; + } + + if (mdb_env_sync(db_env, 0) != 0) + { + XLOG(ERR) << "mdb_env_sync on " << db_path << " failed. " << folly::errnoStr(errno); + write_offline = true; + do_stop_on_error(); + return false; + } + + if (cache_db_env != nullptr) + { + if (mdb_env_sync(cache_db_env, 0) != 0) + { + XLOG(ERR) << "mdb_env_sync on cache db failed. " << folly::errnoStr(errno); + write_offline = true; + do_stop_on_error(); + return false; + } + } } std::unique_lock lock(mutex); @@ -2386,7 +2412,7 @@ bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t d SCommitInfo commit_info; SFragInfo frag_info; frag_info.offset = transid; - frag_info.len = disk_id; + frag_info.len = 0; frag_info.action = FragAction::Commit; frag_info.commit_info = &commit_info; @@ -2402,7 +2428,43 @@ bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t d commit_queue.push_back(frag_info); } - mdb_curr_sync = false; + if(pre_sync) + mdb_curr_sync = false; + + cond.notify_all(); + commit_info.commit_done.wait(lock); + + return commit_info.commit_errors == 0; +} + +bool SingleFileStorage::list_commit() +{ + if (is_dead) + { + return false; + } + + // Only necessary if using wal file + if(!wal_file) + return true; + + std::unique_lock lock(mutex); + + if(!needs_wal_file_reset) + return true; + + SCommitInfo commit_info; + SFragInfo frag_info; + frag_info.offset = -1; + frag_info.len = 0; + frag_info.md5sum = "l"; + frag_info.action = FragAction::Commit; + frag_info.commit_info = &commit_info; + + if (is_dead) + return false; + + commit_queue.push_back(frag_info); cond.notify_all(); commit_info.commit_done.wait(lock); @@ -6726,6 +6788,9 @@ void SingleFileStorage::operator()() if (frag_info.action == FragAction::Commit && frag_info.offset != -1) { + // This logic is for coordinating multiple files. Disable for now + assert(false); + int64_t disk_id = frag_info.len; if (disk_id == 0) curr_transid = frag_info.offset; @@ -6751,7 +6816,49 @@ void SingleFileStorage::operator()() mod_items += 2; } - if (frag_info.action == FragAction::Commit ) + if(wal_file && !wal_file->write(prev_transid, frag_info) ) + { + XLOG(ERR) << "Failed to write to wal file. " << folly::errnoStr(errno); + ++commit_errors; + } + + + if(frag_info.action == FragAction::Commit && + frag_info.md5sum != "reset" && + frag_info.md5sum != "l" && + wal_file && wal_file->items() < 1000) + { + if (data_file.fsyncNoInt()!=0) + { + XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno); + ++commit_errors; + } + + if (new_data_file + && new_data_file.fsyncNoInt()!=0) + { + XLOG(ERR) << "Failed to sync new data file " << (data_file_path.parent_path() / "new_data") << ". " << folly::errnoStr(errno); + ++commit_errors; + } + + const auto commit_ok = wal_file->sync(); + if(!commit_ok) + { + XLOG(ERR) << "Failed to sync wal file. " << folly::errnoStr(errno); + ++commit_errors; + } + + std::scoped_lock llock(mutex); + if (frag_info.commit_info != nullptr) + { + frag_info.commit_info->commit_errors = commit_errors; + frag_info.commit_info->commit_done.notify_all(); + if(commit_ok) + commit_errors = 0; + } + needs_wal_file_reset = true; + } + else if (frag_info.action == FragAction::Commit ) { if (frag_info.md5sum == "reset" && commit_errors>0) @@ -7077,6 +7184,9 @@ void SingleFileStorage::operator()() XLOG(INFO) << "Defrag ctr incremented. Defrag can continue..."; defrag_restart=0; } + + if(commit_ok) + needs_wal_file_reset = false; } if (has_commit_info @@ -7087,6 +7197,14 @@ void SingleFileStorage::operator()() if (commit_ok) { + assert(frag_info.offset == -1); + prev_transid = curr_transid; + if(frag_info.offset == -1) + ++curr_transid; + + if(wal_file) + wal_file->reset(); + int rc = mdb_txn_begin(db_env, NULL, 0, &txn); if (rc) { diff --git a/src/SingleFileStorage.h b/src/SingleFileStorage.h index 7681506..a966015 100644 --- a/src/SingleFileStorage.h +++ b/src/SingleFileStorage.h @@ -29,8 +29,11 @@ using THREAD_ID = pid_t; constexpr int64_t sfs_block_size = 4096; +class WalFile; + class SingleFileStorage { + friend WalFile; public: struct SPunchItem @@ -87,6 +90,7 @@ public: std::string db_path; std::string freespace_cache_path; std::string dm_cache_path; + std::string wal_file_path; int64_t dm_cache_size = 0; bool use_direct_io = false; int64_t data_file_size_limit_mb = 0; @@ -168,11 +172,20 @@ public: bool restore_old(const std::string& fn); - bool commit(bool background_queue, int64_t transid) { - return commit(background_queue, transid, 0); + bool commit(bool background_queue, int64_t transid, const bool pre_sync) { + return commit(background_queue, transid, 0, pre_sync); } - bool commit(bool background_queue, int64_t transid, int64_t disk_id); + bool commit(bool background_queue, int64_t transid, int64_t disk_id, const bool pre_sync); + + /** + * For read after write consistency, when running with WAL, this commits to the LMDB if necessary + * so the listings have read after write consistency. + * + * This could be improved by merging the uncommited changes into the listing, but avoid doing that + * work for now. + */ + bool list_commit(); bool empty_queue(bool background_queue); @@ -566,6 +579,7 @@ private: relaxed_atomic is_dead; relaxed_atomic write_offline; + int64_t prev_transid; int64_t curr_transid; bool force_freespace_check; @@ -605,6 +619,9 @@ private: common_prefix_hash_func_t common_prefix_hash_func; int64_t curr_version = 0; + + std::unique_ptr wal_file; + bool needs_wal_file_reset = false; }; diff --git a/src/WalFile.cpp b/src/WalFile.cpp new file mode 100644 index 0000000..9bfee8a --- /dev/null +++ b/src/WalFile.cpp @@ -0,0 +1,140 @@ +/** + * Copyright Martin Raiber. All Rights Reserved. + * SPDX-License-Identifier: LGPL-3.0-or-later + */ +#include "WalFile.h" +#include "data.h" +#include +#include + +WalFile::WalFile(const std::string &path) + : file(path, O_RDWR | O_CREAT | O_APPEND, 0644) +{ + +} + +bool WalFile::needsWrite(const SingleFileStorage::SFragInfo& info) +{ + switch(info.action) + { + case SingleFileStorage::FragAction::Add: + case SingleFileStorage::FragAction::Del: + case SingleFileStorage::FragAction::DelOld: + case SingleFileStorage::FragAction::DelWithQueued: + case SingleFileStorage::FragAction::QueueDel: + case SingleFileStorage::FragAction::UnqueueDel: + case SingleFileStorage::FragAction::RestoreOld: + case SingleFileStorage::FragAction::FreeExtents: + return true; + default: + return false; + } +} + +bool WalFile::write(const int64_t transid, const SingleFileStorage::SFragInfo& info) +{ + if(!needsWrite(info)) + { + return true; + } + + CWData data; + data.addUInt(0); // checksum + data.addUInt(0); // size + data.addChar(0); // version + data.addVarInt(transid); + data.addUChar(static_cast(info.action)); + data.addString2(info.fn); + data.addVarInt(info.offset); + data.addVarInt(info.len); + data.addVarInt(info.last_modified); + data.addVarInt(info.extra_exts.size()); + for (const auto& ext : info.extra_exts) + { + data.addVarInt(ext.offset); + data.addVarInt(ext.len); + } + + const auto dsize = static_cast(data.getDataSize()); + memcpy(data.getDataPtr() + sizeof(dsize), &dsize, sizeof(dsize)); + const auto crc = folly::crc32c(reinterpret_cast(data.getDataPtr() + sizeof(dsize)), dsize - sizeof(dsize)); + static_assert(sizeof(crc) == sizeof(unsigned int)); + memcpy(data.getDataPtr(), &crc, sizeof(crc)); + + if(file.pwriteFull(data.getDataPtr(), data.getDataSize(), offset) != data.getDataSize()) + { + return false; + } + + ++_items; + + offset += data.getDataSize(); + return true; +} + +std::vector WalFile::read(int64_t transid) +{ + folly::MemoryMapping mapping(file.dupCloseOnExec()); + + const auto allData = mapping.range(); + size_t off = 0; + std::vector ret; + while(off + sizeof(unsigned int)*2 < allData.size()) + { + unsigned int dsize; + unsigned int crc; + memcpy(&crc, allData.data() + off, sizeof(crc)); + memcpy(&dsize, allData.data() + off + sizeof(crc), sizeof(dsize)); + if(off + dsize > allData.size()) + { + break; + } + if(crc != folly::crc32c(allData.data() + off + sizeof(crc), dsize - sizeof(crc))) + { + break; + } + CRData data(reinterpret_cast(allData.data()) + off + sizeof(crc) + sizeof(dsize), dsize - sizeof(crc) - sizeof(dsize)); + off += dsize; + int64_t ctransid; + data.getVarInt(&ctransid); + if(ctransid != transid) + { + continue; + } + SingleFileStorage::SFragInfo info; + unsigned char action; + data.getUChar(&action); + info.action = static_cast(action); + data.getStr2(&info.fn); + data.getVarInt(&info.offset); + data.getVarInt(&info.len); + data.getVarInt(&info.last_modified); + + int64_t extra_exts_size = 0; + data.getVarInt(&extra_exts_size); + + for(size_t i = 0; i < extra_exts_size; ++i) + { + SingleFileStorage::SPunchItem ext; + data.getVarInt(&ext.offset); + data.getVarInt(&ext.len); + info.extra_exts.push_back(ext); + } + + ret.push_back(info); + } + + return ret; +} + +bool WalFile::sync() +{ + return file.fsyncNoInt() == 0; +} + +void WalFile::reset() +{ + offset = 0; + _items = 0; + file.truncate(0); +} \ No newline at end of file diff --git a/src/WalFile.h b/src/WalFile.h new file mode 100644 index 0000000..3ac3ccf --- /dev/null +++ b/src/WalFile.h @@ -0,0 +1,25 @@ +#include +#include "File.h" +#include "SingleFileStorage.h" + +class WalFile +{ + File file; + int64_t offset = 0; + size_t _items = 0; + + bool needsWrite(const SingleFileStorage::SFragInfo& info); + +public: + WalFile(const std::string &path); + + bool write(const int64_t transid, const SingleFileStorage::SFragInfo& info); + std::vector read(int64_t transid); + void reset(); + bool sync(); + + size_t items() const + { + return _items; + } +}; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 827f22f..20a7517 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -49,6 +49,7 @@ DEFINE_bool(stop_on_error, false, "Stop on write/read errors"); DEFINE_bool(punch_holes, true, "Free up space if not enough free space is left by punching holes"); DEFINE_string(server_url, "serverurl", "URL of server"); DEFINE_bool(bucket_versioning, false, "Enable bucket versioning"); +DEFINE_string(index_wal_path, "", "Path where to put the index WAL file. Disabled if empty"); namespace { std::unique_ptr server; @@ -172,6 +173,7 @@ int realMain(int argc, char* argv[]) sfsoptions.key_compare_func = mdb_cmp_s3key; sfsoptions.common_prefix_func = s3key_common_prefix; sfsoptions.common_prefix_hash_func = s3key_common_prefix_hash; + sfsoptions.wal_file_path = FLAGS_index_wal_path.empty() ? FLAGS_index_wal_path : (FLAGS_index_wal_path + os_file_sep() + "index.wal"); proxygen::HTTPSessionBase::setMaxReadBufferSize(16*1024); diff --git a/src/s3handler.cpp b/src/s3handler.cpp index 5810aa0..0b85218 100644 --- a/src/s3handler.cpp +++ b/src/s3handler.cpp @@ -50,6 +50,7 @@ const char unsigned_payload[] = "UNSIGNED-PAYLOAD"; DEFINE_bool(with_stop_command, false, "Allow stopping via putting to stop object"); DEFINE_bool(allow_sig_v2, true, "Allow aws sig v2"); DEFINE_string(host_override, "", "Override host for s3 requests"); +DEFINE_bool(pre_sync_commit, false, "Pre-sync data and index files before commit for potential performance gain"); std::string hashSha256Hex(const std::string_view payload) { @@ -1400,7 +1401,7 @@ void S3Handler::commit(proxygen::HTTPMessage& headers) folly::getGlobalCPUExecutor()->add( [self = this->self, evb]() { - bool b = self->sfs.commit(false, -1); + bool b = self->sfs.commit(false, -1, FLAGS_pre_sync_commit); evb->runInEventBaseThread([self = self, b]() { @@ -1979,7 +1980,7 @@ void S3Handler::finalizeMultipartUpload() if(!self->sfs.get_manual_commit()) { - const bool b = self->sfs.commit(false, -1); + const bool b = self->sfs.commit(false, -1, FLAGS_pre_sync_commit); if(!b) { @@ -2086,7 +2087,7 @@ void S3Handler::deleteObject(proxygen::HTTPMessage& headers) if(res==0 && !self->sfs.get_manual_commit()) { - res = self->sfs.commit(false, -1) ? 0 : 1; + res = self->sfs.commit(false, -1, FLAGS_pre_sync_commit) ? 0 : 1; } evb->runInEventBaseThread([self = self, res]() @@ -2135,6 +2136,8 @@ void S3Handler::deleteBucket(proxygen::HTTPMessage& headers) { const auto iterStartVal = make_key({.key = {}, .version=std::numeric_limits::max(), .bucketId = *bucketId}); SingleFileStorage::IterData iter_data = {}; + + self->sfs.list_commit(); if(!self->sfs.iter_start(iterStartVal, false, iter_data)) { @@ -2524,6 +2527,8 @@ void S3Handler::listObjects(folly::EventBase *evb, std::shared_ptr se iterStartVal = make_key({.key = {}, .version=std::numeric_limits::max(), .bucketId = bucketId}); } + sfs.list_commit(); + if(!sfs.iter_start(iterStartVal, false, iter_data)) { evb->runInEventBaseThread([self = self]() @@ -2972,7 +2977,7 @@ void S3Handler::onBodyCPU(folly::EventBase *evb, int64_t offset, std::unique_ptr if(!sfs.get_manual_commit()) { - bool b = sfs.commit(false, -1); + bool b = sfs.commit(false, -1, FLAGS_pre_sync_commit); if(!b) {