#include "common/errno.h"
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "global/global_context.h"
#include "librbd/ObjectWatcher.h"
#include "librbd/internal.h"
#include "Replayer.h"
Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<> image_sync_throttler,
- RadosRef local_cluster, int64_t local_pool_id,
- const peer_t &peer, const std::vector<const char*> &args) :
+ int64_t local_pool_id, const peer_t &peer,
+ const std::vector<const char*> &args) :
m_threads(threads),
m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler),
m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
m_peer(peer),
m_args(args),
- m_local(local_cluster),
- m_remote(new librados::Rados),
m_local_pool_id(local_pool_id),
m_asok_hook(nullptr),
m_replayer_thread(this)
{
dout(20) << "replaying for " << m_peer << dendl;
- int r = m_local->ioctx_create2(m_local_pool_id, m_local_io_ctx);
+ int r = init_rados(g_ceph_context->_conf->cluster,
+ g_ceph_context->_conf->name.to_str(),
+ "local cluster", &m_local_rados);
+ if (r < 0) {
+ return r;
+ }
+
+ r = init_rados(m_peer.cluster_name, m_peer.client_name,
+ std::string("remote peer ") + stringify(m_peer),
+ &m_remote_rados);
+ if (r < 0) {
+ return r;
+ }
+
+ r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
if (r < 0) {
derr << "error accessing local pool " << m_local_pool_id << ": "
<< cpp_strerror(r) << dendl;
return r;
}
+ r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
+ m_remote_io_ctx);
+ if (r < 0) {
+ derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ m_remote_pool_id = m_remote_io_ctx.get_id();
+
+ dout(20) << "connected to " << m_peer << dendl;
+
+ // Bootstrap existing mirroring images
+ init_local_mirroring_images();
+
+ // TODO: make interval configurable
+ m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond));
+ m_pool_watcher->refresh_images();
+
+ m_replayer_thread.create("replayer");
+
+ return 0;
+}
+
+int Replayer::init_rados(const std::string &cluster_name,
+ const std::string &client_name,
+ const std::string &description, RadosRef *rados_ref) {
+ rados_ref->reset(new librados::Rados());
+
// NOTE: manually bootstrap a CephContext here instead of via
// the librados API to avoid mixing global singletons between
// the librados shared library and the daemon
// TODO: eliminate intermingling of global singletons within Ceph APIs
CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
- if (m_peer.client_name.empty() ||
- !iparams.name.from_str(m_peer.client_name)) {
- derr << "error initializing remote cluster handle for " << m_peer << dendl;
+ if (client_name.empty() || !iparams.name.from_str(client_name)) {
+ derr << "error initializing cluster handle for " << description << dendl;
return -EINVAL;
}
CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
- cct->_conf->cluster = m_peer.cluster_name;
+ cct->_conf->cluster = cluster_name;
// librados::Rados::conf_read_file
- r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
+ int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
if (r < 0) {
- derr << "could not read ceph conf for " << m_peer << ": "
+ derr << "could not read ceph conf for " << description << ": "
<< cpp_strerror(r) << dendl;
cct->put();
return r;
env_to_vec(args, nullptr);
r = cct->_conf->parse_argv(args);
if (r < 0) {
- derr << "could not parse environment for " << m_peer << ":"
+ derr << "could not parse environment for " << description << ":"
<< cpp_strerror(r) << dendl;
cct->put();
return r;
if (!m_args.empty()) {
// librados::Rados::conf_parse_argv
- r = cct->_conf->parse_argv(m_args);
+ args = m_args;
+ r = cct->_conf->parse_argv(args);
if (r < 0) {
- derr << "could not parse command line args for " << m_peer << ": "
+ derr << "could not parse command line args for " << description << ": "
<< cpp_strerror(r) << dendl;
cct->put();
return r;
cct->_conf->apply_changes(nullptr);
cct->_conf->complain_about_parse_errors(cct);
- r = m_remote->init_with_context(cct);
+ r = (*rados_ref)->init_with_context(cct);
assert(r == 0);
cct->put();
- r = m_remote->connect();
- if (r < 0) {
- derr << "error connecting to remote cluster " << m_peer
- << " : " << cpp_strerror(r) << dendl;
- return r;
- }
-
- r = m_remote->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
- m_remote_io_ctx);
+ r = (*rados_ref)->connect();
if (r < 0) {
- derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
- << ": " << cpp_strerror(r) << dendl;
+ derr << "error connecting to " << description << ": "
+ << cpp_strerror(r) << dendl;
return r;
}
- m_remote_pool_id = m_remote_io_ctx.get_id();
-
- dout(20) << "connected to " << m_peer << dendl;
-
- // Bootstrap existing mirroring images
- init_local_mirroring_images();
-
- // TODO: make interval configurable
- m_pool_watcher.reset(new PoolWatcher(m_remote_io_ctx, 30, m_lock, m_cond));
- m_pool_watcher->refresh_images();
-
- m_replayer_thread.create("replayer");
return 0;
}
m_asok_hook_name = asok_hook_name;
delete m_asok_hook;
- CephContext *cct = static_cast<CephContext *>(m_local->cct());
- m_asok_hook = new ReplayerAdminSocketHook(cct, m_asok_hook_name, this);
+ m_asok_hook = new ReplayerAdminSocketHook(g_ceph_context,
+ m_asok_hook_name, this);
}
Mutex::Locker l(m_lock);
auto it = m_image_replayers.find(image_id.id);
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
- m_threads, m_image_deleter, m_image_sync_throttler, m_local, m_remote,
- local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
+ m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
+ m_remote_rados, local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
m_remote_pool_id, image_id.id, image_id.global_id));
it = m_image_replayers.insert(
std::make_pair(image_id.id, std::move(image_replayer))).first;