Add WAL file to potentially speed up commits

This commit is contained in:
Martin Raiber 2025-05-04 19:45:39 +02:00
parent 93fa52e890
commit 8fc1c18e96
10 changed files with 360 additions and 41 deletions

3
.gitignore vendored
View File

@ -12,4 +12,5 @@ vcpkg
vcpkg_installed
/homepage/_site
*.sqlgenbackup
.test_venv
.test_venv
minio

View File

@ -70,6 +70,12 @@ configure_file(
@ONLY
)
# add_compile_options(-fno-omit-frame-pointer)
# if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_CONFIGURATION_TYPES MATCHES ".*[dD]ebug.*")
# add_compile_options(-fsanitize=address)
# add_link_options(-fsanitize=address)
# endif()
add_executable(hs5
external/lmdb/midl.cpp
external/lmdb/mdb.c
@ -90,6 +96,7 @@ add_executable(hs5
src/Session.cpp
src/cmd.cpp
src/StaticHandler.cpp
src/WalFile.cpp
wwwgen/www_files.cpp
${SCHEMA_SOURCES})
@ -116,10 +123,5 @@ target_link_libraries(hs5 PRIVATE $<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zst
proxygen::proxygen proxygen::proxygencurl proxygen::proxygenhttpserver expat::expat unofficial-sodium::sodium SqliteCppGen::SqliteCppGen nlohmann_json::nlohmann_json fmt::fmt)
if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_CONFIGURATION_TYPES MATCHES ".*[dD]ebug.*")
add_compile_options(-fsanitize=address)
add_link_options(-fsanitize=address)
endif()
target_compile_features(hs5 PUBLIC cxx_std_20)

View File

@ -35,6 +35,13 @@
"description": "Build with Ninja/vcpkg (Release)",
"configuration": "Release"
},
{
"name": "ninja-vcpkg-release-with-debug-info",
"configurePreset": "ninja-multi-vcpkg",
"displayName": "Build (Release)",
"description": "Build with Ninja/vcpkg (Release with debug info)",
"configuration": "RelWithDebInfo"
},
{
"name": "ninja-vcpkg",
"configurePreset": "ninja-multi-vcpkg",

View File

@ -409,6 +409,8 @@ Api::ListResp ApiHandler::list(const Api::ListParams& params, const ApiSessionSt
const auto bucketId = *bucketIdOpt;
const auto iterStartVal = make_key({.key = params.continuationToken ? *params.continuationToken : prefix, .version=std::numeric_limits<int64_t>::max(), .bucketId = bucketId});
sfs.list_commit();
SingleFileStorage::IterData iterData = {};
if(!sfs.iter_start(iterStartVal, false, iterData))
throw ApiError(Api::Herror::errorStartingListing);

View File

@ -3,6 +3,7 @@
* SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include "SingleFileStorage.h"
#include "WalFile.h"
#include <asm-generic/errno-base.h>
#include <asm-generic/errno.h>
#include <cstdint>
@ -911,9 +912,13 @@ SingleFileStorage::SingleFileStorage(SFSOptions options)
}
else
{
// Use random initial transid to avoid applying mismatching wal files
curr_transid = folly::Random::rand32();
XLOG(INFO) << "New data file transid " << std::to_string(curr_transid) << " curr size "+folly::prettyPrint(index_file_size, folly::PRETTY_BYTES_IEC) << " fn " << data_file_path;
}
prev_transid = curr_transid;
ch = dbi_size_info_next_disk_id;
val.mv_data = &ch;
val.mv_size = 1;
@ -1155,6 +1160,24 @@ SingleFileStorage::SingleFileStorage(SFSOptions options)
(*cb)();
});
}
if(!options.wal_file_path.empty())
{
XLOGF(INFO, "Opening WAL file at {}", options.wal_file_path);
wal_file = std::make_unique<WalFile>(options.wal_file_path);
const auto items = wal_file->read(curr_transid);
if(!items.empty())
{
XLOGF(INFO, "WAL file read {} items", items.size());
}
for(const auto& item : items)
{
commit_queue.push_back(item);
}
}
}
SingleFileStorage::SingleFileStorage()
@ -2342,43 +2365,46 @@ bool SingleFileStorage::restore_old(const std::string & fn)
return true;
}
bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t disk_id)
bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t disk_id, const bool pre_sync)
{
if (is_dead)
{
return false;
}
if (data_file.fsyncNoInt()!=0)
if(pre_sync)
{
XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno);
write_offline = true;
do_stop_on_error();
return false;
}
{
std::scoped_lock lock(mutex);
mdb_curr_sync = true;
}
if (mdb_env_sync(db_env, 0) != 0)
{
XLOG(ERR) << "mdb_env_sync on " << db_path << " failed. " << folly::errnoStr(errno);
write_offline = true;
do_stop_on_error();
return false;
}
if (cache_db_env != nullptr)
{
if (mdb_env_sync(cache_db_env, 0) != 0)
if (data_file.fsyncNoInt()!=0)
{
XLOG(ERR) << "mdb_env_sync on cache db failed. " << folly::errnoStr(errno);
XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno);
write_offline = true;
do_stop_on_error();
return false;
}
{
std::scoped_lock lock(mutex);
mdb_curr_sync = true;
}
if (mdb_env_sync(db_env, 0) != 0)
{
XLOG(ERR) << "mdb_env_sync on " << db_path << " failed. " << folly::errnoStr(errno);
write_offline = true;
do_stop_on_error();
return false;
}
if (cache_db_env != nullptr)
{
if (mdb_env_sync(cache_db_env, 0) != 0)
{
XLOG(ERR) << "mdb_env_sync on cache db failed. " << folly::errnoStr(errno);
write_offline = true;
do_stop_on_error();
return false;
}
}
}
std::unique_lock lock(mutex);
@ -2386,7 +2412,7 @@ bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t d
SCommitInfo commit_info;
SFragInfo frag_info;
frag_info.offset = transid;
frag_info.len = disk_id;
frag_info.len = 0;
frag_info.action = FragAction::Commit;
frag_info.commit_info = &commit_info;
@ -2402,7 +2428,43 @@ bool SingleFileStorage::commit(bool background_queue, int64_t transid, int64_t d
commit_queue.push_back(frag_info);
}
mdb_curr_sync = false;
if(pre_sync)
mdb_curr_sync = false;
cond.notify_all();
commit_info.commit_done.wait(lock);
return commit_info.commit_errors == 0;
}
bool SingleFileStorage::list_commit()
{
if (is_dead)
{
return false;
}
// Only necessary if using wal file
if(!wal_file)
return true;
std::unique_lock lock(mutex);
if(!needs_wal_file_reset)
return true;
SCommitInfo commit_info;
SFragInfo frag_info;
frag_info.offset = -1;
frag_info.len = 0;
frag_info.md5sum = "l";
frag_info.action = FragAction::Commit;
frag_info.commit_info = &commit_info;
if (is_dead)
return false;
commit_queue.push_back(frag_info);
cond.notify_all();
commit_info.commit_done.wait(lock);
@ -6726,6 +6788,9 @@ void SingleFileStorage::operator()()
if (frag_info.action == FragAction::Commit
&& frag_info.offset != -1)
{
// This logic is for coordinating multiple files. Disable for now
assert(false);
int64_t disk_id = frag_info.len;
if (disk_id == 0)
curr_transid = frag_info.offset;
@ -6751,7 +6816,49 @@ void SingleFileStorage::operator()()
mod_items += 2;
}
if (frag_info.action == FragAction::Commit )
if(wal_file && !wal_file->write(prev_transid, frag_info) )
{
XLOG(ERR) << "Failed to write to wal file. " << folly::errnoStr(errno);
++commit_errors;
}
if(frag_info.action == FragAction::Commit &&
frag_info.md5sum != "reset" &&
frag_info.md5sum != "l" &&
wal_file && wal_file->items() < 1000)
{
if (data_file.fsyncNoInt()!=0)
{
XLOG(ERR) << "Failed to sync data file " << data_file_path << ". " << folly::errnoStr(errno);
++commit_errors;
}
if (new_data_file
&& new_data_file.fsyncNoInt()!=0)
{
XLOG(ERR) << "Failed to sync new data file " << (data_file_path.parent_path() / "new_data") << ". " << folly::errnoStr(errno);
++commit_errors;
}
const auto commit_ok = wal_file->sync();
if(!commit_ok)
{
XLOG(ERR) << "Failed to sync wal file. " << folly::errnoStr(errno);
++commit_errors;
}
std::scoped_lock llock(mutex);
if (frag_info.commit_info != nullptr)
{
frag_info.commit_info->commit_errors = commit_errors;
frag_info.commit_info->commit_done.notify_all();
if(commit_ok)
commit_errors = 0;
}
needs_wal_file_reset = true;
}
else if (frag_info.action == FragAction::Commit )
{
if (frag_info.md5sum == "reset"
&& commit_errors>0)
@ -7077,6 +7184,9 @@ void SingleFileStorage::operator()()
XLOG(INFO) << "Defrag ctr incremented. Defrag can continue...";
defrag_restart=0;
}
if(commit_ok)
needs_wal_file_reset = false;
}
if (has_commit_info
@ -7087,6 +7197,14 @@ void SingleFileStorage::operator()()
if (commit_ok)
{
assert(frag_info.offset == -1);
prev_transid = curr_transid;
if(frag_info.offset == -1)
++curr_transid;
if(wal_file)
wal_file->reset();
int rc = mdb_txn_begin(db_env, NULL, 0, &txn);
if (rc)
{

View File

@ -29,8 +29,11 @@ using THREAD_ID = pid_t;
constexpr int64_t sfs_block_size = 4096;
class WalFile;
class SingleFileStorage
{
friend WalFile;
public:
struct SPunchItem
@ -87,6 +90,7 @@ public:
std::string db_path;
std::string freespace_cache_path;
std::string dm_cache_path;
std::string wal_file_path;
int64_t dm_cache_size = 0;
bool use_direct_io = false;
int64_t data_file_size_limit_mb = 0;
@ -168,11 +172,20 @@ public:
bool restore_old(const std::string& fn);
bool commit(bool background_queue, int64_t transid) {
return commit(background_queue, transid, 0);
bool commit(bool background_queue, int64_t transid, const bool pre_sync) {
return commit(background_queue, transid, 0, pre_sync);
}
bool commit(bool background_queue, int64_t transid, int64_t disk_id);
bool commit(bool background_queue, int64_t transid, int64_t disk_id, const bool pre_sync);
/**
* For read after write consistency, when running with WAL, this commits to the LMDB if necessary
* so the listings have read after write consistency.
*
* This could be improved by merging the uncommited changes into the listing, but avoid doing that
* work for now.
*/
bool list_commit();
bool empty_queue(bool background_queue);
@ -566,6 +579,7 @@ private:
relaxed_atomic<bool> is_dead;
relaxed_atomic<bool> write_offline;
int64_t prev_transid;
int64_t curr_transid;
bool force_freespace_check;
@ -605,6 +619,9 @@ private:
common_prefix_hash_func_t common_prefix_hash_func;
int64_t curr_version = 0;
std::unique_ptr<WalFile> wal_file;
bool needs_wal_file_reset = false;
};

140
src/WalFile.cpp Normal file
View File

@ -0,0 +1,140 @@
/**
* Copyright Martin Raiber. All Rights Reserved.
* SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include "WalFile.h"
#include "data.h"
#include <folly/system/MemoryMapping.h>
#include <folly/hash/Checksum.h>
WalFile::WalFile(const std::string &path)
: file(path, O_RDWR | O_CREAT | O_APPEND, 0644)
{
}
bool WalFile::needsWrite(const SingleFileStorage::SFragInfo& info)
{
switch(info.action)
{
case SingleFileStorage::FragAction::Add:
case SingleFileStorage::FragAction::Del:
case SingleFileStorage::FragAction::DelOld:
case SingleFileStorage::FragAction::DelWithQueued:
case SingleFileStorage::FragAction::QueueDel:
case SingleFileStorage::FragAction::UnqueueDel:
case SingleFileStorage::FragAction::RestoreOld:
case SingleFileStorage::FragAction::FreeExtents:
return true;
default:
return false;
}
}
bool WalFile::write(const int64_t transid, const SingleFileStorage::SFragInfo& info)
{
if(!needsWrite(info))
{
return true;
}
CWData data;
data.addUInt(0); // checksum
data.addUInt(0); // size
data.addChar(0); // version
data.addVarInt(transid);
data.addUChar(static_cast<unsigned char>(info.action));
data.addString2(info.fn);
data.addVarInt(info.offset);
data.addVarInt(info.len);
data.addVarInt(info.last_modified);
data.addVarInt(info.extra_exts.size());
for (const auto& ext : info.extra_exts)
{
data.addVarInt(ext.offset);
data.addVarInt(ext.len);
}
const auto dsize = static_cast<unsigned int>(data.getDataSize());
memcpy(data.getDataPtr() + sizeof(dsize), &dsize, sizeof(dsize));
const auto crc = folly::crc32c(reinterpret_cast<uint8_t*>(data.getDataPtr() + sizeof(dsize)), dsize - sizeof(dsize));
static_assert(sizeof(crc) == sizeof(unsigned int));
memcpy(data.getDataPtr(), &crc, sizeof(crc));
if(file.pwriteFull(data.getDataPtr(), data.getDataSize(), offset) != data.getDataSize())
{
return false;
}
++_items;
offset += data.getDataSize();
return true;
}
std::vector<SingleFileStorage::SFragInfo> WalFile::read(int64_t transid)
{
folly::MemoryMapping mapping(file.dupCloseOnExec());
const auto allData = mapping.range();
size_t off = 0;
std::vector<SingleFileStorage::SFragInfo> ret;
while(off + sizeof(unsigned int)*2 < allData.size())
{
unsigned int dsize;
unsigned int crc;
memcpy(&crc, allData.data() + off, sizeof(crc));
memcpy(&dsize, allData.data() + off + sizeof(crc), sizeof(dsize));
if(off + dsize > allData.size())
{
break;
}
if(crc != folly::crc32c(allData.data() + off + sizeof(crc), dsize - sizeof(crc)))
{
break;
}
CRData data(reinterpret_cast<const char*>(allData.data()) + off + sizeof(crc) + sizeof(dsize), dsize - sizeof(crc) - sizeof(dsize));
off += dsize;
int64_t ctransid;
data.getVarInt(&ctransid);
if(ctransid != transid)
{
continue;
}
SingleFileStorage::SFragInfo info;
unsigned char action;
data.getUChar(&action);
info.action = static_cast<SingleFileStorage::FragAction>(action);
data.getStr2(&info.fn);
data.getVarInt(&info.offset);
data.getVarInt(&info.len);
data.getVarInt(&info.last_modified);
int64_t extra_exts_size = 0;
data.getVarInt(&extra_exts_size);
for(size_t i = 0; i < extra_exts_size; ++i)
{
SingleFileStorage::SPunchItem ext;
data.getVarInt(&ext.offset);
data.getVarInt(&ext.len);
info.extra_exts.push_back(ext);
}
ret.push_back(info);
}
return ret;
}
bool WalFile::sync()
{
return file.fsyncNoInt() == 0;
}
void WalFile::reset()
{
offset = 0;
_items = 0;
file.truncate(0);
}

25
src/WalFile.h Normal file
View File

@ -0,0 +1,25 @@
#include <string>
#include "File.h"
#include "SingleFileStorage.h"
class WalFile
{
File file;
int64_t offset = 0;
size_t _items = 0;
bool needsWrite(const SingleFileStorage::SFragInfo& info);
public:
WalFile(const std::string &path);
bool write(const int64_t transid, const SingleFileStorage::SFragInfo& info);
std::vector<SingleFileStorage::SFragInfo> read(int64_t transid);
void reset();
bool sync();
size_t items() const
{
return _items;
}
};

View File

@ -49,6 +49,7 @@ DEFINE_bool(stop_on_error, false, "Stop on write/read errors");
DEFINE_bool(punch_holes, true, "Free up space if not enough free space is left by punching holes");
DEFINE_string(server_url, "serverurl", "URL of server");
DEFINE_bool(bucket_versioning, false, "Enable bucket versioning");
DEFINE_string(index_wal_path, "", "Path where to put the index WAL file. Disabled if empty");
namespace {
std::unique_ptr<proxygen::HTTPServer> server;
@ -172,6 +173,7 @@ int realMain(int argc, char* argv[])
sfsoptions.key_compare_func = mdb_cmp_s3key;
sfsoptions.common_prefix_func = s3key_common_prefix;
sfsoptions.common_prefix_hash_func = s3key_common_prefix_hash;
sfsoptions.wal_file_path = FLAGS_index_wal_path.empty() ? FLAGS_index_wal_path : (FLAGS_index_wal_path + os_file_sep() + "index.wal");
proxygen::HTTPSessionBase::setMaxReadBufferSize(16*1024);

View File

@ -50,6 +50,7 @@ const char unsigned_payload[] = "UNSIGNED-PAYLOAD";
DEFINE_bool(with_stop_command, false, "Allow stopping via putting to stop object");
DEFINE_bool(allow_sig_v2, true, "Allow aws sig v2");
DEFINE_string(host_override, "", "Override host for s3 requests");
DEFINE_bool(pre_sync_commit, false, "Pre-sync data and index files before commit for potential performance gain");
std::string hashSha256Hex(const std::string_view payload)
{
@ -1400,7 +1401,7 @@ void S3Handler::commit(proxygen::HTTPMessage& headers)
folly::getGlobalCPUExecutor()->add(
[self = this->self, evb]()
{
bool b = self->sfs.commit(false, -1);
bool b = self->sfs.commit(false, -1, FLAGS_pre_sync_commit);
evb->runInEventBaseThread([self = self, b]()
{
@ -1979,7 +1980,7 @@ void S3Handler::finalizeMultipartUpload()
if(!self->sfs.get_manual_commit())
{
const bool b = self->sfs.commit(false, -1);
const bool b = self->sfs.commit(false, -1, FLAGS_pre_sync_commit);
if(!b)
{
@ -2086,7 +2087,7 @@ void S3Handler::deleteObject(proxygen::HTTPMessage& headers)
if(res==0 && !self->sfs.get_manual_commit())
{
res = self->sfs.commit(false, -1) ? 0 : 1;
res = self->sfs.commit(false, -1, FLAGS_pre_sync_commit) ? 0 : 1;
}
evb->runInEventBaseThread([self = self, res]()
@ -2135,6 +2136,8 @@ void S3Handler::deleteBucket(proxygen::HTTPMessage& headers)
{
const auto iterStartVal = make_key({.key = {}, .version=std::numeric_limits<int64_t>::max(), .bucketId = *bucketId});
SingleFileStorage::IterData iter_data = {};
self->sfs.list_commit();
if(!self->sfs.iter_start(iterStartVal, false, iter_data))
{
@ -2524,6 +2527,8 @@ void S3Handler::listObjects(folly::EventBase *evb, std::shared_ptr<S3Handler> se
iterStartVal = make_key({.key = {}, .version=std::numeric_limits<int64_t>::max(), .bucketId = bucketId});
}
sfs.list_commit();
if(!sfs.iter_start(iterStartVal, false, iter_data))
{
evb->runInEventBaseThread([self = self]()
@ -2972,7 +2977,7 @@ void S3Handler::onBodyCPU(folly::EventBase *evb, int64_t offset, std::unique_ptr
if(!sfs.get_manual_commit())
{
bool b = sfs.commit(false, -1);
bool b = sfs.commit(false, -1, FLAGS_pre_sync_commit);
if(!b)
{