Improve memory file

This commit is contained in:
Martin Raiber 2021-05-08 09:15:29 +02:00
parent a92608b10c
commit e64da3fd8b
9 changed files with 341 additions and 53 deletions

View File

@ -103,7 +103,7 @@ public:
virtual void resetSparseExtentIter() = 0;
virtual SSparseExtent nextSparseExtent() = 0;
virtual bool Resize(int64 new_size, bool set_sparse=true) = 0;
virtual std::vector<SFileExtent> getFileExtents(int64 starting_offset, int64 block_size, bool& more_data) = 0;
virtual std::vector<SFileExtent> getFileExtents(int64 starting_offset, int64 block_size, bool& more_data, unsigned int flags=0) = 0;
virtual IVdlVolCache* createVdlVolCache() = 0;
virtual int64 getValidDataLength(IVdlVolCache* vol_cache) = 0;
@ -116,6 +116,14 @@ public:
virtual os_file_handle getOsHandle(bool release_handle = false) = 0;
};
class IMemFile : public IFsFile
{
public:
virtual char* getDataPtr() = 0;
virtual void protect_mem() = 0;
virtual void unprotect_mem() = 0;
};
class ScopedDeleteFn
{
public:

View File

@ -24,6 +24,7 @@ class ISettingsReader;
class IPipe;
class IFile;
class IFsFile;
class IMemFile;
class IOutputStream;
class IThreadPool;
class ICondition;
@ -63,6 +64,7 @@ public:
virtual void setLogCircularBufferSize(size_t size)=0;
virtual std::vector<SCircularLogEntry> getCicularLogBuffer(size_t minid)=0;
virtual void Log(const std::string &pStr, int LogLevel=LL_INFO)=0;
virtual void setLogRotationFiles(size_t n) = 0;
virtual bool Write(THREAD_ID tid, const std::string &str, bool cached=true)=0;
virtual bool WriteRaw(THREAD_ID tid, const char *buf, size_t bsize, bool cached=true)=0;
@ -165,7 +167,7 @@ public:
virtual IFsFile* openFile(std::string pFilename, int pMode=0)=0;
virtual IFsFile* openFileFromHandle(void *handle, const std::string& pFilename)=0;
virtual IFsFile* openTemporaryFile(void)=0;
virtual IFile* openMemoryFile(void)=0;
virtual IMemFile* openMemoryFile(const std::string& name, bool mlock_mem)=0;
virtual bool deleteFile(std::string pFilename)=0;
virtual bool fileExists(std::string pFilename)=0;
@ -212,6 +214,8 @@ public:
virtual int getRecvWindowSize() = 0;
#endif
virtual void mallocFlushTcache() = 0;
};
#ifndef NO_INTERFACE

View File

@ -49,6 +49,7 @@
#include "StreamPipe.h"
#include "ThreadPool.h"
#include "file.h"
#include "file_memory.h"
#include "utf8/utf8.h"
#include "MemoryPipe.h"
#include "MemorySettingsReader.h"
@ -471,6 +472,11 @@ void CServer::Log( const std::string &pStr, int LogLevel)
}
}
void CServer::setLogRotationFiles(size_t n)
{
log_rotation_files = n;
}
void CServer::rotateLogfile()
{
if(static_cast<size_t>(logfile.tellp())>log_rotation_size)
@ -1727,10 +1733,9 @@ IFsFile* CServer::openTemporaryFile(void)
return file;
}
IFile* CServer::openMemoryFile(void)
IMemFile* CServer::openMemoryFile(const std::string& name, bool mlock_mem)
{
//return new CMemoryFile();
return openTemporaryFile();
return new CMemoryFile(name, mlock_mem);
}
bool CServer::deleteFile(std::string pFilename)
@ -2265,6 +2270,10 @@ int CServer::getRecvWindowSize()
}
#endif
void CServer::mallocFlushTcache()
{
}
void CServer::addWebSocket(IWebSocket* websocket)
{
IScopedLock lock(web_socket_mutex);

View File

@ -73,6 +73,7 @@ public:
virtual void setLogCircularBufferSize(size_t size);
virtual std::vector<SCircularLogEntry> getCicularLogBuffer(size_t minid);
virtual void Log(const std::string &pStr, int LogLevel=LL_INFO);
virtual void setLogRotationFiles(size_t n);
virtual bool Write(THREAD_ID tid, const std::string &str, bool cached=true);
virtual bool WriteRaw(THREAD_ID tid, const char *buf, size_t bsize, bool cached=true);
@ -159,7 +160,7 @@ public:
virtual IFsFile* openFile(std::string pFilename, int pMode=0);
virtual IFsFile* openFileFromHandle(void *handle, const std::string& pFilename);
virtual IFsFile* openTemporaryFile(void);
virtual IFile* openMemoryFile(void);
virtual IMemFile* openMemoryFile(const std::string& name, bool mlock_mem);
virtual bool deleteFile(std::string pFilename);
virtual bool fileExists(std::string pFilename);
@ -218,6 +219,8 @@ public:
virtual int getRecvWindowSize();
#endif
void mallocFlushTcache();
private:
void logToCircularBuffer(const std::string& msg, int loglevel);

2
file.h
View File

@ -61,7 +61,7 @@ public:
bool Resize(int64 new_size, bool set_sparse=true);
void resetSparseExtentIter();
SSparseExtent nextSparseExtent();
std::vector<SFileExtent> getFileExtents(int64 starting_offset, int64 block_size, bool& more_data);
std::vector<SFileExtent> getFileExtents(int64 starting_offset, int64 block_size, bool& more_data, unsigned int flags);
IFsFile::os_file_handle getOsHandle(bool release_handle = false);
IVdlVolCache* createVdlVolCache();
int64 getValidDataLength(IVdlVolCache* vol_cache);

View File

@ -499,7 +499,7 @@ IFsFile::SSparseExtent File::nextSparseExtent()
#endif
}
std::vector<IFsFile::SFileExtent> File::getFileExtents(int64 starting_offset, int64 block_size, bool& more_data)
std::vector<IFsFile::SFileExtent> File::getFileExtents(int64 starting_offset, int64 block_size, bool& more_data, unsigned int flags)
{
#ifdef __linux__
std::vector<char> buf;

View File

@ -16,67 +16,77 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "Server.h"
#include "file_memory.h"
#include <algorithm>
#include <assert.h>
#ifndef _WIN32
#include <unistd.h>
#include <memory.h>
#include <sys/mman.h>
#include <pthread.h>
#endif
#include "stringtools.h"
CMemoryFile::CMemoryFile()
int CMemoryFile::page_size = 0;
CMemoryFile::CMemoryFile(const std::string& name, bool mlock_mem)
:mutex(Server->createSharedMutex()),
mlock_mem(mlock_mem), name(name), mprotect_mutex(Server->createMutex()), memory_protected(0)
{
pos=0;
#ifndef _WIN32
if (page_size == 0)
{
page_size = sysconf(_SC_PAGE_SIZE);
if (page_size <= 0)
{
Server->Log("Invalid page size " + convert(page_size) + " errno " + convert((int64)errno), LL_ERROR);
page_size = 4096;
}
}
#endif
}
std::string CMemoryFile::Read(_u32 tr)
CMemoryFile::~CMemoryFile()
{
if(pos>=data.size() )
return "";
size_t rtr=(std::min)((size_t)tr, data.size()-pos);
std::string ret;
ret.resize(rtr);
memcpy(&ret[0], &data[pos], rtr);
pos+=rtr;
#ifndef _WIN32
if (mlock_mem && !data.empty())
munlock(data.data(), data.size());
#endif
}
std::string CMemoryFile::Read(_u32 tr, bool *has_error)
{
std::string ret = Read(pos, tr, has_error);
pos += ret.size();
return ret;
}
_u32 CMemoryFile::Read(char* buffer, _u32 bsize)
_u32 CMemoryFile::Read(char* buffer, _u32 bsize, bool *has_error)
{
if(pos>=data.size() )
return 0;
size_t rtr=(std::min)((size_t)bsize, data.size()-pos);
memcpy(buffer, &data[pos], rtr);
pos+=rtr;
return (_u32)rtr;
_u32 read = Read(pos, buffer, bsize, has_error);
pos += read;
return read;
}
_u32 CMemoryFile::Write(const std::string &tw)
_u32 CMemoryFile::Write(const std::string &tw, bool *has_error)
{
return Write(tw.c_str(), (_u32)tw.size());
return Write(tw.c_str(), (_u32)tw.size(), has_error);
}
_u32 CMemoryFile::Write(const char* buffer, _u32 bsize)
_u32 CMemoryFile::Write(const char* buffer, _u32 bsize, bool *has_error)
{
if(pos+bsize>data.size())
{
data.resize(pos+bsize);
memcpy(&data[pos], buffer, bsize);
pos+=bsize;
return bsize;
}
else
{
memcpy(&data[pos], buffer, bsize);
pos+=bsize;
return bsize;
}
_u32 written = Write(pos, buffer, bsize, has_error);
pos += written;
return written;
}
bool CMemoryFile::Seek(_i64 spos)
{
if((size_t)spos<data.size() && spos>=0)
if(spos>=0)
{
pos=(size_t)spos;
return true;
@ -89,6 +99,7 @@ bool CMemoryFile::Seek(_i64 spos)
_i64 CMemoryFile::Size(void)
{
IScopedReadLock lock(mutex.get());
return data.size();
}
@ -99,7 +110,166 @@ _i64 CMemoryFile::RealSize()
std::string CMemoryFile::getFilename()
{
return "_MEMORY_";
return name;
}
void CMemoryFile::resetSparseExtentIter()
{
}
IFsFile::SSparseExtent CMemoryFile::nextSparseExtent()
{
return SSparseExtent();
}
bool CMemoryFile::Resize(int64 new_size, bool set_sparse)
{
if (new_size < 0)
{
return false;
}
IScopedWriteLock lock(mutex.get());
unprotect_mem();
data.resize(new_size);
#ifndef _WIN32
if(mlock_mem)
mlock(data.data(), data.size());
#endif
protect_mem();
return true;
}
std::vector<IFsFile::SFileExtent> CMemoryFile::getFileExtents(int64 starting_offset, int64 block_size, bool & more_data, unsigned int flags)
{
return std::vector<IFsFile::SFileExtent>();
}
IFsFile::os_file_handle CMemoryFile::getOsHandle(bool release_handle)
{
return os_file_handle();
}
char * CMemoryFile::getDataPtr()
{
return data.data();
}
void CMemoryFile::protect_mem()
{
#ifndef _WIN32
IScopedLock lock(mprotect_mutex.get());
if (memory_protected<=1)
{
if (!data.empty()
&& mprotect(data.data(), data.size(), PROT_READ) != 0)
{
Server->Log("mprotect(r) failed at " + convert((int64)data.data()) + " size " + convert(data.size()) + " errno " + convert((int64)errno), LL_WARNING);
}
--memory_protected;
}
else
{
--memory_protected;
}
#endif
}
void CMemoryFile::unprotect_mem()
{
#ifndef _WIN32
IScopedLock lock(mprotect_mutex.get());
if (memory_protected==0)
{
if (!data.empty()
&& mprotect(data.data(), data.size(), PROT_READ|PROT_WRITE) != 0)
{
Server->Log("mprotect(w) failed at " + convert((int64)data.data()) + " size " + convert(data.size()) + " errno " + convert((int64)errno), LL_WARNING);
}
++memory_protected;
}
else
{
++memory_protected;
}
#endif
}
IVdlVolCache* CMemoryFile::createVdlVolCache()
{
return nullptr;
}
int64 CMemoryFile::getValidDataLength(IVdlVolCache* vol_cache)
{
return -1;
}
std::string CMemoryFile::Read(int64 spos, _u32 tr, bool * has_error)
{
IScopedReadLock lock(mutex.get());
if (spos >= static_cast<int64>(data.size()))
return "";
size_t rtr = static_cast<size_t>((std::min)(static_cast<int64>(tr), static_cast<int64>(data.size()) - spos));
std::string ret;
ret.assign(&data[spos], rtr);
return ret;
}
_u32 CMemoryFile::Read(int64 spos, char * buffer, _u32 bsize, bool * has_error)
{
IScopedReadLock lock(mutex.get());
if (spos >= static_cast<int64>(data.size()))
return 0;
size_t rtr = static_cast<size_t>((std::min)(static_cast<int64>(bsize), static_cast<int64>(data.size()) - spos));
memcpy(buffer, &data[spos], rtr);
return (_u32)rtr;
}
_u32 CMemoryFile::Write(int64 spos, const std::string & tw, bool * has_error)
{
return Write(spos, tw.data(), static_cast<_u32>(tw.size()), has_error);
}
_u32 CMemoryFile::Write(int64 spos, const char * buffer, _u32 bsize, bool * has_error)
{
IScopedReadLock lock(mutex.get());
if (spos + bsize>static_cast<int64>(data.size()))
{
lock.relock(NULL);
IScopedWriteLock wrlock(mutex.get());
unprotect_mem();
if (spos + bsize > static_cast<int64>(data.size()))
{
#ifndef _WIN32
if (mlock_mem && !data.empty())
munlock(data.data(), data.size());
#endif
data.resize(spos + bsize);
#ifndef _WIN32
if (mlock_mem)
mlock(data.data(), data.size());
#endif
}
memcpy(&data[spos], buffer, bsize);
protect_mem();
return bsize;
}
else
{
unprotect_mem();
memcpy(&data[spos], buffer, bsize);
protect_mem();
return bsize;
}
}
bool CMemoryFile::PunchHole( _i64 spos, _i64 size )

View File

@ -1,25 +1,119 @@
#include <string>
#include <algorithm>
#include <memory>
#include "Interface/File.h"
#include "Interface/SharedMutex.h"
#include <atomic>
#ifndef _WIN32
#include <sys/mman.h>
#endif
class CMemoryFile : public IFile
namespace
{
template <class T>
struct aligned_allocator {
typedef T value_type;
aligned_allocator() noexcept {}
template <class U> aligned_allocator(const aligned_allocator<U>&) noexcept {}
T* allocate(std::size_t n);
void deallocate(T* p, std::size_t n);
};
template <class T, class U>
constexpr bool operator== (const aligned_allocator<T>&, const aligned_allocator<U>&) noexcept
{
return true;
}
template <class T, class U>
constexpr bool operator!= (const aligned_allocator<T>&, const aligned_allocator<U>&) noexcept
{
return false;
}
}
class CMemoryFile : public IMemFile
{
public:
CMemoryFile();
CMemoryFile(const std::string& name, bool mlock_mem);
~CMemoryFile();
virtual std::string Read(_u32 tr);
virtual _u32 Read(char* buffer, _u32 bsize);
virtual _u32 Write(const std::string &tw);
virtual _u32 Write(const char* buffer, _u32 bsize);
virtual std::string Read(_u32 tr, bool *has_error = NULL);
virtual _u32 Read(char* buffer, _u32 bsize, bool *has_error = NULL);
virtual _u32 Write(const std::string &tw, bool *has_error = NULL);
virtual _u32 Write(const char* buffer, _u32 bsize, bool *has_error = NULL);
virtual bool Seek(_i64 spos);
virtual _i64 Size(void);
virtual _i64 RealSize();
virtual bool PunchHole( _i64 spos, _i64 size );
virtual bool Sync();
virtual std::string Read(int64 spos, _u32 tr, bool * has_error = NULL);
virtual _u32 Read(int64 spos, char * buffer, _u32 bsize, bool * has_error = NULL);
virtual _u32 Write(int64 spos, const std::string & tw, bool * has_error = NULL);
virtual _u32 Write(int64 spos, const char * buffer, _u32 bsiz, bool * has_error = NULL);
virtual std::string getFilename(void);
virtual void resetSparseExtentIter();
virtual SSparseExtent nextSparseExtent();
virtual bool Resize(int64 new_size, bool set_sparse = true);
virtual std::vector<SFileExtent> getFileExtents(int64 starting_offset, int64 block_size, bool& more_data, unsigned int flags);
virtual os_file_handle getOsHandle(bool release_handle = false);
virtual char* getDataPtr();
void protect_mem();
void unprotect_mem();
static int get_page_size()
{
return page_size;
}
virtual IVdlVolCache* createVdlVolCache() override;
virtual int64 getValidDataLength(IVdlVolCache* vol_cache) override;
private:
std::string data;
std::vector<char, aligned_allocator<char> > data;
std::auto_ptr<ISharedMutex> mutex;
size_t pos;
bool mlock_mem;
std::string name;
static int page_size;
int memory_protected;
std::auto_ptr<IMutex> mprotect_mutex;
};
namespace
{
template <class T>
T* aligned_allocator<T>::allocate(std::size_t n)
{
#ifdef _WIN32
return static_cast<T*>(malloc(n * sizeof(T)));
#else
int page_size = CMemoryFile::get_page_size();
size_t toalloc = n * sizeof(T);
toalloc = ((toalloc + page_size - 1) / page_size) * page_size;
T* ret = static_cast<T*>(mmap(NULL, toalloc, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
madvise(ret, toalloc, MADV_DONTDUMP);
return ret;
#endif
}
template <class T>
void aligned_allocator<T>::deallocate(T* p, std::size_t n) {
#ifdef _WIN32
free(p);
#else
int page_size = CMemoryFile::get_page_size();
size_t toalloc = n * sizeof(T);
toalloc = ((toalloc + page_size - 1) / page_size) * page_size;
munmap(p, toalloc);
#endif
}
}

View File

@ -994,7 +994,7 @@ IFsFile::SSparseExtent File::nextSparseExtent()
return nextSparseExtent();
}
std::vector<IFsFile::SFileExtent> File::getFileExtents(int64 starting_offset, int64 block_size, bool& more_data)
std::vector<IFsFile::SFileExtent> File::getFileExtents(int64 starting_offset, int64 block_size, bool& more_data, unsigned int flags)
{
std::vector<IFsFile::SFileExtent> ret;