From 1a7eac4d82ec34e04af30f3858fec5231408576e Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 23 Aug 2011 10:45:29 -0700 Subject: [PATCH] parallelize rados export and import * use workqueues to parallelize rados export and import * Put export and import into separate files * Fix RADOS_SYNC_TMP_SUFFIX_LEN bug Signed-off-by: Colin McCabe --- src/Makefile.am | 3 +- src/rados_export.cc | 228 +++++++ src/rados_import.cc | 240 +++++++ src/rados_sync.cc | 1575 ++++++++++++++++++------------------------- src/rados_sync.h | 215 ++++++ 5 files changed, 1339 insertions(+), 922 deletions(-) create mode 100644 src/rados_export.cc create mode 100644 src/rados_import.cc create mode 100644 src/rados_sync.h diff --git a/src/Makefile.am b/src/Makefile.am index 52b6ae3aa4135..86c089dd60c49 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -277,7 +277,7 @@ librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 \ -export-symbols-regex '^rbd_.*' -lpthread $(EXTRALIBS) lib_LTLIBRARIES += librbd.la -rados_SOURCES = rados.cc rados_sync.cc +rados_SOURCES = rados.cc rados_import.cc rados_export.cc rados_sync.cc rados_LDADD = libglobal.la librados.la -lpthread -lm $(CRYPTO_LIBS) $(EXTRALIBS) bin_PROGRAMS += rados @@ -949,6 +949,7 @@ endif # headers... and everything else we want to include in a 'make dist' # that autotools doesn't magically identify. noinst_HEADERS = \ + rados_sync.h \ auth/cephx/CephxAuthorizeHandler.h\ auth/cephx/CephxKeyServer.h\ auth/cephx/CephxProtocol.h\ diff --git a/src/rados_export.cc b/src/rados_export.cc new file mode 100644 index 0000000000000..c5de7807757e8 --- /dev/null +++ b/src/rados_export.cc @@ -0,0 +1,228 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rados_sync.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "include/rados/librados.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace librados; + +class ExportLocalFileWQ : public RadosSyncWQ { +public: + ExportLocalFileWQ(IoCtxDistributor *io_ctx_dist, time_t ti, + ThreadPool *tp, ExportDir *export_dir, bool force) + : RadosSyncWQ(io_ctx_dist, ti, tp), + m_export_dir(export_dir), + m_force(force) + { + } +private: + void _process(std::string *s) { + IoCtx &io_ctx(m_io_ctx_dist->get_ioctx()); + int flags = 0; + auto_ptr sobj; + auto_ptr dobj; + const std::string &rados_name(*s); + std::list < std::string > only_in_a; + std::list < std::string > only_in_b; + std::list < std::string > diff; + int ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), sobj); + if (ret) { + cerr << ERR_PREFIX << "couldn't get '" << rados_name << "' from rados: error " + << ret << std::endl; + _exit(ret); + } + std::string obj_path(sobj->get_fs_path(m_export_dir)); + if (m_force) { + flags |= (CHANGED_CONTENTS | CHANGED_XATTRS); + } + else { + ret = BackedUpObject::from_path(obj_path.c_str(), dobj); + if (ret == ENOENT) { + sobj->get_xattrs(only_in_a); + flags |= CHANGED_CONTENTS; + } + else if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_path returned " + << ret << std::endl; + _exit(ret); + } + else { + sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff); + if ((sobj->get_rados_size() == dobj->get_rados_size()) && + (sobj->get_mtime() == dobj->get_mtime())) { + flags |= CHANGED_CONTENTS; + } + } + } + if (flags & CHANGED_CONTENTS) { + ret = sobj->download(io_ctx, obj_path.c_str()); + if (ret) { + cerr << ERR_PREFIX << "download error: " << ret << std::endl; + _exit(ret); + } + } + diff.splice(diff.begin(), only_in_a); + for (std::list < std::string >::const_iterator x = diff.begin(); + x != diff.end(); ++x) { + flags |= CHANGED_XATTRS; + const Xattr *xattr = sobj->get_xattr(*x); + if (xattr == NULL) { + cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; + _exit(ret); + } + std::string xattr_fs_name(USER_XATTR_PREFIX); + xattr_fs_name += x->c_str(); + ret = setxattr(obj_path.c_str(), xattr_fs_name.c_str(), + xattr->data, xattr->len, 0); + if (ret) { + ret = errno; + cerr << ERR_PREFIX << "setxattr error: " << cpp_strerror(ret) << std::endl; + _exit(ret); + } + } + for (std::list < std::string >::const_iterator x = only_in_b.begin(); + x != only_in_b.end(); ++x) { + flags |= CHANGED_XATTRS; + ret = removexattr(obj_path.c_str(), x->c_str()); + if (ret) { + ret = errno; + cerr << ERR_PREFIX << "removexattr error: " << cpp_strerror(ret) << std::endl; + _exit(ret); + } + } + if (m_force) { + cout << "[force] " << rados_name << std::endl; + } + else if (flags & CHANGED_CONTENTS) { + cout << "[exported] " << rados_name << std::endl; + } + else if (flags & CHANGED_XATTRS) { + cout << "[xattr] " << rados_name << std::endl; + } + } + ExportDir *m_export_dir; + bool m_force; +}; + +class ExportValidateExistingWQ : public RadosSyncWQ { +public: + ExportValidateExistingWQ(IoCtxDistributor *io_ctx_dist, time_t ti, + ThreadPool *tp, const char *dir_name) + : RadosSyncWQ(io_ctx_dist, ti, tp), + m_dir_name(dir_name) + { + } +private: + void _process(std::string *s) { + IoCtx &io_ctx(m_io_ctx_dist->get_ioctx()); + auto_ptr lobj; + const std::string &local_name(*s); + int ret = BackedUpObject::from_file(local_name.c_str(), m_dir_name, lobj); + if (ret) { + cout << ERR_PREFIX << "BackedUpObject::from_file: delete loop: " + << "got error " << ret << std::endl; + _exit(ret); + } + auto_ptr robj; + ret = BackedUpObject::from_rados(io_ctx, lobj->get_rados_name(), robj); + if (ret == -ENOENT) { + // The entry doesn't exist on the remote server; delete it locally + char path[strlen(m_dir_name) + local_name.size() + 2]; + snprintf(path, sizeof(path), "%s/%s", m_dir_name, local_name.c_str()); + if (unlink(path)) { + ret = errno; + cerr << ERR_PREFIX << "error unlinking '" << path << "': " + << cpp_strerror(ret) << std::endl; + _exit(ret); + } + cout << "[deleted] " << "removed '" << local_name << "'" << std::endl; + } + else if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_rados: delete loop: " + << "got error " << ret << std::endl; + _exit(ret); + } + } + const char *m_dir_name; +}; + +int do_rados_export(ThreadPool *tp, IoCtx& io_ctx, + IoCtxDistributor *io_ctx_dist, const char *dir_name, + bool create, bool force, bool delete_after) +{ + int ret; + librados::ObjectIterator oi = io_ctx.objects_begin(); + librados::ObjectIterator oi_end = io_ctx.objects_end(); + auto_ptr export_dir; + export_dir.reset(ExportDir::create_for_writing(dir_name, 1, create)); + if (!export_dir.get()) + return -EIO; + ExportLocalFileWQ export_object_wq(io_ctx_dist, time(NULL), + tp, export_dir.get(), force); + for (; oi != oi_end; ++oi) { + export_object_wq.queue(new std::string(*oi)); + } + export_object_wq.drain(); + + if (delete_after) { + ExportValidateExistingWQ export_val_wq(io_ctx_dist, time(NULL), + tp, dir_name); + DirHolder dh; + int err = dh.opendir(dir_name); + if (err) { + cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: " + << cpp_strerror(err) << std::endl; + return err; + } + while (true) { + struct dirent *de = readdir(dh.dp); + if (!de) + break; + if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) + continue; + if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX)) { + char path[strlen(dir_name) + strlen(de->d_name) + 2]; + snprintf(path, sizeof(path), "%s/%s", dir_name, de->d_name); + if (unlink(path)) { + ret = errno; + cerr << ERR_PREFIX << "error unlinking temporary file '" << path << "': " + << cpp_strerror(ret) << std::endl; + return ret; + } + cout << "[deleted] " << "removed temporary file '" << de->d_name << "'" << std::endl; + continue; + } + export_val_wq.queue(new std::string(de->d_name)); + } + export_val_wq.drain(); + } + cout << "[done]" << std::endl; + return 0; +} diff --git a/src/rados_import.cc b/src/rados_import.cc new file mode 100644 index 0000000000000..6ca159905e175 --- /dev/null +++ b/src/rados_import.cc @@ -0,0 +1,240 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rados_sync.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "include/rados/librados.hpp" + +using namespace librados; +using std::auto_ptr; + +class ImportLocalFileWQ : public RadosSyncWQ { +public: + ImportLocalFileWQ(const char *dir_name, bool force, + IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp) + : RadosSyncWQ(io_ctx_dist, ti, tp), + m_dir_name(dir_name), + m_force(force) + { + } +private: + void _process(std::string *s) { + IoCtx &io_ctx(m_io_ctx_dist->get_ioctx()); + const std::string &local_name(*s); + auto_ptr sobj; + auto_ptr dobj; + std::list < std::string > only_in_a; + std::list < std::string > only_in_b; + std::list < std::string > diff; + int flags = 0; + + int ret = BackedUpObject::from_file(local_name.c_str(), + m_dir_name.c_str(), sobj); + if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_file: got error " + << ret << std::endl; + _exit(ret); + } + const char *rados_name(sobj->get_rados_name()); + if (m_force) { + flags |= (CHANGED_CONTENTS | CHANGED_XATTRS); + } + else { + ret = BackedUpObject::from_rados(io_ctx, rados_name, dobj); + if (ret == -ENOENT) { + flags |= CHANGED_CONTENTS; + sobj->get_xattrs(only_in_a); + } + else if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_rados returned " + << ret << std::endl; + _exit(ret); + } + else { + sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff); + if ((sobj->get_rados_size() == dobj->get_rados_size()) && + (sobj->get_mtime() == dobj->get_mtime())) { + flags |= CHANGED_CONTENTS; + } + } + } + if (flags & CHANGED_CONTENTS) { + ret = sobj->upload(io_ctx, local_name.c_str(), m_dir_name.c_str()); + if (ret) { + cerr << ERR_PREFIX << "upload error: " << ret << std::endl; + _exit(ret); + } + } + for (std::list < std::string >::const_iterator x = only_in_a.begin(); + x != only_in_a.end(); ++x) { + flags |= CHANGED_XATTRS; + const Xattr *xattr = sobj->get_xattr(*x); + if (xattr == NULL) { + cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; + _exit(ret); + } + bufferlist bl; + bl.append(xattr->data, xattr->len); + ret = io_ctx.setxattr(rados_name, x->c_str(), bl); + if (ret < 0) { + ret = errno; + cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name + << "', xattr_name='" << x->c_str() << "'): " << cpp_strerror(ret) + << std::endl; + _exit(ret); + } + } + for (std::list < std::string >::const_iterator x = diff.begin(); + x != diff.end(); ++x) { + flags |= CHANGED_XATTRS; + const Xattr *xattr = sobj->get_xattr(*x); + if (xattr == NULL) { + cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; + _exit(ret); + } + bufferlist bl; + bl.append(xattr->data, xattr->len); + ret = io_ctx.rmxattr(rados_name, x->c_str()); + if (ret < 0) { + cerr << ERR_PREFIX << "io_ctx.rmxattr error2: " << cpp_strerror(ret) + << std::endl; + _exit(ret); + } + ret = io_ctx.setxattr(rados_name, x->c_str(), bl); + if (ret < 0) { + ret = errno; + cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name + << "', xattr='" << x->c_str() << "'): " << cpp_strerror(ret) << std::endl; + _exit(ret); + } + } + for (std::list < std::string >::const_iterator x = only_in_b.begin(); + x != only_in_b.end(); ++x) { + flags |= CHANGED_XATTRS; + ret = io_ctx.rmxattr(rados_name, x->c_str()); + if (ret < 0) { + ret = errno; + cerr << ERR_PREFIX << "rmxattr error3: " << cpp_strerror(ret) << std::endl; + _exit(ret); + } + } + if (m_force) { + cout << "[force] " << rados_name << std::endl; + } + else if (flags & CHANGED_CONTENTS) { + cout << "[imported] " << rados_name << std::endl; + } + else if (flags & CHANGED_XATTRS) { + cout << "[xattr] " << rados_name << std::endl; + } + } + std::string m_dir_name; + bool m_force; +}; + +class ImportValidateExistingWQ : public RadosSyncWQ { +public: + ImportValidateExistingWQ(ExportDir *export_dir, + IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp) + : RadosSyncWQ(io_ctx_dist, ti, tp), + m_export_dir(export_dir) + { + } +private: + void _process(std::string *s) { + IoCtx &io_ctx(m_io_ctx_dist->get_ioctx()); + const std::string &rados_name(*s); + auto_ptr robj; + int ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), robj); + if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_rados in delete loop " + << "returned " << ret << std::endl; + _exit(ret); + } + std::string obj_path(robj->get_fs_path(m_export_dir)); + auto_ptr lobj; + ret = BackedUpObject::from_path(obj_path.c_str(), lobj); + if (ret == ENOENT) { + ret = io_ctx.remove(rados_name); + if (ret && ret != -ENOENT) { + cerr << ERR_PREFIX << "io_ctx.remove(" << obj_path << ") failed " + << "with error " << ret << std::endl; + _exit(ret); + } + cout << "[deleted] " << "removed '" << rados_name << "'" << std::endl; + } + else if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_path in delete loop " + << "returned " << ret << std::endl; + _exit(ret); + } + } + ExportDir *m_export_dir; +}; + +int do_rados_import(ThreadPool *tp, IoCtx &io_ctx, IoCtxDistributor* io_ctx_dist, + const char *dir_name, bool force, bool delete_after) +{ + auto_ptr export_dir; + export_dir.reset(ExportDir::from_file_system(dir_name)); + if (!export_dir.get()) + return -EIO; + DirHolder dh; + int ret = dh.opendir(dir_name); + if (ret) { + cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: " + << cpp_strerror(ret) << std::endl; + return ret; + } + ImportLocalFileWQ import_file_wq(dir_name, force, + io_ctx_dist, time(NULL), tp); + while (true) { + struct dirent *de = readdir(dh.dp); + if (!de) + break; + if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) + continue; + if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX)) + continue; + import_file_wq.queue(new std::string(de->d_name)); + } + import_file_wq.drain(); + + if (delete_after) { + ImportValidateExistingWQ import_val_wq(export_dir.get(), io_ctx_dist, + time(NULL), tp); + librados::ObjectIterator oi = io_ctx.objects_begin(); + librados::ObjectIterator oi_end = io_ctx.objects_end(); + for (; oi != oi_end; ++oi) { + import_val_wq.queue(new std::string(*oi)); + } + import_val_wq.drain(); + } + cout << "[done]" << std::endl; + return 0; +} diff --git a/src/rados_sync.cc b/src/rados_sync.cc index 906aec045a912..848a20aa8efce 100644 --- a/src/rados_sync.cc +++ b/src/rados_sync.cc @@ -12,70 +12,52 @@ * */ +#include "common/ceph_argparse.h" +#include "common/config.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "global/global_context.h" +#include "global/global_init.h" +#include "include/rados/librados.hpp" +#include "rados_sync.h" + #include #include #include #include #include +#include #include #include +#include #include #include #include #include #include -#include "common/ceph_argparse.h" -#include "global/global_context.h" -#include "global/global_init.h" -#include "common/config.h" -#include "common/errno.h" -#include "common/strtol.h" -#include "include/rados/librados.hpp" - -using std::auto_ptr; using namespace librados; +using std::auto_ptr; static const char * const XATTR_RADOS_SYNC_VER = "user.rados_sync_ver"; static const char * const XATTR_FULLNAME = "user.rados_full_name"; -static const char USER_XATTR_PREFIX[] = "user.rados."; +const char USER_XATTR_PREFIX[] = "user.rados."; static const size_t USER_XATTR_PREFIX_LEN = sizeof(USER_XATTR_PREFIX) / sizeof(USER_XATTR_PREFIX[0]) - 1; /* It's important that RADOS_SYNC_TMP_SUFFIX contain at least one character * that we wouldn't normally alllow in a file name-- in this case, $ */ -static const char * const RADOS_SYNC_TMP_SUFFIX = "$tmp"; +const char RADOS_SYNC_TMP_SUFFIX[] = "$tmp"; static const size_t RADOS_SYNC_TMP_SUFFIX_LEN = sizeof(RADOS_SYNC_TMP_SUFFIX) / sizeof(RADOS_SYNC_TMP_SUFFIX[0]) - 1; -static const char ERR_PREFIX[] = "[ERROR] "; - -/* Linux seems to use ENODATA instead of ENOATTR when an extended attribute - * is missing */ -#ifndef ENOATTR -#define ENOATTR ENODATA -#endif - -enum { - CHANGED_XATTRS = 0x1, - CHANGED_CONTENTS = 0x2, -}; - -/* Given the name of an extended attribute from a file in the filesystem, - * returns an empty string if the extended attribute does not represent a rados - * user extended attribute. Otherwise, returns the name of the rados extended - * attribute. - * - * Rados user xattrs are prefixed with USER_XATTR_PREFIX. - */ -static std::string get_user_xattr_name(const char *fs_xattr_name) +std::string get_user_xattr_name(const char *fs_xattr_name) { if (strncmp(fs_xattr_name, USER_XATTR_PREFIX, USER_XATTR_PREFIX_LEN)) return ""; return fs_xattr_name + USER_XATTR_PREFIX_LEN; } -/* Returns true if 'suffix' is a suffix of str */ -static bool is_suffix(const char *str, const char *suffix) +bool is_suffix(const char *str, const char *suffix) { size_t strlen_str = strlen(str); size_t strlen_suffix = strlen(suffix); @@ -84,988 +66,722 @@ static bool is_suffix(const char *str, const char *suffix) return (strcmp(str + (strlen_str - strlen_suffix), suffix) == 0); } -/* Represents a directory in the filesystem that we export rados objects to (or - * import them from.) - */ -class ExportDir -{ -public: - static ExportDir* create_for_writing(const std::string path, int version, +ExportDir* ExportDir::create_for_writing(const std::string path, int version, bool create) - { - if (access(path.c_str(), R_OK | W_OK) == 0) { - return ExportDir::from_file_system(path); - } - if (!create) { - cerr << ERR_PREFIX << "ExportDir: directory '" - << path << "' does not exist. Use --create to create it." - << std::endl; - return NULL; - } - int ret = mkdir(path.c_str(), 0700); - if (ret < 0) { - int err = errno; - if (err != EEXIST) { - cerr << ERR_PREFIX << "ExportDir: mkdir error: " - << cpp_strerror(err) << std::endl; - return NULL; - } - } - char buf[32]; - snprintf(buf, sizeof(buf), "%d", version); - ret = setxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, strlen(buf) + 1, 0); - if (ret < 0) { - int err = errno; - cerr << ERR_PREFIX << "ExportDir: setxattr error :" +{ + if (access(path.c_str(), R_OK | W_OK) == 0) { + return ExportDir::from_file_system(path); + } + if (!create) { + cerr << ERR_PREFIX << "ExportDir: directory '" + << path << "' does not exist. Use --create to create it." + << std::endl; + return NULL; + } + int ret = mkdir(path.c_str(), 0700); + if (ret < 0) { + int err = errno; + if (err != EEXIST) { + cerr << ERR_PREFIX << "ExportDir: mkdir error: " << cpp_strerror(err) << std::endl; return NULL; } - return new ExportDir(version, path); } + char buf[32]; + snprintf(buf, sizeof(buf), "%d", version); + ret = setxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, strlen(buf) + 1, 0); + if (ret < 0) { + int err = errno; + cerr << ERR_PREFIX << "ExportDir: setxattr error :" + << cpp_strerror(err) << std::endl; + return NULL; + } + return new ExportDir(version, path); +} - static ExportDir* from_file_system(const std::string path) - { - if (access(path.c_str(), R_OK)) { - cerr << "ExportDir: source directory '" << path - << "' appears to be inaccessible." << std::endl; - return NULL; - } - int ret; - char buf[32]; - memset(buf, 0, sizeof(buf)); - ret = getxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, sizeof(buf) - 1); - if (ret < 0) { - ret = errno; - if (ret == ENODATA) { - cerr << ERR_PREFIX << "ExportDir: directory '" << path - << "' does not appear to have been created by a rados " - << "export operation." << std::endl; - return NULL; - } - cerr << ERR_PREFIX << "ExportDir: getxattr error :" - << cpp_strerror(ret) << std::endl; - return NULL; - } - std::string err; - ret = strict_strtol(buf, 10, &err); - if (!err.empty()) { - cerr << ERR_PREFIX << "ExportDir: invalid value for " - << XATTR_RADOS_SYNC_VER << ": " << buf << ". parse error: " - << err << std::endl; +ExportDir* ExportDir::from_file_system(const std::string path) +{ + if (access(path.c_str(), R_OK)) { + cerr << "ExportDir: source directory '" << path + << "' appears to be inaccessible." << std::endl; return NULL; - } - if (ret != 1) { - cerr << ERR_PREFIX << "ExportDir: can't handle any naming " - << "convention besides version 1. You must upgrade this program to " - << "handle the data in the new format." << std::endl; + } + int ret; + char buf[32]; + memset(buf, 0, sizeof(buf)); + ret = getxattr(path.c_str(), XATTR_RADOS_SYNC_VER, buf, sizeof(buf) - 1); + if (ret < 0) { + ret = errno; + if (ret == ENODATA) { + cerr << ERR_PREFIX << "ExportDir: directory '" << path + << "' does not appear to have been created by a rados " + << "export operation." << std::endl; return NULL; } - return new ExportDir(ret, path); - } + cerr << ERR_PREFIX << "ExportDir: getxattr error :" + << cpp_strerror(ret) << std::endl; + return NULL; + } + std::string err; + ret = strict_strtol(buf, 10, &err); + if (!err.empty()) { + cerr << ERR_PREFIX << "ExportDir: invalid value for " + << XATTR_RADOS_SYNC_VER << ": " << buf << ". parse error: " + << err << std::endl; + return NULL; + } + if (ret != 1) { + cerr << ERR_PREFIX << "ExportDir: can't handle any naming " + << "convention besides version 1. You must upgrade this program to " + << "handle the data in the new format." << std::endl; + return NULL; + } + return new ExportDir(ret, path); +} - /* Given a rados object name, return something which looks kind of like the - * first part of the name. - * - * The actual file name that the backed-up object is stored in is irrelevant - * to rados_sync. The only reason to make it human-readable at all is to make - * things easier on sysadmins. The XATTR_FULLNAME extended attribute has the - * real, full object name. - * - * This function turns unicode into a bunch of 'at' signs. This could be - * fixed. If you try, be sure to handle all the multibyte characters - * correctly. - * I guess a better hash would be nice too. - */ - std::string get_fs_path(const std::string rados_name) const - { - static int HASH_LENGTH = 17; - size_t i; - size_t strlen_rados_name = strlen(rados_name.c_str()); - size_t sz; - bool need_hash = false; - if (strlen_rados_name > 200) { - sz = 200; +std::string ExportDir::get_fs_path(const std::string rados_name) const +{ + static int HASH_LENGTH = 17; + size_t i; + size_t strlen_rados_name = strlen(rados_name.c_str()); + size_t sz; + bool need_hash = false; + if (strlen_rados_name > 200) { + sz = 200; + need_hash = true; + } + else { + sz = strlen_rados_name; + } + char fs_path[sz + HASH_LENGTH + 1]; + for (i = 0; i < sz; ++i) { + // Just replace anything that looks funny with an 'at' sign. + // Unicode also gets turned into 'at' signs. + signed char c = rados_name[i]; + if (c < 0x20) { + // Since c is signed, this also eliminates bytes with the high bit set + c = '@'; need_hash = true; } - else { - sz = strlen_rados_name; + else if (c == 0x7f) { + c = '@'; + need_hash = true; } - char fs_path[sz + HASH_LENGTH + 1]; - for (i = 0; i < sz; ++i) { - // Just replace anything that looks funny with an 'at' sign. - // Unicode also gets turned into 'at' signs. - signed char c = rados_name[i]; - if (c < 0x20) { - // Since c is signed, this also eliminates bytes with the high bit set - c = '@'; - need_hash = true; - } - else if (c == 0x7f) { - c = '@'; - need_hash = true; - } - else if (c == '/') { - c = '@'; - need_hash = true; - } - else if (c == '\\') { - c = '@'; - need_hash = true; - } - else if (c == '$') { - c = '@'; - need_hash = true; - } - else if (c == ' ') { - c = '_'; - need_hash = true; - } - else if (c == '\n') { - c = '@'; - need_hash = true; - } - else if (c == '\r') { - c = '@'; - need_hash = true; - } - fs_path[i] = c; + else if (c == '/') { + c = '@'; + need_hash = true; } - - if (need_hash) { - uint64_t hash = 17; - for (i = 0; i < strlen_rados_name; ++i) { - hash += (rados_name[i] * 33); - } - // The extra byte of length is because snprintf always NULL-terminates. - snprintf(fs_path + i, HASH_LENGTH + 1, "_%016" PRIx64, hash); + else if (c == '\\') { + c = '@'; + need_hash = true; } - else { - // NULL-terminate. - fs_path[i] = '\0'; + else if (c == '$') { + c = '@'; + need_hash = true; } - - ostringstream oss; - oss << path << "/" << fs_path; - return oss.str(); - } - -private: - ExportDir(int version_, const std::string path_) - : version(version_), - path(path_) - { - } - - int version; - std::string path; -}; - -class DirHolder { -public: - DirHolder() - : dp(NULL) - { - } - ~DirHolder() { - if (!dp) - return; - if (closedir(dp)) { - int err = errno; - cerr << ERR_PREFIX << "closedir failed: " << cpp_strerror(err) << std::endl; + else if (c == ' ') { + c = '_'; + need_hash = true; } - dp = NULL; - } - int opendir(const char *dir_name) { - dp = ::opendir(dir_name); - if (!dp) { - int err = errno; - return err; + else if (c == '\n') { + c = '@'; + need_hash = true; + } + else if (c == '\r') { + c = '@'; + need_hash = true; } - return 0; + fs_path[i] = c; } - DIR *dp; -}; -// Stores a length and a chunk of malloc()ed data -class Xattr { -public: - Xattr(char *data_, ssize_t len_) - : data(data_), len(len_) - { - } - ~Xattr() { - free(data); - } - bool operator==(const struct Xattr &rhs) const { - if (len != rhs.len) - return false; - return (memcmp(data, rhs.data, len) == 0); + if (need_hash) { + uint64_t hash = 17; + for (i = 0; i < strlen_rados_name; ++i) { + hash += (rados_name[i] * 33); + } + // The extra byte of length is because snprintf always NULL-terminates. + snprintf(fs_path + i, HASH_LENGTH + 1, "_%016" PRIx64, hash); } - bool operator!=(const struct Xattr &rhs) const { - return !((*this) == rhs); + else { + // NULL-terminate. + fs_path[i] = '\0'; } - char *data; - ssize_t len; -}; -// Represents an object that we are backing up -class BackedUpObject + ostringstream oss; + oss << path << "/" << fs_path; + return oss.str(); +} + +ExportDir::ExportDir(int version_, const std::string path_) + : version(version_), + path(path_) { -public: - static int from_file(const char *file_name, const char *dir_name, - std::auto_ptr &obj) - { - char obj_path[strlen(dir_name) + strlen(file_name) + 2]; - snprintf(obj_path, sizeof(obj_path), "%s/%s", dir_name, file_name); - return BackedUpObject::from_path(obj_path, obj); +} + +DirHolder::DirHolder() + : dp(NULL) +{ +} + +DirHolder::~DirHolder() { + if (!dp) + return; + if (closedir(dp)) { + int err = errno; + cerr << ERR_PREFIX << "closedir failed: " << cpp_strerror(err) << std::endl; } + dp = NULL; +} - static int from_path(const char *path, std::auto_ptr &obj) - { - int ret; - FILE *fp = fopen(path, "r"); - if (!fp) { - ret = errno; - if (ret != ENOENT) { - cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying to " - << "open '" << path << "': " << cpp_strerror(ret) << std::endl; - } - return ret; - } - int fd = fileno(fp); - struct stat st_buf; - memset(&st_buf, 0, sizeof(st_buf)); - ret = fstat(fd, &st_buf); - if (ret) { - ret = errno; - fclose(fp); - cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying " - << "to stat '" << path << "': " << cpp_strerror(ret) << std::endl; - return ret; - } +int DirHolder::opendir(const char *dir_name) { + dp = ::opendir(dir_name); + if (!dp) { + int err = errno; + return err; + } + return 0; +} - // get fullname - ssize_t res = fgetxattr(fd, XATTR_FULLNAME, NULL, 0); - if (res <= 0) { - fclose(fp); - ret = errno; - if (res == 0) { - cerr << ERR_PREFIX << "BackedUpObject::from_path: found empty " - << XATTR_FULLNAME << " attribute on '" << path - << "'" << std::endl; - ret = ENODATA; - } else if (ret == ENODATA) { - cerr << ERR_PREFIX << "BackedUpObject::from_path: there was no " - << XATTR_FULLNAME << " attribute found on '" << path - << "'" << std::endl; - } else { - cerr << ERR_PREFIX << "getxattr error: " << cpp_strerror(ret) << std::endl; - } - return ret; - } - char rados_name_[res + 1]; - memset(rados_name_, 0, sizeof(rados_name_)); - res = fgetxattr(fd, XATTR_FULLNAME, rados_name_, res); - if (res < 0) { - ret = errno; - fclose(fp); - cerr << ERR_PREFIX << "BackedUpObject::getxattr(" << XATTR_FULLNAME - << ") error: " << cpp_strerror(ret) << std::endl; - return ret; - } +static __thread int t_iod_idx = -1; - BackedUpObject *o = new BackedUpObject(rados_name_, - st_buf.st_size, st_buf.st_mtime); - if (!o) { - fclose(fp); - return ENOBUFS; - } - ret = o->read_xattrs_from_file(fileno(fp)); - if (ret) { - fclose(fp); - cerr << ERR_PREFIX << "BackedUpObject::from_path(path = '" - << path << "): read_xattrs_from_file returned " << ret << std::endl; - delete o; - return ret; - } +static pthread_mutex_t io_ctx_distributor_lock = PTHREAD_MUTEX_INITIALIZER; - fclose(fp); - obj.reset(o); - return 0; +IoCtxDistributor* IoCtxDistributor::instance() { + IoCtxDistributor *ret; + pthread_mutex_lock(&io_ctx_distributor_lock); + if (s_instance == NULL) { + s_instance = new IoCtxDistributor(); } + ret = s_instance; + pthread_mutex_unlock(&io_ctx_distributor_lock); + return ret; +} - static int from_rados(IoCtx& io_ctx, const char *rados_name_, - auto_ptr &obj) - { - uint64_t rados_size_ = 0; - time_t rados_time_ = 0; - int ret = io_ctx.stat(rados_name_, &rados_size_, &rados_time_); - if (ret == -ENOENT) { - // don't complain here about ENOENT - return ret; - } else if (ret < 0) { - cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '" - << rados_name_ << "'): stat failed with error " << ret << std::endl; - return ret; - } - BackedUpObject *o = new BackedUpObject(rados_name_, rados_size_, rados_time_); - ret = o->read_xattrs_from_rados(io_ctx); +int IoCtxDistributor::init(Rados &cluster, const char *pool_name, + int num_ioctxes) { + m_io_ctxes.resize(num_ioctxes); + for (std::vector::iterator i = m_io_ctxes.begin(); + i != m_io_ctxes.end(); ++i) { + IoCtx &io_ctx(*i); + int ret = cluster.ioctx_create(pool_name, io_ctx); if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '" - << rados_name_ << "'): read_xattrs_from_rados returned " - << ret << std::endl; - delete o; return ret; } - obj.reset(o); - return 0; } + m_highest_iod_idx.set(0); + return 0; +} - ~BackedUpObject() - { - for (std::map < std::string, Xattr* >::iterator x = xattrs.begin(); - x != xattrs.end(); ++x) - { - delete x->second; - x->second = NULL; - } - free(rados_name); +void IoCtxDistributor::clear() { + for (std::vector::iterator i = m_io_ctxes.begin(); + i != m_io_ctxes.end(); ++i) { + IoCtx &io_ctx(*i); + io_ctx.close(); } + m_io_ctxes.clear(); + m_highest_iod_idx.set(0); +} - /* Get the mangled name for this rados object. */ - std::string get_fs_path(const ExportDir *export_dir) const - { - return export_dir->get_fs_path(rados_name); +IoCtx& IoCtxDistributor::get_ioctx() { + if (t_iod_idx == -1) { + t_iod_idx = m_highest_iod_idx.inc() - 1; + } + if (m_io_ctxes.size() <= (unsigned int)t_iod_idx) { + cerr << ERR_PREFIX << "IoCtxDistributor: logic error on line " + << __LINE__ << std::endl; + _exit(1); } + return m_io_ctxes[t_iod_idx]; +} - /* Convert the xattrs on this BackedUpObject to a kind of JSON-like string. - * This is only used for debugging. - * Note that we're assuming we can just treat the xattr data as a - * null-terminated string, which isn't true. Again, this is just for debugging, - * so it doesn't matter. - */ - std::string xattrs_to_str() const - { - ostringstream oss; - std::string prefix; - for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin(); - x != xattrs.end(); ++x) - { - char buf[x->second->len + 1]; - memcpy(buf, x->second->data, x->second->len); - buf[x->second->len] = '\0'; - oss << prefix << "{" << x->first << ":" << buf << "}"; - prefix = ", "; - } - return oss.str(); +IoCtxDistributor *IoCtxDistributor::s_instance = NULL; + +IoCtxDistributor::IoCtxDistributor() { + clear(); +} + +IoCtxDistributor::~IoCtxDistributor() { + clear(); +} + +RadosSyncWQ::RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp) + : ThreadPool::WorkQueue("FileStore::OpWQ", ti, tp), + m_io_ctx_dist(io_ctx_dist) +{ +} + +bool RadosSyncWQ::_enqueue(std::string *s) { + m_items.push_back(s); + return true; +} + +void RadosSyncWQ::_dequeue(std::string *o) { + assert(0); +} + +bool RadosSyncWQ::_empty() { + return m_items.empty(); +} + +std::string *RadosSyncWQ::_dequeue() { + if (m_items.empty()) + return NULL; + std::string *ret = m_items.front(); + m_items.pop_front(); + return ret; +} + +void RadosSyncWQ::_process_finish(std::string *s) { + delete s; +} + +void RadosSyncWQ::_clear() { + for (std::deque::iterator i = m_items.begin(); + i != m_items.end(); ++i) { + delete *i; } + m_items.clear(); +} - /* Diff the extended attributes on this BackedUpObject with those found on a - * different BackedUpObject - */ - void xattr_diff(const BackedUpObject *rhs, - std::list < std::string > &only_in_a, - std::list < std::string > &only_in_b, - std::list < std::string > &diff) const - { - only_in_a.clear(); - only_in_b.clear(); - diff.clear(); - for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin(); - x != xattrs.end(); ++x) - { - std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.find(x->first); - if (r == rhs->xattrs.end()) { - only_in_a.push_back(x->first); - } - else { - const Xattr &r_obj(*r->second); - const Xattr &x_obj(*x->second); - if (r_obj != x_obj) - diff.push_back(x->first); - } - } - for (std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.begin(); - r != rhs->xattrs.end(); ++r) - { - std::map < std::string, Xattr* >::const_iterator x = rhs->xattrs.find(r->first); - if (x == xattrs.end()) { - only_in_b.push_back(r->first); - } +Xattr::Xattr(char *data_, ssize_t len_) + : data(data_), len(len_) +{ +} + +Xattr::~Xattr() { + free(data); +} + +bool Xattr::operator==(const struct Xattr &rhs) const { + if (len != rhs.len) + return false; + return (memcmp(data, rhs.data, len) == 0); +} + +bool Xattr::operator!=(const struct Xattr &rhs) const { + return !((*this) == rhs); +} + +int BackedUpObject::from_file(const char *file_name, const char *dir_name, + std::auto_ptr &obj) +{ + char obj_path[strlen(dir_name) + strlen(file_name) + 2]; + snprintf(obj_path, sizeof(obj_path), "%s/%s", dir_name, file_name); + return BackedUpObject::from_path(obj_path, obj); +} + +int BackedUpObject::from_path(const char *path, std::auto_ptr &obj) +{ + int ret; + FILE *fp = fopen(path, "r"); + if (!fp) { + ret = errno; + if (ret != ENOENT) { + cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying to " + << "open '" << path << "': " << cpp_strerror(ret) << std::endl; } + return ret; + } + int fd = fileno(fp); + struct stat st_buf; + memset(&st_buf, 0, sizeof(st_buf)); + ret = fstat(fd, &st_buf); + if (ret) { + ret = errno; + fclose(fp); + cerr << ERR_PREFIX << "BackedUpObject::from_path: error while trying " + << "to stat '" << path << "': " << cpp_strerror(ret) << std::endl; + return ret; } - void get_xattrs(std::list < std::string > &xattrs_) const - { - for (std::map < std::string, Xattr* >::const_iterator r = xattrs.begin(); - r != xattrs.end(); ++r) - { - xattrs_.push_back(r->first); + // get fullname + ssize_t res = fgetxattr(fd, XATTR_FULLNAME, NULL, 0); + if (res <= 0) { + fclose(fp); + ret = errno; + if (res == 0) { + cerr << ERR_PREFIX << "BackedUpObject::from_path: found empty " + << XATTR_FULLNAME << " attribute on '" << path + << "'" << std::endl; + ret = ENODATA; + } else if (ret == ENODATA) { + cerr << ERR_PREFIX << "BackedUpObject::from_path: there was no " + << XATTR_FULLNAME << " attribute found on '" << path + << "'" << std::endl; + } else { + cerr << ERR_PREFIX << "getxattr error: " << cpp_strerror(ret) << std::endl; } + return ret; } - - const Xattr* get_xattr(const std::string name) const - { - std::map < std::string, Xattr* >::const_iterator x = xattrs.find(name); - if (x == xattrs.end()) - return NULL; - else - return x->second; + char rados_name_[res + 1]; + memset(rados_name_, 0, sizeof(rados_name_)); + res = fgetxattr(fd, XATTR_FULLNAME, rados_name_, res); + if (res < 0) { + ret = errno; + fclose(fp); + cerr << ERR_PREFIX << "BackedUpObject::getxattr(" << XATTR_FULLNAME + << ") error: " << cpp_strerror(ret) << std::endl; + return ret; } - const char *get_rados_name() const { - return rados_name; + BackedUpObject *o = new BackedUpObject(rados_name_, + st_buf.st_size, st_buf.st_mtime); + if (!o) { + fclose(fp); + return ENOBUFS; } - - uint64_t get_rados_size() const { - return rados_size; + ret = o->read_xattrs_from_file(fileno(fp)); + if (ret) { + fclose(fp); + cerr << ERR_PREFIX << "BackedUpObject::from_path(path = '" + << path << "): read_xattrs_from_file returned " << ret << std::endl; + delete o; + return ret; } - time_t get_mtime() const { - return rados_time; - } + fclose(fp); + obj.reset(o); + return 0; +} - int download(IoCtx &io_ctx, const char *path) - { - char tmp_path[strlen(path) + RADOS_SYNC_TMP_SUFFIX_LEN + 1]; - snprintf(tmp_path, sizeof(tmp_path), "%s%s", path, RADOS_SYNC_TMP_SUFFIX); - FILE *fp = fopen(tmp_path, "w"); - if (!fp) { - int err = errno; - cerr << ERR_PREFIX << "download: error opening '" << tmp_path << "':" - << cpp_strerror(err) << std::endl; - return err; - } - int fd = fileno(fp); - uint64_t off = 0; - static const int CHUNK_SZ = 32765; - while (true) { - bufferlist bl; - int rlen = io_ctx.read(rados_name, bl, CHUNK_SZ, off); - if (rlen < 0) { - cerr << ERR_PREFIX << "download: io_ctx.read(" << rados_name << ") returned " - << rlen << std::endl; - return rlen; - } - if (rlen < CHUNK_SZ) - off = 0; - else - off += rlen; - size_t flen = fwrite(bl.c_str(), 1, rlen, fp); - if (flen != (size_t)rlen) { - int err = errno; - cerr << ERR_PREFIX << "download: fwrite(" << tmp_path << ") error: " - << cpp_strerror(err) << std::endl; - fclose(fp); - return err; - } - if (off == 0) - break; - } - size_t attr_sz = strlen(rados_name) + 1; - int res = fsetxattr(fd, XATTR_FULLNAME, rados_name, attr_sz, 0); - if (res) { - int err = errno; - cerr << ERR_PREFIX << "download: fsetxattr(" << tmp_path << ") error: " - << cpp_strerror(err) << std::endl; - fclose(fp); - return err; - } - if (fclose(fp)) { - int err = errno; - cerr << ERR_PREFIX << "download: fclose(" << tmp_path << ") error: " - << cpp_strerror(err) << std::endl; - return err; - } - if (rename(tmp_path, path)) { - int err = errno; - cerr << ERR_PREFIX << "download: rename(" << tmp_path << ", " - << path << ") error: " << cpp_strerror(err) << std::endl; - return err; - } - return 0; +int BackedUpObject::from_rados(IoCtx& io_ctx, const char *rados_name_, + auto_ptr &obj) +{ + uint64_t rados_size_ = 0; + time_t rados_time_ = 0; + int ret = io_ctx.stat(rados_name_, &rados_size_, &rados_time_); + if (ret == -ENOENT) { + // don't complain here about ENOENT + return ret; + } else if (ret < 0) { + cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '" + << rados_name_ << "'): stat failed with error " << ret << std::endl; + return ret; + } + BackedUpObject *o = new BackedUpObject(rados_name_, rados_size_, rados_time_); + ret = o->read_xattrs_from_rados(io_ctx); + if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::from_rados(rados_name_ = '" + << rados_name_ << "'): read_xattrs_from_rados returned " + << ret << std::endl; + delete o; + return ret; } + obj.reset(o); + return 0; +} - int upload(IoCtx &io_ctx, const char *file_name, const char *dir_name) +BackedUpObject::~BackedUpObject() +{ + for (std::map < std::string, Xattr* >::iterator x = xattrs.begin(); + x != xattrs.end(); ++x) { - char path[strlen(file_name) + strlen(dir_name) + 2]; - snprintf(path, sizeof(path), "%s/%s", dir_name, file_name); - FILE *fp = fopen(path, "r"); - if (!fp) { - int err = errno; - cerr << ERR_PREFIX << "upload: error opening '" << path << "': " - << cpp_strerror(err) << std::endl; - return err; - } - // Need to truncate RADOS object to size 0, in case there is - // already something there. - int ret = io_ctx.trunc(rados_name, 0); - if (ret) { - cerr << ERR_PREFIX << "upload: trunc failed with error " << ret << std::endl; - return ret; - } - uint64_t off = 0; - static const int CHUNK_SZ = 32765; - while (true) { - char buf[CHUNK_SZ]; - int flen = fread(buf, CHUNK_SZ, 1, fp); - if (flen < 0) { - int err = errno; - cerr << ERR_PREFIX << "upload: fread(" << file_name << ") error: " - << cpp_strerror(err) << std::endl; - fclose(fp); - return err; - } - if ((flen == 0) && (off != 0)) { - fclose(fp); - break; - } - // There must be a zero-copy way to do this? - bufferlist bl; - bl.append(buf, flen); - int rlen = io_ctx.write(rados_name, bl, flen, off); - if (rlen < 0) { - fclose(fp); - cerr << ERR_PREFIX << "upload: rados_write error: " << rlen << std::endl; - return rlen; - } - if (rlen != flen) { - fclose(fp); - cerr << ERR_PREFIX << "upload: rados_write error: short write" << std::endl; - return -EIO; - } - off += rlen; - if (flen < CHUNK_SZ) { - fclose(fp); - return 0; - } - } - return 0; + delete x->second; + x->second = NULL; } + free(rados_name); +} + +std::string BackedUpObject::get_fs_path(const ExportDir *export_dir) const +{ + return export_dir->get_fs_path(rados_name); +} -private: - BackedUpObject(const char *rados_name_, uint64_t rados_size_, time_t rados_time_) - : rados_name(strdup(rados_name_)), - rados_size(rados_size_), - rados_time(rados_time_) +std::string BackedUpObject::xattrs_to_str() const +{ + ostringstream oss; + std::string prefix; + for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin(); + x != xattrs.end(); ++x) { + char buf[x->second->len + 1]; + memcpy(buf, x->second->data, x->second->len); + buf[x->second->len] = '\0'; + oss << prefix << "{" << x->first << ":" << buf << "}"; + prefix = ", "; } + return oss.str(); +} - int read_xattrs_from_file(int fd) +void BackedUpObject::xattr_diff(const BackedUpObject *rhs, + std::list < std::string > &only_in_a, + std::list < std::string > &only_in_b, + std::list < std::string > &diff) const +{ + only_in_a.clear(); + only_in_b.clear(); + diff.clear(); + for (std::map < std::string, Xattr* >::const_iterator x = xattrs.begin(); + x != xattrs.end(); ++x) { - ssize_t blen = flistxattr(fd, NULL, 0); - if (blen > 0x1000000) { - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: unwilling " - << "to allocate a buffer of size " << blen << " on the stack for " - << "flistxattr." << std::endl; - return ENOBUFS; + std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.find(x->first); + if (r == rhs->xattrs.end()) { + only_in_a.push_back(x->first); } - char buf[blen + 1]; - memset(buf, 0, sizeof(buf)); - ssize_t blen2 = flistxattr(fd, buf, blen); - if (blen != blen2) { - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattrs changed while " - << "we were trying to " - << "list them? First length was " << blen << ", but now it's " << blen2 - << std::endl; - return EDOM; + else { + const Xattr &r_obj(*r->second); + const Xattr &x_obj(*x->second); + if (r_obj != x_obj) + diff.push_back(x->first); } - const char *b = buf; - while (*b) { - size_t bs = strlen(b); - std::string xattr_name = get_user_xattr_name(b); - if (!xattr_name.empty()) { - ssize_t attr_len = fgetxattr(fd, b, NULL, 0); - if (attr_len < 0) { - int err = errno; - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " - << "fgetxattr(rados_name = '" << rados_name << "', xattr_name='" - << xattr_name << "') failed: " << cpp_strerror(err) << std::endl; - return EDOM; - } - char *attr = (char*)malloc(attr_len); - if (!attr) { - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " - << "malloc(" << attr_len << ") failed for xattr_name='" - << xattr_name << "'" << std::endl; - return ENOBUFS; - } - ssize_t attr_len2 = fgetxattr(fd, b, attr, attr_len); - if (attr_len2 < 0) { - int err = errno; - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " - << "fgetxattr(rados_name = '" << rados_name << "', " - << "xattr_name='" << xattr_name << "') failed: " - << cpp_strerror(err) << std::endl; - free(attr); - return EDOM; - } - if (attr_len2 != attr_len) { - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattr " - << "changed while we were trying to get it? " - << "fgetxattr(rados_name = '"<< rados_name - << "', xattr_name='" << xattr_name << "') returned a different length " - << "than when we first called it! old_len = " << attr_len - << "new_len = " << attr_len2 << std::endl; - free(attr); - return EDOM; - } - xattrs[xattr_name] = new Xattr(attr, attr_len); - } - b += (bs + 1); + } + for (std::map < std::string, Xattr* >::const_iterator r = rhs->xattrs.begin(); + r != rhs->xattrs.end(); ++r) + { + std::map < std::string, Xattr* >::const_iterator x = rhs->xattrs.find(r->first); + if (x == xattrs.end()) { + only_in_b.push_back(r->first); } - return 0; } +} - int read_xattrs_from_rados(IoCtx &io_ctx) +void BackedUpObject::get_xattrs(std::list < std::string > &xattrs_) const +{ + for (std::map < std::string, Xattr* >::const_iterator r = xattrs.begin(); + r != xattrs.end(); ++r) { - map attrset; - int ret = io_ctx.getxattrs(rados_name, attrset); - if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_rados: " - << "getxattrs failed with error code " << ret << std::endl; - return ret; - } - for (map::iterator i = attrset.begin(); - i != attrset.end(); ) - { - bufferlist& bl(i->second); - char *data = (char*)malloc(bl.length()); - if (!data) - return ENOBUFS; - memcpy(data, bl.c_str(), bl.length()); - Xattr *xattr = new Xattr(data, bl.length()); - if (!xattr) { - free(data); - return ENOBUFS; - } - xattrs[i->first] = xattr; - attrset.erase(i++); - } - return 0; + xattrs_.push_back(r->first); } +} - // don't allow copying - BackedUpObject &operator=(const BackedUpObject &rhs); - BackedUpObject(const BackedUpObject &rhs); +const Xattr* BackedUpObject::get_xattr(const std::string name) const +{ + std::map < std::string, Xattr* >::const_iterator x = xattrs.find(name); + if (x == xattrs.end()) + return NULL; + else + return x->second; +} - char *rados_name; - uint64_t rados_size; - uint64_t rados_time; - std::map < std::string, Xattr* > xattrs; -}; +const char *BackedUpObject::get_rados_name() const { + return rados_name; +} -static int do_export(IoCtx& io_ctx, const char *dir_name, - bool create, bool force, bool delete_after) -{ - int ret; - librados::ObjectIterator oi = io_ctx.objects_begin(); - librados::ObjectIterator oi_end = io_ctx.objects_end(); - auto_ptr export_dir; - export_dir.reset(ExportDir::create_for_writing(dir_name, 1, create)); - if (!export_dir.get()) - return -EIO; - for (; oi != oi_end; ++oi) { - int flags = 0; - auto_ptr sobj; - auto_ptr dobj; - string rados_name(*oi); - std::list < std::string > only_in_a; - std::list < std::string > only_in_b; - std::list < std::string > diff; - ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), sobj); - if (ret) { - cerr << ERR_PREFIX << "couldn't get '" << rados_name << "' from rados: error " - << ret << std::endl; - return ret; - } - std::string obj_path(sobj->get_fs_path(export_dir.get())); - if (force) { - flags |= (CHANGED_CONTENTS | CHANGED_XATTRS); - } - else { - ret = BackedUpObject::from_path(obj_path.c_str(), dobj); - if (ret == ENOENT) { - sobj->get_xattrs(only_in_a); - flags |= CHANGED_CONTENTS; - } - else if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_path returned " - << ret << std::endl; - return ret; - } - else { - sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff); - if ((sobj->get_rados_size() == dobj->get_rados_size()) && - (sobj->get_mtime() == dobj->get_mtime())) { - flags |= CHANGED_CONTENTS; - } - } - } - if (flags & CHANGED_CONTENTS) { - ret = sobj->download(io_ctx, obj_path.c_str()); - if (ret) { - cerr << ERR_PREFIX << "download error: " << ret << std::endl; - return ret; - } - } - diff.splice(diff.begin(), only_in_a); - for (std::list < std::string >::const_iterator x = diff.begin(); - x != diff.end(); ++x) { - flags |= CHANGED_XATTRS; - const Xattr *xattr = sobj->get_xattr(*x); - if (xattr == NULL) { - cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; - return -ENOSYS; - } - std::string xattr_fs_name(USER_XATTR_PREFIX); - xattr_fs_name += x->c_str(); - ret = setxattr(obj_path.c_str(), xattr_fs_name.c_str(), - xattr->data, xattr->len, 0); - if (ret) { - ret = errno; - cerr << ERR_PREFIX << "setxattr error: " << cpp_strerror(ret) << std::endl; - return ret; - } - } - for (std::list < std::string >::const_iterator x = only_in_b.begin(); - x != only_in_b.end(); ++x) { - flags |= CHANGED_XATTRS; - ret = removexattr(obj_path.c_str(), x->c_str()); - if (ret) { - ret = errno; - cerr << ERR_PREFIX << "removexattr error: " << cpp_strerror(ret) << std::endl; - return ret; - } - } - if (force) { - cout << "[force] " << rados_name << std::endl; - } - else if (flags & CHANGED_CONTENTS) { - cout << "[exported] " << rados_name << std::endl; - } - else if (flags & CHANGED_XATTRS) { - cout << "[xattr] " << rados_name << std::endl; - } - } +uint64_t BackedUpObject::get_rados_size() const { + return rados_size; +} - if (delete_after) { - DirHolder dh; - int err = dh.opendir(dir_name); - if (err) { - cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: " +time_t BackedUpObject::get_mtime() const { + return rados_time; +} + +int BackedUpObject::download(IoCtx &io_ctx, const char *path) +{ + char tmp_path[strlen(path) + RADOS_SYNC_TMP_SUFFIX_LEN + 1]; + snprintf(tmp_path, sizeof(tmp_path), "%s%s", path, RADOS_SYNC_TMP_SUFFIX); + FILE *fp = fopen(tmp_path, "w"); + if (!fp) { + int err = errno; + cerr << ERR_PREFIX << "download: error opening '" << tmp_path << "':" + << cpp_strerror(err) << std::endl; + return err; + } + int fd = fileno(fp); + uint64_t off = 0; + static const int CHUNK_SZ = 32765; + while (true) { + bufferlist bl; + int rlen = io_ctx.read(rados_name, bl, CHUNK_SZ, off); + if (rlen < 0) { + cerr << ERR_PREFIX << "download: io_ctx.read(" << rados_name << ") returned " + << rlen << std::endl; + return rlen; + } + if (rlen < CHUNK_SZ) + off = 0; + else + off += rlen; + size_t flen = fwrite(bl.c_str(), 1, rlen, fp); + if (flen != (size_t)rlen) { + int err = errno; + cerr << ERR_PREFIX << "download: fwrite(" << tmp_path << ") error: " << cpp_strerror(err) << std::endl; + fclose(fp); return err; } - while (true) { - struct dirent *de = readdir(dh.dp); - if (!de) - break; - if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) - continue; - if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX)) { - char path[strlen(dir_name) + strlen(de->d_name) + 2]; - snprintf(path, sizeof(path), "%s/%s", dir_name, de->d_name); - if (unlink(path)) { - ret = errno; - cerr << ERR_PREFIX << "error unlinking temporary file '" << path << "': " - << cpp_strerror(ret) << std::endl; - return ret; - } - cout << "[deleted] " << "removed temporary file '" << de->d_name << "'" << std::endl; - continue; - } - auto_ptr lobj; - ret = BackedUpObject::from_file(de->d_name, dir_name, lobj); - if (ret) { - cout << ERR_PREFIX << "BackedUpObject::from_file: delete loop: " - << "got error " << ret << std::endl; - return ret; - } - auto_ptr robj; - ret = BackedUpObject::from_rados(io_ctx, lobj->get_rados_name(), robj); - if (ret == -ENOENT) { - // The entry doesn't exist on the remote server; delete it locally - char path[strlen(dir_name) + strlen(de->d_name) + 2]; - snprintf(path, sizeof(path), "%s/%s", dir_name, de->d_name); - if (unlink(path)) { - ret = errno; - cerr << ERR_PREFIX << "error unlinking '" << path << "': " - << cpp_strerror(ret) << std::endl; - return ret; - } - cout << "[deleted] " << "removed '" << de->d_name << "'" << std::endl; - } - else if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_rados: delete loop: " - << "got error " << ret << std::endl; - return ret; - } - } + if (off == 0) + break; + } + size_t attr_sz = strlen(rados_name) + 1; + int res = fsetxattr(fd, XATTR_FULLNAME, rados_name, attr_sz, 0); + if (res) { + int err = errno; + cerr << ERR_PREFIX << "download: fsetxattr(" << tmp_path << ") error: " + << cpp_strerror(err) << std::endl; + fclose(fp); + return err; + } + if (fclose(fp)) { + int err = errno; + cerr << ERR_PREFIX << "download: fclose(" << tmp_path << ") error: " + << cpp_strerror(err) << std::endl; + return err; + } + if (rename(tmp_path, path)) { + int err = errno; + cerr << ERR_PREFIX << "download: rename(" << tmp_path << ", " + << path << ") error: " << cpp_strerror(err) << std::endl; + return err; } - cout << "[done]" << std::endl; return 0; } -static int do_import(IoCtx& io_ctx, const char *dir_name, - bool force, bool delete_after) +int BackedUpObject::upload(IoCtx &io_ctx, const char *file_name, const char *dir_name) { - auto_ptr export_dir; - export_dir.reset(ExportDir::from_file_system(dir_name)); - if (!export_dir.get()) - return -EIO; - DirHolder dh; - int ret = dh.opendir(dir_name); + char path[strlen(file_name) + strlen(dir_name) + 2]; + snprintf(path, sizeof(path), "%s/%s", dir_name, file_name); + FILE *fp = fopen(path, "r"); + if (!fp) { + int err = errno; + cerr << ERR_PREFIX << "upload: error opening '" << path << "': " + << cpp_strerror(err) << std::endl; + return err; + } + // Need to truncate RADOS object to size 0, in case there is + // already something there. + int ret = io_ctx.trunc(rados_name, 0); if (ret) { - cerr << ERR_PREFIX << "opendir(" << dir_name << ") error: " - << cpp_strerror(ret) << std::endl; + cerr << ERR_PREFIX << "upload: trunc failed with error " << ret << std::endl; return ret; } + uint64_t off = 0; + static const int CHUNK_SZ = 32765; while (true) { - auto_ptr sobj; - auto_ptr dobj; - std::list < std::string > only_in_a; - std::list < std::string > only_in_b; - std::list < std::string > diff; - int flags = 0; - struct dirent *de = readdir(dh.dp); - if (!de) - break; - if ((strcmp(de->d_name, ".") == 0) || (strcmp(de->d_name, "..") == 0)) - continue; - if (is_suffix(de->d_name, RADOS_SYNC_TMP_SUFFIX)) - continue; - ret = BackedUpObject::from_file(de->d_name, dir_name, sobj); - if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_file: got error " - << ret << std::endl; - return ret; + char buf[CHUNK_SZ]; + int flen = fread(buf, CHUNK_SZ, 1, fp); + if (flen < 0) { + int err = errno; + cerr << ERR_PREFIX << "upload: fread(" << file_name << ") error: " + << cpp_strerror(err) << std::endl; + fclose(fp); + return err; } - const char *rados_name(sobj->get_rados_name()); - if (force) { - flags |= (CHANGED_CONTENTS | CHANGED_XATTRS); + if ((flen == 0) && (off != 0)) { + fclose(fp); + break; } - else { - ret = BackedUpObject::from_rados(io_ctx, rados_name, dobj); - if (ret == -ENOENT) { - flags |= CHANGED_CONTENTS; - sobj->get_xattrs(only_in_a); - } - else if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_rados returned " - << ret << std::endl; - return ret; - } - else { - sobj->xattr_diff(dobj.get(), only_in_a, only_in_b, diff); - if ((sobj->get_rados_size() == dobj->get_rados_size()) && - (sobj->get_mtime() == dobj->get_mtime())) { - flags |= CHANGED_CONTENTS; - } - } + // There must be a zero-copy way to do this? + bufferlist bl; + bl.append(buf, flen); + int rlen = io_ctx.write(rados_name, bl, flen, off); + if (rlen < 0) { + fclose(fp); + cerr << ERR_PREFIX << "upload: rados_write error: " << rlen << std::endl; + return rlen; } - if (flags & CHANGED_CONTENTS) { - ret = sobj->upload(io_ctx, de->d_name, dir_name); - if (ret) { - cerr << ERR_PREFIX << "upload error: " << ret << std::endl; - return ret; - } + if (rlen != flen) { + fclose(fp); + cerr << ERR_PREFIX << "upload: rados_write error: short write" << std::endl; + return -EIO; } - for (std::list < std::string >::const_iterator x = only_in_a.begin(); - x != only_in_a.end(); ++x) { - flags |= CHANGED_XATTRS; - const Xattr *xattr = sobj->get_xattr(*x); - if (xattr == NULL) { - cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; - return -ENOSYS; - } - bufferlist bl; - bl.append(xattr->data, xattr->len); - ret = io_ctx.setxattr(rados_name, x->c_str(), bl); - if (ret < 0) { - ret = errno; - cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name - << "', xattr_name='" << x->c_str() << "'): " << cpp_strerror(ret) - << std::endl; - return ret; - } + off += rlen; + if (flen < CHUNK_SZ) { + fclose(fp); + return 0; } - for (std::list < std::string >::const_iterator x = diff.begin(); - x != diff.end(); ++x) { - flags |= CHANGED_XATTRS; - const Xattr *xattr = sobj->get_xattr(*x); - if (xattr == NULL) { - cerr << ERR_PREFIX << "internal error on line: " << __LINE__ << std::endl; - return -ENOSYS; - } - bufferlist bl; - bl.append(xattr->data, xattr->len); - ret = io_ctx.rmxattr(rados_name, x->c_str()); - if (ret < 0) { - cerr << ERR_PREFIX << "io_ctx.rmxattr error2: " << cpp_strerror(ret) - << std::endl; - return ret; + } + return 0; +} + +BackedUpObject::BackedUpObject(const char *rados_name_, + uint64_t rados_size_, time_t rados_time_) + : rados_name(strdup(rados_name_)), + rados_size(rados_size_), + rados_time(rados_time_) +{ +} + +int BackedUpObject::read_xattrs_from_file(int fd) +{ + ssize_t blen = flistxattr(fd, NULL, 0); + if (blen > 0x1000000) { + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: unwilling " + << "to allocate a buffer of size " << blen << " on the stack for " + << "flistxattr." << std::endl; + return ENOBUFS; + } + char buf[blen + 1]; + memset(buf, 0, sizeof(buf)); + ssize_t blen2 = flistxattr(fd, buf, blen); + if (blen != blen2) { + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattrs changed while " + << "we were trying to " + << "list them? First length was " << blen << ", but now it's " << blen2 + << std::endl; + return EDOM; + } + const char *b = buf; + while (*b) { + size_t bs = strlen(b); + std::string xattr_name = get_user_xattr_name(b); + if (!xattr_name.empty()) { + ssize_t attr_len = fgetxattr(fd, b, NULL, 0); + if (attr_len < 0) { + int err = errno; + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " + << "fgetxattr(rados_name = '" << rados_name << "', xattr_name='" + << xattr_name << "') failed: " << cpp_strerror(err) << std::endl; + return EDOM; + } + char *attr = (char*)malloc(attr_len); + if (!attr) { + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " + << "malloc(" << attr_len << ") failed for xattr_name='" + << xattr_name << "'" << std::endl; + return ENOBUFS; } - ret = io_ctx.setxattr(rados_name, x->c_str(), bl); - if (ret < 0) { - ret = errno; - cerr << ERR_PREFIX << "io_ctx.setxattr(rados_name='" << rados_name - << "', xattr='" << x->c_str() << "'): " << cpp_strerror(ret) << std::endl; - return ret; + ssize_t attr_len2 = fgetxattr(fd, b, attr, attr_len); + if (attr_len2 < 0) { + int err = errno; + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: " + << "fgetxattr(rados_name = '" << rados_name << "', " + << "xattr_name='" << xattr_name << "') failed: " + << cpp_strerror(err) << std::endl; + free(attr); + return EDOM; } - } - for (std::list < std::string >::const_iterator x = only_in_b.begin(); - x != only_in_b.end(); ++x) { - flags |= CHANGED_XATTRS; - ret = io_ctx.rmxattr(rados_name, x->c_str()); - if (ret < 0) { - ret = errno; - cerr << ERR_PREFIX << "rmxattr error3: " << cpp_strerror(ret) << std::endl; - return ret; + if (attr_len2 != attr_len) { + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_file: xattr " + << "changed while we were trying to get it? " + << "fgetxattr(rados_name = '"<< rados_name + << "', xattr_name='" << xattr_name << "') returned a different length " + << "than when we first called it! old_len = " << attr_len + << "new_len = " << attr_len2 << std::endl; + free(attr); + return EDOM; } + xattrs[xattr_name] = new Xattr(attr, attr_len); } - if (force) { - cout << "[force] " << rados_name << std::endl; - } - else if (flags & CHANGED_CONTENTS) { - cout << "[imported] " << rados_name << std::endl; - } - else if (flags & CHANGED_XATTRS) { - cout << "[xattr] " << rados_name << std::endl; - } + b += (bs + 1); } - if (delete_after) { - librados::ObjectIterator oi = io_ctx.objects_begin(); - librados::ObjectIterator oi_end = io_ctx.objects_end(); - for (; oi != oi_end; ++oi) { - string rados_name(*oi); - auto_ptr robj; - ret = BackedUpObject::from_rados(io_ctx, rados_name.c_str(), robj); - if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_rados in delete loop " - << "returned " << ret << std::endl; - return ret; - } - std::string obj_path(robj->get_fs_path(export_dir.get())); - auto_ptr lobj; - ret = BackedUpObject::from_path(obj_path.c_str(), lobj); - if (ret == ENOENT) { - ret = io_ctx.remove(rados_name); - if (ret && ret != -ENOENT) { - cerr << ERR_PREFIX << "io_ctx.remove(" << obj_path << ") failed " - << "with error " << ret << std::endl; - return ret; - } - cout << "[deleted] " << "removed '" << rados_name << "'" << std::endl; - } - else if (ret) { - cerr << ERR_PREFIX << "BackedUpObject::from_path in delete loop " - << "returned " << ret << std::endl; - return ret; - } + return 0; +} + +int BackedUpObject::read_xattrs_from_rados(IoCtx &io_ctx) +{ + map attrset; + int ret = io_ctx.getxattrs(rados_name, attrset); + if (ret) { + cerr << ERR_PREFIX << "BackedUpObject::read_xattrs_from_rados: " + << "getxattrs failed with error code " << ret << std::endl; + return ret; + } + for (map::iterator i = attrset.begin(); + i != attrset.end(); ) + { + bufferlist& bl(i->second); + char *data = (char*)malloc(bl.length()); + if (!data) + return ENOBUFS; + memcpy(data, bl.c_str(), bl.length()); + Xattr *xattr = new Xattr(data, bl.length()); + if (!xattr) { + free(data); + return ENOBUFS; } + xattrs[i->first] = xattr; + attrset.erase(i++); } - cout << "[done]" << std::endl; return 0; } @@ -1143,11 +859,28 @@ int rados_tool_sync(const std::map < std::string, std::string > &opts, } std::string dir_name = (action == "import") ? src : dst; + int num_threads = 3; + IoCtxDistributor *io_ctx_dist = IoCtxDistributor::instance(); + ret = io_ctx_dist->init(rados, pool_name.c_str(), num_threads); + if (ret) { + cerr << ERR_PREFIX << "failed to initialize Radso io contexts." + << std::endl; + _exit(ret); + } + + ThreadPool thread_pool(g_ceph_context, "rados_sync_threadpool", num_threads); + thread_pool.start(); if (action == "import") { - return do_import(io_ctx, src.c_str(), force, delete_after); + ret = do_rados_import(&thread_pool, io_ctx, io_ctx_dist, src.c_str(), + force, delete_after); + thread_pool.stop(); + return ret; } else { - return do_export(io_ctx, dst.c_str(), create, force, delete_after); + ret = do_rados_export(&thread_pool, io_ctx, io_ctx_dist, dst.c_str(), + create, force, delete_after); + thread_pool.stop(); + return ret; } } diff --git a/src/rados_sync.h b/src/rados_sync.h new file mode 100644 index 0000000000000..cc5d03029eddf --- /dev/null +++ b/src/rados_sync.h @@ -0,0 +1,215 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_RADOS_SYNC_H +#define CEPH_RADOS_SYNC_H + +#include "include/atomic.h" +#include "common/WorkQueue.h" + +#include +#include + +namespace librados { + class IoCtx; + class Rados; +} + +extern const char USER_XATTR_PREFIX[]; +extern const char RADOS_SYNC_TMP_SUFFIX[]; +#define ERR_PREFIX "[ERROR] " + +/* Linux seems to use ENODATA instead of ENOATTR when an extended attribute + * is missing */ +#ifndef ENOATTR +#define ENOATTR ENODATA +#endif + +enum { + CHANGED_XATTRS = 0x1, + CHANGED_CONTENTS = 0x2, +}; + +/** Given the name of an extended attribute from a file in the filesystem, + * returns an empty string if the extended attribute does not represent a rados + * user extended attribute. Otherwise, returns the name of the rados extended + * attribute. + * + * Rados user xattrs are prefixed with USER_XATTR_PREFIX. + */ +std::string get_user_xattr_name(const char *fs_xattr_name); + +/* Returns true if 'suffix' is a suffix of str */ +bool is_suffix(const char *str, const char *suffix); + +/** Represents a directory in the filesystem that we export rados objects to (or + * import them from.) + */ +class ExportDir +{ +public: + static ExportDir* create_for_writing(const std::string path, int version, + bool create); + static ExportDir* from_file_system(const std::string path); + + /* Given a rados object name, return something which looks kind of like the + * first part of the name. + * + * The actual file name that the backed-up object is stored in is irrelevant + * to rados_sync. The only reason to make it human-readable at all is to make + * things easier on sysadmins. The XATTR_FULLNAME extended attribute has the + * real, full object name. + * + * This function turns unicode into a bunch of 'at' signs. This could be + * fixed. If you try, be sure to handle all the multibyte characters + * correctly. + * I guess a better hash would be nice too. + */ + std::string get_fs_path(const std::string rados_name) const; + +private: + ExportDir(int version_, const std::string path_); + + int version; + std::string path; +}; + +/** Smart pointer wrapper for a DIR* + */ +class DirHolder { +public: + DirHolder(); + ~DirHolder(); + int opendir(const char *dir_name); + DIR *dp; +}; + +/** IoCtxDistributor is a singleton that distributes out IoCtx instances to + * different threads. + */ +class IoCtxDistributor +{ +public: + static IoCtxDistributor* instance(); + int init(librados::Rados &cluster, const char *pool_name, int num_ioctxes); + void clear(); + librados::IoCtx& get_ioctx(); +private: + static IoCtxDistributor *s_instance; + IoCtxDistributor(); + ~IoCtxDistributor(); + + ceph::atomic_t m_highest_iod_idx; + + /* NB: there might be some false sharing here that we could optimize + * away in the future */ + std::vector m_io_ctxes; +}; + +class RadosSyncWQ : public ThreadPool::WorkQueue { +public: + RadosSyncWQ(IoCtxDistributor *io_ctx_dist, time_t ti, ThreadPool *tp); +protected: + IoCtxDistributor *m_io_ctx_dist; +private: + bool _enqueue(std::string *s); + void _dequeue(std::string *o); + bool _empty(); + std::string *_dequeue(); + void _process_finish(std::string *s); + void _clear(); + std::deque m_items; +}; + +/* Stores a length and a chunk of malloc()ed data */ +class Xattr { +public: + Xattr(char *data_, ssize_t len_); + ~Xattr(); + bool operator==(const struct Xattr &rhs) const; + bool operator!=(const struct Xattr &rhs) const; + + char *data; + ssize_t len; +}; + +/* Represents an object that we are backing up */ +class BackedUpObject +{ +public: + static int from_file(const char *file_name, const char *dir_name, + std::auto_ptr &obj); + static int from_path(const char *path, std::auto_ptr &obj); + static int from_rados(librados::IoCtx& io_ctx, const char *rados_name_, + auto_ptr &obj); + ~BackedUpObject(); + + /* Get the mangled name for this rados object. */ + std::string get_fs_path(const ExportDir *export_dir) const; + + /* Convert the xattrs on this BackedUpObject to a kind of JSON-like string. + * This is only used for debugging. + * Note that we're assuming we can just treat the xattr data as a + * null-terminated string, which isn't true. Again, this is just for debugging, + * so it doesn't matter. + */ + std::string xattrs_to_str() const; + + /* Diff the extended attributes on this BackedUpObject with those found on a + * different BackedUpObject + */ + void xattr_diff(const BackedUpObject *rhs, + std::list < std::string > &only_in_a, + std::list < std::string > &only_in_b, + std::list < std::string > &diff) const; + + void get_xattrs(std::list < std::string > &xattrs_) const; + + const Xattr* get_xattr(const std::string name) const; + + const char *get_rados_name() const; + + uint64_t get_rados_size() const; + + time_t get_mtime() const; + + int download(librados::IoCtx &io_ctx, const char *path); + + int upload(librados::IoCtx &io_ctx, const char *file_name, const char *dir_name); + +private: + BackedUpObject(const char *rados_name_, uint64_t rados_size_, time_t rados_time_); + + int read_xattrs_from_file(int fd); + + int read_xattrs_from_rados(librados::IoCtx &io_ctx); + + // don't allow copying + BackedUpObject &operator=(const BackedUpObject &rhs); + BackedUpObject(const BackedUpObject &rhs); + + char *rados_name; + uint64_t rados_size; + uint64_t rados_time; + std::map < std::string, Xattr* > xattrs; +}; + +extern int do_rados_import(ThreadPool *tp, librados::IoCtx &io_ctx, + IoCtxDistributor* io_ctx_dist, const char *dir_name, + bool force, bool delete_after); +extern int do_rados_export(ThreadPool *tp, librados::IoCtx& io_ctx, + IoCtxDistributor *io_ctx_dist, const char *dir_name, + bool create, bool force, bool delete_after); + +#endif -- 2.39.5