newtx/sdk/base/data.cpp

787 lines
14 KiB
C++

#include "data.h"
#include <string.h>
#if OS_WIN
#else
#include <sys/fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#endif
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
packet_data_base::packet_data_base() : pack_cmd_(0), pack_id_(0)
, progress_notify_(PROGRESS_NOTIFYER()), user_data_(nullptr)
{}
packet_data_base::~packet_data_base()
{}
int packet_data_base::notify_progress(uint64_t total, uint64_t cur_size, uint32_t err)
{
if (progress_notify_)
return progress_notify_(total, cur_size, err, user_data_);
else
return ENOENT;
}
void packet_data_base::set_packet_param(uint32_t cmd, uint32_t id)
{
pack_cmd_ = cmd;
pack_id_ = id;
}
void packet_data_base::set_session_id(uint32_t session_id)
{
session_id_ = session_id;
}
int packet_data_base::get_packet_command(void)
{
return pack_cmd_;
}
int packet_data_base::get_packet_id(void)
{
return pack_id_;
}
uint32_t packet_data_base::get_session_id(void)
{
return session_id_;
}
void packet_data_base::set_progress_notify(PROGRESS_NOTIFYER notify, void* param)
{
progress_notify_ = notify;
user_data_ = param;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
data_holder::data_holder()
{}
data_holder::~data_holder()
{}
void data_holder::cancel(void)
{}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
mem_holder::mem_holder(size_t size) : space_(size)
{
buf_ = (uint8_t*)malloc(size);
if (buf_)
memset(buf_, 0, size);
}
mem_holder::~mem_holder()
{
if (buf_)
free(buf_);
buf_ = nullptr;
}
int mem_holder::put_data(const void* data, uint32_t* size)
{
if (*size > space_ - wpos_)
*size = space_ - wpos_;
memcpy(buf_ + wpos_, data, *size);
wpos_ += *size;
return 0;
}
bool mem_holder::is_complete(void)
{
return wpos_ >= space_;
}
uint32_t mem_holder::get_required(void)
{
if (wpos_ >= space_)
return 0;
else
return space_ - wpos_;
}
size_t mem_holder::data_length(void)
{
return wpos_;
}
uint8_t* mem_holder::data(void)
{
return buf_;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
empty_holer::empty_holer(uint64_t size) : size_(size), put_(0)
{}
empty_holer::~empty_holer()
{}
int empty_holer::put_data(const void* data, uint32_t* size)
{
if (*size >= size_ - put_)
{
*size -= size_ - put_;
put_ = size_;
}
else
{
put_ += *size;
}
notify_progress(size_, put_, 0);
return 0;
}
bool empty_holer::is_complete(void)
{
return size_ == put_;
}
uint32_t empty_holer::get_required(void)
{
return size_ - put_;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
file_saver::file_saver(void) : size_(0), wrote_(0), path_(""), check_(""), dst_(nullptr), pack_cmd_(0), pack_id_(0)
{}
file_saver::~file_saver()
{
utils::to_log(LOG_LEVEL_DEBUG, "Write file(%s) over(%ld/%ld).\n", path_.c_str(), wrote_, size_);
close();
}
void file_saver::close(void)
{
if (map_)
map_->release();
map_ = nullptr;
if(dst_)
fclose(dst_);
dst_ = nullptr;
size_ = wrote_ = pack_cmd_ = pack_id_ = 0;
path_ = check_ = "";
}
int file_saver::set_verify_data(const char* data, size_t len)
{
if (data)
check_ = std::string(data, len);
else
check_ = "";
return 0;
}
int file_saver::open(const char* path, uint64_t size, bool in_mem, size_t off)
{
int err = 0;
close();
wrote_ = off;
path_ = path;
size_ = size;
if (in_mem)
{
map_ = new file_map();
err = map_->open(path, size, false);
if (err || !map_->map())
{
map_->release();
map_ = nullptr;
}
else
return err;
}
err = utils::make_file_size(path, wrote_);
dst_ = fopen(path, "ab+");
if (dst_)
{
unsigned long long space = 0;
std::string dir(path);
size_t pos = dir.rfind(PATH_SEPARATOR[0]);
if (pos != std::string::npos)
dir.erase(pos);
err = utils::get_disk_space(dir.c_str(), nullptr, &space, nullptr);
if (err || space < size * 1.5)
{
fclose(dst_);
dst_ = nullptr;
remove(path);
if (err == 0)
err = ENOSPC;
}
}
else
{
err = errno;
}
return err;
}
int file_saver::put_data(const void* data, uint32_t* size/*[in] - total bytes of data; [out] - used bytes*/)
{
if (!dst_)
{
if (map_)
{
// fix me: we consider whole file is all mapped in memory
int w = *size > size_ - wrote_ ? size_ - wrote_ : *size;
memcpy(map_->buffer() + wrote_, data, w);
*size = w;
wrote_ += w;
notify_progress(size_, wrote_, 0);
return 0;
}
return ENOENT;
}
int w = *size > size_ - wrote_ ? size_ - wrote_ : *size,
real_w = fwrite(data, 1, w, dst_), // should handle error here !
err = 0;
*size = real_w;
wrote_ += real_w;
if(wrote_ >= size_)
{
fclose(dst_);
dst_ = nullptr;
}
else if (real_w < w) // what happens ?
{
err = ferror(dst_);
}
notify_progress(size_, wrote_, err);
return 0;
}
bool file_saver::is_complete(void)
{
return wrote_ >= size_;
}
uint32_t file_saver::get_required(void)
{
return size_ - wrote_;
}
void file_saver::cancel(void)
{
std::string discard(path_);
utils::to_log(LOG_LEVEL_DEBUG, "Discard receiving file (%u/%u): '%s'.\n", wrote_, size_, path_.c_str());
close();
remove(discard.c_str());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
data_source::data_source()
{}
data_source::~data_source()
{}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// dyn_mem
uint64_t dyn_mem::mem_used_bytes_ = 0;
MUTEX dyn_mem::mem_lock_;
dyn_mem::dyn_mem(size_t size) : buf_(nullptr), len_(0), space_(ALIGN_TO(size, 16))
{
buf_ = (uint8_t*)malloc(space_);
if (buf_)
{
SIMPLE_LOCK(dyn_mem::mem_lock_);
dyn_mem::mem_used_bytes_ += space_;
memset(buf_, 0, space_);
}
}
dyn_mem::dyn_mem(void* buf, size_t size) : buf_((uint8_t*)buf), space_(size), len_(size)
{
}
dyn_mem::~dyn_mem()
{
if (buf_)
{
free(buf_);
{
SIMPLE_LOCK(dyn_mem::mem_lock_);
dyn_mem::mem_used_bytes_ -= space_;
}
}
}
uint64_t dyn_mem::mem_used(void)
{
return dyn_mem::mem_used_bytes_;
}
dyn_mem_ptr dyn_mem::memory(size_t size)
{
return new dyn_mem(size);
}
uint32_t dyn_mem::space(void)
{
return space_;
}
bool dyn_mem::set_len(size_t len)
{
if (len > space_)
return false;
len_ = len;
return true;
}
int dyn_mem::put(const void* data, int len)
{
if (len + len_ > space_)
len = space_ - len;
if (len > 0)
{
memcpy(buf_ + len_, data, len);
len_ += len;
}
else
len = 0;
return len;
}
void* dyn_mem::detach(size_t* size)
{
void* buf = buf_;
if (size)
*size = space_;
space_ = len_ = 0;
buf_ = nullptr;
return buf;
}
size_t dyn_mem::used(size_t len)
{
if (len >= len_)
{
len_ = 0;
}
else if (len)
{
memcpy(buf_, buf_ + len, len_ - len);
len_ -= len;
}
return len_;
}
dyn_mem& dyn_mem::operator+=(dyn_mem& r)
{
if (len_ + r.get_rest() > space_)
{
size_t size = ALIGN_TO(len_ + r.get_rest(), 16);
uint8_t* buf = (uint8_t*)malloc(size);
memcpy(buf, buf_, len_);
free(buf_);
buf_ = buf;
{
SIMPLE_LOCK(dyn_mem::mem_lock_);
dyn_mem::mem_used_bytes_ += size - space_;
}
space_ = size;
}
memcpy(buf_ + len_, r.buf_, r.get_rest());
len_ += r.get_rest();
return *this;
}
bool dyn_mem::is_memory_block(void)
{
return true;
}
uint32_t dyn_mem::get_rest(void)
{
return len_;
}
// following API valid when is_memory_block() return true
uint8_t* dyn_mem::ptr(void)
{
return buf_;
}
// following API valid when is_memory_block() return false
int dyn_mem::fetch_data(void* buf, uint32_t* size)
{
if (*size >= len_)
{
memcpy(buf, buf_, len_);
*size = len_;
len_ = 0;
}
else
{
memcpy(buf, buf_, *size);
used(*size);
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
file_reader::file_reader() : len_(0), src_(nullptr), path_(""), consume_(0)
{}
file_reader::~file_reader()
{
if(src_)
fclose(src_);
if (map_)
map_->release();
utils::to_log(LOG_LEVEL_DEBUG, "Read file(%s) over(%ld/%ld).\n", path_.c_str(), consume_, len_);
}
int file_reader::open(const char* file, bool in_mem, size_t off)
{
if(src_)
fclose(src_);
src_ = nullptr;
consume_ = off;
if (in_mem)
{
map_ = new file_map();
if (map_->open(file, 0, true) || !map_->map())
{
map_->release();
map_ = nullptr;
}
else
{
if (map_->total_size() <= off)
{
map_->release();
map_ = nullptr;
return EOVERFLOW;
}
path_ = file;
len_ = map_->total_size();
return 0;
}
}
src_ = fopen(file, "rb");
if(!src_)
return errno;
FSEEK(src_, 0, SEEK_END);
len_ = FTELL(src_);
FSEEK(src_, consume_, SEEK_SET);
path_ = file;
if (len_ <= consume_)
{
fclose(src_);
src_ = nullptr;
return EOVERFLOW;
}
return 0;
}
int file_reader::attach(FILE* f)
{
if (src_)
{
fclose(src_);
src_ = nullptr;
}
if (map_)
{
map_->release();
map_ = nullptr;
}
uint64_t cur = FTELL(f);
FSEEK(f, 0, SEEK_END);
len_ = FTELL(f);
FSEEK(f, cur, SEEK_SET);
if (len_ <= cur)
return EINVAL;
src_ = f;
len_ -= cur;
consume_ = 0;
return 0;
}
FILE* file_reader::detach(void)
{
FILE* ret = src_;
src_ = nullptr;
len_ = 0;
path_ = "";
return ret;
}
bool file_reader::is_memory_block(void)
{
return map_ != nullptr;
}
uint32_t file_reader::get_rest(void)
{
return len_ - consume_;
}
// following API valid when is_memory_block() return true
uint8_t* file_reader::ptr(void)
{
return map_ ? map_->buffer() : nullptr;
}
// following API valid when is_memory_block() return false
int file_reader::fetch_data(void* buf, uint32_t* size)
{
if (!src_)
{
if (map_)
{
if (*size + consume_ >= len_)
*size = len_ - consume_;
memcpy(buf, map_->buffer() + consume_, *size);
consume_ += *size;
notify_progress(len_, consume_, 0);
return 0;
}
return ENODATA;
}
size_t r = fread(buf, 1, *size, src_); // fix me if ERROR occurs !!!
consume_ += r;
*size = r;
if (consume_ >= len_)
{
fclose(src_);
src_ = nullptr;
}
notify_progress(len_, consume_, 0);
return 0;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
file_map::file_map()
{
utils::get_page_size(&os_map_size_);
}
file_map::~file_map()
{
close();
}
void file_map::unmap(void)
{
if (buf_)
#if OS_WIN
UnmapViewOfFile(buf_);
#else
munmap(buf_, map_size_);
#endif
buf_ = nullptr;
map_off_ = map_size_ = off_ = 0;
}
int file_map::open(const char* file, uint64_t size, bool readonly)
{
close();
std::string oper(readonly ? "open" : "create");
#if OS_WIN
HANDLE h = INVALID_HANDLE_VALUE;
DWORD access = PAGE_READONLY;
if (readonly)
{
h = CreateFileA(file, GENERIC_READ, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
}
else
{
access = PAGE_READWRITE;
h = CreateFileA(file, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
}
if (h == INVALID_HANDLE_VALUE)
{
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: %s '%s' failed: %d\n", oper.c_str(), file, GetLastError());
return EFAULT;
}
if (readonly)
{
DWORD* hi = (DWORD*)&size + 1,
* lo = (DWORD*)&size;
*lo = GetFileSize(h, hi);
}
else
{
LONG lo = size & 0x0ffffffff,
hi = size >> 32;
DWORD ret = SetFilePointer(h, lo, &hi, FILE_BEGIN);
if (ret == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR)
{
CloseHandle(h);
remove(file);
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: no space(%ld) for '%s'.\n", size, file);
return ENOSPC;
}
else
{
// write a byte to ensure map success
SetFilePointer(h, -1, NULL, FILE_CURRENT);
lo = 0;
WriteFile(h, &lo, 1, &ret, NULL);
}
}
map_ = CreateFileMappingA(h, NULL, access, 0, 0, NULL);
access = GetLastError();
CloseHandle(h);
if (!map_)
{
remove(file);
map_ = INVALID_HANDLE_VALUE;
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: create mapping object for '%s' failed: %d.\n", file, access);
return EFAULT;
}
#else
if (readonly)
map_ = (HANDLE)::open(file, O_RDONLY, 0644);
else
{
int err = utils::make_file_size(file, size);
if(err)
{
utils::to_log(LOG_LEVEL_FATAL, "FileMapping: make file(%s) size(%ld) = %d\n", file, size, err);
return err;
}
map_ = (HANDLE)::open(file, O_RDWR, 0666);
utils::to_log(LOG_LEVEL_DEBUG, "FileMapping: open('%s', O_APPEND, 0666) = %p\n", file, map_);
}
if (map_ == INVALID_HANDLE_VALUE)
{
int err = errno;
if(!readonly)
remove(file);
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: create mapping object for '%s' failed: %d.\n", file, err);
return err;
}
#endif
path_file_ = file;
total_ = size;
read_only_ = readonly;
return 0;
}
int file_map::close(void)
{
unmap();
if (map_ != INVALID_HANDLE_VALUE)
#if OS_WIN
CloseHandle(map_);
#else
::close((int)(long)map_);
#endif
map_ = INVALID_HANDLE_VALUE;
total_ = 0;
return 0;
}
uint64_t file_map::total_size(void)
{
return total_;
}
uint8_t* file_map::map(uint64_t off, uint32_t* size)
{
uint32_t len = 0;
unmap();
if (!size)
size = &len;
if (off < total_)
{
DWORD hi = 0,
lo = 0,
cnt = 0;
map_off_ = off / os_map_size_ * os_map_size_;
off_ = off - map_off_;
hi = map_off_ >> 32;
lo = map_off_;
cnt = total_ - map_off_;
if (cnt - off > *size && *size)
{
cnt = ALIGN_TO(*size + off, os_map_size_);
if(cnt > total_ - map_off_)
cnt = total_ - map_off_;
}
map_size_ = cnt;
#if OS_WIN
buf_ = (uint8_t*)MapViewOfFile(map_, read_only_ ? FILE_MAP_READ : FILE_MAP_READ | FILE_MAP_WRITE, hi, lo, map_size_);
#else
int priv = PROT_READ, prot = MAP_PRIVATE;
if (!read_only_)
{
priv |= PROT_WRITE;
prot = MAP_SHARED;
}
buf_ = (uint8_t*)mmap(nullptr, map_size_, priv, prot, (int)(long)map_, map_off_);
if(buf_ == INVALID_HANDLE_VALUE)
buf_ = nullptr;
#endif
if (!buf_)
{
utils::to_log(LOG_LEVEL_WARNING, "FileMapping: request map(%p + %u), real map(%p + %u) failed: %d\n"
, off, *size, map_off_, map_size_, GetLastError());
*size = 0;
}
else
*size = cnt - off;
}
return buf_ ? buf_ + off_ : nullptr;
}
uint8_t* file_map::buffer(void)
{
return buf_ ? buf_ + off_ : nullptr;
}