m_threads.get(), m_image_deleter.get(), m_instance_watcher,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
m_local_mirror_uuid, m_local_ioctx.get_id(), m_global_image_id);
- m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
- m_remote_ioctx);
+ m_replayer->add_peer("peer uuid", m_remote_ioctx);
}
void start()
m_threads, &mock_image_deleter, &m_instance_watcher,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
- m_image_replayer->add_remote_image(
- "remote_mirror_uuid", m_remote_image_ctx->id, m_remote_io_ctx);
+ m_image_replayer->add_peer("peer_uuid", m_remote_io_ctx);
}
librbd::ImageCtx *m_remote_image_ctx;
MOCK_METHOD0(restart, void());
MOCK_METHOD0(flush, void());
MOCK_METHOD2(print_status, void(Formatter *, stringstream *));
- MOCK_METHOD3(add_remote_image, void(const std::string &,
- const std::string &,
- librados::IoCtx &));
- MOCK_METHOD3(remove_remote_image, void(const std::string &,
- const std::string &,
- bool));
- MOCK_METHOD0(remote_images_empty, bool());
+ MOCK_METHOD2(add_peer, void(const std::string &, librados::IoCtx &));
MOCK_METHOD0(get_global_image_id, const std::string &());
MOCK_METHOD0(get_local_image_id, const std::string &());
MOCK_METHOD0(is_running, bool());
InSequence seq;
instance_replayer.init();
- instance_replayer.add_peer("remote_mirror_uuid", m_remote_io_ctx);
+ instance_replayer.add_peer("peer_uuid", m_remote_io_ctx);
// Acquire
C_SaferCond on_acquire;
- EXPECT_CALL(mock_image_replayer, add_remote_image("remote_mirror_uuid",
- "remote_image_id", _));
+ EXPECT_CALL(mock_image_replayer, add_peer("peer_uuid", _));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
C_SaferCond on_release;
- EXPECT_CALL(mock_image_replayer,
- remove_remote_image("remote_mirror_uuid", "remote_image_id",
- false));
- EXPECT_CALL(mock_image_replayer, remote_images_empty())
- .WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, is_stopped())
.WillOnce(Return(false));
EXPECT_CALL(mock_image_replayer, is_running())
}
template <typename I>
-void ImageReplayer<I>::add_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- librados::IoCtx &io_ctx) {
+void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
+ librados::IoCtx &io_ctx) {
Mutex::Locker locker(m_lock);
-
- RemoteImage remote_image(mirror_uuid, image_id, io_ctx);
- auto it = m_remote_images.find(remote_image);
- if (it == m_remote_images.end()) {
- m_remote_images.insert(remote_image);
+ auto it = m_peers.find({peer_uuid});
+ if (it == m_peers.end()) {
+ m_peers.insert({peer_uuid, io_ctx});
}
}
-template <typename I>
-void ImageReplayer<I>::remove_remote_image(const std::string &mirror_uuid,
- const std::string &image_id,
- bool schedule_delete) {
- Mutex::Locker locker(m_lock);
- m_remote_images.erase({mirror_uuid, image_id});
-}
-
-template <typename I>
-bool ImageReplayer<I>::remote_images_empty() const {
- Mutex::Locker locker(m_lock);
- return m_remote_images.empty();
-}
-
template <typename I>
void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
dout(20) << r << " " << desc << dendl;
template <typename I>
void ImageReplayer<I>::prepare_remote_image() {
dout(20) << dendl;
- if (m_remote_images.empty()) {
+ if (m_peers.empty()) {
on_start_fail(-EREMOTEIO, "waiting for primary remote image");
return;
}
// TODO bootstrap will need to support multiple remote images
- m_remote_image = *m_remote_images.begin();
+ m_remote_image = {*m_peers.begin()};
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
image_replayer::HealthState get_health_state() const;
- void add_remote_image(const std::string &remote_mirror_uuid,
- const std::string &remote_image_id,
- librados::IoCtx &remote_io_ctx);
- void remove_remote_image(const std::string &remote_mirror_uuid,
- const std::string &remote_image_id,
- bool schedule_delete);
- bool remote_images_empty() const;
+ void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx);
inline int64_t get_local_pool_id() const {
return m_local_pool_id;
RemoteImage() {
}
- RemoteImage(const std::string &mirror_uuid,
- const std::string &image_id)
- : mirror_uuid(mirror_uuid), image_id(image_id) {
- }
- RemoteImage(const std::string &mirror_uuid,
- const std::string &image_id,
- librados::IoCtx &io_ctx)
- : mirror_uuid(mirror_uuid), image_id(image_id), io_ctx(io_ctx) {
- }
-
- inline bool operator<(const RemoteImage &rhs) const {
- if (mirror_uuid != rhs.mirror_uuid) {
- return mirror_uuid < rhs.mirror_uuid;
- } else {
- return image_id < rhs.image_id;
- }
- }
- inline bool operator==(const RemoteImage &rhs) const {
- return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
+ RemoteImage(const Peer& peer) : io_ctx(peer.io_ctx) {
}
};
- typedef std::set<RemoteImage> RemoteImages;
-
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
typedef boost::optional<State> OptionalState;
typedef boost::optional<cls::rbd::MirrorImageStatusState>
ImageDeleter<ImageCtxT>* m_image_deleter;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
- RemoteImages m_remote_images;
+ Peers m_peers;
RemoteImage m_remote_image;
RadosRef m_local;
}
template <typename I>
-void InstanceReplayer<I>::add_peer(std::string mirror_uuid,
+void InstanceReplayer<I>::add_peer(std::string peer_uuid,
librados::IoCtx io_ctx) {
- dout(20) << mirror_uuid << dendl;
+ dout(20) << peer_uuid << dendl;
Mutex::Locker locker(m_lock);
- auto result = m_peers.insert(Peer(mirror_uuid, io_ctx)).second;
+ auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
assert(result);
}
-template <typename I>
-void InstanceReplayer<I>::remove_peer(std::string mirror_uuid) {
- dout(20) << mirror_uuid << dendl;
-
- Mutex::Locker locker(m_lock);
- auto result = m_peers.erase(Peer(mirror_uuid));
- assert(result > 0);
-}
-
template <typename I>
void InstanceReplayer<I>::release_all(Context *on_finish) {
dout(20) << dendl;
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
m_threads, m_image_deleter, instance_watcher, m_local_rados,
it = m_image_replayers.insert(std::make_pair(global_image_id,
image_replayer)).first;
- }
-
- auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- auto iter = m_peers.find(Peer(peer_mirror_uuid));
- assert(iter != m_peers.end());
- auto io_ctx = iter->io_ctx;
- image_replayer->add_remote_image(peer_mirror_uuid, peer_image_id, io_ctx);
+ // TODO only a single peer is currently supported
+ assert(m_peers.size() == 1);
+ auto peer = *m_peers.begin();
+ image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
}
- start_image_replayer(image_replayer);
+ start_image_replayer(it->second);
m_threads->work_queue->queue(on_finish, 0);
}
assert(m_on_shut_down == nullptr);
auto it = m_image_replayers.find(global_image_id);
-
if (it == m_image_replayers.end()) {
dout(20) << global_image_id << ": not found" << dendl;
m_threads->work_queue->queue(on_finish, 0);
}
auto image_replayer = it->second;
- if (!peer_mirror_uuid.empty()) {
- image_replayer->remove_remote_image(peer_mirror_uuid, peer_image_id,
- schedule_delete);
- }
-
- if (!image_replayer->remote_images_empty()) {
- dout(20) << global_image_id << ": still has peer images" << dendl;
- m_threads->work_queue->queue(on_finish, 0);
- return;
- }
-
m_image_replayers.erase(it);
on_finish = new FunctionContext(
image_replayer->destroy();
on_finish->complete(0);
});
-
- if (schedule_delete) {
- on_finish = new FunctionContext(
- [this, image_replayer, on_finish] (int r) {
- auto global_image_id = image_replayer->get_global_image_id();
- m_image_deleter->schedule_image_delete(
- m_local_rados, m_local_pool_id, global_image_id, false);
- on_finish->complete(0);
- });
- }
-
stop_image_replayer(image_replayer, on_finish);
}
void init(Context *on_finish);
void shut_down(Context *on_finish);
- void add_peer(std::string mirror_uuid, librados::IoCtx io_ctx);
- void remove_peer(std::string mirror_uuid);
+ void add_peer(std::string peer_uuid, librados::IoCtx io_ctx);
void acquire_image(InstanceWatcher<ImageCtxT> *instance_watcher,
const std::string &global_image_id,
* @endverbatim
*/
- struct Peer {
- std::string mirror_uuid;
- librados::IoCtx io_ctx;
-
- Peer() {
- }
-
- Peer(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) {
- }
-
- Peer(const std::string &mirror_uuid, librados::IoCtx &io_ctx)
- : mirror_uuid(mirror_uuid), io_ctx(io_ctx) {
- }
-
- inline bool operator<(const Peer &rhs) const {
- return mirror_uuid < rhs.mirror_uuid;
- }
- inline bool operator==(const Peer &rhs) const {
- return mirror_uuid == rhs.mirror_uuid;
- }
- };
-
- typedef std::set<Peer> Peers;
-
Threads<ImageCtxT> *m_threads;
ServiceDaemon<ImageCtxT>* m_service_daemon;
ImageDeleter<ImageCtxT>* m_image_deleter;
m_remote_pool_watcher->get_image_count());
}
- std::string removed_remote_peer_id;
- ImageIds removed_remote_image_ids;
- if (m_initial_mirror_image_ids.find(mirror_uuid) ==
- m_initial_mirror_image_ids.end() &&
- m_initial_mirror_image_ids.size() < 2) {
- m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
-
- if (m_initial_mirror_image_ids.size() == 2) {
- dout(10) << "local and remote pools refreshed" << dendl;
-
- // both local and remote initial pool listing received. derive
- // removal notifications for the remote pool
- auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
- auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
- removed_remote_peer_id = m_initial_mirror_image_ids.rbegin()->first;
- for (auto &local_image_id : local_image_ids) {
- if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
- removed_remote_image_ids.emplace(local_image_id.global_id, "");
- }
- }
- local_image_ids.clear();
- remote_image_ids.clear();
- }
- }
-
- if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
- m_instance_replayer->remove_peer(m_peer.uuid);
- m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
- m_peer.uuid = mirror_uuid;
- }
-
m_update_op_tracker.start_op();
Context *ctx = new FunctionContext([this](int r) {
dout(20) << "complete handle_update: r=" << r << dendl;
gather_ctx->new_sub());
}
- for (auto &image_id : removed_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
- mirror_uuid, image_id.id, true,
- gather_ctx->new_sub());
- if (!mirror_uuid.empty()) {
+ if (!mirror_uuid.empty()) {
+ for (auto &image_id : removed_image_ids) {
+ // for now always send to myself (the leader)
+ std::string &instance_id = m_instance_watcher->get_instance_id();
m_instance_watcher->notify_peer_image_removed(instance_id,
image_id.global_id,
mirror_uuid,
}
}
- // derived removal events for remote after initial image listing
- for (auto& image_id : removed_remote_image_ids) {
- // for now always send to myself (the leader)
- std::string &instance_id = m_instance_watcher->get_instance_id();
- m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
- removed_remote_peer_id,
- image_id.id, true,
- gather_ctx->new_sub());
- }
-
gather_ctx->activate();
}
assert(!m_local_pool_watcher);
m_local_pool_watcher.reset(new PoolWatcher<>(
m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
- m_initial_mirror_image_ids.clear();
// ensure the initial set of local images is up-to-date
// after acquiring the leader role
std::string m_asok_hook_name;
AdminSocketHook *m_asok_hook = nullptr;
- std::map<std::string, ImageIds> m_initial_mirror_image_ids;
-
service_daemon::CalloutId m_callout_id = service_daemon::CALLOUT_ID_NONE;
class PoolReplayerThread : public Thread {
#include <string>
#include <vector>
+#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
namespace rbd {
typedef std::set<ImageId> ImageIds;
+struct Peer {
+ std::string peer_uuid;
+ librados::IoCtx io_ctx;
+
+ Peer() {
+ }
+ Peer(const std::string &peer_uuid) : peer_uuid(peer_uuid) {
+ }
+ Peer(const std::string &peer_uuid, librados::IoCtx& io_ctx)
+ : peer_uuid(peer_uuid), io_ctx(io_ctx) {
+ }
+
+ inline bool operator<(const Peer &rhs) const {
+ return peer_uuid < rhs.peer_uuid;
+ }
+};
+
+typedef std::set<Peer> Peers;
+
struct peer_t {
peer_t() = default;
peer_t(const std::string &uuid, const std::string &cluster_name,