m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
- m_retry_aio_context(NULL)
+ m_retry_aio_context(NULL),
+ m_owner_client_id_lock("librbd::ImageWatcher::m_owner_client_id_lock")
{
m_finisher->start();
m_timer->init();
bool ImageWatcher::is_lock_owner() const {
assert(m_image_ctx.owner_lock.is_locked());
- return (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED ||
- m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
+ return (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED);
}
int ImageWatcher::register_watch() {
if ((strncmp(locker_address.c_str(),
iter->addr, sizeof(iter->addr)) == 0) &&
(locker_handle == iter->cookie)) {
+ Mutex::Locker l(m_owner_client_id_lock);
+ m_owner_client_id = ClientId(iter->watcher_id, locker_handle);
return 0;
}
}
ldout(m_image_ctx.cct, 10) << "acquired exclusive lock" << dendl;
m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
+ {
+ Mutex::Locker l(m_owner_client_id_lock);
+ m_owner_client_id = get_client_id();
+ }
+
if (m_image_ctx.object_map != NULL) {
r = m_image_ctx.object_map->lock();
if (r < 0 && r != -ENOENT) {
}
bufferlist bl;
- ::encode(NotifyMessage(AcquiredLockPayload()), bl);
+ ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
// send the notification when we aren't holding locks
FunctionContext *ctx = new FunctionContext(
bufferlist bl;
::encode(NotifyMessage(SnapCreatePayload(snap_name)), bl);
- bufferlist response;
- int r = notify_lock_owner(bl, response);
- if (r < 0) {
- return r;
- }
- return decode_response_code(response);
+ return notify_lock_owner(bl);
}
void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
}
-int ImageWatcher::decode_response_code(bufferlist &bl) {
- int r;
- try {
- bufferlist::iterator iter = bl.begin();
-
- ResponseMessage response_message;
- ::decode(response_message, iter);
-
- r = response_message.result;
- } catch (const buffer::error &err) {
- r = -EINVAL;
- }
- return r;
-}
-
void ImageWatcher::notify_released_lock() {
ldout(m_image_ctx.cct, 10) << "notify released lock" << dendl;
bufferlist bl;
- ::encode(NotifyMessage(ReleasedLockPayload()), bl);
+ ::encode(NotifyMessage(ReleasedLockPayload(get_client_id())), bl);
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
}
}
bufferlist bl;
- ::encode(NotifyMessage(RequestLockPayload()), bl);
+ ::encode(NotifyMessage(RequestLockPayload(get_client_id())), bl);
- bufferlist response;
- int r = notify_lock_owner(bl, response);
+ int r = notify_lock_owner(bl);
m_image_ctx.owner_lock.put_read();
if (r == -ETIMEDOUT) {
}
}
-int ImageWatcher::notify_lock_owner(bufferlist &bl, bufferlist& response) {
+int ImageWatcher::notify_lock_owner(bufferlist &bl) {
assert(m_image_ctx.owner_lock.is_locked());
// since we need to ack our own notifications, release the owner lock just in
}
}
+ bufferlist response;
bool lock_owner_responded = false;
for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
if (i->second.length() > 0) {
lderr(m_image_ctx.cct) << "no lock owners detected" << dendl;
return -ETIMEDOUT;
}
- return 0;
+
+ try {
+ bufferlist::iterator iter = response.begin();
+
+ ResponseMessage response_message;
+ ::decode(response_message, iter);
+
+ r = response_message.result;
+ } catch (const buffer::error &err) {
+ r = -EINVAL;
+ }
+ return r;
}
int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id,
}
} BOOST_SCOPE_EXIT_END
- bufferlist response;
- r = notify_lock_owner(in, response);
+ r = notify_lock_owner(in);
if (r < 0) {
return r;
}
void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
+ if (payload.client_id.is_valid()) {
+ Mutex::Locker l(m_owner_client_id_lock);
+ if (payload.client_id == m_owner_client_id) {
+ // we already know that the remote client is the owner
+ return;
+ }
+ m_owner_client_id = payload.client_id;
+ }
+
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
schedule_cancel_async_requests();
+ schedule_retry_aio_requests(false);
}
}
void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
+ if (payload.client_id.is_valid()) {
+ Mutex::Locker l(m_owner_client_id_lock);
+ if (payload.client_id != m_owner_client_id) {
+ return;
+ }
+ m_owner_client_id = ClientId();
+ }
+
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
schedule_cancel_async_requests();
void ImageWatcher::handle_payload(const RequestLockPayload &payload,
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "exclusive lock requested" << dendl;
- RWLock::WLocker l(m_image_ctx.owner_lock);
- if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
- m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
+ if (payload.client_id == get_client_id()) {
+ return;
+ }
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
// need to send something back so the client can detect a missing leader
::encode(ResponseMessage(0), *out);
+ {
+ Mutex::Locker l(m_owner_client_id_lock);
+ if (!m_owner_client_id.is_valid()) {
+ return;
+ }
+ m_owner_client_id = ClientId();
+ }
+
ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::release_lock, this));
void ImageWatcher::handle_payload(const FlattenPayload &payload,
bufferlist *out) {
+
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- bool new_request;
- {
+ int r = 0;
+ bool new_request = false;
+ if (payload.async_request_id.client_id == get_client_id()) {
+ r = -ERESTART;
+ } else {
RWLock::WLocker l(m_async_request_lock);
- new_request = (m_async_pending.count(payload.async_request_id) == 0);
- if (new_request) {
+ if (m_async_pending.count(payload.async_request_id) == 0) {
m_async_pending.insert(payload.async_request_id);
+ new_request = true;
}
}
- int r = 0;
if (new_request) {
RemoteProgressContext *prog_ctx =
new RemoteProgressContext(*this, payload.async_request_id);
bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- bool new_request;
- {
+ int r = 0;
+ bool new_request = false;
+ if (payload.async_request_id.client_id == get_client_id()) {
+ r = -ERESTART;
+ } else {
RWLock::WLocker l(m_async_request_lock);
- new_request = (m_async_pending.count(payload.async_request_id) == 0);
- if (new_request) {
+ if (m_async_pending.count(payload.async_request_id) == 0) {
m_async_pending.insert(payload.async_request_id);
+ new_request = true;
}
}
- int r = 0;
if (new_request) {
RemoteProgressContext *prog_ctx =
new RemoteProgressContext(*this, payload.async_request_id);
void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl) {
- bool loopback;
- {
- RWLock::RLocker l(m_watch_lock);
- loopback = (m_watch_handle == handle);
- }
-
NotifyMessage notify_message;
if (bl.length() == 0) {
// legacy notification for header updates
}
}
- apply_visitor(HandlePayloadVisitor(this, notify_id, handle, loopback),
+ apply_visitor(HandlePayloadVisitor(this, notify_id, handle),
notify_message.payload);
}
lderr(m_image_ctx.cct) << "image watch failed: " << handle << ", "
<< cpp_strerror(err) << dendl;
+ {
+ Mutex::Locker l(m_owner_client_id_lock);
+ m_owner_client_id = ClientId();
+ }
+
RWLock::WLocker l(m_watch_lock);
if (m_watch_state == WATCH_STATE_REGISTERED) {
m_image_ctx.md_ctx.unwatch2(m_watch_handle);
enum LockOwnerState {
LOCK_OWNER_STATE_NOT_LOCKED,
- LOCK_OWNER_STATE_LOCKED,
- LOCK_OWNER_STATE_RELEASING
+ LOCK_OWNER_STATE_LOCKED
};
enum WatchState {
ImageWatcher *image_watcher;
uint64_t notify_id;
uint64_t handle;
- bool loopback;
HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_,
- uint64_t handle_, bool loopback_)
- : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
- loopback(loopback_) {}
+ uint64_t handle_)
+ : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_)
+ {
+ }
inline void operator()(const WatchNotify::HeaderUpdatePayload &payload) const {
bufferlist out;
template <typename Payload>
inline void operator()(const Payload &payload) const {
bufferlist out;
- if (!loopback) {
- image_watcher->handle_payload(payload, &out);
- }
+ image_watcher->handle_payload(payload, &out);
image_watcher->acknowledge_notify(notify_id, handle, out);
}
};
std::vector<AioRequest> m_aio_requests;
Context *m_retry_aio_context;
+ Mutex m_owner_client_id_lock;
+ WatchNotify::ClientId m_owner_client_id;
+
std::string encode_lock_cookie() const;
static bool decode_lock_cookie(const std::string &cookie, uint64_t *handle);
void cancel_async_requests();
WatchNotify::ClientId get_client_id();
- static int decode_response_code(bufferlist &bl);
void notify_released_lock();
void notify_request_lock();
- int notify_lock_owner(bufferlist &bl, bufferlist &response);
+ int notify_lock_owner(bufferlist &bl);
int notify_async_request(const WatchNotify::AsyncRequestId &id,
bufferlist &in, ProgressContext& prog_ctx);
void AcquiredLockPayload::encode(bufferlist &bl) const {
::encode(static_cast<uint32_t>(NOTIFY_OP_ACQUIRED_LOCK), bl);
+ ::encode(client_id, bl);
}
void AcquiredLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ if (version >= 2) {
+ ::decode(client_id, iter);
+ }
}
void AcquiredLockPayload::dump(Formatter *f) const {
f->dump_string("notify_op", "AcquiredLock");
+ f->open_object_section("client_id");
+ client_id.dump(f);
+ f->close_section();
}
void ReleasedLockPayload::encode(bufferlist &bl) const {
::encode(static_cast<uint32_t>(NOTIFY_OP_RELEASED_LOCK), bl);
+ ::encode(client_id, bl);
}
void ReleasedLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ if (version >= 2) {
+ ::decode(client_id, iter);
+ }
}
void ReleasedLockPayload::dump(Formatter *f) const {
f->dump_string("notify_op", "ReleasedLock");
+ f->open_object_section("client_id");
+ client_id.dump(f);
+ f->close_section();
}
void RequestLockPayload::encode(bufferlist &bl) const {
::encode(static_cast<uint32_t>(NOTIFY_OP_REQUEST_LOCK), bl);
+ ::encode(client_id, bl);
}
void RequestLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ if (version >= 2) {
+ ::decode(client_id, iter);
+ }
}
void RequestLockPayload::dump(Formatter *f) const {
f->dump_string("notify_op", "RequestLock");
+ f->open_object_section("client_id");
+ client_id.dump(f);
+ f->close_section();
}
void HeaderUpdatePayload::encode(bufferlist &bl) const {
}
void NotifyMessage::encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
boost::apply_visitor(EncodePayloadVisitor(bl), payload);
ENCODE_FINISH(bl);
}
}
void NotifyMessage::generate_test_instances(std::list<NotifyMessage *> &o) {
- o.push_back(new NotifyMessage(AcquiredLockPayload()));
- o.push_back(new NotifyMessage(ReleasedLockPayload()));
- o.push_back(new NotifyMessage(RequestLockPayload()));
+ o.push_back(new NotifyMessage(AcquiredLockPayload(ClientId(1, 2))));
+ o.push_back(new NotifyMessage(ReleasedLockPayload(ClientId(1, 2))));
+ o.push_back(new NotifyMessage(RequestLockPayload(ClientId(1, 2))));
o.push_back(new NotifyMessage(HeaderUpdatePayload()));
o.push_back(new NotifyMessage(AsyncProgressPayload(AsyncRequestId(ClientId(0, 1), 2), 3, 4)));
o.push_back(new NotifyMessage(AsyncCompletePayload(AsyncRequestId(ClientId(0, 1), 2), 3)));