From: Jason Dillaman Date: Fri, 6 Feb 2015 02:14:42 +0000 (-0500) Subject: librbd: ensure ImageWatcher notifications are idempotent X-Git-Tag: v0.93~19^2~3^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9f2f306f0802d30670cc31f9635937e6ead271ce;p=ceph.git librbd: ensure ImageWatcher notifications are idempotent Ensure that ImageWatcher can support receiving duplicate notifications from librados. Resize and flatten now pass a fixed request id and the lock owner matches those ids with in-progress requests. Also protect against loopback notifications that might result in incorrect behavior across exclusive lock states. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index 9b401163fc7e..781b66b9576c 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -16,6 +16,7 @@ #include "common/Readahead.h" #include "common/RWLock.h" #include "common/snap_types.h" +#include "include/atomic.h" #include "include/buffer.h" #include "include/rbd/librbd.hpp" #include "include/rbd_types.h" @@ -114,6 +115,8 @@ namespace librbd { ObjectMap *object_map; + atomic_t async_request_seq; + /** * Either image_name or image_id must be set. * If id is not known, pass the empty std::string, diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index a533f1f4a9a2..30e0a1961851 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -70,7 +70,6 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx) m_timer_lock("librbd::ImageWatcher::m_timer_lock"), m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)), m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"), - m_async_request_id(0), m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"), m_retry_aio_context(NULL) { @@ -212,19 +211,15 @@ int ImageWatcher::request_lock( } } - { - // aio operations will be retried once the the watch is re-established - RWLock::RLocker l(m_watch_lock); - if (m_watch_state == WATCH_STATE_ERROR) { - return 0; - } - } + RWLock::RLocker l(m_watch_lock); + if (m_watch_state == WATCH_STATE_REGISTERED) { + ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl; - // run notify request in finisher to avoid blocking aio path - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_request_lock, this)); - m_finisher->queue(ctx); - ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl; + // run notify request in finisher to avoid blocking aio path + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_request_lock, this)); + m_finisher->queue(ctx); + } return 0; } @@ -270,9 +265,8 @@ void ImageWatcher::finalize_request_lock() { } if (owned_lock) { retry_aio_requests(); - } else { - schedule_retry_aio_requests(); + schedule_retry_aio_requests(true); } } @@ -426,6 +420,13 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request, return 0; } +void ImageWatcher::schedule_async_complete(const RemoteAsyncRequest &request, + int r) { + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_async_complete, this, request, r)); + m_finisher->queue(ctx); +} + int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request, int r) { ldout(m_image_ctx.cct, 20) << "remote async request finished: " @@ -440,37 +441,47 @@ int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request, librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid, &m_image_ctx); - m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL); + int ret = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, + NOTIFY_TIMEOUT, NULL); + if (ret < 0) { + lderr(m_image_ctx.cct) << "failed to notify async complete: " + << cpp_strerror(ret) << dendl; + if (ret == -ETIMEDOUT) { + schedule_async_complete(request, r); + } + } else { + RWLock::WLocker l(m_async_request_lock); + m_async_pending.erase(request); + } return 0; } -int ImageWatcher::notify_flatten(ProgressContext &prog_ctx) { +int ImageWatcher::notify_flatten(uint64_t request_id, ProgressContext &prog_ctx) { assert(m_image_ctx.owner_lock.is_locked()); assert(!is_lock_owner()); bufferlist bl; - uint64_t async_request_id; ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); ::encode(NOTIFY_OP_FLATTEN, bl); - async_request_id = encode_async_request(bl); + encode_async_request(request_id, bl); ENCODE_FINISH(bl); - return notify_async_request(async_request_id, bl, prog_ctx); + return notify_async_request(request_id, bl, prog_ctx); } -int ImageWatcher::notify_resize(uint64_t size, ProgressContext &prog_ctx) { +int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size, + ProgressContext &prog_ctx) { assert(m_image_ctx.owner_lock.is_locked()); assert(!is_lock_owner()); bufferlist bl; - uint64_t async_request_id; ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); ::encode(NOTIFY_OP_RESIZE, bl); ::encode(size, bl); - async_request_id = encode_async_request(bl); + encode_async_request(request_id, bl); ENCODE_FINISH(bl); - return notify_async_request(async_request_id, bl, prog_ctx); + return notify_async_request(request_id, bl, prog_ctx); } int ImageWatcher::notify_snap_create(const std::string &snap_name) { @@ -520,12 +531,21 @@ bool ImageWatcher::decode_lock_cookie(const std::string &tag, return true; } -void ImageWatcher::schedule_retry_aio_requests() { +void ImageWatcher::schedule_retry_aio_requests(bool use_timer) { Mutex::Locker l(m_timer_lock); - if (m_retry_aio_context == NULL) { - m_retry_aio_context = new FunctionContext(boost::bind( - &ImageWatcher::finalize_retry_aio_requests, this)); - m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context); + if (use_timer) { + if (m_retry_aio_context == NULL) { + m_retry_aio_context = new FunctionContext(boost::bind( + &ImageWatcher::finalize_retry_aio_requests, this)); + m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context); + } + } else { + m_timer->cancel_event(m_retry_aio_context); + m_retry_aio_context = NULL; + + Context *ctx = new FunctionContext(boost::bind( + &ImageWatcher::retry_aio_requests, this)); + m_finisher->queue(ctx); } } @@ -559,6 +579,12 @@ void ImageWatcher::retry_aio_requests() { } } +void ImageWatcher::schedule_cancel_async_requests() { + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::cancel_async_requests, this)); + m_finisher->queue(ctx); +} + void ImageWatcher::cancel_async_requests() { RWLock::WLocker l(m_async_request_lock); for (std::map::iterator iter = m_async_requests.begin(); @@ -568,16 +594,12 @@ void ImageWatcher::cancel_async_requests() { m_async_requests.clear(); } -uint64_t ImageWatcher::encode_async_request(bufferlist &bl) { - RWLock::WLocker l(m_async_request_lock); - ++m_async_request_id; - +void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) { RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(), - m_watch_handle, m_async_request_id); + reinterpret_cast(this), request_id); ::encode(request, bl); ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl; - return m_async_request_id; } int ImageWatcher::decode_response_code(bufferlist &bl) { @@ -628,7 +650,7 @@ void ImageWatcher::notify_request_lock() { } else if (r < 0) { lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r) << dendl; - schedule_retry_aio_requests(); + schedule_retry_aio_requests(true); } } @@ -718,15 +740,14 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id, return r; } -void ImageWatcher::schedule_update_progress( - const RemoteAsyncRequest &remote_async_request, - uint64_t offset, uint64_t total) { +void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request, + uint64_t offset, uint64_t total) { RWLock::WLocker l(m_async_request_lock); - if (m_async_progress.count(remote_async_request) == 0) { - m_async_progress.insert(remote_async_request); + if (m_async_progress.count(request) == 0) { + m_async_progress.insert(request); FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_async_progress, - this, remote_async_request, offset, total)); + boost::bind(&ImageWatcher::notify_async_progress, this, request, offset, + total)); m_finisher->queue(ctx); } } @@ -741,32 +762,24 @@ void ImageWatcher::handle_header_update() { void ImageWatcher::handle_acquired_lock() { ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl; - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::cancel_async_requests, this)); - m_finisher->queue(ctx); + RWLock::RLocker l(m_image_ctx.owner_lock); + if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { + schedule_cancel_async_requests(); + } } void ImageWatcher::handle_released_lock() { ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl; RWLock::RLocker l(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::cancel_async_requests, this)); - m_finisher->queue(ctx); - - Mutex::Locker l(m_aio_request_lock); - if (!m_aio_requests.empty()) { - ldout(m_image_ctx.cct, 20) << "queuing lock request" << dendl; - FunctionContext *req_ctx = new FunctionContext( - boost::bind(&ImageWatcher::finalize_request_lock, this)); - m_finisher->queue(req_ctx); - } + schedule_cancel_async_requests(); + schedule_retry_aio_requests(false); } } void ImageWatcher::handle_request_lock(bufferlist *out) { RWLock::WLocker l(m_image_ctx.owner_lock); - if (is_lock_owner()) { + if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) { m_lock_owner_state = LOCK_OWNER_STATE_RELEASING; // need to send something back so the client can detect a missing leader @@ -790,7 +803,7 @@ void ImageWatcher::handle_async_progress(bufferlist::iterator iter) { ::decode(offset, iter); ::decode(total, iter); if (request.gid == m_image_ctx.md_ctx.get_instance_id() && - request.handle == m_watch_handle) { + request.handle == reinterpret_cast(this)) { RWLock::RLocker l(m_async_request_lock); std::map::iterator iter = m_async_requests.find(request.request_id); @@ -810,7 +823,7 @@ void ImageWatcher::handle_async_complete(bufferlist::iterator iter) { int r; ::decode(r, iter); if (request.gid == m_image_ctx.md_ctx.get_instance_id() && - request.handle == m_watch_handle) { + request.handle == reinterpret_cast(this)) { Context *ctx = NULL; { RWLock::RLocker l(m_async_request_lock); @@ -834,14 +847,22 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) { RemoteAsyncRequest request; ::decode(request, iter); - RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this, - request); - RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); + int r = 0; + RWLock::WLocker l(m_async_request_lock); + if (m_async_pending.count(request) == 0) { + RemoteProgressContext *prog_ctx = + new RemoteProgressContext(*this, request); + RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); - ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl; - int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx); - if (r < 0) { - delete ctx; + ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl; + r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx); + if (r < 0) { + delete ctx; + lderr(m_image_ctx.cct) << "remove flatten request failed: " + << cpp_strerror(r) << dendl; + } else { + m_async_pending.insert(request); + } } ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); @@ -859,15 +880,23 @@ void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) { RemoteAsyncRequest request; ::decode(request, iter); - RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this, - request); - RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); - - ldout(m_image_ctx.cct, 10) << "remote resize request: " << request - << " " << size << dendl; - int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx); - if (r < 0) { - delete ctx; + int r = 0; + RWLock::WLocker l(m_async_request_lock); + if (m_async_pending.count(request) == 0) { + RemoteProgressContext *prog_ctx = + new RemoteProgressContext(*this, request); + RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); + + ldout(m_image_ctx.cct, 10) << "remote resize request: " << request + << " " << size << dendl; + r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx); + if (r < 0) { + lderr(m_image_ctx.cct) << "remove resize request failed: " + << cpp_strerror(r) << dendl; + delete ctx; + } else { + m_async_pending.insert(request); + } } ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); @@ -916,6 +945,12 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle, return; } + bool loopback; + { + RWLock::RLocker l(m_watch_lock); + loopback = (m_watch_handle == handle); + } + bufferlist::iterator iter = bl.begin(); try { DECODE_START(NOTIFY_VERSION, iter); @@ -923,51 +958,54 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle, ::decode(op, iter); bufferlist out; - switch (op) { - // client ops - case NOTIFY_OP_ACQUIRED_LOCK: - acknowledge_notify(notify_id, handle, out); - handle_acquired_lock(); - break; - case NOTIFY_OP_RELEASED_LOCK: - acknowledge_notify(notify_id, handle, out); - handle_released_lock(); - break; - case NOTIFY_OP_HEADER_UPDATE: - acknowledge_notify(notify_id, handle, out); - handle_header_update(); - break; - case NOTIFY_OP_ASYNC_PROGRESS: - acknowledge_notify(notify_id, handle, out); - handle_async_progress(iter); - break; - case NOTIFY_OP_ASYNC_COMPLETE: - acknowledge_notify(notify_id, handle, out); - handle_async_complete(iter); - break; - - // lock owner-only ops - case NOTIFY_OP_REQUEST_LOCK: - handle_request_lock(&out); - acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_FLATTEN: - handle_flatten(iter, &out); - acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_RESIZE: - handle_resize(iter, &out); + if (loopback && op != NOTIFY_OP_HEADER_UPDATE) { acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_SNAP_CREATE: - handle_snap_create(iter, &out); - acknowledge_notify(notify_id, handle, out); - break; - - default: - handle_unknown_op(&out); - acknowledge_notify(notify_id, handle, out); - break; + } else { + switch (op) { + // client ops + case NOTIFY_OP_ACQUIRED_LOCK: + acknowledge_notify(notify_id, handle, out); + handle_acquired_lock(); + break; + case NOTIFY_OP_RELEASED_LOCK: + acknowledge_notify(notify_id, handle, out); + handle_released_lock(); + break; + case NOTIFY_OP_HEADER_UPDATE: + acknowledge_notify(notify_id, handle, out); + handle_header_update(); + break; + case NOTIFY_OP_ASYNC_PROGRESS: + acknowledge_notify(notify_id, handle, out); + handle_async_progress(iter); + break; + case NOTIFY_OP_ASYNC_COMPLETE: + acknowledge_notify(notify_id, handle, out); + handle_async_complete(iter); + break; + + // lock owner-only ops + case NOTIFY_OP_REQUEST_LOCK: + handle_request_lock(&out); + acknowledge_notify(notify_id, handle, out); + break; + case NOTIFY_OP_FLATTEN: + handle_flatten(iter, &out); + acknowledge_notify(notify_id, handle, out); + break; + case NOTIFY_OP_RESIZE: + handle_resize(iter, &out); + acknowledge_notify(notify_id, handle, out); + break; + case NOTIFY_OP_SNAP_CREATE: + handle_snap_create(iter, &out); + acknowledge_notify(notify_id, handle, out); + break; + default: + handle_unknown_op(&out); + acknowledge_notify(notify_id, handle, out); + break; + } } DECODE_FINISH(iter); } catch (const buffer::error &err) { @@ -1040,7 +1078,6 @@ void ImageWatcher::reregister_watch() { } } - Mutex::Locker l(m_timer_lock); retry_aio_requests(); } @@ -1056,10 +1093,7 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) { } void ImageWatcher::RemoteContext::finish(int r) { - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_async_complete, - &m_image_watcher, m_remote_async_request, r)); - m_image_watcher.m_finisher->queue(ctx); + m_image_watcher.schedule_async_complete(m_remote_async_request, r); } } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index 11e03715ae3f..c274c969ab01 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -63,13 +63,9 @@ namespace librbd { void assert_header_locked(librados::ObjectWriteOperation *op); - int notify_async_progress(const RemoteAsyncRequest &remote_async_request, - uint64_t offset, uint64_t total); - int notify_async_complete(const RemoteAsyncRequest &remote_async_request, - int r); - - int notify_flatten(ProgressContext &prog_ctx); - int notify_resize(uint64_t size, ProgressContext &prog_ctx); + int notify_flatten(uint64_t request_id, ProgressContext &prog_ctx); + int notify_resize(uint64_t request_id, uint64_t size, + ProgressContext &prog_ctx); int notify_snap_create(const std::string &snap_name); static void notify_header_update(librados::IoCtx &io_ctx, @@ -115,8 +111,8 @@ namespace librbd { } virtual int update_progress(uint64_t offset, uint64_t total) { - m_image_watcher.schedule_update_progress( - m_remote_async_request, offset, total); + m_image_watcher.schedule_async_progress(m_remote_async_request, offset, + total); return 0; } @@ -162,8 +158,8 @@ namespace librbd { SafeTimer *m_timer; RWLock m_async_request_lock; - uint64_t m_async_request_id; std::map m_async_requests; + std::set m_async_pending; std::set m_async_progress; Mutex m_aio_request_lock; @@ -181,14 +177,15 @@ namespace librbd { void finalize_request_lock(); void finalize_header_update(); - void schedule_retry_aio_requests(); + void schedule_retry_aio_requests(bool use_timer); void cancel_retry_aio_requests(); void finalize_retry_aio_requests(); void retry_aio_requests(); + void schedule_cancel_async_requests(); void cancel_async_requests(); - uint64_t encode_async_request(bufferlist &bl); + void encode_async_request(uint64_t request_id, bufferlist &bl); static int decode_response_code(bufferlist &bl); void notify_released_lock(); @@ -199,8 +196,14 @@ namespace librbd { ProgressContext& prog_ctx); void notify_request_leadership(); - void schedule_update_progress(const RemoteAsyncRequest &remote_async_request, - uint64_t offset, uint64_t total); + void schedule_async_progress(const RemoteAsyncRequest &remote_async_request, + uint64_t offset, uint64_t total); + int notify_async_progress(const RemoteAsyncRequest &remote_async_request, + uint64_t offset, uint64_t total); + void schedule_async_complete(const RemoteAsyncRequest &remote_async_request, + int r); + int notify_async_complete(const RemoteAsyncRequest &remote_async_request, + int r); void handle_header_update(); void handle_acquired_lock(); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 2049553cbfa5..0765a3ac0e61 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -1601,6 +1601,7 @@ reprotect_and_return_err: ldout(cct, 20) << "resize " << ictx << " " << ictx->size << " -> " << size << dendl; + uint64_t request_id = ictx->async_request_seq.inc(); int r; do { C_SaferCond *ctx; @@ -1614,7 +1615,7 @@ reprotect_and_return_err: break; } - r = ictx->image_watcher->notify_resize(size, prog_ctx); + r = ictx->image_watcher->notify_resize(request_id, size, prog_ctx); if (r != -ETIMEDOUT && r != -ERESTART) { return r; } @@ -2409,6 +2410,7 @@ reprotect_and_return_err: return -EROFS; } + uint64_t request_id = ictx->async_request_seq.inc(); int r; do { C_SaferCond *ctx; @@ -2422,7 +2424,7 @@ reprotect_and_return_err: break; } - r = ictx->image_watcher->notify_flatten(prog_ctx); + r = ictx->image_watcher->notify_flatten(request_id, prog_ctx); if (r != -ETIMEDOUT && r != -ERESTART) { return r; }