void create_replayer() {
m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
- rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
- m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(),
- m_remote_pool_id, m_remote_image_id, "global image id");
+ m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
+ m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
+ m_remote_ioctx);
}
void start()
m_image_replayer = new MockImageReplayer(
m_threads, m_image_deleter, m_image_sync_throttler,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
- rbd::mirror::RadosRef(new librados::Rados(m_remote_io_ctx)),
- "local_mirror_uuid", "remote_mirror_uuid", m_local_io_ctx.get_id(),
- m_remote_io_ctx.get_id(), m_remote_image_ctx->id, "global image id");
+ "local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
+ m_image_replayer->add_remote_image(
+ "remote_mirror_uuid", m_remote_image_ctx->id, m_remote_io_ctx);
}
virtual void TearDown() {
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<I> image_sync_throttler,
- RadosRef local, RadosRef remote,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- int64_t local_pool_id,
- int64_t remote_pool_id,
- const std::string &remote_image_id,
+ RadosRef local,
+ const std::string &local_mirror_uuid,
+ int64_t local_pool_id,
const std::string &global_image_id) :
m_threads(threads),
m_image_deleter(image_deleter),
m_image_sync_throttler(image_sync_throttler),
m_local(local),
- m_remote(remote),
m_local_mirror_uuid(local_mirror_uuid),
- m_remote_mirror_uuid(remote_mirror_uuid),
- m_remote_pool_id(remote_pool_id),
m_local_pool_id(local_pool_id),
- m_remote_image_id(remote_image_id),
m_global_image_id(global_image_id),
- m_name(stringify(remote_pool_id) + "/" + remote_image_id),
- m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
- remote_image_id),
+ m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
+ global_image_id),
m_progress_cxt(this),
m_journal_listener(new JournalListener(this)),
m_remote_listener(this)
// re-registered using "remote_pool_name/remote_image_name" name.
std::string pool_name;
- int r = m_remote->pool_reverse_lookup(m_remote_pool_id, &pool_name);
+ int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
if (r < 0) {
- derr << "error resolving remote pool " << m_remote_pool_id
+ derr << "error resolving local pool " << m_local_pool_id
<< ": " << cpp_strerror(r) << dendl;
- pool_name = stringify(m_remote_pool_id);
+ pool_name = stringify(m_local_pool_id);
}
m_name = pool_name + "/" + m_global_image_id;
delete m_asok_hook;
}
+template <typename I>
+void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
+ const std::string &image_id,
+ librados::IoCtx &io_ctx) {
+ Mutex::Locker locker(m_lock);
+
+ RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
+ auto it = m_remote_images.find(remote_image);
+ if (it == m_remote_images.end()) {
+ m_remote_images.insert(remote_image);
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
+ const std::string &image_id) {
+ Mutex::Locker locker(m_lock);
+ m_remote_images.erase({mirror_uuid, image_id});
+}
+
+template <typename I>
+bool ImageReplayer<I>::remote_images_empty() const {
+ Mutex::Locker locker(m_lock);
+ return m_remote_images.empty();
+}
+
template <typename I>
void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
dout(20) << r << " " << desc << dendl;
dout(5) << "stopped manually, ignoring start without manual flag"
<< dendl;
r = -EPERM;
+ } else if (m_remote_images.empty()) {
+ derr << "no remote images associated with replayer" << dendl;
+ r = -EINVAL;
} else {
m_state = STATE_STARTING;
m_last_r = 0;
m_state_desc.clear();
m_manual_stop = false;
+ // TODO bootstrap will need to support multiple remote images
+ m_remote_image = *m_remote_images.begin();
+
if (on_finish != nullptr) {
assert(m_on_start_finish == nullptr);
m_on_start_finish = on_finish;
return;
}
- r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
- if (r < 0) {
- derr << "error opening ioctx for remote pool " << m_remote_pool_id
- << ": " << cpp_strerror(r) << dendl;
- on_start_fail(r, "error opening remote pool");
- return;
- }
-
r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
if (r < 0) {
derr << "error opening ioctx for local pool " << m_local_pool_id
m_remote_journaler = new Journaler(m_threads->work_queue,
m_threads->timer,
- &m_threads->timer_lock, m_remote_ioctx,
- m_remote_image_id, m_local_mirror_uuid,
- settings);
+ &m_threads->timer_lock,
+ m_remote_image.io_ctx,
+ m_remote_image.image_id,
+ m_local_mirror_uuid, settings);
bootstrap();
}
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
BootstrapRequest<I> *request = BootstrapRequest<I>::create(
- m_local_ioctx, m_remote_ioctx, m_image_sync_throttler,
- &m_local_image_ctx, m_local_image_name, m_remote_image_id,
+ m_local_ioctx, m_remote_image.io_ctx, m_image_sync_throttler,
+ &m_local_image_ctx, m_local_image_name, m_remote_image.image_id,
m_global_image_id, m_threads->work_queue, m_threads->timer,
- &m_threads->timer_lock, m_local_mirror_uuid, m_remote_mirror_uuid,
+ &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
m_remote_journaler, &m_client_meta, ctx, &m_do_resync, &m_progress_cxt);
{
std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
mirror_uuid == m_local_mirror_uuid) {
- mirror_uuid = m_remote_mirror_uuid;
+ mirror_uuid = m_remote_image.mirror_uuid;
} else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
dout(5) << "encountered image demotion: stopping" << dendl;
Mutex::Locker locker(m_lock);
librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
- predecessor.mirror_uuid = m_remote_mirror_uuid;
+ predecessor.mirror_uuid = m_remote_image.mirror_uuid;
} else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
}
dout(20) << "stop complete" << dendl;
m_local_ioctx.close();
- m_remote_ioctx.close();
ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
m_replay_status_formatter = nullptr;
#include "ImageDeleter.h"
#include "ProgressContext.h"
#include "types.h"
+#include <set>
+#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
class AdminSocketHook;
ImageReplayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
- RadosRef local, RadosRef remote,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid, int64_t local_pool_id,
- int64_t remote_pool_id, const std::string &remote_image_id,
- const std::string &global_image_id);
+ RadosRef local, const std::string &local_mirror_uuid,
+ int64_t local_pool_id, const std::string &global_image_id);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
return (m_last_r == -EBLACKLISTED);
}
+ void add_remote_image(const std::string &remote_mirror_uuid,
+ const std::string &remote_image_id,
+ librados::IoCtx &remote_io_ctx);
+ void remove_remote_image(const std::string &remote_mirror_uuid,
+ const std::string &remote_image_id);
+ bool remote_images_empty() const;
+
inline int64_t get_local_pool_id() const {
return m_local_pool_id;
}
- inline int64_t get_remote_pool_id() const {
- return m_remote_pool_id;
- }
inline const std::string& get_global_image_id() const {
return m_global_image_id;
}
- inline const std::string& get_remote_image_id() const {
- return m_remote_image_id;
- }
inline std::string get_local_image_id() {
Mutex::Locker locker(m_lock);
return m_local_image_id;
bool on_replay_interrupted();
private:
+ struct RemoteImage {
+ std::string mirror_uuid;
+ std::string image_id;
+ librados::IoCtx io_ctx;
+
+ RemoteImage() {
+ }
+ RemoteImage(const std::string &mirror_uuid,
+ const std::string &image_id)
+ : mirror_uuid(mirror_uuid), image_id(image_id) {
+ }
+ RemoteImage(const std::string &mirror_uuid,
+ const std::string &image_id,
+ librados::IoCtx &io_ctx)
+ : mirror_uuid(mirror_uuid), image_id(image_id), io_ctx(io_ctx) {
+ }
+
+ inline bool operator<(const RemoteImage &rhs) const {
+ if (mirror_uuid != rhs.mirror_uuid) {
+ return mirror_uuid < rhs.mirror_uuid;
+ } else {
+ return image_id < rhs.image_id;
+ }
+ }
+ inline bool operator==(const RemoteImage &rhs) const {
+ return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
+ }
+ };
+
+ typedef std::set<RemoteImage> RemoteImages;
+
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
typedef boost::optional<State> OptionalState;
Threads *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
- RadosRef m_local, m_remote;
+
+ RemoteImages m_remote_images;
+ RemoteImage m_remote_image;
+
+ RadosRef m_local;
std::string m_local_mirror_uuid;
- std::string m_remote_mirror_uuid;
- int64_t m_remote_pool_id, m_local_pool_id;
- std::string m_remote_image_id, m_local_image_id, m_global_image_id;
+ int64_t m_local_pool_id;
+ std::string m_local_image_id;
+ std::string m_global_image_id;
std::string m_local_image_name;
std::string m_name;
mutable Mutex m_lock;
image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
nullptr;
- librados::IoCtx m_local_ioctx, m_remote_ioctx;
+ librados::IoCtx m_local_ioctx;
ImageCtxT *m_local_image_ctx = nullptr;
decltype(ImageCtxT::journal) m_local_journal = nullptr;
for (auto image_it = m_image_replayers.begin();
image_it != m_image_replayers.end();) {
auto image_id_it = image_ids.find(image_it->first);
- if (image_id_it == image_ids.end() ||
- image_id_it->id != image_it->second->get_remote_image_id()) {
+ if (image_id_it == image_ids.end()) {
if (image_it->second->is_running()) {
dout(20) << "stop image replayer for remote image "
<< image_it->second->get_global_image_id() << dendl;
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
- m_remote_rados, local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
- m_remote_pool_id, image_id.id, image_id.global_id));
+ local_mirror_uuid, m_local_pool_id, image_id.global_id));
it = m_image_replayers.insert(
std::make_pair(image_id.global_id, std::move(image_replayer))).first;
- } else if (image_id.id != it->second->get_remote_image_id()) {
- // mismatched replayer in progress of stopping
- continue;
}
+
+ it->second->add_remote_image(remote_mirror_uuid, image_id.id,
+ m_remote_io_ctx);
if (!it->second->is_running()) {
dout(20) << "starting image replayer for remote image "
<< image_id.global_id << dendl;