Support multiple threads

This commit is contained in:
Martin Raiber 2020-11-14 18:30:32 +01:00
parent 488e00d691
commit 9e83826cb8
7 changed files with 113 additions and 30 deletions

View File

@ -6,7 +6,7 @@ FMNT=/media/test
BMNT=/media/bench
mkdir -p "$FMNT"
mkdir -p "$BMNT"
./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 5000 &
./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 100 5000 4 &
while ! test -e "$FMNT/volume"; do sleep 1; done
LODEV=$(losetup --find --show "$FMNT/volume" --direct-io=on)
mkfs.ext4 -F $LODEV

View File

@ -4,7 +4,7 @@
#include <liburing.h>
#include <iostream>
fuse_io_context::MallocItem* fuse_io_context::malloc_cache_head = nullptr;
thread_local fuse_io_context::MallocItem* fuse_io_context::malloc_cache_head = nullptr;
fuse_io_context::fuse_io_context(FuseRing fuse_ring)
: fuse_ring(std::move(fuse_ring)), last_rc(0)

View File

@ -187,7 +187,7 @@ struct fuse_io_context
char data[];
};
static MallocItem* malloc_cache_head;
thread_local static MallocItem* malloc_cache_head;
static constexpr size_t malloc_cache_item_size = 500;
static void clear_malloc_cache()

View File

@ -14,6 +14,7 @@
#include <thread>
#include <iostream>
#include "fuse_io_context.h"
#include "fuseuring_main.h"
namespace
{
@ -382,7 +383,7 @@ namespace
io_uring_prep_splice(sqe3, fuse_io->pipe[0],
-1, io.fuse_ring.fd, -1, out_header->len,
SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK);
SPLICE_F_MOVE | SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK);
sqe3->flags |= IOSQE_FIXED_FILE;
std::vector<int> rcs = co_await io.complete({sqe1, sqe2, sqe3});
@ -797,7 +798,8 @@ fuse_io_context::io_uring_task<int> queue_fuse_read(fuse_io_context& io)
co_return rc;
}
int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_background, int congestion_threshold)
int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_fuse_ios,
int max_background, int congestion_threshold, size_t n_threads)
{
umount(mountpoint.c_str());
@ -923,7 +925,8 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
init_out.init_out.congestion_threshold = congestion_threshold;
init_out.init_out.max_pages = static_cast<uint16_t>(fuse_max_pages);
init_out.init_out.time_gran = 1;
init_out.init_out.max_write = getpagesize()*fuse_max_pages;
size_t max_write = getpagesize()*fuse_max_pages;
init_out.init_out.max_write = max_write;
if(write(fuse_fd, &init_out, sizeof(init_out))!=sizeof(init_out))
{
@ -931,14 +934,86 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
return 9;
}
struct io_uring fuse_uring;
rc = io_uring_queue_init(max_background*2, &fuse_uring, 0);//IORING_SETUP_SQPOLL);
if(rc<0)
if(n_threads<=1)
{
perror("Error setting up io_uring.");
return 10;
return fuseuring_run(max_background, max_write, backing_fd, fuse_fd, nullptr, 0);
}
else
{
struct io_uring fuse_uring;
int rc = io_uring_queue_init(std::max(100, max_fuse_ios*2), &fuse_uring, 0);
if(rc<0)
{
perror("Error setting up io_uring.");
return 10;
}
int thread_rc=0;
std::vector<std::thread> threads;
for(size_t i=0;i<n_threads;++i)
{
threads.push_back(std::thread( [max_fuse_ios,
max_write, backing_fd, fuse_fd, &thread_rc, i, &fuse_uring] () {
int session_fd = open("/dev/fuse", O_RDWR | O_CLOEXEC);
if(session_fd==-1)
{
perror("Error opening /dev/fuse -2.");
thread_rc=2;
return;
}
int rc = ioctl(session_fd, FUSE_DEV_IOC_CLONE, &fuse_fd);
if(rc==-1)
{
perror("Error cloning fuse fd");
thread_rc=3;
}
rc = fuseuring_run(max_fuse_ios,
max_write, backing_fd, session_fd,
i==0 ? &fuse_uring : nullptr,
i==0 ? 0 : fuse_uring.ring_fd);
if(rc!=0)
thread_rc=rc;
}));
}
for(auto& thread: threads)
{
thread.join();
}
return thread_rc;
}
}
int fuseuring_run(int max_fuse_ios, size_t max_write,
int backing_fd, int fuse_fd, struct io_uring* fuse_uring, int uring_wq_fd)
{
struct io_uring fuse_uring_local;
if(fuse_uring==nullptr)
{
struct io_uring_params p = {};
//p.flags = IORING_SETUP_SQPOLL;
if(uring_wq_fd!=0)
{
p.flags = IORING_SETUP_ATTACH_WQ;
}
p.wq_fd = uring_wq_fd;
int rc = io_uring_queue_init_params(std::max(100, max_fuse_ios*2), &fuse_uring_local, &p);//IORING_SETUP_SQPOLL);
if(rc<0)
{
perror("Error setting up io_uring.");
return 10;
}
fuse_uring = &fuse_uring_local;
}
std::vector<int> fixed_fds;
@ -952,9 +1027,9 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
fixed_fds.push_back(backing_fd);
fuse_ring.backing_fd_orig = backing_fd;
size_t max_bufsize = init_out.init_out.max_write + sizeof(fuse_in_header) + sizeof(fuse_write_in);
size_t max_bufsize = max_write + sizeof(fuse_in_header) + sizeof(fuse_write_in);
std::vector<char> header_buf_v(header_buf_size*init_out.init_out.max_background);
std::vector<char> header_buf_v(header_buf_size*max_fuse_ios);
char* header_buf = header_buf_v.data();
struct iovec iov;
@ -963,7 +1038,7 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
size_t header_buf_idx = reg_buffers.size();
reg_buffers.push_back(iov);
std::vector<char> scratch_buf_v(scratch_buf_size*init_out.init_out.max_background);
std::vector<char> scratch_buf_v(scratch_buf_size*max_fuse_ios);
char* scratch_buf = scratch_buf_v.data();
iov.iov_base = scratch_buf;
iov.iov_len = scratch_buf_v.size();
@ -972,10 +1047,10 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
std::vector<int> pipe_fds;
for(size_t i=0;i<init_out.init_out.max_background;++i)
for(size_t i=0;i<max_fuse_ios;++i)
{
std::unique_ptr<fuse_io_context::FuseIo> new_io = std::make_unique<fuse_io_context::FuseIo>();
rc = pipe2(new_io->pipe, O_CLOEXEC|O_NONBLOCK);
int rc = pipe2(new_io->pipe, O_CLOEXEC|O_NONBLOCK);
if(rc!=0)
{
perror("Error creating pipe.");
@ -1008,21 +1083,21 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
fuse_ring.ios.push_back(std::move(new_io));
}
rc = io_uring_register_files(&fuse_uring, &fixed_fds[0], fixed_fds.size());
int rc = io_uring_register_files(fuse_uring, &fixed_fds[0], fixed_fds.size());
if(rc<0)
{
perror("Error registering fuse io_uring files.");
return 13;
}
rc = io_uring_register_buffers(&fuse_uring, &reg_buffers[0], reg_buffers.size());
rc = io_uring_register_buffers(fuse_uring, &reg_buffers[0], reg_buffers.size());
if(rc<0)
{
perror("Error registering fuse io_uring buffers.");
return 14;
}
fuse_ring.ring = &fuse_uring;
fuse_ring.ring = fuse_uring;
fuse_ring.ring_submit = false;
fuse_ring.max_bufsize = max_bufsize;
@ -1039,10 +1114,10 @@ int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_backgr
fuse_io_context service(std::move(fuse_ring));
rc = service.run(queue_fuse_read);
io_uring_unregister_buffers(&fuse_uring);
io_uring_unregister_files(&fuse_uring);
io_uring_unregister_buffers(fuse_uring);
io_uring_unregister_files(fuse_uring);
io_uring_queue_exit(&fuse_uring);
io_uring_queue_exit(fuse_uring);
for(int p: pipe_fds)
{

View File

@ -3,4 +3,9 @@
#pragma once
#include <string>
int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_background, int congestion_threshold);
int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_fuse_ios,
int max_background, int congestion_threshold, size_t n_threads);
struct fuse_uring;
int fuseuring_run(int max_fuse_ios, size_t max_write, int backing_fd,
int fuse_fd, struct io_uring* fuse_uring, int uring_wq_fd);

View File

@ -18,9 +18,9 @@
int main(int argc, char* argv[])
{
if(argc<5)
if(argc<6)
{
std::cerr << "Not enough arguments ./fuseuring [backing file path] [fuse mount path] [backing file size] [fuse max_background]" << std::endl;
std::cerr << "Not enough arguments ./fuseuring [backing file path] [fuse mount path] [backing file size] [fuse max ios] [fuse max_background] [number of threads]" << std::endl;
return 101;
}
@ -58,9 +58,12 @@ int main(int argc, char* argv[])
perror("Error increasing RLIMIT_MEMLOCK");
}
int fuse_max_background = atoi(argv[4]);
int fuse_max_ios = atoi(argv[4]);
int fuse_max_background = atoi(argv[5]);
size_t n_threads = static_cast<size_t>(atoi(argv[6]));
rc = fuseuring_main(backing_fd, argv[2], fuse_max_background, fuse_max_background+1000);
rc = fuseuring_main(backing_fd, argv[2], fuse_max_ios,
fuse_max_background, fuse_max_background+1000, n_threads);
close(backing_fd);

View File

@ -140,7 +140,7 @@ FMNT=/media/test
BMNT=/media/bench
mkdir -p "$FMNT"
mkdir -p "$BMNT"
./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 5000 &
./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 200 5000 1 &
LODEV=$(losetup --find --show "$FMNT/volume" --direct-io=on)
mkfs.ext4 $LODEV || true
mount $LODEV "$BMNT"