From: Jason Dillaman Date: Thu, 19 Feb 2015 02:56:34 +0000 (-0500) Subject: librbd: minor cleanup of ImageWatcher messages X-Git-Tag: v0.93~16^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=10d86863f270536a757c2f771ec1248bc9fa09ab;p=ceph.git librbd: minor cleanup of ImageWatcher messages Moved all RPC messages to their own classes to facilitate cleaner version control and backward compatibility. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index 6e6fa4d76afe..5b31c29e9424 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -19,47 +19,16 @@ #undef dout_prefix #define dout_prefix *_dout << "librbd::ImageWatcher: " -static void decode(librbd::RemoteAsyncRequest &request, - bufferlist::iterator &iter) { - ::decode(request.gid, iter); - ::decode(request.handle, iter); - ::decode(request.request_id, iter); -} - -static void encode(const librbd::RemoteAsyncRequest &request, bufferlist &bl) { - ::encode(request.gid, bl); - ::encode(request.handle, bl); - ::encode(request.request_id, bl); -} - -static std::ostream &operator<<(std::ostream &out, - const librbd::RemoteAsyncRequest &request) { - out << "[" << request.gid << "," << request.handle << "," - << request.request_id << "]"; - return out; -} - namespace librbd { +using namespace WatchNotify; + static const std::string WATCHER_LOCK_TAG = "internal"; static const std::string WATCHER_LOCK_COOKIE_PREFIX = "auto"; static const uint64_t NOTIFY_TIMEOUT = 5000; -static const uint8_t NOTIFY_VERSION = 1; static const double RETRY_DELAY_SECONDS = 1.0; -enum { - NOTIFY_OP_ACQUIRED_LOCK = 0, - NOTIFY_OP_RELEASED_LOCK = 1, - NOTIFY_OP_REQUEST_LOCK = 2, - NOTIFY_OP_HEADER_UPDATE = 3, - NOTIFY_OP_ASYNC_PROGRESS = 4, - NOTIFY_OP_ASYNC_COMPLETE = 5, - NOTIFY_OP_FLATTEN = 6, - NOTIFY_OP_RESIZE = 7, - NOTIFY_OP_SNAP_CREATE = 8 -}; - ImageWatcher::ImageWatcher(ImageCtx &image_ctx) : m_image_ctx(image_ctx), m_watch_lock("librbd::ImageWatcher::m_watch_lock"), @@ -87,6 +56,10 @@ ImageWatcher::~ImageWatcher() RWLock::RLocker l(m_watch_lock); assert(m_watch_state != WATCH_STATE_REGISTERED); } + { + RWLock::RLocker l(m_image_ctx.owner_lock); + assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); + } m_finisher->stop(); delete m_finisher; } @@ -99,7 +72,8 @@ bool ImageWatcher::is_lock_supported() const { bool ImageWatcher::is_lock_owner() const { assert(m_image_ctx.owner_lock.is_locked()); - return m_lock_owner_state == LOCK_OWNER_STATE_LOCKED; + return (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED || + m_lock_owner_state == LOCK_OWNER_STATE_RELEASING); } int ImageWatcher::register_watch() { @@ -269,21 +243,6 @@ bool ImageWatcher::try_request_lock() { return is_lock_owner(); } -void ImageWatcher::finalize_request_lock() { - cancel_retry_aio_requests(); - - bool owned_lock; - { - RWLock::RLocker l(m_image_ctx.owner_lock); - owned_lock = try_request_lock(); - } - if (owned_lock) { - retry_aio_requests(); - } else { - schedule_retry_aio_requests(true); - } -} - int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie, std::string *address, uint64_t *handle) { std::mapqueue(ctx); } -int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request, +int ImageWatcher::notify_async_complete(const AsyncRequestId &request, int r) { ldout(m_image_ctx.cct, 20) << "remote async request finished: " << request << " = " << r << dendl; bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_ASYNC_COMPLETE, bl); - ::encode(request, bl); - ::encode(r, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(AsyncCompletePayload(request, r)), bl); librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid, &m_image_ctx); @@ -473,13 +421,12 @@ int ImageWatcher::notify_flatten(uint64_t request_id, ProgressContext &prog_ctx) assert(m_image_ctx.owner_lock.is_locked()); assert(!is_lock_owner()); + AsyncRequestId async_request_id(get_client_id(), request_id); + bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_FLATTEN, bl); - encode_async_request(request_id, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(FlattenPayload(async_request_id)), bl); - return notify_async_request(request_id, bl, prog_ctx); + return notify_async_request(async_request_id, bl, prog_ctx); } int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size, @@ -487,14 +434,12 @@ int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size, assert(m_image_ctx.owner_lock.is_locked()); assert(!is_lock_owner()); + AsyncRequestId async_request_id(get_client_id(), request_id); + bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_RESIZE, bl); - ::encode(size, bl); - encode_async_request(request_id, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(ResizePayload(size, async_request_id)), bl); - return notify_async_request(request_id, bl, prog_ctx); + return notify_async_request(async_request_id, bl, prog_ctx); } int ImageWatcher::notify_snap_create(const std::string &snap_name) { @@ -502,10 +447,7 @@ int ImageWatcher::notify_snap_create(const std::string &snap_name) { assert(!is_lock_owner()); bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_SNAP_CREATE, bl); - ::encode(snap_name, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(SnapCreatePayload(snap_name)), bl); bufferlist response; int r = notify_lock_owner(bl, response); @@ -520,9 +462,7 @@ void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx, { // supports legacy (empty buffer) clients bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_HEADER_UPDATE, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(HeaderUpdatePayload()), bl); io_ctx.notify2(oid, bl, NOTIFY_TIMEOUT, NULL); } @@ -600,28 +540,28 @@ void ImageWatcher::schedule_cancel_async_requests() { void ImageWatcher::cancel_async_requests() { RWLock::WLocker l(m_async_request_lock); - for (std::map::iterator iter = m_async_requests.begin(); + for (std::map::iterator iter = + m_async_requests.begin(); iter != m_async_requests.end(); ++iter) { iter->second.first->complete(-ERESTART); } m_async_requests.clear(); } -void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) { - RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(), - reinterpret_cast(this), request_id); - ::encode(request, bl); - - ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl; +ClientId ImageWatcher::get_client_id() { + RWLock::RLocker l(m_watch_lock); + return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle); } int ImageWatcher::decode_response_code(bufferlist &bl) { int r; try { bufferlist::iterator iter = bl.begin(); - DECODE_START(NOTIFY_VERSION, iter); - ::decode(r, iter); - DECODE_FINISH(iter); + + ResponseMessage response_message; + ::decode(response_message, iter); + + r = response_message.result; } catch (const buffer::error &err) { r = -EINVAL; } @@ -631,9 +571,7 @@ int ImageWatcher::decode_response_code(bufferlist &bl) { void ImageWatcher::notify_released_lock() { ldout(m_image_ctx.cct, 10) << "notify released lock" << dendl; bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_RELEASED_LOCK, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(ReleasedLockPayload()), bl); m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL); } @@ -649,9 +587,7 @@ void ImageWatcher::notify_request_lock() { } bufferlist bl; - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl); - ::encode(NOTIFY_OP_REQUEST_LOCK, bl); - ENCODE_FINISH(bl); + ::encode(NotifyMessage(RequestLockPayload()), bl); bufferlist response; int r = notify_lock_owner(bl, response); @@ -714,11 +650,12 @@ int ImageWatcher::notify_lock_owner(bufferlist &bl, bufferlist& response) { return 0; } -int ImageWatcher::notify_async_request(uint64_t async_request_id, +int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id, bufferlist &in, ProgressContext& prog_ctx) { assert(m_image_ctx.owner_lock.is_locked()); + ldout(m_image_ctx.cct, 10) << "async request: " << async_request_id << dendl; Mutex my_lock("librbd::ImageWatcher::notify_async_request::my_lock"); Cond cond; bool done = false; @@ -753,7 +690,7 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id, return r; } -void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request, +void ImageWatcher::schedule_async_progress(const AsyncRequestId &request, uint64_t offset, uint64_t total) { RWLock::WLocker l(m_async_request_lock); if (m_async_progress.count(request) == 0) { @@ -765,7 +702,8 @@ void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request, } } -void ImageWatcher::handle_header_update() { +void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload, + bufferlist *out) { ldout(m_image_ctx.cct, 10) << "image header updated" << dendl; Mutex::Locker lictx(m_image_ctx.refresh_lock); @@ -773,7 +711,8 @@ void ImageWatcher::handle_header_update() { m_image_ctx.perfcounter->inc(l_librbd_notify); } -void ImageWatcher::handle_acquired_lock() { +void ImageWatcher::handle_payload(const AcquiredLockPayload &payload, + bufferlist *out) { ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl; RWLock::RLocker l(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { @@ -781,7 +720,8 @@ void ImageWatcher::handle_acquired_lock() { } } -void ImageWatcher::handle_released_lock() { +void ImageWatcher::handle_payload(const ReleasedLockPayload &payload, + bufferlist *out) { ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl; RWLock::RLocker l(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { @@ -790,92 +730,78 @@ void ImageWatcher::handle_released_lock() { } } -void ImageWatcher::handle_request_lock(bufferlist *out) { +void ImageWatcher::handle_payload(const RequestLockPayload &payload, + bufferlist *out) { + ldout(m_image_ctx.cct, 10) << "exclusive lock requested" << dendl; RWLock::WLocker l(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) { m_lock_owner_state = LOCK_OWNER_STATE_RELEASING; // need to send something back so the client can detect a missing leader - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); - ::encode(0, *out); - ENCODE_FINISH(*out); + ::encode(ResponseMessage(0), *out); - ldout(m_image_ctx.cct, 10) << "exclusive lock requested, releasing" << dendl; + ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl; FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::release_lock, this)); m_finisher->queue(ctx); } } -void ImageWatcher::handle_async_progress(bufferlist::iterator iter) { - RemoteAsyncRequest request; - ::decode(request, iter); - - uint64_t offset; - uint64_t total; - ::decode(offset, iter); - ::decode(total, iter); - if (request.gid == m_image_ctx.md_ctx.get_instance_id() && - request.handle == reinterpret_cast(this)) { - RWLock::RLocker l(m_async_request_lock); - std::map::iterator iter = - m_async_requests.find(request.request_id); - if (iter != m_async_requests.end()) { - ldout(m_image_ctx.cct, 20) << "request progress: " - << request << " @ " << offset - << "/" << total << dendl; - iter->second.second->update_progress(offset, total); - } +void ImageWatcher::handle_payload(const AsyncProgressPayload &payload, + bufferlist *out) { + RWLock::RLocker l(m_async_request_lock); + std::map::iterator req_it = + m_async_requests.find(payload.async_request_id); + if (req_it != m_async_requests.end()) { + ldout(m_image_ctx.cct, 20) << "request progress: " + << payload.async_request_id << " @ " + << payload.offset << "/" << payload.total + << dendl; + req_it->second.second->update_progress(payload.offset, payload.total); } } -void ImageWatcher::handle_async_complete(bufferlist::iterator iter) { - RemoteAsyncRequest request; - ::decode(request, iter); - - int r; - ::decode(r, iter); - if (request.gid == m_image_ctx.md_ctx.get_instance_id() && - request.handle == reinterpret_cast(this)) { - Context *ctx = NULL; - { - RWLock::RLocker l(m_async_request_lock); - std::map::iterator iter = - m_async_requests.find(request.request_id); - if (iter != m_async_requests.end()) { - ctx = iter->second.first; - } - } - if (ctx != NULL) { - ldout(m_image_ctx.cct, 10) << "request finished: " - << request << " = " << r << dendl; - ctx->complete(r); +void ImageWatcher::handle_payload(const AsyncCompletePayload &payload, + bufferlist *out) { + Context *ctx = NULL; + { + RWLock::RLocker l(m_async_request_lock); + std::map::iterator req_it = + m_async_requests.find(payload.async_request_id); + if (req_it != m_async_requests.end()) { + ctx = req_it->second.first; } } + if (ctx != NULL) { + ldout(m_image_ctx.cct, 10) << "request finished: " + << payload.async_request_id << "=" + << payload.result << dendl; + ctx->complete(payload.result); + } } -void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) { +void ImageWatcher::handle_payload(const FlattenPayload &payload, + bufferlist *out) { RWLock::RLocker l(m_image_ctx.owner_lock); if (is_lock_owner()) { - RemoteAsyncRequest request; - ::decode(request, iter); - bool new_request; { RWLock::WLocker l(m_async_request_lock); - new_request = (m_async_pending.count(request) == 0); + new_request = (m_async_pending.count(payload.async_request_id) == 0); if (new_request) { - m_async_pending.insert(request); + m_async_pending.insert(payload.async_request_id); } } int r = 0; if (new_request) { RemoteProgressContext *prog_ctx = - new RemoteProgressContext(*this, request); - RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); + new RemoteProgressContext(*this, payload.async_request_id); + RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id, + prog_ctx); - ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl; + ldout(m_image_ctx.cct, 10) << "remote flatten request: " + << payload.async_request_id << dendl; r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx); if (r < 0) { delete ctx; @@ -883,71 +809,61 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) { << cpp_strerror(r) << dendl; RWLock::WLocker l(m_async_request_lock); - m_async_pending.erase(request); + m_async_pending.erase(payload.async_request_id); } } - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); - ::encode(r, *out); - ENCODE_FINISH(*out); + ::encode(ResponseMessage(r), *out); } } -void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) { +void ImageWatcher::handle_payload(const ResizePayload &payload, + bufferlist *out) { RWLock::RLocker l(m_image_ctx.owner_lock); if (is_lock_owner()) { - uint64_t size; - ::decode(size, iter); - - RemoteAsyncRequest request; - ::decode(request, iter); - bool new_request; { RWLock::WLocker l(m_async_request_lock); - new_request = (m_async_pending.count(request) == 0); + new_request = (m_async_pending.count(payload.async_request_id) == 0); if (new_request) { - m_async_pending.insert(request); + m_async_pending.insert(payload.async_request_id); } } int r = 0; if (new_request) { RemoteProgressContext *prog_ctx = - new RemoteProgressContext(*this, request); - RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx); - - ldout(m_image_ctx.cct, 10) << "remote resize request: " << request - << " " << size << dendl; - r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx); + new RemoteProgressContext(*this, payload.async_request_id); + RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id, + prog_ctx); + + ldout(m_image_ctx.cct, 10) << "remote resize request: " + << payload.async_request_id << " " + << payload.size << dendl; + r = librbd::async_resize(&m_image_ctx, ctx, payload.size, *prog_ctx); if (r < 0) { lderr(m_image_ctx.cct) << "remove resize request failed: " << cpp_strerror(r) << dendl; delete ctx; RWLock::WLocker l(m_async_request_lock); - m_async_pending.erase(request); + m_async_pending.erase(payload.async_request_id); } } - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); - ::encode(r, *out); - ENCODE_FINISH(*out); + ::encode(ResponseMessage(r), *out); } } -void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out) { +void ImageWatcher::handle_payload(const SnapCreatePayload &payload, + bufferlist *out) { RWLock::RLocker l(m_image_ctx.owner_lock); if (is_lock_owner()) { - std::string snap_name; - ::decode(snap_name, iter); - - ldout(m_image_ctx.cct, 10) << "remote snap_create request: " << snap_name << dendl; - int r = librbd::snap_create(&m_image_ctx, snap_name.c_str(), false); - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); - ::encode(r, *out); - ENCODE_FINISH(*out); + ldout(m_image_ctx.cct, 10) << "remote snap_create request: " + << payload.snap_name << dendl; + int r = librbd::snap_create(&m_image_ctx, payload.snap_name.c_str(), false); + ::encode(ResponseMessage(r), *out); if (r == 0) { // cannot notify within a notificiation FunctionContext *ctx = new FunctionContext( @@ -957,91 +873,39 @@ void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out } } -void ImageWatcher::handle_unknown_op(bufferlist *out) { +void ImageWatcher::handle_payload(const UnknownPayload &payload, + bufferlist *out) { RWLock::RLocker l(m_image_ctx.owner_lock); if (is_lock_owner()) { - ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out); - ::encode(-EOPNOTSUPP, *out); - ENCODE_FINISH(*out); + ::encode(ResponseMessage(-EOPNOTSUPP), *out); } } void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl) { - if (bl.length() == 0) { - // legacy notification for header updates - bufferlist out; - acknowledge_notify(notify_id, handle, out); - handle_header_update(); - return; - } - bool loopback; { RWLock::RLocker l(m_watch_lock); loopback = (m_watch_handle == handle); } - bufferlist::iterator iter = bl.begin(); - try { - DECODE_START(NOTIFY_VERSION, iter); - int op; - ::decode(op, iter); - - bufferlist out; - if (loopback && op != NOTIFY_OP_HEADER_UPDATE) { - acknowledge_notify(notify_id, handle, out); - } else { - switch (op) { - // client ops - case NOTIFY_OP_ACQUIRED_LOCK: - acknowledge_notify(notify_id, handle, out); - handle_acquired_lock(); - break; - case NOTIFY_OP_RELEASED_LOCK: - acknowledge_notify(notify_id, handle, out); - handle_released_lock(); - break; - case NOTIFY_OP_HEADER_UPDATE: - acknowledge_notify(notify_id, handle, out); - handle_header_update(); - break; - case NOTIFY_OP_ASYNC_PROGRESS: - acknowledge_notify(notify_id, handle, out); - handle_async_progress(iter); - break; - case NOTIFY_OP_ASYNC_COMPLETE: - acknowledge_notify(notify_id, handle, out); - handle_async_complete(iter); - break; - - // lock owner-only ops - case NOTIFY_OP_REQUEST_LOCK: - handle_request_lock(&out); - acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_FLATTEN: - handle_flatten(iter, &out); - acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_RESIZE: - handle_resize(iter, &out); - acknowledge_notify(notify_id, handle, out); - break; - case NOTIFY_OP_SNAP_CREATE: - handle_snap_create(iter, &out); - acknowledge_notify(notify_id, handle, out); - break; - default: - handle_unknown_op(&out); - acknowledge_notify(notify_id, handle, out); - break; - } + NotifyMessage notify_message; + if (bl.length() == 0) { + // legacy notification for header updates + notify_message = NotifyMessage(HeaderUpdatePayload()); + } else { + try { + bufferlist::iterator iter = bl.begin(); + ::decode(notify_message, iter); + } catch (const buffer::error &err) { + lderr(m_image_ctx.cct) << "error decoding image notification: " + << err.what() << dendl; + return; } - DECODE_FINISH(iter); - } catch (const buffer::error &err) { - lderr(m_image_ctx.cct) << "error decoding image notification" << dendl; } + + apply_visitor(HandlePayloadVisitor(this, notify_id, handle, loopback), + notify_message.payload); } void ImageWatcher::handle_error(uint64_t handle, int err) { @@ -1103,7 +967,7 @@ void ImageWatcher::reregister_watch() { m_watch_state = WATCH_STATE_REGISTERED; } - handle_header_update(); + handle_payload(HeaderUpdatePayload(), NULL); if (lock_owner) { r = try_lock(); @@ -1132,7 +996,7 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) { } void ImageWatcher::RemoteContext::finish(int r) { - m_image_watcher.schedule_async_complete(m_remote_async_request, r); + m_image_watcher.schedule_async_complete(m_async_request_id, r); } } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index 59ef416c7d3d..6230f35d5d3e 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -8,6 +8,7 @@ #include "include/Context.h" #include "include/rados/librados.hpp" #include "include/rbd/librbd.hpp" +#include "librbd/WatchNotifyTypes.h" #include #include #include @@ -24,26 +25,6 @@ namespace librbd { class AioCompletion; class ImageCtx; - struct RemoteAsyncRequest { - uint64_t gid; - uint64_t handle; - uint64_t request_id; - - RemoteAsyncRequest() : gid(), handle(), request_id() {} - RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_) - : gid(gid_), handle(handle_), request_id(request_id_) {} - - inline bool operator<(const RemoteAsyncRequest &rhs) const { - if (gid != rhs.gid) { - return gid < rhs.gid; - } else if (handle != rhs.handle) { - return handle < rhs.handle; - } else { - return request_id < rhs.request_id; - } - } - }; - class ImageWatcher { public: @@ -104,30 +85,29 @@ namespace librbd { class RemoteProgressContext : public ProgressContext { public: RemoteProgressContext(ImageWatcher &image_watcher, - const RemoteAsyncRequest &remote_async_request) - : m_image_watcher(image_watcher), - m_remote_async_request(remote_async_request) + const WatchNotify::AsyncRequestId &id) + : m_image_watcher(image_watcher), m_async_request_id(id) { } virtual int update_progress(uint64_t offset, uint64_t total) { - m_image_watcher.schedule_async_progress(m_remote_async_request, offset, + m_image_watcher.schedule_async_progress(m_async_request_id, offset, total); return 0; } private: ImageWatcher &m_image_watcher; - RemoteAsyncRequest m_remote_async_request; + WatchNotify::AsyncRequestId m_async_request_id; }; class RemoteContext : public Context { public: RemoteContext(ImageWatcher &image_watcher, - const RemoteAsyncRequest &remote_async_request, + const WatchNotify::AsyncRequestId &id, RemoteProgressContext *prog_ctx) - : m_image_watcher(image_watcher), - m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx) + : m_image_watcher(image_watcher), m_async_request_id(id), + m_prog_ctx(prog_ctx) { } @@ -139,10 +119,37 @@ namespace librbd { private: ImageWatcher &m_image_watcher; - RemoteAsyncRequest m_remote_async_request; + WatchNotify::AsyncRequestId m_async_request_id; RemoteProgressContext *m_prog_ctx; }; + struct HandlePayloadVisitor : public boost::static_visitor { + ImageWatcher *image_watcher; + uint64_t notify_id; + uint64_t handle; + bool loopback; + + HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_, + uint64_t handle_, bool loopback_) + : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_), + loopback(loopback_) {} + + inline void operator()(const WatchNotify::HeaderUpdatePayload &payload) const { + bufferlist out; + image_watcher->handle_payload(payload, &out); + image_watcher->acknowledge_notify(notify_id, handle, out); + } + + template + inline void operator()(const Payload &payload) const { + bufferlist out; + if (!loopback) { + image_watcher->handle_payload(payload, &out); + } + image_watcher->acknowledge_notify(notify_id, handle, out); + } + }; + ImageCtx &m_image_ctx; RWLock m_watch_lock; @@ -158,9 +165,9 @@ namespace librbd { SafeTimer *m_timer; RWLock m_async_request_lock; - std::map m_async_requests; - std::set m_async_pending; - std::set m_async_progress; + std::map m_async_requests; + std::set m_async_pending; + std::set m_async_progress; Mutex m_aio_request_lock; std::vector m_aio_requests; @@ -174,7 +181,6 @@ namespace librbd { int lock(); void release_lock(); bool try_request_lock(); - void finalize_request_lock(); void finalize_header_update(); void schedule_retry_aio_requests(bool use_timer); @@ -185,37 +191,47 @@ namespace librbd { void schedule_cancel_async_requests(); void cancel_async_requests(); - void encode_async_request(uint64_t request_id, bufferlist &bl); + WatchNotify::ClientId get_client_id(); static int decode_response_code(bufferlist &bl); void notify_released_lock(); void notify_request_lock(); int notify_lock_owner(bufferlist &bl, bufferlist &response); - int notify_async_request(uint64_t async_request_id, bufferlist &in, - ProgressContext& prog_ctx); + int notify_async_request(const WatchNotify::AsyncRequestId &id, + bufferlist &in, ProgressContext& prog_ctx); void notify_request_leadership(); - void schedule_async_progress(const RemoteAsyncRequest &remote_async_request, + void schedule_async_progress(const WatchNotify::AsyncRequestId &id, uint64_t offset, uint64_t total); - int notify_async_progress(const RemoteAsyncRequest &remote_async_request, + int notify_async_progress(const WatchNotify::AsyncRequestId &id, uint64_t offset, uint64_t total); - void schedule_async_complete(const RemoteAsyncRequest &remote_async_request, + void schedule_async_complete(const WatchNotify::AsyncRequestId &id, int r); - int notify_async_complete(const RemoteAsyncRequest &remote_async_request, + int notify_async_complete(const WatchNotify::AsyncRequestId &id, int r); - void handle_header_update(); - void handle_acquired_lock(); - void handle_released_lock(); - void handle_request_lock(bufferlist *out); - - void handle_async_progress(bufferlist::iterator iter); - void handle_async_complete(bufferlist::iterator iter); - void handle_flatten(bufferlist::iterator iter, bufferlist *out); - void handle_resize(bufferlist::iterator iter, bufferlist *out); - void handle_snap_create(bufferlist::iterator iter, bufferlist *out); - void handle_unknown_op(bufferlist *out); + void handle_payload(const WatchNotify::HeaderUpdatePayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::AcquiredLockPayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::ReleasedLockPayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::RequestLockPayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::AsyncProgressPayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::AsyncCompletePayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::FlattenPayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::ResizePayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::SnapCreatePayload& payload, + bufferlist *out); + void handle_payload(const WatchNotify::UnknownPayload& payload, + bufferlist *out); + void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl); void handle_error(uint64_t cookie, int err); void acknowledge_notify(uint64_t notify_id, uint64_t handle, diff --git a/src/librbd/Makefile.am b/src/librbd/Makefile.am index b94061edf167..f18d7b00b330 100644 --- a/src/librbd/Makefile.am +++ b/src/librbd/Makefile.am @@ -12,7 +12,8 @@ librbd_internal_la_SOURCES = \ librbd/ImageWatcher.cc \ librbd/internal.cc \ librbd/LibrbdWriteback.cc \ - librbd/ObjectMap.cc + librbd/ObjectMap.cc \ + librbd/WatchNotifyTypes.cc noinst_LTLIBRARIES += librbd_internal.la librbd_api_la_SOURCES = \ @@ -56,4 +57,5 @@ noinst_HEADERS += \ librbd/LibrbdWriteback.h \ librbd/ObjectMap.h \ librbd/parent_types.h \ - librbd/SnapInfo.h + librbd/SnapInfo.h \ + librbd/WatchNotifyTypes.h diff --git a/src/librbd/WatchNotifyTypes.cc b/src/librbd/WatchNotifyTypes.cc new file mode 100644 index 000000000000..4622ff67cebb --- /dev/null +++ b/src/librbd/WatchNotifyTypes.cc @@ -0,0 +1,220 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/WatchNotifyTypes.h" +#include "include/assert.h" + +namespace librbd { +namespace WatchNotify { + +namespace { + +class EncodePayloadVisitor : public boost::static_visitor { +public: + EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {} + + template + inline void operator()(const Payload &payload) const { + payload.encode(m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodePayloadVisitor : public boost::static_visitor { +public: + DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter) + : m_version(version), m_iter(iter) {} + + template + inline void operator()(Payload &payload) const { + payload.decode(m_version, m_iter); + } + +private: + __u8 m_version; + bufferlist::iterator &m_iter; +}; + +} + +void ClientId::encode(bufferlist &bl) const { + ::encode(gid, bl); + ::encode(handle, bl); +} + +void ClientId::decode(bufferlist::iterator &iter) { + ::decode(gid, iter); + ::decode(handle, iter); +} + +void AsyncRequestId::encode(bufferlist &bl) const { + ::encode(client_id, bl); + ::encode(request_id, bl); +} + +void AsyncRequestId::decode(bufferlist::iterator &iter) { + ::decode(client_id, iter); + ::decode(request_id, iter); +} + +void AcquiredLockPayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_ACQUIRED_LOCK), bl); +} + +void AcquiredLockPayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void ReleasedLockPayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_RELEASED_LOCK), bl); +} + +void ReleasedLockPayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void RequestLockPayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_REQUEST_LOCK), bl); +} + +void RequestLockPayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void HeaderUpdatePayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_HEADER_UPDATE), bl); +} + +void HeaderUpdatePayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void AsyncProgressPayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_ASYNC_PROGRESS), bl); + ::encode(async_request_id, bl); + ::encode(offset, bl); + ::encode(total, bl); +} + +void AsyncProgressPayload::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(async_request_id, iter); + ::decode(offset, iter); + ::decode(total, iter); +} + +void AsyncCompletePayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_ASYNC_COMPLETE), bl); + ::encode(async_request_id, bl); + ::encode(result, bl); +} + +void AsyncCompletePayload::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(async_request_id, iter); + ::decode(result, iter); +} + +void FlattenPayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_FLATTEN), bl); + ::encode(async_request_id, bl); +} + +void FlattenPayload::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(async_request_id, iter); +} + +void ResizePayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_RESIZE), bl); + ::encode(size, bl); + ::encode(async_request_id, bl); +} + +void ResizePayload::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(size, iter); + ::decode(async_request_id, iter); +} + +void SnapCreatePayload::encode(bufferlist &bl) const { + ::encode(static_cast(NOTIFY_OP_SNAP_CREATE), bl); + ::encode(snap_name, bl); +} + +void SnapCreatePayload::decode(__u8 version, bufferlist::iterator &iter) { + ::decode(snap_name, iter); +} + +void UnknownPayload::encode(bufferlist &bl) const { + assert(false); +} + +void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) { +} + +void NotifyMessage::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + boost::apply_visitor(EncodePayloadVisitor(bl), payload); + ENCODE_FINISH(bl); +} + +void NotifyMessage::decode(bufferlist::iterator& iter) { + DECODE_START(1, iter); + + uint32_t notify_op; + ::decode(notify_op, iter); + + // select the correct payload variant based upon the encoded op + switch (notify_op) { + case NOTIFY_OP_ACQUIRED_LOCK: + payload = AcquiredLockPayload(); + break; + case NOTIFY_OP_RELEASED_LOCK: + payload = ReleasedLockPayload(); + break; + case NOTIFY_OP_REQUEST_LOCK: + payload = RequestLockPayload(); + break; + case NOTIFY_OP_HEADER_UPDATE: + payload = HeaderUpdatePayload(); + break; + case NOTIFY_OP_ASYNC_PROGRESS: + payload = AsyncProgressPayload(); + break; + case NOTIFY_OP_ASYNC_COMPLETE: + payload = AsyncCompletePayload(); + break; + case NOTIFY_OP_FLATTEN: + payload = FlattenPayload(); + break; + case NOTIFY_OP_RESIZE: + payload = ResizePayload(); + break; + case NOTIFY_OP_SNAP_CREATE: + payload = SnapCreatePayload(); + break; + default: + payload = UnknownPayload(); + break; + } + + apply_visitor(DecodePayloadVisitor(struct_v, iter), payload); + DECODE_FINISH(iter); +} + +void ResponseMessage::encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(result, bl); + ENCODE_FINISH(bl); +} + +void ResponseMessage::decode(bufferlist::iterator& iter) { + DECODE_START(1, iter); + ::decode(result, iter); + DECODE_FINISH(iter); +} + +} // namespace WatchNotify +} // namespace librbd + +std::ostream &operator<<(std::ostream &out, + const librbd::WatchNotify::AsyncRequestId &request) { + out << "[" << request.client_id.gid << "," << request.client_id.handle << "," + << request.request_id << "]"; + return out; +} diff --git a/src/librbd/WatchNotifyTypes.h b/src/librbd/WatchNotifyTypes.h new file mode 100644 index 000000000000..abb36d60188f --- /dev/null +++ b/src/librbd/WatchNotifyTypes.h @@ -0,0 +1,197 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef LIBRBD_WATCH_NOTIFY_TYPES_H +#define LIBRBD_WATCH_NOTIFY_TYPES_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include +#include +#include + +namespace librbd { +namespace WatchNotify { + +struct ClientId { + uint64_t gid; + uint64_t handle; + + ClientId() : gid(0), handle(0) {} + ClientId(uint64_t gid_, uint64_t handle_) : gid(gid_), handle(handle_) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); + + inline bool operator==(const ClientId &rhs) const { + return (gid == rhs.gid && handle == rhs.handle); + } + inline bool operator!=(const ClientId &rhs) const { + return !(*this == rhs); + } + inline bool operator<(const ClientId &rhs) const { + if (gid != rhs.gid) { + return gid < rhs.gid; + } else { + return handle < rhs.handle; + } + } +}; + +struct AsyncRequestId { + ClientId client_id; + uint64_t request_id; + + AsyncRequestId() : request_id() {} + AsyncRequestId(const ClientId &client_id_, uint64_t request_id_) + : client_id(client_id_), request_id(request_id_) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); + + inline bool operator<(const AsyncRequestId &rhs) const { + if (client_id != rhs.client_id) { + return client_id < rhs.client_id; + } else { + return request_id < rhs.request_id; + } + } +}; + +enum NotifyOp { + NOTIFY_OP_ACQUIRED_LOCK = 0, + NOTIFY_OP_RELEASED_LOCK = 1, + NOTIFY_OP_REQUEST_LOCK = 2, + NOTIFY_OP_HEADER_UPDATE = 3, + NOTIFY_OP_ASYNC_PROGRESS = 4, + NOTIFY_OP_ASYNC_COMPLETE = 5, + NOTIFY_OP_FLATTEN = 6, + NOTIFY_OP_RESIZE = 7, + NOTIFY_OP_SNAP_CREATE = 8 +}; + +struct AcquiredLockPayload { + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct ReleasedLockPayload { + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct RequestLockPayload { + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct HeaderUpdatePayload { + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct AsyncProgressPayload { + AsyncProgressPayload() : offset(0), total(0) {} + AsyncProgressPayload(const AsyncRequestId &id, uint64_t offset_, uint64_t total_) + : async_request_id(id), offset(offset_), total(total_) {} + + AsyncRequestId async_request_id; + uint64_t offset; + uint64_t total; + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct AsyncCompletePayload { + AsyncCompletePayload() {} + AsyncCompletePayload(const AsyncRequestId &id, int r) + : async_request_id(id), result(r) {} + + AsyncRequestId async_request_id; + int result; + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct FlattenPayload { + FlattenPayload() {} + FlattenPayload(const AsyncRequestId &id) : async_request_id(id) {} + + AsyncRequestId async_request_id; + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct ResizePayload { + ResizePayload() : size(0) {} + ResizePayload(uint64_t size_, const AsyncRequestId &id) + : size(size_), async_request_id(id) {} + + uint64_t size; + AsyncRequestId async_request_id; + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct SnapCreatePayload { + SnapCreatePayload() {} + SnapCreatePayload(const std::string &name) : snap_name(name) {} + + std::string snap_name; + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +struct UnknownPayload { + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::iterator &iter); +}; + +typedef boost::variant Payload; + +struct NotifyMessage { + NotifyMessage() : payload(UnknownPayload()) {} + NotifyMessage(const Payload &payload_) : payload(payload_) {} + + Payload payload; + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); +}; + +struct ResponseMessage { + ResponseMessage() : result(0) {} + ResponseMessage(int result_) : result(result_) {} + + int result; + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& it); +}; + +} // namespace WatchNotify +} // namespace librbd + +std::ostream &operator<<(std::ostream &out, + const librbd::WatchNotify::AsyncRequestId &request); + +WRITE_CLASS_ENCODER(librbd::WatchNotify::ClientId); +WRITE_CLASS_ENCODER(librbd::WatchNotify::AsyncRequestId); +WRITE_CLASS_ENCODER(librbd::WatchNotify::NotifyMessage); +WRITE_CLASS_ENCODER(librbd::WatchNotify::ResponseMessage); + +#endif // LIBRBD_WATCH_NOTIFY_TYPES_H