commit 9508a9929795e0b8df29bb929b966fb37b577086 Author: Martin Raiber Date: Sat Nov 7 17:24:12 2020 +0100 Initial commit diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..e69de29 diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..e69de29 diff --git a/Makefile.am b/Makefile.am new file mode 100755 index 0000000..7d5b8e7 --- /dev/null +++ b/Makefile.am @@ -0,0 +1,9 @@ +ACLOCAL_AMFLAGS = -I m4 +bin_PROGRAMS = fuseuring + +fuseuring_SOURCES = fuse_io_service.cpp main.cpp fuseuring_main.cpp + +fuseuring_LDADD = $(PTHREAD_LIBS) -luring +fuseuring_CXXFLAGS = $(PTHREAD_CFLAGS) -std=c++2a -D_FILE_OFFSET_BITS=64 + +noinst_HEADERS = fuse_io_service.h fuseuring_main.h fuse_kernel.h diff --git a/NEWS b/NEWS new file mode 100644 index 0000000..e69de29 diff --git a/README b/README new file mode 100644 index 0000000..eda7dd4 --- /dev/null +++ b/README @@ -0,0 +1 @@ +Please see readme.md. \ No newline at end of file diff --git a/bench.sh b/bench.sh new file mode 100755 index 0000000..1bd6704 --- /dev/null +++ b/bench.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +set -ex + +FMNT=/media/test +BMNT=/media/bench +mkdir -p "$FMNT" +./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 5000 & +while ! test -e "$FMNT/volume"; do sleep 1; done +LODEV=$(losetup --find --show "$FMNT/volume" --direct-io=on) +mkfs.ext4 -F $LODEV +mount $LODEV "$BMNT" +losetup -d $LODEV + +cp /usr/share/doc/fio/examples/ssd-test.fio ./ +sed -i 's/iodepth=4/iodepth=1024/g' ssd-test.fio +sed -i 's/size=10g/size=400m/g' ssd-test.fio +sed -i "s@directory=/mount-point-of-ssd@directory=$BMNT@g" ssd-test.fio +fio ssd-test.fio + +umount "$BMNT" +umount "$FMNT" + diff --git a/config.h b/config.h new file mode 100644 index 0000000..9adfd02 --- /dev/null +++ b/config.h @@ -0,0 +1,69 @@ +/* config.h. Generated from config.h.in by configure. */ +/* config.h.in. Generated from configure.ac by autoheader. */ + +/* Define to 1 if you have the header file. */ +#define HAVE_INTTYPES_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_MEMORY_H 1 + +/* Define if you have POSIX threads libraries and header files. */ +#define HAVE_PTHREAD 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_PTHREAD_H 1 + +/* pthread has GNU extension thread_setname_np */ +#define HAVE_PTHREAD_SETNAME_NP 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDINT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDLIB_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STRINGS_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STRING_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_STAT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_TYPES_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_UNISTD_H 1 + +/* Name of package */ +#define PACKAGE "fuseuring" + +/* Define to the address where bug reports for this package should be sent. */ +#define PACKAGE_BUGREPORT "martin@urbackup.org" + +/* Define to the full name of this package. */ +#define PACKAGE_NAME "fuseuring" + +/* Define to the full name and version of this package. */ +#define PACKAGE_STRING "fuseuring 0.1.0.0" + +/* Define to the one symbol short name of this package. */ +#define PACKAGE_TARNAME "fuseuring" + +/* Define to the home page for this package. */ +#define PACKAGE_URL "" + +/* Define to the version of this package. */ +#define PACKAGE_VERSION "0.1.0.0" + +/* Define to necessary symbol if this constant uses a non-standard name on + your system. */ +/* #undef PTHREAD_CREATE_JOINABLE */ + +/* Define to 1 if you have the ANSI C header files. */ +#define STDC_HEADERS 1 + +/* Version number of package */ +#define VERSION "0.1.0.0" diff --git a/configure.ac b/configure.ac new file mode 100755 index 0000000..65b2ac5 --- /dev/null +++ b/configure.ac @@ -0,0 +1,39 @@ +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. + +AC_PREREQ(2.61) +AC_INIT([fuseuring], [0.1.0.0], [martin@urbackup.org]) +AC_CONFIG_SRCDIR([main.cpp]) +AC_CONFIG_HEADER([config.h]) +AC_CONFIG_MACRO_DIR([m4]) +AC_CANONICAL_SYSTEM +AM_INIT_AUTOMAKE([subdir-objects tar-ustar]) + +# Checks for programs. +AC_PROG_CXX +AC_PROG_CC +AM_PROG_CC_C_O + +AX_PTHREAD +if !($HAVE_PTHREAD) +then + echo "Sorry, your system needs the pthread library." + echo "Either install it or give up." + exit 1 +fi + +AC_LANG([C++]) + +# Checks for libraries. + +# Checks for header files. +AC_CHECK_HEADERS([pthread.h]) + +# Checks for library functions. + +AC_SEARCH_LIBS([clock_gettime], [rt posix4], + [test "$ac_cv_search_clock_gettime" = "none required" || LIBS="$LIBS $ac_cv_search_clock_gettime"], + [AC_MSG_FAILURE([No library for clock_gettime found])] ) + +AC_CONFIG_FILES([Makefile]) +AC_OUTPUT diff --git a/fuse_io_service.cpp b/fuse_io_service.cpp new file mode 100755 index 0000000..79c2d5a --- /dev/null +++ b/fuse_io_service.cpp @@ -0,0 +1,174 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright (C) Martin Raiber +#include "fuse_io_service.h" +#include "../Interface/Server.h" +#include +#include + + +fuse_io_service::fuse_io_service(FuseRing fuse_ring) + : fuse_ring(fuse_ring) +{ +} + +int fuse_io_service::fuseuring_handle_cqe(struct io_uring_cqe *cqe) +{ + if(cqe->user_data==0) + { + DBG_PRINT(std::cerr << "Cqe no user_data" << std::endl); + return 0; + } + + IoUringAwaiter::IoUringAwaiterRes* res = reinterpret_cast::IoUringAwaiterRes*>(cqe->user_data); + res->res = cqe->res; + DBG_PRINT(std::cout << "Cqe res "<< cqe->res << std::endl); + --res->gres->tocomplete; + if(res->gres->tocomplete==0) + { + DBG_PRINT(std::cerr << "Resume cqe..." << std::endl); + res->gres->awaiter.resume(); + } + + return 0; +} + +int fuse_io_service::fuseuring_submit(bool block) +{ + if(fuse_ring.ring_submit) + { + int rc; + if(block) + rc = io_uring_submit_and_wait(fuse_ring.ring, 1); + else + rc = io_uring_submit(fuse_ring.ring); + + if(rc<0) + { + perror("Error submitting to fuse io_uring."); + return 18; + } + fuse_ring.ring_submit=false; + } + return 0; +} + +int fuse_io_service::run_sqe_awaiters() +{ + if(sqe_awaiters!=nullptr) + { + if(int rc; (rc=fuseuring_submit(false))!=0) + return rc; + } + + while(sqe_awaiters!=nullptr) + { + struct io_uring_sqe *sqe = io_uring_get_sqe(fuse_ring.ring); + + if(sqe) + { + IoUringSqeAwaiter* curr = sqe_awaiters; + sqe_awaiters = curr->next; + curr->sqe = sqe; + curr->awaiter.resume(); + } + else + { + break; + } + } + + return 0; +} + +int fuse_io_service::run(queue_fuse_read_t queue_read) +{ + fuse_ring.ring_submit = false; + + std::vector > tasks; + while(true) + { + if(int rc; (rc=run_sqe_awaiters())!=0) + return rc; + + while(!fuse_ring.ios.empty()) + { + io_uring_task task = queue_read(*this); + + if(task.has_res()) + { + int rc = task.res(); + + if(rc<0) + { + std::cerr << "Error after running task (1): " << rc << std::endl; + return 20; + } + } + else + { + tasks.push_back(std::move(task)); + } + } + + if(int rc; (rc=run_sqe_awaiters())!=0) + return rc; + + if(int rc; (rc=fuseuring_submit(true))!=0) + return rc; + + int nr_comp = 0; + while(true) + { + struct io_uring_cqe *cqe; + if(nr_comp==0) + { + int rc = io_uring_wait_cqe(fuse_ring.ring, &cqe); + if(rc<0) + { + perror("Waiting for fuse iouring cqe failed."); + return 16; + } + } + else + { + int rc = io_uring_peek_cqe(fuse_ring.ring, &cqe); + if(rc!=0) + { + break; + } + } + + int rc = fuseuring_handle_cqe(cqe); + if(rc<0) + { + std::cerr << "Error handling cqe rc=" << rc << std::endl; + return 17; + } + + io_uring_cqe_seen(fuse_ring.ring, cqe); + + ++nr_comp; + } + + std::vector > new_tasks; + for(auto& task: tasks) + { + if(task.has_res()) + { + int rc = task.res(); + + if(rc<0) + { + std::cerr << "Error running task (2). rc: " << rc << std::endl; + return 18; + } + } + else + { + new_tasks.push_back(std::move(task)); + } + } + + std::swap(tasks, new_tasks); + } +} \ No newline at end of file diff --git a/fuse_io_service.h b/fuse_io_service.h new file mode 100755 index 0000000..0dba234 --- /dev/null +++ b/fuse_io_service.h @@ -0,0 +1,373 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright (C) Martin Raiber +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#define DBG_PRINT(x) + +/* +for clang and libc++ +namespace std +{ + template + using coroutine_handle = std::experimental::coroutine_handle; + + using suspend_never = std::experimental::suspend_never; + + using suspend_always = std::experimental::suspend_always; +}*/ + +static uint64_t handle_v(std::coroutine_handle<> p_awaiter) +{ + return (uint64_t)*((uint64_t*)&p_awaiter); +} + +struct fuse_io_service +{ + template + struct IoUringAwaiter + { + struct IoUringAwaiterGlobalRes + { + size_t tocomplete; + std::coroutine_handle<> awaiter; + }; + + struct IoUringAwaiterRes + { + IoUringAwaiterRes() noexcept + : res(-1) {} + + int res; + IoUringAwaiterGlobalRes* gres; + }; + + IoUringAwaiter(std::vector sqes) noexcept + { + awaiter_res.resize(sqes.size()); + global_res.tocomplete = sqes.size(); + for(size_t i=0;iuser_data = reinterpret_cast(&awaiter_res[i]); + } + } + + IoUringAwaiter(IoUringAwaiter const&) = delete; + IoUringAwaiter(IoUringAwaiter&& other) = delete; + IoUringAwaiter& operator=(IoUringAwaiter&&) = delete; + IoUringAwaiter& operator=(IoUringAwaiter const&) = delete; + + bool await_ready() const noexcept + { + return false; + } + + void await_suspend(std::coroutine_handle<> p_awaiter) noexcept + { + DBG_PRINT(std::cout << "Await suspend io "<< handle_v(p_awaiter) << std::endl); + global_res.awaiter = p_awaiter; + } + + template >::value, int> = 0> + std::vector await_resume() const noexcept + { + std::vector res; + for(auto& sr: awaiter_res) + { + res.push_back(sr.res); + } + return res; + } + + template::value, int> = 0> + int await_resume() const noexcept + { + return awaiter_res[0].res; + } + + template >::value, int> = 0> + std::pair await_resume() const noexcept + { + return std::make_pair(awaiter_res[0].res, awaiter_res[1].res); + } + + private: + IoUringAwaiterGlobalRes global_res; + std::vector awaiter_res; + }; + + [[nodiscard]] auto complete(std::vector sqes) + { + return IoUringAwaiter >(sqes); + } + + [[nodiscard]] auto complete(io_uring_sqe* sqe) + { + return IoUringAwaiter({sqe}); + } + + [[nodiscard]] auto complete(std::pair sqes) + { + return IoUringAwaiter >({sqes.first, sqes.second}); + } + + struct IoUringSqeAwaiter; + IoUringSqeAwaiter* sqe_awaiters = nullptr; + + struct IoUringSqeAwaiter + { + IoUringSqeAwaiter(fuse_io_service& io, struct io_uring* uring) + : io(io), next(nullptr) + { + sqe = io_uring_get_sqe(uring); + } + + IoUringSqeAwaiter(IoUringSqeAwaiter const&) = delete; + IoUringSqeAwaiter(IoUringSqeAwaiter&& other) = delete; + IoUringSqeAwaiter& operator=(IoUringSqeAwaiter&&) = delete; + IoUringSqeAwaiter& operator=(IoUringSqeAwaiter const&) = delete; + + bool await_ready() const noexcept + { + return sqe!=nullptr; + } + + void await_suspend(std::coroutine_handle<> p_awaiter) noexcept + { + DBG_PRINT(std::cout << "Await suspend sqe "<< handle_v(p_awaiter) << std::endl); + awaiter = p_awaiter; + if(io.sqe_awaiters!=nullptr) + { + next = io.sqe_awaiters; + } + io.sqe_awaiters = this; + } + + struct io_uring_sqe* await_resume() const noexcept + { + return sqe; + } + + fuse_io_service& io; + std::coroutine_handle<> awaiter; + IoUringSqeAwaiter* next; + struct io_uring_sqe *sqe; + }; + + [[nodiscard]] auto get_sqe() + { + fuse_ring.ring_submit=true; + return IoUringSqeAwaiter(*this, fuse_ring.ring); + } + + template + struct io_uring_promise_type + { + using promise_type = io_uring_promise_type; + using handle = std::coroutine_handle; + + enum class e_res_state + { + Init, + Detached, + Res + }; + + io_uring_promise_type() + : res_state(e_res_state::Init) {} + + auto get_return_object() + { + return io_uring_task{handle::from_promise(*this)}; + } + + void return_value(T v) + { + res_state = e_res_state::Res; + res = v; + } + + auto initial_suspend() { + return std::suspend_never{}; + } + + auto final_suspend() noexcept { + struct final_awaiter : std::suspend_always + { + final_awaiter(promise_type* promise) + :promise(promise) {} + + void await_suspend(std::coroutine_handle<> p_awaiter) const noexcept + { + if(promise->res_state==e_res_state::Detached) + { + DBG_PRINT(std::cout << "promise final detached" << std::endl); + if(promise->awaiter) + promise->awaiter.destroy(); + handle::from_promise(*promise).destroy(); + } + else if(promise->awaiter) + { + DBG_PRINT(std::cout << "promise final await resume" << std::endl); + promise->awaiter.resume(); + } + else + { + DBG_PRINT(std::cout << "promise final no awaiter" << std::endl); + } + + } + + private: + promise_type* promise; + }; + return final_awaiter(this); + } + + void unhandled_exception() + { + abort(); + } + + std::coroutine_handle<> awaiter; + + e_res_state res_state; + T res; + }; + + template + struct [[nodiscard]] io_uring_task + { + using promise_type = io_uring_promise_type; + using handle = std::coroutine_handle; + + io_uring_task(io_uring_task const&) = delete; + + io_uring_task(io_uring_task&& other) noexcept + : coro_h(std::exchange(other.coro_h, {})) + { + + } + + io_uring_task& operator=(io_uring_task&&) = delete; + io_uring_task& operator=(io_uring_task const&) = delete; + + io_uring_task(handle h) noexcept + : coro_h(h) + { + + } + + ~io_uring_task() noexcept + { + if(coro_h) + { + if(!coro_h.done()) + { + DBG_PRINT(std::cout << "Detach" << std::endl); + coro_h.promise().res_state = promise_type::e_res_state::Detached; + } + else + { + DBG_PRINT(std::cout << "Destroy" << std::endl); + coro_h.destroy(); + } + } + } + + bool has_res() const noexcept + { + return coro_h.promise().res_state == promise_type::e_res_state::Res; + } + + T res() const noexcept + { + assert(has_res()); + return coro_h.promise().res; + } + + bool await_ready() const noexcept + { + bool r = has_res(); + if(r) + { + DBG_PRINT(std::cout << "Await is ready" << std::endl); + } + else + { + DBG_PRINT(std::cout << "Await is not ready" << std::endl); + } + + return r; + } + + template + void await_suspend(std::coroutine_handle > p_awaiter) noexcept + { + DBG_PRINT(std::cout << "Await task " << (uint64_t)this << " suspend "<< handle_v(p_awaiter) << " prev " << handle_v(coro_h.promise().awaiter) << std::endl); + coro_h.promise().awaiter = p_awaiter; + } + + T await_resume() const noexcept + { + return res(); + } + + private: + handle coro_h; + }; + + struct SFuseIo + { + char type; + int pipe[2]; + char* header_buf; + size_t header_buf_idx; + char* scratch_buf; + size_t scratch_buf_idx; + }; + + struct FuseRing + { + std::stack ios; + struct io_uring* ring; + int fd; + bool ring_submit; + size_t max_bufsize; + int backing_fd; + int backing_fd_orig; + uint64_t backing_f_size; + }; + + FuseRing fuse_ring; + + fuse_io_service(FuseRing fuse_ring); + + typedef fuse_io_service::io_uring_task (*queue_fuse_read_t)(fuse_io_service& io); + + int run(queue_fuse_read_t queue_read); + + SFuseIo* get_fuse_io() + { + fuse_io_service::SFuseIo* fuse_io = fuse_ring.ios.top(); + fuse_ring.ios.pop(); + return fuse_io; + } + + void release_fuse_io(SFuseIo* fuse_io) + { + fuse_ring.ios.push(fuse_io); + } + +private: + int fuseuring_handle_cqe(struct io_uring_cqe *cqe); + int fuseuring_submit(bool block); + int run_sqe_awaiters(); +}; \ No newline at end of file diff --git a/fuse_kernel.h b/fuse_kernel.h new file mode 100755 index 0000000..018a00a --- /dev/null +++ b/fuse_kernel.h @@ -0,0 +1,848 @@ +/* SPDX-License-Identifier: ((GPL-2.0 WITH Linux-syscall-note) OR BSD-2-Clause) */ +/* + This file defines the kernel interface of FUSE + Copyright (C) 2001-2008 Miklos Szeredi + + This program can be distributed under the terms of the GNU GPL. + See the file COPYING. + + This -- and only this -- header file may also be distributed under + the terms of the BSD Licence as follows: + + Copyright (C) 2001-2007 Miklos Szeredi. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + SUCH DAMAGE. +*/ + +/* + * This file defines the kernel interface of FUSE + * + * Protocol changelog: + * + * 7.9: + * - new fuse_getattr_in input argument of GETATTR + * - add lk_flags in fuse_lk_in + * - add lock_owner field to fuse_setattr_in, fuse_read_in and fuse_write_in + * - add blksize field to fuse_attr + * - add file flags field to fuse_read_in and fuse_write_in + * - Add ATIME_NOW and MTIME_NOW flags to fuse_setattr_in + * + * 7.10 + * - add nonseekable open flag + * + * 7.11 + * - add IOCTL message + * - add unsolicited notification support + * - add POLL message and NOTIFY_POLL notification + * + * 7.12 + * - add umask flag to input argument of create, mknod and mkdir + * - add notification messages for invalidation of inodes and + * directory entries + * + * 7.13 + * - make max number of background requests and congestion threshold + * tunables + * + * 7.14 + * - add splice support to fuse device + * + * 7.15 + * - add store notify + * - add retrieve notify + * + * 7.16 + * - add BATCH_FORGET request + * - FUSE_IOCTL_UNRESTRICTED shall now return with array of 'struct + * fuse_ioctl_iovec' instead of ambiguous 'struct iovec' + * - add FUSE_IOCTL_32BIT flag + * + * 7.17 + * - add FUSE_FLOCK_LOCKS and FUSE_RELEASE_FLOCK_UNLOCK + * + * 7.18 + * - add FUSE_IOCTL_DIR flag + * - add FUSE_NOTIFY_DELETE + * + * 7.19 + * - add FUSE_FALLOCATE + * + * 7.20 + * - add FUSE_AUTO_INVAL_DATA + * + * 7.21 + * - add FUSE_READDIRPLUS + * - send the requested events in POLL request + * + * 7.22 + * - add FUSE_ASYNC_DIO + * + * 7.23 + * - add FUSE_WRITEBACK_CACHE + * - add time_gran to fuse_init_out + * - add reserved space to fuse_init_out + * - add FATTR_CTIME + * - add ctime and ctimensec to fuse_setattr_in + * - add FUSE_RENAME2 request + * - add FUSE_NO_OPEN_SUPPORT flag + * + * 7.24 + * - add FUSE_LSEEK for SEEK_HOLE and SEEK_DATA support + * + * 7.25 + * - add FUSE_PARALLEL_DIROPS + * + * 7.26 + * - add FUSE_HANDLE_KILLPRIV + * - add FUSE_POSIX_ACL + * + * 7.27 + * - add FUSE_ABORT_ERROR + * + * 7.28 + * - add FUSE_COPY_FILE_RANGE + * - add FOPEN_CACHE_DIR + * - add FUSE_MAX_PAGES, add max_pages to init_out + * - add FUSE_CACHE_SYMLINKS + * + * 7.29 + * - add FUSE_NO_OPENDIR_SUPPORT flag + * + * 7.30 + * - add FUSE_EXPLICIT_INVAL_DATA + * - add FUSE_IOCTL_COMPAT_X32 + * + * 7.31 + * - add FUSE_WRITE_KILL_PRIV flag + */ + +#ifndef _LINUX_FUSE_H +#define _LINUX_FUSE_H + +#ifdef __KERNEL__ +#include +#else +#include +#endif + +/* + * Version negotiation: + * + * Both the kernel and userspace send the version they support in the + * INIT request and reply respectively. + * + * If the major versions match then both shall use the smallest + * of the two minor versions for communication. + * + * If the kernel supports a larger major version, then userspace shall + * reply with the major version it supports, ignore the rest of the + * INIT message and expect a new INIT message from the kernel with a + * matching major version. + * + * If the library supports a larger major version, then it shall fall + * back to the major protocol version sent by the kernel for + * communication and reply with that major version (and an arbitrary + * supported minor version). + */ + +/** Version number of this interface */ +#define FUSE_KERNEL_VERSION 7 + +/** Minor version number of this interface */ +#define FUSE_KERNEL_MINOR_VERSION 31 + +/** The node ID of the root inode */ +#define FUSE_ROOT_ID 1 + +/* Make sure all structures are padded to 64bit boundary, so 32bit + userspace works under 64bit kernels */ + +struct fuse_attr { + uint64_t ino; + uint64_t size; + uint64_t blocks; + uint64_t atime; + uint64_t mtime; + uint64_t ctime; + uint32_t atimensec; + uint32_t mtimensec; + uint32_t ctimensec; + uint32_t mode; + uint32_t nlink; + uint32_t uid; + uint32_t gid; + uint32_t rdev; + uint32_t blksize; + uint32_t padding; +}; + +struct fuse_kstatfs { + uint64_t blocks; + uint64_t bfree; + uint64_t bavail; + uint64_t files; + uint64_t ffree; + uint32_t bsize; + uint32_t namelen; + uint32_t frsize; + uint32_t padding; + uint32_t spare[6]; +}; + +struct fuse_file_lock { + uint64_t start; + uint64_t end; + uint32_t type; + uint32_t pid; /* tgid */ +}; + +/** + * Bitmasks for fuse_setattr_in.valid + */ +#define FATTR_MODE (1 << 0) +#define FATTR_UID (1 << 1) +#define FATTR_GID (1 << 2) +#define FATTR_SIZE (1 << 3) +#define FATTR_ATIME (1 << 4) +#define FATTR_MTIME (1 << 5) +#define FATTR_FH (1 << 6) +#define FATTR_ATIME_NOW (1 << 7) +#define FATTR_MTIME_NOW (1 << 8) +#define FATTR_LOCKOWNER (1 << 9) +#define FATTR_CTIME (1 << 10) + +/** + * Flags returned by the OPEN request + * + * FOPEN_DIRECT_IO: bypass page cache for this open file + * FOPEN_KEEP_CACHE: don't invalidate the data cache on open + * FOPEN_NONSEEKABLE: the file is not seekable + * FOPEN_CACHE_DIR: allow caching this directory + * FOPEN_STREAM: the file is stream-like (no file position at all) + */ +#define FOPEN_DIRECT_IO (1 << 0) +#define FOPEN_KEEP_CACHE (1 << 1) +#define FOPEN_NONSEEKABLE (1 << 2) +#define FOPEN_CACHE_DIR (1 << 3) +#define FOPEN_STREAM (1 << 4) + +/** + * INIT request/reply flags + * + * FUSE_ASYNC_READ: asynchronous read requests + * FUSE_POSIX_LOCKS: remote locking for POSIX file locks + * FUSE_FILE_OPS: kernel sends file handle for fstat, etc... (not yet supported) + * FUSE_ATOMIC_O_TRUNC: handles the O_TRUNC open flag in the filesystem + * FUSE_EXPORT_SUPPORT: filesystem handles lookups of "." and ".." + * FUSE_BIG_WRITES: filesystem can handle write size larger than 4kB + * FUSE_DONT_MASK: don't apply umask to file mode on create operations + * FUSE_SPLICE_WRITE: kernel supports splice write on the device + * FUSE_SPLICE_MOVE: kernel supports splice move on the device + * FUSE_SPLICE_READ: kernel supports splice read on the device + * FUSE_FLOCK_LOCKS: remote locking for BSD style file locks + * FUSE_HAS_IOCTL_DIR: kernel supports ioctl on directories + * FUSE_AUTO_INVAL_DATA: automatically invalidate cached pages + * FUSE_DO_READDIRPLUS: do READDIRPLUS (READDIR+LOOKUP in one) + * FUSE_READDIRPLUS_AUTO: adaptive readdirplus + * FUSE_ASYNC_DIO: asynchronous direct I/O submission + * FUSE_WRITEBACK_CACHE: use writeback cache for buffered writes + * FUSE_NO_OPEN_SUPPORT: kernel supports zero-message opens + * FUSE_PARALLEL_DIROPS: allow parallel lookups and readdir + * FUSE_HANDLE_KILLPRIV: fs handles killing suid/sgid/cap on write/chown/trunc + * FUSE_POSIX_ACL: filesystem supports posix acls + * FUSE_ABORT_ERROR: reading the device after abort returns ECONNABORTED + * FUSE_MAX_PAGES: init_out.max_pages contains the max number of req pages + * FUSE_CACHE_SYMLINKS: cache READLINK responses + * FUSE_NO_OPENDIR_SUPPORT: kernel supports zero-message opendir + * FUSE_EXPLICIT_INVAL_DATA: only invalidate cached pages on explicit request + */ +#define FUSE_ASYNC_READ (1 << 0) +#define FUSE_POSIX_LOCKS (1 << 1) +#define FUSE_FILE_OPS (1 << 2) +#define FUSE_ATOMIC_O_TRUNC (1 << 3) +#define FUSE_EXPORT_SUPPORT (1 << 4) +#define FUSE_BIG_WRITES (1 << 5) +#define FUSE_DONT_MASK (1 << 6) +#define FUSE_SPLICE_WRITE (1 << 7) +#define FUSE_SPLICE_MOVE (1 << 8) +#define FUSE_SPLICE_READ (1 << 9) +#define FUSE_FLOCK_LOCKS (1 << 10) +#define FUSE_HAS_IOCTL_DIR (1 << 11) +#define FUSE_AUTO_INVAL_DATA (1 << 12) +#define FUSE_DO_READDIRPLUS (1 << 13) +#define FUSE_READDIRPLUS_AUTO (1 << 14) +#define FUSE_ASYNC_DIO (1 << 15) +#define FUSE_WRITEBACK_CACHE (1 << 16) +#define FUSE_NO_OPEN_SUPPORT (1 << 17) +#define FUSE_PARALLEL_DIROPS (1 << 18) +#define FUSE_HANDLE_KILLPRIV (1 << 19) +#define FUSE_POSIX_ACL (1 << 20) +#define FUSE_ABORT_ERROR (1 << 21) +#define FUSE_MAX_PAGES (1 << 22) +#define FUSE_CACHE_SYMLINKS (1 << 23) +#define FUSE_NO_OPENDIR_SUPPORT (1 << 24) +#define FUSE_EXPLICIT_INVAL_DATA (1 << 25) + +/** + * CUSE INIT request/reply flags + * + * CUSE_UNRESTRICTED_IOCTL: use unrestricted ioctl + */ +#define CUSE_UNRESTRICTED_IOCTL (1 << 0) + +/** + * Release flags + */ +#define FUSE_RELEASE_FLUSH (1 << 0) +#define FUSE_RELEASE_FLOCK_UNLOCK (1 << 1) + +/** + * Getattr flags + */ +#define FUSE_GETATTR_FH (1 << 0) + +/** + * Lock flags + */ +#define FUSE_LK_FLOCK (1 << 0) + +/** + * WRITE flags + * + * FUSE_WRITE_CACHE: delayed write from page cache, file handle is guessed + * FUSE_WRITE_LOCKOWNER: lock_owner field is valid + * FUSE_WRITE_KILL_PRIV: kill suid and sgid bits + */ +#define FUSE_WRITE_CACHE (1 << 0) +#define FUSE_WRITE_LOCKOWNER (1 << 1) +#define FUSE_WRITE_KILL_PRIV (1 << 2) + +/** + * Read flags + */ +#define FUSE_READ_LOCKOWNER (1 << 1) + +/** + * Ioctl flags + * + * FUSE_IOCTL_COMPAT: 32bit compat ioctl on 64bit machine + * FUSE_IOCTL_UNRESTRICTED: not restricted to well-formed ioctls, retry allowed + * FUSE_IOCTL_RETRY: retry with new iovecs + * FUSE_IOCTL_32BIT: 32bit ioctl + * FUSE_IOCTL_DIR: is a directory + * FUSE_IOCTL_COMPAT_X32: x32 compat ioctl on 64bit machine (64bit time_t) + * + * FUSE_IOCTL_MAX_IOV: maximum of in_iovecs + out_iovecs + */ +#define FUSE_IOCTL_COMPAT (1 << 0) +#define FUSE_IOCTL_UNRESTRICTED (1 << 1) +#define FUSE_IOCTL_RETRY (1 << 2) +#define FUSE_IOCTL_32BIT (1 << 3) +#define FUSE_IOCTL_DIR (1 << 4) +#define FUSE_IOCTL_COMPAT_X32 (1 << 5) + +#define FUSE_IOCTL_MAX_IOV 256 + +/** + * Poll flags + * + * FUSE_POLL_SCHEDULE_NOTIFY: request poll notify + */ +#define FUSE_POLL_SCHEDULE_NOTIFY (1 << 0) + +/** + * Fsync flags + * + * FUSE_FSYNC_FDATASYNC: Sync data only, not metadata + */ +#define FUSE_FSYNC_FDATASYNC (1 << 0) + +enum fuse_opcode { + FUSE_LOOKUP = 1, + FUSE_FORGET = 2, /* no reply */ + FUSE_GETATTR = 3, + FUSE_SETATTR = 4, + FUSE_READLINK = 5, + FUSE_SYMLINK = 6, + FUSE_MKNOD = 8, + FUSE_MKDIR = 9, + FUSE_UNLINK = 10, + FUSE_RMDIR = 11, + FUSE_RENAME = 12, + FUSE_LINK = 13, + FUSE_OPEN = 14, + FUSE_READ = 15, + FUSE_WRITE = 16, + FUSE_STATFS = 17, + FUSE_RELEASE = 18, + FUSE_FSYNC = 20, + FUSE_SETXATTR = 21, + FUSE_GETXATTR = 22, + FUSE_LISTXATTR = 23, + FUSE_REMOVEXATTR = 24, + FUSE_FLUSH = 25, + FUSE_INIT = 26, + FUSE_OPENDIR = 27, + FUSE_READDIR = 28, + FUSE_RELEASEDIR = 29, + FUSE_FSYNCDIR = 30, + FUSE_GETLK = 31, + FUSE_SETLK = 32, + FUSE_SETLKW = 33, + FUSE_ACCESS = 34, + FUSE_CREATE = 35, + FUSE_INTERRUPT = 36, + FUSE_BMAP = 37, + FUSE_DESTROY = 38, + FUSE_IOCTL = 39, + FUSE_POLL = 40, + FUSE_NOTIFY_REPLY = 41, + FUSE_BATCH_FORGET = 42, + FUSE_FALLOCATE = 43, + FUSE_READDIRPLUS = 44, + FUSE_RENAME2 = 45, + FUSE_LSEEK = 46, + FUSE_COPY_FILE_RANGE = 47, + + /* CUSE specific operations */ + CUSE_INIT = 4096 +}; + +enum fuse_notify_code { + FUSE_NOTIFY_POLL = 1, + FUSE_NOTIFY_INVAL_INODE = 2, + FUSE_NOTIFY_INVAL_ENTRY = 3, + FUSE_NOTIFY_STORE = 4, + FUSE_NOTIFY_RETRIEVE = 5, + FUSE_NOTIFY_DELETE = 6, + FUSE_NOTIFY_CODE_MAX +}; + +/* The read buffer is required to be at least 8k, but may be much larger */ +#define FUSE_MIN_READ_BUFFER 8192 + +#define FUSE_COMPAT_ENTRY_OUT_SIZE 120 + +struct fuse_entry_out { + uint64_t nodeid; /* Inode ID */ + uint64_t generation; /* Inode generation: nodeid:gen must + be unique for the fs's lifetime */ + uint64_t entry_valid; /* Cache timeout for the name */ + uint64_t attr_valid; /* Cache timeout for the attributes */ + uint32_t entry_valid_nsec; + uint32_t attr_valid_nsec; + struct fuse_attr attr; +}; + +struct fuse_forget_in { + uint64_t nlookup; +}; + +struct fuse_forget_one { + uint64_t nodeid; + uint64_t nlookup; +}; + +struct fuse_batch_forget_in { + uint32_t count; + uint32_t dummy; +}; + +struct fuse_getattr_in { + uint32_t getattr_flags; + uint32_t dummy; + uint64_t fh; +}; + +#define FUSE_COMPAT_ATTR_OUT_SIZE 96 + +struct fuse_attr_out { + uint64_t attr_valid; /* Cache timeout for the attributes */ + uint32_t attr_valid_nsec; + uint32_t dummy; + struct fuse_attr attr; +}; + +#define FUSE_COMPAT_MKNOD_IN_SIZE 8 + +struct fuse_mknod_in { + uint32_t mode; + uint32_t rdev; + uint32_t umask; + uint32_t padding; +}; + +struct fuse_mkdir_in { + uint32_t mode; + uint32_t umask; +}; + +struct fuse_rename_in { + uint64_t newdir; +}; + +struct fuse_rename2_in { + uint64_t newdir; + uint32_t flags; + uint32_t padding; +}; + +struct fuse_link_in { + uint64_t oldnodeid; +}; + +struct fuse_setattr_in { + uint32_t valid; + uint32_t padding; + uint64_t fh; + uint64_t size; + uint64_t lock_owner; + uint64_t atime; + uint64_t mtime; + uint64_t ctime; + uint32_t atimensec; + uint32_t mtimensec; + uint32_t ctimensec; + uint32_t mode; + uint32_t unused4; + uint32_t uid; + uint32_t gid; + uint32_t unused5; +}; + +struct fuse_open_in { + uint32_t flags; + uint32_t unused; +}; + +struct fuse_create_in { + uint32_t flags; + uint32_t mode; + uint32_t umask; + uint32_t padding; +}; + +struct fuse_open_out { + uint64_t fh; + uint32_t open_flags; + uint32_t padding; +}; + +struct fuse_release_in { + uint64_t fh; + uint32_t flags; + uint32_t release_flags; + uint64_t lock_owner; +}; + +struct fuse_flush_in { + uint64_t fh; + uint32_t unused; + uint32_t padding; + uint64_t lock_owner; +}; + +struct fuse_read_in { + uint64_t fh; + uint64_t offset; + uint32_t size; + uint32_t read_flags; + uint64_t lock_owner; + uint32_t flags; + uint32_t padding; +}; + +#define FUSE_COMPAT_WRITE_IN_SIZE 24 + +struct fuse_write_in { + uint64_t fh; + uint64_t offset; + uint32_t size; + uint32_t write_flags; + uint64_t lock_owner; + uint32_t flags; + uint32_t padding; +}; + +struct fuse_write_out { + uint32_t size; + uint32_t padding; +}; + +#define FUSE_COMPAT_STATFS_SIZE 48 + +struct fuse_statfs_out { + struct fuse_kstatfs st; +}; + +struct fuse_fsync_in { + uint64_t fh; + uint32_t fsync_flags; + uint32_t padding; +}; + +struct fuse_setxattr_in { + uint32_t size; + uint32_t flags; +}; + +struct fuse_getxattr_in { + uint32_t size; + uint32_t padding; +}; + +struct fuse_getxattr_out { + uint32_t size; + uint32_t padding; +}; + +struct fuse_lk_in { + uint64_t fh; + uint64_t owner; + struct fuse_file_lock lk; + uint32_t lk_flags; + uint32_t padding; +}; + +struct fuse_lk_out { + struct fuse_file_lock lk; +}; + +struct fuse_access_in { + uint32_t mask; + uint32_t padding; +}; + +struct fuse_init_in { + uint32_t major; + uint32_t minor; + uint32_t max_readahead; + uint32_t flags; +}; + +#define FUSE_COMPAT_INIT_OUT_SIZE 8 +#define FUSE_COMPAT_22_INIT_OUT_SIZE 24 + +struct fuse_init_out { + uint32_t major; + uint32_t minor; + uint32_t max_readahead; + uint32_t flags; + uint16_t max_background; + uint16_t congestion_threshold; + uint32_t max_write; + uint32_t time_gran; + uint16_t max_pages; + uint16_t padding; + uint32_t unused[8]; +}; + +#define CUSE_INIT_INFO_MAX 4096 + +struct cuse_init_in { + uint32_t major; + uint32_t minor; + uint32_t unused; + uint32_t flags; +}; + +struct cuse_init_out { + uint32_t major; + uint32_t minor; + uint32_t unused; + uint32_t flags; + uint32_t max_read; + uint32_t max_write; + uint32_t dev_major; /* chardev major */ + uint32_t dev_minor; /* chardev minor */ + uint32_t spare[10]; +}; + +struct fuse_interrupt_in { + uint64_t unique; +}; + +struct fuse_bmap_in { + uint64_t block; + uint32_t blocksize; + uint32_t padding; +}; + +struct fuse_bmap_out { + uint64_t block; +}; + +struct fuse_ioctl_in { + uint64_t fh; + uint32_t flags; + uint32_t cmd; + uint64_t arg; + uint32_t in_size; + uint32_t out_size; +}; + +struct fuse_ioctl_iovec { + uint64_t base; + uint64_t len; +}; + +struct fuse_ioctl_out { + int32_t result; + uint32_t flags; + uint32_t in_iovs; + uint32_t out_iovs; +}; + +struct fuse_poll_in { + uint64_t fh; + uint64_t kh; + uint32_t flags; + uint32_t events; +}; + +struct fuse_poll_out { + uint32_t revents; + uint32_t padding; +}; + +struct fuse_notify_poll_wakeup_out { + uint64_t kh; +}; + +struct fuse_fallocate_in { + uint64_t fh; + uint64_t offset; + uint64_t length; + uint32_t mode; + uint32_t padding; +}; + +struct fuse_in_header { + uint32_t len; + uint32_t opcode; + uint64_t unique; + uint64_t nodeid; + uint32_t uid; + uint32_t gid; + uint32_t pid; + uint32_t padding; +}; + +struct fuse_out_header { + uint32_t len; + int32_t error; + uint64_t unique; +}; + +struct fuse_dirent { + uint64_t ino; + uint64_t off; + uint32_t namelen; + uint32_t type; + char name[]; +}; + +#define FUSE_NAME_OFFSET offsetof(struct fuse_dirent, name) +#define FUSE_DIRENT_ALIGN(x) \ + (((x) + sizeof(uint64_t) - 1) & ~(sizeof(uint64_t) - 1)) +#define FUSE_DIRENT_SIZE(d) \ + FUSE_DIRENT_ALIGN(FUSE_NAME_OFFSET + (d)->namelen) + +struct fuse_direntplus { + struct fuse_entry_out entry_out; + struct fuse_dirent dirent; +}; + +#define FUSE_NAME_OFFSET_DIRENTPLUS \ + offsetof(struct fuse_direntplus, dirent.name) +#define FUSE_DIRENTPLUS_SIZE(d) \ + FUSE_DIRENT_ALIGN(FUSE_NAME_OFFSET_DIRENTPLUS + (d)->dirent.namelen) + +struct fuse_notify_inval_inode_out { + uint64_t ino; + int64_t off; + int64_t len; +}; + +struct fuse_notify_inval_entry_out { + uint64_t parent; + uint32_t namelen; + uint32_t padding; +}; + +struct fuse_notify_delete_out { + uint64_t parent; + uint64_t child; + uint32_t namelen; + uint32_t padding; +}; + +struct fuse_notify_store_out { + uint64_t nodeid; + uint64_t offset; + uint32_t size; + uint32_t padding; +}; + +struct fuse_notify_retrieve_out { + uint64_t notify_unique; + uint64_t nodeid; + uint64_t offset; + uint32_t size; + uint32_t padding; +}; + +/* Matches the size of fuse_write_in */ +struct fuse_notify_retrieve_in { + uint64_t dummy1; + uint64_t offset; + uint32_t size; + uint32_t dummy2; + uint64_t dummy3; + uint64_t dummy4; +}; + +/* Device ioctls: */ +#define FUSE_DEV_IOC_CLONE _IOR(229, 0, uint32_t) + +struct fuse_lseek_in { + uint64_t fh; + uint64_t offset; + uint32_t whence; + uint32_t padding; +}; + +struct fuse_lseek_out { + uint64_t offset; +}; + +struct fuse_copy_file_range_in { + uint64_t fh_in; + uint64_t off_in; + uint64_t nodeid_out; + uint64_t fh_out; + uint64_t off_out; + uint64_t len; + uint64_t flags; +}; + +#endif /* _LINUX_FUSE_H */ diff --git a/fuseuring_main.cpp b/fuseuring_main.cpp new file mode 100755 index 0000000..c5de106 --- /dev/null +++ b/fuseuring_main.cpp @@ -0,0 +1,1023 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright (C) Martin Raiber +#include +#include "fuse_kernel.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fuse_io_service.h" + +namespace +{ + const size_t fuse_max_pages = 32; + const size_t header_buf_size = std::max(sizeof(fuse_in_header), sizeof(fuse_out_header) + sizeof(fuse_write_out)); + const size_t scratch_buf_size = std::max(std::max(std::max( + static_cast(4096), + sizeof(fuse_out_header)+sizeof(fuse_attr_out)), + sizeof(fuse_out_header)+sizeof(fuse_entry_out)), + sizeof(fuse_out_header)+sizeof(fuse_write_out)); + + template + auto round_up(T numToRound, T multiple) + { + return ((numToRound + multiple - 1) / multiple) * multiple; + } +} + +[[nodiscard]] fuse_io_service::io_uring_task read_rbytes(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + size_t rbytes, bool add_zero, std::vector& free_buf) +{ + if(rbytes==0) + { + co_return fuse_io->scratch_buf; + } + + struct io_uring_sqe *sqe; + sqe = co_await io.get_sqe(); + + DBG_PRINT(std::cout << "Read rbytes "<< rbytes << std::endl); + bool read_fixed=false; + if(rbytes + (add_zero ? 1 : 0) < scratch_buf_size) + { + read_fixed=true; + io_uring_prep_read_fixed(sqe, fuse_io->pipe[0], fuse_io->scratch_buf, + rbytes, 0, fuse_io->scratch_buf_idx); + } + else + { + free_buf.resize(rbytes + (add_zero ? 1 : 0)); + io_uring_prep_read(sqe, fuse_io->pipe[0], &free_buf[0], + rbytes, 0); + } + sqe->flags |= IOSQE_FIXED_FILE; + int rc = co_await io.complete(sqe); + + if(rc<0 || rcscratch_buf[rbytes] = 0; + + co_return fuse_io->scratch_buf; + } + else + { + if(add_zero) + free_buf[rbytes]=0; + + co_return &free_buf[0]; + } +} + +[[nodiscard]] fuse_io_service::io_uring_task send_reply(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io) +{ + struct io_uring_sqe *sqe; + sqe = co_await io.get_sqe(); + + size_t reply_size = reinterpret_cast(fuse_io->scratch_buf)->len; + + io_uring_prep_write_fixed(sqe, fuse_io->pipe[1], + fuse_io->scratch_buf, reply_size, + 0, fuse_io->scratch_buf_idx); + sqe->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe *sqe2 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe2, fuse_io->pipe[0], + -1, io.fuse_ring.fd, -1, reply_size, + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe2->flags |= IOSQE_FIXED_FILE; + + auto [rc1, rc2] = co_await io.complete(std::make_pair(sqe, sqe2)); + + if(rc1!=reply_size || rc2!=reply_size) + { + std::cerr << "# Send reply failed rc1="<< rc1 << " rc2=" << rc2 << std::endl; + co_return -1; + } + else + { + co_return 0; + } +} + +[[nodiscard]] fuse_io_service::io_uring_task send_reply(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + const std::vector& buf) +{ + struct io_uring_sqe *sqe; + sqe = co_await io.get_sqe(); + + DBG_PRINT(std::cout << "send unique buf: " << reinterpret_cast(buf.data())->unique << std::endl); + io_uring_prep_write(sqe, fuse_io->pipe[1], + buf.data(), buf.size(), + 0); + sqe->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe *sqe2 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe2, fuse_io->pipe[0], + -1, io.fuse_ring.fd, -1, buf.size(), + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe2->flags |= IOSQE_FIXED_FILE; + + auto [rc1, rc2] = co_await io.complete(std::make_pair(sqe, sqe2)); + + if(rc1!=buf.size() || rc2!=buf.size()) + { + std::cerr << "# Send reply buf failed rc1="<< rc1 << " rc2=" << rc2 << std::endl; + co_return -1; + } + else + { + co_return 0; + } +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_unknown(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + + out_header->error = -ENOSYS; + out_header->len = sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task send_attr(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + uint64_t unique, uint64_t nodeid) +{ + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_attr_out) + sizeof(fuse_out_header); + out_header->unique = unique; + + fuse_attr_out* attr_out = reinterpret_cast(fuse_io->scratch_buf + sizeof(fuse_out_header)); + attr_out->attr_valid = 3600; + attr_out->attr_valid_nsec = 0; + memset(&attr_out->attr, 0, sizeof(attr_out->attr)); + + DBG_PRINT(std::cout << "send_attr nodeid " << nodeid << std::endl); + if(nodeid==1) + { + attr_out->attr.mode = S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO; + attr_out->attr.ino = 1; + } + else if(nodeid==3) + { + attr_out->attr.mode = S_IFREG | S_IRWXU | S_IRWXG | S_IRWXO; + attr_out->attr.ino = 3; + attr_out->attr.size = io.fuse_ring.backing_f_size; + attr_out->attr.blocks = round_up(attr_out->attr.size, 512); + attr_out->attr.blksize = getpagesize(); + } + else + { + out_header->error = -EACCES; + out_header->len = sizeof(fuse_out_header); + } + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_getattr(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + uint64_t nodeid = fheader->nodeid; + + fuse_getattr_in* getattr_in = reinterpret_cast(rbytes_buf); + bool getattr_fh = (getattr_in->getattr_flags & FUSE_GETATTR_FH)>0; + if(getattr_fh) + { + DBG_PRINT(std::cout << "fattr fh" << std::endl); + nodeid = getattr_in->fh; + } + + co_return co_await send_attr(io, fuse_io, fheader->unique, nodeid); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_setattr(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + uint64_t nodeid = fheader->nodeid; + + fuse_setattr_in* setattr_in = reinterpret_cast(rbytes_buf); + if(setattr_in->fh) + { + DBG_PRINT(std::cout << "fattr fh" << std::endl); + nodeid = setattr_in->fh; + } + + if(nodeid==3) + { + DBG_PRINT(std::cout << "Set attr new size " << setattr_in->size << " denied" << std::endl); + } + + co_return co_await send_attr(io, fuse_io, fheader->unique, nodeid); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_lookup(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + + DBG_PRINT(std::cout << "fuse lookup " << rbytes_buf << std::endl); + + std::string lname = rbytes_buf; + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_entry_out) + sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + fuse_entry_out* entry_out = reinterpret_cast(fuse_io->scratch_buf + sizeof(fuse_out_header)); + + entry_out->generation = 0; + entry_out->entry_valid = 3600; + entry_out->entry_valid_nsec = 0; + entry_out->attr_valid = 3600; + entry_out->attr_valid_nsec = 0; + + if(lname=="volume") + { + DBG_PRINT(std::cout << "Looking up volume" << std::endl); + entry_out->nodeid = 3; + entry_out->attr = {}; + entry_out->attr.mode = S_IFREG | S_IRWXU | S_IRWXG | S_IRWXO; + entry_out->attr.ino = 3; + entry_out->attr.size = io.fuse_ring.backing_f_size; + entry_out->attr.blocks = round_up(entry_out->attr.size, 512); + entry_out->attr.blksize = getpagesize(); + } + else + { + entry_out->nodeid = 1; + entry_out->attr = {}; + entry_out->attr.mode = S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO; + entry_out->attr.ino = 1; + } + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_opendir(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + fuse_open_in* open_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "opendir nodeid " << fheader->nodeid << std::endl); + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_open_out) + sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + fuse_open_out* open_out = reinterpret_cast(fuse_io->scratch_buf + sizeof(fuse_out_header)); + open_out->fh = 1; + open_out->open_flags = open_in->flags | FOPEN_CACHE_DIR; + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_open(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + fuse_open_in* open_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "open nodeid " << fheader->nodeid << std::endl); + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_open_out) + sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + fuse_open_out* open_out = reinterpret_cast(fuse_io->scratch_buf + sizeof(fuse_out_header)); + open_out->fh = 3; + open_out->open_flags = open_in->flags | FOPEN_KEEP_CACHE | FOPEN_DIRECT_IO; + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_read(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + uint64_t read_offset; + uint32_t read_size; + { + fuse_read_in* read_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "read nodeid " << fheader->nodeid << " off: " << read_in->offset << " size: "<size << std::endl); + + if(fheader->nodeid!=3) + { + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->unique = fheader->unique; + out_header->error = -ENOENT; + co_return co_await send_reply(io, fuse_io); + } + + if(read_in->offset + read_in->size > io.fuse_ring.backing_f_size) + { + read_in->size = io.fuse_ring.backing_f_size - read_in->offset; + DBG_PRINT(std::cout << "Reading less: " << read_in->size << std::endl); + } + + read_offset = read_in->offset; + read_size = read_in->size; + } + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_out_header) + read_size; + out_header->unique = fheader->unique; + + io_uring_sqe* sqe1 = co_await io.get_sqe(); + + io_uring_prep_write_fixed(sqe1, fuse_io->pipe[1], + fuse_io->scratch_buf, sizeof(fuse_out_header), + -1, fuse_io->scratch_buf_idx); + sqe1->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe *sqe2 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe2, io.fuse_ring.backing_fd, + read_offset, fuse_io->pipe[1], -1, read_size, + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe2->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe *sqe3 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe3, fuse_io->pipe[0], + -1, io.fuse_ring.fd, -1, out_header->len, + SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe3->flags |= IOSQE_FIXED_FILE; + + std::vector rcs = co_await io.complete({sqe1, sqe2, sqe3}); + + for(int rc: rcs) + { + if(rc<0) + co_return -1; + } + + if(rcs[0]len) + { + co_return -1; + } + + co_return 0; +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_write(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + uint64_t write_offset; + uint32_t write_size; + { + fuse_write_in* write_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "write nodeid " << fheader->nodeid << " off: " << write_in->offset << " size: "<< write_in->size << std::endl); + + if(fheader->nodeid!=3) + { + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->unique = fheader->unique; + out_header->error = -ENOENT; + out_header->len = sizeof(fuse_out_header); + co_return co_await send_reply(io, fuse_io); + } + + write_offset = write_in->offset; + write_size = write_in->size; + + /*if(write_offset + write_size > io.fuse_ring.backing_f_size) + { + write_size = io.fuse_ring.backing_f_size - write_offset; + std::cout << "Writing less: " << write_size << std::endl; + }*/ + + if(fheader->len!=sizeof(fuse_in_header)+sizeof(fuse_write_in)+write_size) + { + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->unique = fheader->unique; + out_header->error = -EINVAL; + out_header->len = sizeof(fuse_out_header); + co_return co_await send_reply(io, fuse_io); + } + } + + /*if(write_size<=4096) + { + io_uring_sqe* sqe1 = co_await io.get_sqe(); + io_uring_prep_read_fixed(sqe1, fuse_io->pipe[0], fuse_io->scratch_buf, + write_size, 0, fuse_io->scratch_buf_idx); + sqe1->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe* sqe2 = co_await io.get_sqe(); + io_uring_prep_write(sqe2, io.fuse_ring.backing_fd, fuse_io->scratch_buf, + write_size, write_offset); + sqe2->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + uint64_t header_unique = fheader->unique; + + fuse_out_header* out_header = reinterpret_cast(fuse_io->header_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_out_header) + sizeof(fuse_write_out); + out_header->unique = header_unique; + + fuse_write_out* write_out = reinterpret_cast(fuse_io->header_buf + sizeof(fuse_out_header)); + write_out->size = write_size; + write_out->padding = 0; + + io_uring_sqe* sqe3 = co_await io.get_sqe(); + io_uring_prep_write(sqe3, fuse_io->pipe[1], + fuse_io->header_buf, out_header->len, + 0); + sqe3->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe* sqe4 = co_await io.get_sqe(); + io_uring_prep_splice(sqe4, fuse_io->pipe[0], + -1, io.fuse_ring.fd, -1, out_header->len, + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe4->flags |= IOSQE_FIXED_FILE; + + std::vector rcs = co_await io.complete({sqe1, sqe2, sqe3, sqe4}); + + for(int rc: rcs) + { + if(rc<0) + co_return -1; + } + + co_return 0; + } + + std::cout << "Splice ... write size " << write_size << std::endl;*/ + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_out_header) + sizeof(fuse_write_out); + out_header->unique = fheader->unique; + + fuse_write_out* write_out = reinterpret_cast(fuse_io->scratch_buf + sizeof(fuse_out_header)); + write_out->size = write_size; + write_out->padding = 0; + + io_uring_sqe* sqe1 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe1, fuse_io->pipe[0], + -1, io.fuse_ring.backing_fd, write_offset, write_size, + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe1->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe* sqe2 = co_await io.get_sqe(); + + io_uring_prep_write_fixed(sqe2, fuse_io->pipe[1], + fuse_io->scratch_buf, out_header->len, + 0, fuse_io->scratch_buf_idx); + sqe2->flags |= IOSQE_FIXED_FILE | IOSQE_IO_LINK; + + io_uring_sqe *sqe3 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe3, fuse_io->pipe[0], + -1, io.fuse_ring.fd, -1, out_header->len, + SPLICE_F_MOVE| SPLICE_F_FD_IN_FIXED | SPLICE_F_NONBLOCK); + sqe3->flags |= IOSQE_FIXED_FILE; + + std::vector rcs = co_await io.complete({sqe1, sqe2, sqe3}); + + if(rcs[0]<0) + { + out_header->error = rcs[0]; + out_header->len = sizeof(fuse_out_header); + co_return co_await send_reply(io, fuse_io); + } + + if(rcs[0]size = rcs[0]; + co_return co_await send_reply(io, fuse_io); + } + + if(rcs[1]<0 || rcs[2]<0 || + rcs[1]!=out_header->len || rcs[2]!=out_header->len) + { + co_return -1; + } + + DBG_PRINT(std::cout << "FUSE_WRITE done" << std::endl); + + co_return 0; +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_releasedir(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + fuse_release_in* release_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "releasedir nodeid " << fheader->nodeid << std::endl); + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + co_return co_await send_reply(io, fuse_io); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_release(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + fuse_release_in* release_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "release nodeid " << fheader->nodeid << std::endl); + + fuse_out_header* out_header = reinterpret_cast(fuse_io->scratch_buf); + out_header->error = 0; + out_header->len = sizeof(fuse_out_header); + out_header->unique = fheader->unique; + + co_return co_await send_reply(io, fuse_io); +} + +void add_dir(std::vector& buf, const std::string& name, size_t off, const struct stat& stbuf) +{ + size_t bsize = FUSE_DIRENT_ALIGN(FUSE_NAME_OFFSET + name.size()); + size_t orig_off = buf.size(); + buf.resize(buf.size()+bsize); + fuse_dirent* dirent = reinterpret_cast(&buf[orig_off]); + + dirent->ino = stbuf.st_ino; + dirent->namelen = name.size(); + dirent->off = off; + dirent->type = (stbuf.st_mode & S_IFMT) >> 12; + memcpy(dirent->name, name.data(), name.size()); + memset(dirent->name + name.size(), 0, bsize-FUSE_NAME_OFFSET-name.size()); +} + +[[nodiscard]] fuse_io_service::io_uring_task handle_readdir(fuse_io_service& io, fuse_io_service::SFuseIo* fuse_io, + char* rbytes_buf) +{ + fuse_in_header* fheader = reinterpret_cast(fuse_io->header_buf); + + fuse_read_in* read_in = reinterpret_cast(rbytes_buf); + + DBG_PRINT(std::cout << "readdir nodeid " << fheader->nodeid << " offset: " << read_in->offset << " read_flags: " + << read_in->read_flags << " size: " << read_in->size << std::endl); + + std::vector out_buf(sizeof(fuse_out_header)); + fuse_out_header* out_header = reinterpret_cast(out_buf.data()); + out_header->error = 0; + out_header->unique = fheader->unique; + + if(read_in->offset==0) + { + struct stat stbuf = {}; + stbuf.st_mode = S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO; + + stbuf.st_ino = 2; + add_dir(out_buf, ".", 1, stbuf); + + stbuf.st_ino = 3; + add_dir(out_buf, "..", 2, stbuf); + + stbuf.st_ino = 4; + stbuf.st_size = io.fuse_ring.backing_f_size; + stbuf.st_blocks = round_up(stbuf.st_size, 512); + add_dir(out_buf, "volume", 3, stbuf); + } + + out_header = reinterpret_cast(out_buf.data()); + out_header->len = out_buf.size(); + + co_return co_await send_reply(io, fuse_io, out_buf); +} + +fuse_io_service::io_uring_task queue_fuse_read(fuse_io_service& io) +{ + fuse_io_service::SFuseIo* fuse_io = io.get_fuse_io(); + + DBG_PRINT(std::cout << "queue_fuse_read" << std::endl); + struct io_uring_sqe *sqe1 = co_await io.get_sqe(); + struct io_uring_sqe *sqe2 = co_await io.get_sqe(); + + io_uring_prep_splice(sqe1, io.fuse_ring.fd, -1, fuse_io->pipe[1], + -1, io.fuse_ring.max_bufsize, SPLICE_F_MOVE|SPLICE_F_NONBLOCK|SPLICE_F_FD_IN_FIXED); + sqe1->flags |= IOSQE_IO_HARDLINK | IOSQE_FIXED_FILE; + + io_uring_prep_read_fixed(sqe2, fuse_io->pipe[0], fuse_io->header_buf, + sizeof(fuse_in_header), 0, fuse_io->header_buf_idx); + sqe2->flags |= IOSQE_FIXED_FILE; + + auto [rbytes, res] = co_await io.complete(std::make_pair(sqe1, sqe2)); + + if(rbytes<0) + { + //std::cerr << "Error reading from fuse" << std::endl; + co_return -1; + } + + if(res<0 || res(fuse_io->header_buf); + + DBG_PRINT(std::cout << "## fheader opcode: "<< fheader->opcode << " unique: "<< fheader->unique << std::endl); + + size_t req_read_rbytes = 0; + bool req_read_add_zero=false; + bool req_allow_add_bytes=false; + switch(fheader->opcode) + { + case FUSE_GETATTR: + req_read_rbytes = sizeof(fuse_getattr_in); + break; + case FUSE_SETATTR: + req_read_rbytes = sizeof(fuse_setattr_in); + break; + case FUSE_OPENDIR: + req_read_rbytes = sizeof(fuse_open_in); + break; + case FUSE_READDIR: + req_read_rbytes = sizeof(fuse_read_in); + break; + case FUSE_RELEASEDIR: + req_read_rbytes = sizeof(fuse_release_in); + break; + case FUSE_LOOKUP: + req_read_rbytes = rbytes - sizeof(fuse_in_header); + req_read_add_zero=true; + break; + case FUSE_OPEN: + req_read_rbytes = sizeof(fuse_open_in); + break; + case FUSE_READ: + req_read_rbytes = sizeof(fuse_read_in); + break; + case FUSE_RELEASE: + req_read_rbytes = sizeof(fuse_release_in); + break; + case FUSE_WRITE: + req_read_rbytes = sizeof(fuse_write_in); + req_allow_add_bytes=true; + break; + default: + req_read_rbytes = rbytes - sizeof(fuse_in_header); + } + + std::vector rbytes_buf_d; + char* rbytes_buf; + if(req_read_rbytes>0) + { + if(!req_allow_add_bytes && + req_read_rbytes!=rbytes - sizeof(fuse_in_header)) + { + co_return -1; + } + + rbytes_buf = co_await read_rbytes(io, fuse_io, req_read_rbytes, + req_read_add_zero, rbytes_buf_d); + if(rbytes_buf==nullptr) + { + co_return -1; + } + } + +/*#undef DBG_PRINT +#define DBG_PRINT(x) x*/ + int rc; + switch(fheader->opcode) + { + case FUSE_GETATTR: + DBG_PRINT(std::cout << "FUSE_GETATTR" << std::endl); + rc = co_await handle_getattr(io, fuse_io, rbytes_buf); + break; + case FUSE_SETATTR: + DBG_PRINT(std::cout << "FUSE_SETATTR" << std::endl); + rc = co_await handle_setattr(io, fuse_io, rbytes_buf); + break; + case FUSE_OPENDIR: + DBG_PRINT(std::cout << "FUSE_OPENDIR" << std::endl); + rc = co_await handle_opendir(io, fuse_io, rbytes_buf); + break; + case FUSE_READDIR: + DBG_PRINT(std::cout << "FUSE_READDIR" << std::endl); + rc = co_await handle_readdir(io, fuse_io, rbytes_buf); + break; + case FUSE_RELEASEDIR: + DBG_PRINT(std::cout << "FUSE_RELEASEDIR" << std::endl); + rc = co_await handle_releasedir(io, fuse_io, rbytes_buf); + break; + case FUSE_LOOKUP: + DBG_PRINT(std::cout << "FUSE_LOOKUP" << std::endl); + rc = co_await handle_lookup(io, fuse_io, rbytes_buf); + break; + case FUSE_OPEN: + DBG_PRINT(std::cout << "FUSE_OPEN" << std::endl); + rc = co_await handle_open(io, fuse_io, rbytes_buf); + break; + case FUSE_READ: + DBG_PRINT(std::cout << "FUSE_READ" << std::endl); + rc = co_await handle_read(io, fuse_io, rbytes_buf); + break; + case FUSE_RELEASE: + DBG_PRINT(std::cout << "FUSE_RELEASE" << std::endl); + rc = co_await handle_release(io, fuse_io, rbytes_buf); + break; + case FUSE_WRITE: + DBG_PRINT(std::cout << "FUSE_WRITE" << std::endl); + rc = co_await handle_write(io, fuse_io, rbytes_buf); + break; + default: + DBG_PRINT(std::cout << "## Unhandled opcode: " << fheader->opcode << std::endl); + rc = co_await handle_unknown(io, fuse_io); + break; + } +/*#undef DBG_PRINT +#define DBG_PRINT(x)*/ + + io.release_fuse_io(fuse_io); + + DBG_PRINT(std::cout << "## handle fuse done" << std::endl); + co_return rc; +} + +int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_background, int congestion_threshold) +{ + umount(mountpoint.c_str()); + + struct stat stbuf; + int rc = stat(mountpoint.c_str(), &stbuf); + if(rc!=0) + { + perror(("Error stat() mountpoint \""+mountpoint+"\". ").c_str()); + return 1; + } + + int fuse_fd = open("/dev/fuse", O_RDWR | O_CLOEXEC); + if(fuse_fd==-1) + { + perror("Error opening /dev/fuse."); + return 2; + } + + char rootmode[10]; + snprintf(rootmode, sizeof(rootmode), "%o", stbuf.st_mode & S_IFMT); + std::string mount_opts = "fd="+std::to_string(fuse_fd)+",rootmode="+std::string(rootmode)+",user_id=0,group_id=0,default_permissions,allow_other"; + + rc = mount("tclouddrive", mountpoint.c_str(), "fuse", MS_NOSUID | MS_NODEV|MS_NOATIME|MS_NOEXEC, mount_opts.c_str()); + + if(rc!=0) + { + perror("Error mounting fuse file system."); + return 3; + } + + const size_t init_buf_size = 8192; + char* init_buf = new char[init_buf_size]; + rc = read(fuse_fd, init_buf, init_buf_size); + + if(rc<=0) + { + perror(("Err fuse init rc="+std::to_string(rc)).c_str()); + return 4; + } + + struct InitInMsg + { + fuse_in_header header; + fuse_init_in init_in; + }; + + InitInMsg* init_in = reinterpret_cast(init_buf); + + if(init_in->header.opcode != FUSE_INIT) + { + std::cerr << "Unexpected opcode during init" << std::endl; + return 5; + } + + if(init_in->header.len!=sizeof(InitInMsg)) + { + std::cerr << "Unexpected length during init" << std::endl; + return 5; + } + + if(init_in->init_in.majorinit_in.major << std::endl; + return 5; + } + + struct InitOutMsg + { + fuse_out_header header; + fuse_init_out init_out; + }; + InitOutMsg init_out; + init_out.header.error = 0; + init_out.header.len = sizeof(InitOutMsg); + init_out.header.unique = init_in->header.unique; + init_out.init_out.major = FUSE_KERNEL_VERSION; + init_out.init_out.minor =FUSE_KERNEL_MINOR_VERSION; + + if(init_in->init_in.major>FUSE_KERNEL_VERSION) + { + if(write(fuse_fd, &init_out, sizeof(init_out))!=sizeof(init_out)) + { + perror("Error writing ver reply"); + return 6; + } + + int rc = read(fuse_fd, init_buf, init_buf_size); + + if(rc<=0) + { + perror("Err fuse init 2"); + return 7; + } + } + + init_out.init_out.max_readahead = init_in->init_in.max_readahead; + if( !(init_in->init_in.flags & FUSE_ASYNC_READ) + || !(init_in->init_in.flags & FUSE_PARALLEL_DIROPS) + || !(init_in->init_in.flags & FUSE_AUTO_INVAL_DATA) + || !(init_in->init_in.flags & FUSE_HANDLE_KILLPRIV) + || !(init_in->init_in.flags & FUSE_ASYNC_DIO) + || !(init_in->init_in.flags & FUSE_IOCTL_DIR) + || !(init_in->init_in.flags & FUSE_ATOMIC_O_TRUNC) + || !(init_in->init_in.flags & FUSE_SPLICE_READ) + || !(init_in->init_in.flags & FUSE_SPLICE_WRITE) + || !(init_in->init_in.flags & FUSE_MAX_PAGES) + || !(init_in->init_in.flags & FUSE_WRITEBACK_CACHE) + || !(init_in->init_in.flags & FUSE_EXPORT_SUPPORT) + || !(init_in->init_in.flags & FUSE_SPLICE_MOVE) ) + { + std::cerr << "Linux kernel is missing required fuse capabilities." << std::endl; + return 8; + } + + init_out.init_out.flags = FUSE_MAX_PAGES |FUSE_PARALLEL_DIROPS + | FUSE_BIG_WRITES |FUSE_ASYNC_READ | FUSE_AUTO_INVAL_DATA + | FUSE_HANDLE_KILLPRIV | FUSE_ASYNC_DIO | FUSE_IOCTL_DIR + | FUSE_ATOMIC_O_TRUNC | FUSE_SPLICE_READ | FUSE_SPLICE_WRITE + | FUSE_MAX_PAGES | FUSE_WRITEBACK_CACHE | FUSE_EXPORT_SUPPORT + | FUSE_SPLICE_MOVE; + + + init_out.init_out.max_background = max_background; + init_out.init_out.congestion_threshold = congestion_threshold; + init_out.init_out.max_pages = static_cast(fuse_max_pages); + init_out.init_out.time_gran = 1; + init_out.init_out.max_write = getpagesize()*fuse_max_pages; + + if(write(fuse_fd, &init_out, sizeof(init_out))!=sizeof(init_out)) + { + perror("Error writing init reply"); + return 9; + } + + struct io_uring fuse_uring; + + rc = io_uring_queue_init(init_out.init_out.max_background*2, &fuse_uring, IORING_SETUP_SQPOLL); + + if(rc<0) + { + perror("Error setting up io_uring."); + return 10; + } + + std::vector fixed_fds; + fixed_fds.push_back(fuse_fd); + int fuse_fixed_fd = 0; + std::vector reg_buffers; + + fuse_io_service::FuseRing fuse_ring; + fuse_ring.fd = fuse_fixed_fd; + fuse_ring.backing_fd = fixed_fds.size(); + fixed_fds.push_back(backing_fd); + fuse_ring.backing_fd_orig = backing_fd; + + size_t max_bufsize = init_out.init_out.max_write + sizeof(fuse_in_header) + sizeof(fuse_write_in); + + std::vector header_buf_v(header_buf_size*init_out.init_out.max_background); + char* header_buf = header_buf_v.data(); + + struct iovec iov; + iov.iov_base = header_buf; + iov.iov_len = header_buf_v.size(); + size_t header_buf_idx = reg_buffers.size(); + reg_buffers.push_back(iov); + + std::vector scratch_buf_v(scratch_buf_size*init_out.init_out.max_background); + char* scratch_buf = scratch_buf_v.data(); + iov.iov_base = scratch_buf; + iov.iov_len = scratch_buf_v.size(); + size_t scratch_buf_idx = reg_buffers.size(); + reg_buffers.push_back(iov); + + for(size_t i=0;itype = 0; + rc = pipe2(new_io->pipe, O_CLOEXEC|O_NONBLOCK); + if(rc!=0) + { + perror("Error creating pipe."); + return 11; + } + + rc = fcntl(new_io->pipe[0], F_SETPIPE_SZ, max_bufsize); + if(rc<0) + { + perror(("Error setting pipe size to "+std::to_string(max_bufsize)+".").c_str()); + return 12; + } + + fixed_fds.push_back(new_io->pipe[0]); + new_io->pipe[0] = fixed_fds.size()-1; + fixed_fds.push_back(new_io->pipe[1]); + new_io->pipe[1] = fixed_fds.size()-1; + + new_io->header_buf = header_buf; + header_buf+=header_buf_size; + new_io->header_buf_idx = header_buf_idx; + + new_io->scratch_buf = scratch_buf; + scratch_buf+=scratch_buf_size; + new_io->scratch_buf_idx = scratch_buf_idx; + + fuse_ring.ios.push(new_io); + } + + rc = io_uring_register_files(&fuse_uring, &fixed_fds[0], fixed_fds.size()); + if(rc<0) + { + perror("Error registering fuse io_uring files."); + return 13; + } + + rc = io_uring_register_buffers(&fuse_uring, ®_buffers[0], reg_buffers.size()); + if(rc<0) + { + perror("Error registering fuse io_uring buffers."); + return 14; + } + + fuse_ring.ring = &fuse_uring; + fuse_ring.ring_submit = false; + fuse_ring.max_bufsize = max_bufsize; + + struct stat bst; + if(fstat(backing_fd, &bst)!=0) + { + perror("Error getting backing file info"); + return 15; + } + + fuse_ring.backing_f_size = bst.st_size; + + std::cout << "Running..." << std::endl; + fuse_io_service service(fuse_ring); + rc = service.run(queue_fuse_read); + + io_uring_unregister_buffers(&fuse_uring); + io_uring_unregister_files(&fuse_uring); + + return rc; +} diff --git a/fuseuring_main.h b/fuseuring_main.h new file mode 100755 index 0000000..3f801cf --- /dev/null +++ b/fuseuring_main.h @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright (C) Martin Raiber +#pragma once +#include "../Interface/File.h" +#include + +int fuseuring_main(int backing_fd, const std::string& mountpoint, int max_background, int congestion_threshold); \ No newline at end of file diff --git a/main.cpp b/main.cpp new file mode 100755 index 0000000..70b9b87 --- /dev/null +++ b/main.cpp @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright (C) Martin Raiber +#include "fuseuring_main.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +int main(int argc, char* argv[]) +{ + if(argc<5) + { + std::cerr << "Not enough arguments ./fuseuring [backing file path] [fuse mount path] [backing file size] [fuse max_background]" << std::endl; + return 101; + } + + int backing_fd = open(argv[1], O_CLOEXEC|O_CREAT|O_RDWR); + //int backing_fd = memfd_create("backing_file", MFD_CLOEXEC); + + if(backing_fd==-1) + { + perror("Error opening backing file"); + return 1; + } + + int64_t backing_file_size = atoll(argv[3]); + + int rc = posix_fallocate(backing_fd, 0, backing_file_size); + if(rc!=0) + { + std::cerr << "Error allocating 1GB for backing file rc: " << rc << std::endl; + return 1; + } + + rc = prctl(PR_SET_IO_FLUSHER, 1, 0, 0, 0); + + if(rc!=0) + { + perror("Error setting PR_SET_IO_FLUSHER"); + } + + struct rlimit rlimit; + rlimit.rlim_cur = RLIM_INFINITY; + rlimit.rlim_max = RLIM_INFINITY; + rc = setrlimit(RLIMIT_MEMLOCK, &rlimit); + if(rc!=0) + { + perror("Error increasing RLIMIT_MEMLOCK"); + } + + int fuse_max_background = atoi(argv[4]); + + return fuseuring_main(backing_fd, argv[2], fuse_max_background, fuse_max_background+1000); +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..340dffd --- /dev/null +++ b/readme.md @@ -0,0 +1,149 @@ +## Fuseuring + +This is an example program that demonstrates how to use io_uring to drive Linux fuse. The only thing it does is forward a single file to the fuse mount point. + +### Why? + +Using io_uring reduces the number of system calls the fuse program makes which should speed it up. + +Fuse is sometimes used to export a single file to be used as volume, as well. E.g. vdfuse, [s3backer](https://github.com/archiecobbs/s3backer), [UrBackup](https://www.urbackup.org/) (vhd/vhdz mounting). Recent improvements in loop (direct-io), fuse and Linux memory management (`PR_SET_IO_FLUSHER`) have made this really performant. + +### Performance + +With large iodepth it gets good results (backing file on tmpfs): + +``` +fio ~/fio/ssd-test.fio +seq-read: (g=0): rw=read, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=libaio, iodepth=1024 +rand-read: (g=1): rw=randread, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=libaio, iodepth=1024 +seq-write: (g=2): rw=write, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=libaio, iodepth=1024 +rand-write: (g=3): rw=randwrite, bs=(R) 4096B-4096B, (W) 4096B-4096B, (T) 4096B-4096B, ioengine=libaio, iodepth=1024 +fio-3.21 +Starting 4 processes +Jobs: 1 (f=1): [_(3),w(1)][100.0%][w=384MiB/s][w=98.3k IOPS][eta 00m:00s] +seq-read: (groupid=0, jobs=1): err= 0: pid=178089: Sat Nov 7 16:39:44 2020 + read: IOPS=165k, BW=643MiB/s (674MB/s)(3500MiB/5443msec) + slat (nsec): min=1000, max=9680.6k, avg=4407.18, stdev=18980.57 + clat (usec): min=318, max=33373, avg=6190.66, stdev=3064.92 + lat (usec): min=320, max=33379, avg=6195.14, stdev=3066.45 + clat percentiles (usec): + | 1.00th=[ 1778], 5.00th=[ 2573], 10.00th=[ 2999], 20.00th=[ 3621], + | 30.00th=[ 4228], 40.00th=[ 4817], 50.00th=[ 5473], 60.00th=[ 6259], + | 70.00th=[ 7308], 80.00th=[ 8586], 90.00th=[10421], 95.00th=[11994], + | 99.00th=[14222], 99.50th=[15401], 99.90th=[26608], 99.95th=[28967], + | 99.99th=[32637] + bw ( KiB/s): min=614544, max=745196, per=100.00%, avg=662270.43, stdev=49887.45, samples=7 + iops : min=153636, max=186299, avg=165567.43, stdev=12471.72, samples=7 + lat (usec) : 500=0.01%, 750=0.02%, 1000=0.06% + lat (msec) : 2=1.54%, 4=24.67%, 10=61.52%, 20=11.89%, 50=0.30% + cpu : usr=18.83%, sys=43.37%, ctx=84144, majf=0, minf=1036 + IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.9% + submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0% + complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1% + issued rwts: total=896000,0,0,0 short=0,0,0,0 dropped=0,0,0,0 + latency : target=0, window=0, percentile=100.00%, depth=1024 +rand-read: (groupid=1, jobs=1): err= 0: pid=178096: Sat Nov 7 16:39:44 2020 + read: IOPS=91.0k, BW=356MiB/s (373MB/s)(3500MiB/9845msec) + slat (nsec): min=1100, max=16238k, avg=9354.69, stdev=93603.08 + clat (usec): min=189, max=47250, avg=11192.69, stdev=2888.77 + lat (usec): min=192, max=47252, avg=11202.13, stdev=2890.28 + clat percentiles (usec): + | 1.00th=[ 6521], 5.00th=[ 7635], 10.00th=[ 8225], 20.00th=[ 8979], + | 30.00th=[ 9634], 40.00th=[10290], 50.00th=[10814], 60.00th=[11469], + | 70.00th=[12125], 80.00th=[13042], 90.00th=[14353], 95.00th=[15664], + | 99.00th=[20317], 99.50th=[22938], 99.90th=[34866], 99.95th=[42730], + | 99.99th=[44303] + bw ( KiB/s): min=333610, max=383504, per=100.00%, avg=365092.00, stdev=16877.24, samples=13 + iops : min=83402, max=95876, avg=91272.69, stdev=4219.28, samples=13 + lat (usec) : 250=0.01%, 500=0.01%, 750=0.01%, 1000=0.01% + lat (msec) : 2=0.06%, 4=0.16%, 10=34.80%, 20=63.89%, 50=1.06% + cpu : usr=11.62%, sys=27.98%, ctx=128214, majf=0, minf=1037 + IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.9% + submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0% + complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1% + issued rwts: total=896000,0,0,0 short=0,0,0,0 dropped=0,0,0,0 + latency : target=0, window=0, percentile=100.00%, depth=1024 +seq-write: (groupid=2, jobs=1): err= 0: pid=178107: Sat Nov 7 16:39:44 2020 + write: IOPS=131k, BW=513MiB/s (538MB/s)(3500MiB/6818msec); 0 zone resets + slat (nsec): min=1100, max=4008.6k, avg=5821.89, stdev=18205.65 + clat (usec): min=239, max=21808, avg=7749.25, stdev=3409.43 + lat (usec): min=243, max=21809, avg=7755.16, stdev=3411.55 + clat percentiles (usec): + | 1.00th=[ 2638], 5.00th=[ 3163], 10.00th=[ 3654], 20.00th=[ 4555], + | 30.00th=[ 5407], 40.00th=[ 6390], 50.00th=[ 7373], 60.00th=[ 8356], + | 70.00th=[ 9372], 80.00th=[10683], 90.00th=[12256], 95.00th=[14091], + | 99.00th=[17171], 99.50th=[17957], 99.90th=[19792], 99.95th=[20317], + | 99.99th=[21103] + bw ( KiB/s): min=479727, max=612688, per=99.28%, avg=521884.56, stdev=40381.06, samples=9 + iops : min=119931, max=153172, avg=130470.78, stdev=10095.48, samples=9 + lat (usec) : 250=0.01%, 500=0.01%, 750=0.01%, 1000=0.01% + lat (msec) : 2=0.16%, 4=13.54%, 10=61.47%, 20=24.72%, 50=0.08% + cpu : usr=17.10%, sys=39.93%, ctx=112618, majf=0, minf=14 + IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.9% + submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0% + complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1% + issued rwts: total=0,896000,0,0 short=0,0,0,0 dropped=0,0,0,0 + latency : target=0, window=0, percentile=100.00%, depth=1024 +rand-write: (groupid=3, jobs=1): err= 0: pid=178115: Sat Nov 7 16:39:44 2020 + write: IOPS=103k, BW=403MiB/s (422MB/s)(3500MiB/8695msec); 0 zone resets + slat (nsec): min=1200, max=16351k, avg=8084.77, stdev=67434.88 + clat (usec): min=1126, max=38086, avg=9877.97, stdev=2952.08 + lat (usec): min=1128, max=38088, avg=9886.14, stdev=2953.72 + clat percentiles (usec): + | 1.00th=[ 6325], 5.00th=[ 7373], 10.00th=[ 7701], 20.00th=[ 8094], + | 30.00th=[ 8455], 40.00th=[ 8848], 50.00th=[ 9110], 60.00th=[ 9503], + | 70.00th=[10159], 80.00th=[11076], 90.00th=[12518], 95.00th=[14615], + | 99.00th=[23725], 99.50th=[26870], 99.90th=[32637], 99.95th=[35390], + | 99.99th=[36963] + bw ( KiB/s): min=340472, max=447488, per=99.79%, avg=411309.00, stdev=31900.28, samples=17 + iops : min=85118, max=111872, avg=102827.24, stdev=7975.05, samples=17 + lat (msec) : 2=0.04%, 4=0.28%, 10=67.14%, 20=30.73%, 50=1.82% + cpu : usr=13.52%, sys=30.96%, ctx=74227, majf=0, minf=12 + IO depths : 1=0.1%, 2=0.1%, 4=0.1%, 8=0.1%, 16=0.1%, 32=0.1%, >=64=99.9% + submit : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.0% + complete : 0=0.0%, 4=100.0%, 8=0.0%, 16=0.0%, 32=0.0%, 64=0.0%, >=64=0.1% + issued rwts: total=0,896000,0,0 short=0,0,0,0 dropped=0,0,0,0 + latency : target=0, window=0, percentile=100.00%, depth=1024 + +Run status group 0 (all jobs): + READ: bw=643MiB/s (674MB/s), 643MiB/s-643MiB/s (674MB/s-674MB/s), io=3500MiB (3670MB), run=5443-5443msec + +Run status group 1 (all jobs): + READ: bw=356MiB/s (373MB/s), 356MiB/s-356MiB/s (373MB/s-373MB/s), io=3500MiB (3670MB), run=9845-9845msec + +Run status group 2 (all jobs): + WRITE: bw=513MiB/s (538MB/s), 513MiB/s-513MiB/s (538MB/s-538MB/s), io=3500MiB (3670MB), run=6818-6818msec + +Run status group 3 (all jobs): + WRITE: bw=403MiB/s (422MB/s), 403MiB/s-403MiB/s (422MB/s-422MB/s), io=3500MiB (3670MB), run=8695-8695msec + +Disk stats (read/write): + loop0: ios=1363439/1466332, merge=428561/321891, ticks=1868225/1741830, in_queue=3610054, util=94.74% +``` + +### How to compile + +Need gcc >= 10 for C++ coroutines. Depends on (recent) liburing-dev. + +```bash +autoreconf --install +./configure +make +``` + +### How to run + +Needs a recent Linux kernel (>=5.9) for io_uring functionality and recent `losetup`. + +```bash +FMNT=/media/test +BMNT=/media/bench +mkdir -p "$FMNT" +./fuseuring /tmp/backing_file.img "$FMNT" $((500*1024*1024)) 5000 & +LODEV=$(losetup --find --show "$FMNT/volume" --direct-io=on) +mkfs.ext4 $LODEV || true +mount $LODEV "$BMNT" +losetup -d $LODEV +``` + +Or see `bench.sh`.