const std::string &, Context *));
MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
const std::string &, bool, Context *));
+ MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&,
+ Context *));
};
template <>
delete instance_watcher2;
}
+TEST_F(TestMockInstanceWatcher, PeerImageRemoved) {
+ MockManagedLock mock_managed_lock;
+
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ std::string instance_id1 = m_instance_id;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ MockInstanceReplayer mock_instance_replayer1;
+ auto instance_watcher1 = MockInstanceWatcher::create(
+ io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
+
+ librados::Rados cluster;
+ librados::IoCtx io_ctx2;
+ EXPECT_EQ("", connect_cluster_pp(cluster));
+ EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+ std::string instance_id2 = stringify(io_ctx2.get_instance_id());
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+ MockInstanceReplayer mock_instance_replayer2;
+ auto instance_watcher2 = MockInstanceWatcher::create(
+ io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
+
+ InSequence seq;
+
+ // Init instance watcher 1
+ expect_register_instance(mock_io_ctx1, 0);
+ expect_register_watch(mock_io_ctx1, instance_id1);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher1->init());
+
+ // Init instance watcher 2
+ expect_register_instance(mock_io_ctx2, 0);
+ expect_register_watch(mock_io_ctx2, instance_id2);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher2->init());
+
+ // Peer Image Removed on the same instance
+ EXPECT_CALL(mock_instance_replayer1, remove_peer_image("gid", "uuid", _))
+ .WillOnce(WithArg<2>(CompleteContext(0)));
+ C_SaferCond on_removed1;
+ instance_watcher1->notify_peer_image_removed(instance_id1, "gid", "uuid",
+ &on_removed1);
+ ASSERT_EQ(0, on_removed1.wait());
+
+ // Peer Image Removed on the other instance
+ EXPECT_CALL(mock_instance_replayer2, remove_peer_image("gid", "uuid", _))
+ .WillOnce(WithArg<2>(CompleteContext(0)));
+ C_SaferCond on_removed2;
+ instance_watcher1->notify_peer_image_removed(instance_id2, "gid", "uuid",
+ &on_removed2);
+ ASSERT_EQ(0, on_removed2.wait());
+
+ // Shutdown instance watcher 1
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx1);
+ expect_unregister_instance(mock_io_ctx1, 0);
+ instance_watcher1->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher1;
+
+ // Shutdown instance watcher 2
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx2);
+ expect_unregister_instance(mock_io_ctx2, 0);
+ instance_watcher2->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher2;
+}
+
TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
MockManagedLock mock_managed_lock;
librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
delete instance_watcher;
}
+TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) {
+ MockManagedLock mock_managed_lock;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
+
+ auto instance_watcher = new MockInstanceWatcher(
+ m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
+ InSequence seq;
+
+ // Init
+ expect_register_instance(mock_io_ctx, 0);
+ expect_register_watch(mock_io_ctx);
+ expect_acquire_lock(mock_managed_lock, 0);
+ ASSERT_EQ(0, instance_watcher->init());
+
+ // Send Acquire Image and cancel
+ EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
+ .WillOnce(Invoke(
+ [this, instance_watcher, &mock_io_ctx](
+ const std::string& o, librados::AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
+ c->get();
+ auto ctx = new FunctionContext(
+ [instance_watcher, &mock_io_ctx, c, pbl](int r) {
+ instance_watcher->cancel_notify_requests("other");
+ ::encode(librbd::watcher::NotifyResponse(), *pbl);
+ mock_io_ctx.get_mock_rados_client()->
+ finish_aio_completion(c, -ETIMEDOUT);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+ }));
+
+ C_SaferCond on_acquire;
+ instance_watcher->notify_peer_image_removed("other", "gid", "uuid",
+ &on_acquire);
+ ASSERT_EQ(-ECANCELED, on_acquire.wait());
+
+ // Shutdown
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx);
+ expect_unregister_instance(mock_io_ctx, 0);
+ instance_watcher->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher;
+}
+
+
class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
public:
typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
stop_image_replayer(image_replayer, on_finish);
}
+template <typename I>
+void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ // TODO
+ m_threads->work_queue->queue(on_finish, 0);
+}
+
template <typename I>
void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
dout(20) << dendl;
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
bool schedule_delete, Context *on_finish);
+ void remove_peer_image(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish);
+
void release_all(Context *on_finish);
void print_status(Formatter *f, stringstream *ss);
}
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
- << ": sendding to " << instance_id << dendl;
+ << ": sending to " << instance_id << dendl;
notifier->notify(bl, &response, this);
}
}
}
+template <typename I>
+void InstanceWatcher<I>::notify_peer_image_removed(
+ const std::string &instance_id, const std::string &global_image_id,
+ const std::string &peer_mirror_uuid, Context *on_notify_ack) {
+ dout(20) << "instance_id=" << instance_id << ", "
+ << "global_image_id=" << global_image_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_on_finish == nullptr);
+
+ if (instance_id == m_instance_id) {
+ handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
+ } else {
+ uint64_t request_id = ++m_request_seq;
+ bufferlist bl;
+ ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
+ peer_mirror_uuid}}, bl);
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), on_notify_ack);
+ req->send();
+ }
+}
+
template <typename I>
void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
Context *on_sync_start) {
m_work_queue->queue(ctx, 0);
}
+template <typename I>
+void InstanceWatcher<I>::handle_peer_image_removed(
+ const std::string &global_image_id, const std::string &peer_mirror_uuid,
+ Context *on_finish) {
+ dout(20) << "global_image_id=" << global_image_id << ", "
+ << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
+
+ auto ctx = new FunctionContext(
+ [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
+ m_instance_replayer->remove_peer_image(global_image_id,
+ peer_mirror_uuid, on_finish);
+ m_notify_op_tracker.finish_op();
+ });
+
+ m_notify_op_tracker.start_op();
+ m_work_queue->queue(ctx, 0);
+}
+
template <typename I>
void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
const std::string &sync_id,
}
}
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const PeerImageRemovedPayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish != nullptr) {
+ handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
+ on_finish);
+ }
+}
+
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const SyncRequestPayload &payload,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
bool schedule_delete, Context *on_notify_ack);
+ void notify_peer_image_removed(const std::string &instance_id,
+ const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_notify_ack);
void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
bool cancel_sync_request(const std::string &sync_id);
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
bool schedule_delete, Context *on_finish);
+ void handle_peer_image_removed(const std::string &global_image_id,
+ const std::string &peer_mirror_uuid,
+ Context *on_finish);
void handle_sync_request(const std::string &instance_id,
const std::string &sync_id, Context *on_finish);
void handle_payload(const std::string &instance_id,
const instance_watcher::ImageReleasePayload &payload,
C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::PeerImageRemovedPayload &payload,
+ C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::SyncRequestPayload &payload,
C_NotifyAck *on_notify_ack);
m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
mirror_uuid, image_id.id, true,
gather_ctx->new_sub());
+ if (!mirror_uuid.empty()) {
+ m_instance_watcher->notify_peer_image_removed(instance_id,
+ image_id.global_id,
+ mirror_uuid,
+ gather_ctx->new_sub());
+ }
}
// derived removal events for remote after initial image listing
f->dump_bool("schedule_delete", schedule_delete);
}
+void PeerImageRemovedPayload::encode(bufferlist &bl) const {
+ PayloadBase::encode(bl);
+ ::encode(global_image_id, bl);
+ ::encode(peer_mirror_uuid, bl);
+}
+
+void PeerImageRemovedPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ PayloadBase::decode(version, iter);
+ ::decode(global_image_id, iter);
+ ::decode(peer_mirror_uuid, iter);
+}
+
+void PeerImageRemovedPayload::dump(Formatter *f) const {
+ PayloadBase::dump(f);
+ f->dump_string("global_image_id", global_image_id);
+ f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
+}
+
void SyncPayloadBase::encode(bufferlist &bl) const {
PayloadBase::encode(bl);
::encode(sync_id, bl);
}
void NotifyMessage::encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 2, bl);
boost::apply_visitor(EncodePayloadVisitor(bl), payload);
ENCODE_FINISH(bl);
}
void NotifyMessage::decode(bufferlist::iterator& iter) {
- DECODE_START(1, iter);
+ DECODE_START(2, iter);
uint32_t notify_op;
::decode(notify_op, iter);
case NOTIFY_OP_IMAGE_RELEASE:
payload = ImageReleasePayload();
break;
+ case NOTIFY_OP_PEER_IMAGE_REMOVED:
+ payload = PeerImageRemovedPayload();
+ break;
case NOTIFY_OP_SYNC_REQUEST:
payload = SyncRequestPayload();
break;
o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
true)));
+ o.push_back(new NotifyMessage(PeerImageRemovedPayload()));
+ o.push_back(new NotifyMessage(PeerImageRemovedPayload(1, "gid", "uuid")));
+
o.push_back(new NotifyMessage(SyncRequestPayload()));
o.push_back(new NotifyMessage(SyncRequestPayload(1, "sync_id")));
case NOTIFY_OP_IMAGE_RELEASE:
out << "ImageRelease";
break;
+ case NOTIFY_OP_PEER_IMAGE_REMOVED:
+ out << "PeerImageRemoved";
+ break;
case NOTIFY_OP_SYNC_REQUEST:
out << "SyncRequest";
break;
namespace instance_watcher {
enum NotifyOp {
- NOTIFY_OP_IMAGE_ACQUIRE = 0,
- NOTIFY_OP_IMAGE_RELEASE = 1,
- NOTIFY_OP_SYNC_REQUEST = 2,
- NOTIFY_OP_SYNC_START = 3,
+ NOTIFY_OP_IMAGE_ACQUIRE = 0,
+ NOTIFY_OP_IMAGE_RELEASE = 1,
+ NOTIFY_OP_PEER_IMAGE_REMOVED = 2,
+ NOTIFY_OP_SYNC_REQUEST = 3,
+ NOTIFY_OP_SYNC_START = 4
};
struct PayloadBase {
void dump(Formatter *f) const;
};
+struct PeerImageRemovedPayload : public PayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_PEER_IMAGE_REMOVED;
+
+ std::string global_image_id;
+ std::string peer_mirror_uuid;
+
+ PeerImageRemovedPayload() {
+ }
+ PeerImageRemovedPayload(uint64_t request_id,
+ const std::string& global_image_id,
+ const std::string& peer_mirror_uuid)
+ : PayloadBase(request_id),
+ global_image_id(global_image_id), peer_mirror_uuid(peer_mirror_uuid) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
struct SyncPayloadBase : public PayloadBase {
std::string sync_id;
typedef boost::variant<ImageAcquirePayload,
ImageReleasePayload,
+ PeerImageRemovedPayload,
SyncRequestPayload,
SyncStartPayload,
UnknownPayload> Payload;