#undef dout_prefix
#define dout_prefix *_dout << "librbd::ImageWatcher: "
-static void decode(librbd::RemoteAsyncRequest &request,
- bufferlist::iterator &iter) {
- ::decode(request.gid, iter);
- ::decode(request.handle, iter);
- ::decode(request.request_id, iter);
-}
-
-static void encode(const librbd::RemoteAsyncRequest &request, bufferlist &bl) {
- ::encode(request.gid, bl);
- ::encode(request.handle, bl);
- ::encode(request.request_id, bl);
-}
-
-static std::ostream &operator<<(std::ostream &out,
- const librbd::RemoteAsyncRequest &request) {
- out << "[" << request.gid << "," << request.handle << ","
- << request.request_id << "]";
- return out;
-}
-
namespace librbd {
+using namespace WatchNotify;
+
static const std::string WATCHER_LOCK_TAG = "internal";
static const std::string WATCHER_LOCK_COOKIE_PREFIX = "auto";
static const uint64_t NOTIFY_TIMEOUT = 5000;
-static const uint8_t NOTIFY_VERSION = 1;
static const double RETRY_DELAY_SECONDS = 1.0;
-enum {
- NOTIFY_OP_ACQUIRED_LOCK = 0,
- NOTIFY_OP_RELEASED_LOCK = 1,
- NOTIFY_OP_REQUEST_LOCK = 2,
- NOTIFY_OP_HEADER_UPDATE = 3,
- NOTIFY_OP_ASYNC_PROGRESS = 4,
- NOTIFY_OP_ASYNC_COMPLETE = 5,
- NOTIFY_OP_FLATTEN = 6,
- NOTIFY_OP_RESIZE = 7,
- NOTIFY_OP_SNAP_CREATE = 8
-};
-
ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
: m_image_ctx(image_ctx),
m_watch_lock("librbd::ImageWatcher::m_watch_lock"),
RWLock::RLocker l(m_watch_lock);
assert(m_watch_state != WATCH_STATE_REGISTERED);
}
+ {
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+ }
m_finisher->stop();
delete m_finisher;
}
bool ImageWatcher::is_lock_owner() const {
assert(m_image_ctx.owner_lock.is_locked());
- return m_lock_owner_state == LOCK_OWNER_STATE_LOCKED;
+ return (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED ||
+ m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
}
int ImageWatcher::register_watch() {
return is_lock_owner();
}
-void ImageWatcher::finalize_request_lock() {
- cancel_retry_aio_requests();
-
- bool owned_lock;
- {
- RWLock::RLocker l(m_image_ctx.owner_lock);
- owned_lock = try_request_lock();
- }
- if (owned_lock) {
- retry_aio_requests();
- } else {
- schedule_retry_aio_requests(true);
- }
-}
-
int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie,
std::string *address, uint64_t *handle) {
std::map<rados::cls::lock::locker_id_t,
}
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_ACQUIRED_LOCK, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(AcquiredLockPayload()), bl);
// send the notification when we aren't holding locks
FunctionContext *ctx = new FunctionContext(
encode_lock_cookie(), WATCHER_LOCK_TAG);
}
-int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
+int ImageWatcher::notify_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
ldout(m_image_ctx.cct, 20) << "remote async request progress: "
<< request << " @ " << offset
<< "/" << total << dendl;
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_ASYNC_PROGRESS, bl);
- ::encode(request, bl);
- ::encode(offset, bl);
- ::encode(total, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(AsyncProgressPayload(request, offset, total)), bl);
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
return 0;
}
-void ImageWatcher::schedule_async_complete(const RemoteAsyncRequest &request,
+void ImageWatcher::schedule_async_complete(const AsyncRequestId &request,
int r) {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
m_finisher->queue(ctx);
}
-int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
+int ImageWatcher::notify_async_complete(const AsyncRequestId &request,
int r) {
ldout(m_image_ctx.cct, 20) << "remote async request finished: "
<< request << " = " << r << dendl;
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_ASYNC_COMPLETE, bl);
- ::encode(request, bl);
- ::encode(r, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(AsyncCompletePayload(request, r)), bl);
librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid,
&m_image_ctx);
assert(m_image_ctx.owner_lock.is_locked());
assert(!is_lock_owner());
+ AsyncRequestId async_request_id(get_client_id(), request_id);
+
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_FLATTEN, bl);
- encode_async_request(request_id, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(FlattenPayload(async_request_id)), bl);
- return notify_async_request(request_id, bl, prog_ctx);
+ return notify_async_request(async_request_id, bl, prog_ctx);
}
int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size,
assert(m_image_ctx.owner_lock.is_locked());
assert(!is_lock_owner());
+ AsyncRequestId async_request_id(get_client_id(), request_id);
+
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_RESIZE, bl);
- ::encode(size, bl);
- encode_async_request(request_id, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(ResizePayload(size, async_request_id)), bl);
- return notify_async_request(request_id, bl, prog_ctx);
+ return notify_async_request(async_request_id, bl, prog_ctx);
}
int ImageWatcher::notify_snap_create(const std::string &snap_name) {
assert(!is_lock_owner());
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_SNAP_CREATE, bl);
- ::encode(snap_name, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(SnapCreatePayload(snap_name)), bl);
bufferlist response;
int r = notify_lock_owner(bl, response);
{
// supports legacy (empty buffer) clients
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_HEADER_UPDATE, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(HeaderUpdatePayload()), bl);
io_ctx.notify2(oid, bl, NOTIFY_TIMEOUT, NULL);
}
void ImageWatcher::cancel_async_requests() {
RWLock::WLocker l(m_async_request_lock);
- for (std::map<uint64_t, AsyncRequest>::iterator iter = m_async_requests.begin();
+ for (std::map<AsyncRequestId, AsyncRequest>::iterator iter =
+ m_async_requests.begin();
iter != m_async_requests.end(); ++iter) {
iter->second.first->complete(-ERESTART);
}
m_async_requests.clear();
}
-void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) {
- RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(),
- reinterpret_cast<uint64_t>(this), request_id);
- ::encode(request, bl);
-
- ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl;
+ClientId ImageWatcher::get_client_id() {
+ RWLock::RLocker l(m_watch_lock);
+ return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
}
int ImageWatcher::decode_response_code(bufferlist &bl) {
int r;
try {
bufferlist::iterator iter = bl.begin();
- DECODE_START(NOTIFY_VERSION, iter);
- ::decode(r, iter);
- DECODE_FINISH(iter);
+
+ ResponseMessage response_message;
+ ::decode(response_message, iter);
+
+ r = response_message.result;
} catch (const buffer::error &err) {
r = -EINVAL;
}
void ImageWatcher::notify_released_lock() {
ldout(m_image_ctx.cct, 10) << "notify released lock" << dendl;
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_RELEASED_LOCK, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(ReleasedLockPayload()), bl);
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
}
}
bufferlist bl;
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
- ::encode(NOTIFY_OP_REQUEST_LOCK, bl);
- ENCODE_FINISH(bl);
+ ::encode(NotifyMessage(RequestLockPayload()), bl);
bufferlist response;
int r = notify_lock_owner(bl, response);
return 0;
}
-int ImageWatcher::notify_async_request(uint64_t async_request_id,
+int ImageWatcher::notify_async_request(const AsyncRequestId &async_request_id,
bufferlist &in,
ProgressContext& prog_ctx) {
assert(m_image_ctx.owner_lock.is_locked());
+ ldout(m_image_ctx.cct, 10) << "async request: " << async_request_id << dendl;
Mutex my_lock("librbd::ImageWatcher::notify_async_request::my_lock");
Cond cond;
bool done = false;
return r;
}
-void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request,
+void ImageWatcher::schedule_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
RWLock::WLocker l(m_async_request_lock);
if (m_async_progress.count(request) == 0) {
}
}
-void ImageWatcher::handle_header_update() {
+void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
+ bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "image header updated" << dendl;
Mutex::Locker lictx(m_image_ctx.refresh_lock);
m_image_ctx.perfcounter->inc(l_librbd_notify);
}
-void ImageWatcher::handle_acquired_lock() {
+void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
+ bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
}
}
-void ImageWatcher::handle_released_lock() {
+void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
+ bufferlist *out) {
ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
}
}
-void ImageWatcher::handle_request_lock(bufferlist *out) {
+void ImageWatcher::handle_payload(const RequestLockPayload &payload,
+ bufferlist *out) {
+ ldout(m_image_ctx.cct, 10) << "exclusive lock requested" << dendl;
RWLock::WLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
// need to send something back so the client can detect a missing leader
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
- ::encode(0, *out);
- ENCODE_FINISH(*out);
+ ::encode(ResponseMessage(0), *out);
- ldout(m_image_ctx.cct, 10) << "exclusive lock requested, releasing" << dendl;
+ ldout(m_image_ctx.cct, 10) << "queuing release of exclusive lock" << dendl;
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::release_lock, this));
m_finisher->queue(ctx);
}
}
-void ImageWatcher::handle_async_progress(bufferlist::iterator iter) {
- RemoteAsyncRequest request;
- ::decode(request, iter);
-
- uint64_t offset;
- uint64_t total;
- ::decode(offset, iter);
- ::decode(total, iter);
- if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
- request.handle == reinterpret_cast<uint64_t>(this)) {
- RWLock::RLocker l(m_async_request_lock);
- std::map<uint64_t, AsyncRequest>::iterator iter =
- m_async_requests.find(request.request_id);
- if (iter != m_async_requests.end()) {
- ldout(m_image_ctx.cct, 20) << "request progress: "
- << request << " @ " << offset
- << "/" << total << dendl;
- iter->second.second->update_progress(offset, total);
- }
+void ImageWatcher::handle_payload(const AsyncProgressPayload &payload,
+ bufferlist *out) {
+ RWLock::RLocker l(m_async_request_lock);
+ std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
+ m_async_requests.find(payload.async_request_id);
+ if (req_it != m_async_requests.end()) {
+ ldout(m_image_ctx.cct, 20) << "request progress: "
+ << payload.async_request_id << " @ "
+ << payload.offset << "/" << payload.total
+ << dendl;
+ req_it->second.second->update_progress(payload.offset, payload.total);
}
}
-void ImageWatcher::handle_async_complete(bufferlist::iterator iter) {
- RemoteAsyncRequest request;
- ::decode(request, iter);
-
- int r;
- ::decode(r, iter);
- if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
- request.handle == reinterpret_cast<uint64_t>(this)) {
- Context *ctx = NULL;
- {
- RWLock::RLocker l(m_async_request_lock);
- std::map<uint64_t, AsyncRequest>::iterator iter =
- m_async_requests.find(request.request_id);
- if (iter != m_async_requests.end()) {
- ctx = iter->second.first;
- }
- }
- if (ctx != NULL) {
- ldout(m_image_ctx.cct, 10) << "request finished: "
- << request << " = " << r << dendl;
- ctx->complete(r);
+void ImageWatcher::handle_payload(const AsyncCompletePayload &payload,
+ bufferlist *out) {
+ Context *ctx = NULL;
+ {
+ RWLock::RLocker l(m_async_request_lock);
+ std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
+ m_async_requests.find(payload.async_request_id);
+ if (req_it != m_async_requests.end()) {
+ ctx = req_it->second.first;
}
}
+ if (ctx != NULL) {
+ ldout(m_image_ctx.cct, 10) << "request finished: "
+ << payload.async_request_id << "="
+ << payload.result << dendl;
+ ctx->complete(payload.result);
+ }
}
-void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const FlattenPayload &payload,
+ bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- RemoteAsyncRequest request;
- ::decode(request, iter);
-
bool new_request;
{
RWLock::WLocker l(m_async_request_lock);
- new_request = (m_async_pending.count(request) == 0);
+ new_request = (m_async_pending.count(payload.async_request_id) == 0);
if (new_request) {
- m_async_pending.insert(request);
+ m_async_pending.insert(payload.async_request_id);
}
}
int r = 0;
if (new_request) {
RemoteProgressContext *prog_ctx =
- new RemoteProgressContext(*this, request);
- RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+ new RemoteProgressContext(*this, payload.async_request_id);
+ RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id,
+ prog_ctx);
- ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
+ ldout(m_image_ctx.cct, 10) << "remote flatten request: "
+ << payload.async_request_id << dendl;
r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
if (r < 0) {
delete ctx;
<< cpp_strerror(r) << dendl;
RWLock::WLocker l(m_async_request_lock);
- m_async_pending.erase(request);
+ m_async_pending.erase(payload.async_request_id);
}
}
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
- ::encode(r, *out);
- ENCODE_FINISH(*out);
+ ::encode(ResponseMessage(r), *out);
}
}
-void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const ResizePayload &payload,
+ bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- uint64_t size;
- ::decode(size, iter);
-
- RemoteAsyncRequest request;
- ::decode(request, iter);
-
bool new_request;
{
RWLock::WLocker l(m_async_request_lock);
- new_request = (m_async_pending.count(request) == 0);
+ new_request = (m_async_pending.count(payload.async_request_id) == 0);
if (new_request) {
- m_async_pending.insert(request);
+ m_async_pending.insert(payload.async_request_id);
}
}
int r = 0;
if (new_request) {
RemoteProgressContext *prog_ctx =
- new RemoteProgressContext(*this, request);
- RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
-
- ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
- << " " << size << dendl;
- r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
+ new RemoteProgressContext(*this, payload.async_request_id);
+ RemoteContext *ctx = new RemoteContext(*this, payload.async_request_id,
+ prog_ctx);
+
+ ldout(m_image_ctx.cct, 10) << "remote resize request: "
+ << payload.async_request_id << " "
+ << payload.size << dendl;
+ r = librbd::async_resize(&m_image_ctx, ctx, payload.size, *prog_ctx);
if (r < 0) {
lderr(m_image_ctx.cct) << "remove resize request failed: "
<< cpp_strerror(r) << dendl;
delete ctx;
RWLock::WLocker l(m_async_request_lock);
- m_async_pending.erase(request);
+ m_async_pending.erase(payload.async_request_id);
}
}
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
- ::encode(r, *out);
- ENCODE_FINISH(*out);
+ ::encode(ResponseMessage(r), *out);
}
}
-void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out) {
+void ImageWatcher::handle_payload(const SnapCreatePayload &payload,
+ bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- std::string snap_name;
- ::decode(snap_name, iter);
-
- ldout(m_image_ctx.cct, 10) << "remote snap_create request: " << snap_name << dendl;
- int r = librbd::snap_create(&m_image_ctx, snap_name.c_str(), false);
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
- ::encode(r, *out);
- ENCODE_FINISH(*out);
+ ldout(m_image_ctx.cct, 10) << "remote snap_create request: "
+ << payload.snap_name << dendl;
+ int r = librbd::snap_create(&m_image_ctx, payload.snap_name.c_str(), false);
+ ::encode(ResponseMessage(r), *out);
if (r == 0) {
// cannot notify within a notificiation
FunctionContext *ctx = new FunctionContext(
}
}
-void ImageWatcher::handle_unknown_op(bufferlist *out) {
+void ImageWatcher::handle_payload(const UnknownPayload &payload,
+ bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
- ::encode(-EOPNOTSUPP, *out);
- ENCODE_FINISH(*out);
+ ::encode(ResponseMessage(-EOPNOTSUPP), *out);
}
}
void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
bufferlist &bl) {
- if (bl.length() == 0) {
- // legacy notification for header updates
- bufferlist out;
- acknowledge_notify(notify_id, handle, out);
- handle_header_update();
- return;
- }
-
bool loopback;
{
RWLock::RLocker l(m_watch_lock);
loopback = (m_watch_handle == handle);
}
- bufferlist::iterator iter = bl.begin();
- try {
- DECODE_START(NOTIFY_VERSION, iter);
- int op;
- ::decode(op, iter);
-
- bufferlist out;
- if (loopback && op != NOTIFY_OP_HEADER_UPDATE) {
- acknowledge_notify(notify_id, handle, out);
- } else {
- switch (op) {
- // client ops
- case NOTIFY_OP_ACQUIRED_LOCK:
- acknowledge_notify(notify_id, handle, out);
- handle_acquired_lock();
- break;
- case NOTIFY_OP_RELEASED_LOCK:
- acknowledge_notify(notify_id, handle, out);
- handle_released_lock();
- break;
- case NOTIFY_OP_HEADER_UPDATE:
- acknowledge_notify(notify_id, handle, out);
- handle_header_update();
- break;
- case NOTIFY_OP_ASYNC_PROGRESS:
- acknowledge_notify(notify_id, handle, out);
- handle_async_progress(iter);
- break;
- case NOTIFY_OP_ASYNC_COMPLETE:
- acknowledge_notify(notify_id, handle, out);
- handle_async_complete(iter);
- break;
-
- // lock owner-only ops
- case NOTIFY_OP_REQUEST_LOCK:
- handle_request_lock(&out);
- acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_FLATTEN:
- handle_flatten(iter, &out);
- acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_RESIZE:
- handle_resize(iter, &out);
- acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_SNAP_CREATE:
- handle_snap_create(iter, &out);
- acknowledge_notify(notify_id, handle, out);
- break;
- default:
- handle_unknown_op(&out);
- acknowledge_notify(notify_id, handle, out);
- break;
- }
+ NotifyMessage notify_message;
+ if (bl.length() == 0) {
+ // legacy notification for header updates
+ notify_message = NotifyMessage(HeaderUpdatePayload());
+ } else {
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(notify_message, iter);
+ } catch (const buffer::error &err) {
+ lderr(m_image_ctx.cct) << "error decoding image notification: "
+ << err.what() << dendl;
+ return;
}
- DECODE_FINISH(iter);
- } catch (const buffer::error &err) {
- lderr(m_image_ctx.cct) << "error decoding image notification" << dendl;
}
+
+ apply_visitor(HandlePayloadVisitor(this, notify_id, handle, loopback),
+ notify_message.payload);
}
void ImageWatcher::handle_error(uint64_t handle, int err) {
m_watch_state = WATCH_STATE_REGISTERED;
}
- handle_header_update();
+ handle_payload(HeaderUpdatePayload(), NULL);
if (lock_owner) {
r = try_lock();
}
void ImageWatcher::RemoteContext::finish(int r) {
- m_image_watcher.schedule_async_complete(m_remote_async_request, r);
+ m_image_watcher.schedule_async_complete(m_async_request_id, r);
}
}
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
+#include "librbd/WatchNotifyTypes.h"
#include <set>
#include <string>
#include <utility>
class AioCompletion;
class ImageCtx;
- struct RemoteAsyncRequest {
- uint64_t gid;
- uint64_t handle;
- uint64_t request_id;
-
- RemoteAsyncRequest() : gid(), handle(), request_id() {}
- RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
- : gid(gid_), handle(handle_), request_id(request_id_) {}
-
- inline bool operator<(const RemoteAsyncRequest &rhs) const {
- if (gid != rhs.gid) {
- return gid < rhs.gid;
- } else if (handle != rhs.handle) {
- return handle < rhs.handle;
- } else {
- return request_id < rhs.request_id;
- }
- }
- };
-
class ImageWatcher {
public:
class RemoteProgressContext : public ProgressContext {
public:
RemoteProgressContext(ImageWatcher &image_watcher,
- const RemoteAsyncRequest &remote_async_request)
- : m_image_watcher(image_watcher),
- m_remote_async_request(remote_async_request)
+ const WatchNotify::AsyncRequestId &id)
+ : m_image_watcher(image_watcher), m_async_request_id(id)
{
}
virtual int update_progress(uint64_t offset, uint64_t total) {
- m_image_watcher.schedule_async_progress(m_remote_async_request, offset,
+ m_image_watcher.schedule_async_progress(m_async_request_id, offset,
total);
return 0;
}
private:
ImageWatcher &m_image_watcher;
- RemoteAsyncRequest m_remote_async_request;
+ WatchNotify::AsyncRequestId m_async_request_id;
};
class RemoteContext : public Context {
public:
RemoteContext(ImageWatcher &image_watcher,
- const RemoteAsyncRequest &remote_async_request,
+ const WatchNotify::AsyncRequestId &id,
RemoteProgressContext *prog_ctx)
- : m_image_watcher(image_watcher),
- m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
+ : m_image_watcher(image_watcher), m_async_request_id(id),
+ m_prog_ctx(prog_ctx)
{
}
private:
ImageWatcher &m_image_watcher;
- RemoteAsyncRequest m_remote_async_request;
+ WatchNotify::AsyncRequestId m_async_request_id;
RemoteProgressContext *m_prog_ctx;
};
+ struct HandlePayloadVisitor : public boost::static_visitor<void> {
+ ImageWatcher *image_watcher;
+ uint64_t notify_id;
+ uint64_t handle;
+ bool loopback;
+
+ HandlePayloadVisitor(ImageWatcher *image_watcher_, uint64_t notify_id_,
+ uint64_t handle_, bool loopback_)
+ : image_watcher(image_watcher_), notify_id(notify_id_), handle(handle_),
+ loopback(loopback_) {}
+
+ inline void operator()(const WatchNotify::HeaderUpdatePayload &payload) const {
+ bufferlist out;
+ image_watcher->handle_payload(payload, &out);
+ image_watcher->acknowledge_notify(notify_id, handle, out);
+ }
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ bufferlist out;
+ if (!loopback) {
+ image_watcher->handle_payload(payload, &out);
+ }
+ image_watcher->acknowledge_notify(notify_id, handle, out);
+ }
+ };
+
ImageCtx &m_image_ctx;
RWLock m_watch_lock;
SafeTimer *m_timer;
RWLock m_async_request_lock;
- std::map<uint64_t, AsyncRequest> m_async_requests;
- std::set<RemoteAsyncRequest> m_async_pending;
- std::set<RemoteAsyncRequest> m_async_progress;
+ std::map<WatchNotify::AsyncRequestId, AsyncRequest> m_async_requests;
+ std::set<WatchNotify::AsyncRequestId> m_async_pending;
+ std::set<WatchNotify::AsyncRequestId> m_async_progress;
Mutex m_aio_request_lock;
std::vector<AioRequest> m_aio_requests;
int lock();
void release_lock();
bool try_request_lock();
- void finalize_request_lock();
void finalize_header_update();
void schedule_retry_aio_requests(bool use_timer);
void schedule_cancel_async_requests();
void cancel_async_requests();
- void encode_async_request(uint64_t request_id, bufferlist &bl);
+ WatchNotify::ClientId get_client_id();
static int decode_response_code(bufferlist &bl);
void notify_released_lock();
void notify_request_lock();
int notify_lock_owner(bufferlist &bl, bufferlist &response);
- int notify_async_request(uint64_t async_request_id, bufferlist &in,
- ProgressContext& prog_ctx);
+ int notify_async_request(const WatchNotify::AsyncRequestId &id,
+ bufferlist &in, ProgressContext& prog_ctx);
void notify_request_leadership();
- void schedule_async_progress(const RemoteAsyncRequest &remote_async_request,
+ void schedule_async_progress(const WatchNotify::AsyncRequestId &id,
uint64_t offset, uint64_t total);
- int notify_async_progress(const RemoteAsyncRequest &remote_async_request,
+ int notify_async_progress(const WatchNotify::AsyncRequestId &id,
uint64_t offset, uint64_t total);
- void schedule_async_complete(const RemoteAsyncRequest &remote_async_request,
+ void schedule_async_complete(const WatchNotify::AsyncRequestId &id,
int r);
- int notify_async_complete(const RemoteAsyncRequest &remote_async_request,
+ int notify_async_complete(const WatchNotify::AsyncRequestId &id,
int r);
- void handle_header_update();
- void handle_acquired_lock();
- void handle_released_lock();
- void handle_request_lock(bufferlist *out);
-
- void handle_async_progress(bufferlist::iterator iter);
- void handle_async_complete(bufferlist::iterator iter);
- void handle_flatten(bufferlist::iterator iter, bufferlist *out);
- void handle_resize(bufferlist::iterator iter, bufferlist *out);
- void handle_snap_create(bufferlist::iterator iter, bufferlist *out);
- void handle_unknown_op(bufferlist *out);
+ void handle_payload(const WatchNotify::HeaderUpdatePayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::AcquiredLockPayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::ReleasedLockPayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::RequestLockPayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::AsyncProgressPayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::AsyncCompletePayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::FlattenPayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::ResizePayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::SnapCreatePayload& payload,
+ bufferlist *out);
+ void handle_payload(const WatchNotify::UnknownPayload& payload,
+ bufferlist *out);
+
void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
void handle_error(uint64_t cookie, int err);
void acknowledge_notify(uint64_t notify_id, uint64_t handle,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/WatchNotifyTypes.h"
+#include "include/assert.h"
+
+namespace librbd {
+namespace WatchNotify {
+
+namespace {
+
+class EncodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {}
+
+ template <typename Payload>
+ inline void operator()(const Payload &payload) const {
+ payload.encode(m_bl);
+ }
+
+private:
+ bufferlist &m_bl;
+};
+
+class DecodePayloadVisitor : public boost::static_visitor<void> {
+public:
+ DecodePayloadVisitor(__u8 version, bufferlist::iterator &iter)
+ : m_version(version), m_iter(iter) {}
+
+ template <typename Payload>
+ inline void operator()(Payload &payload) const {
+ payload.decode(m_version, m_iter);
+ }
+
+private:
+ __u8 m_version;
+ bufferlist::iterator &m_iter;
+};
+
+}
+
+void ClientId::encode(bufferlist &bl) const {
+ ::encode(gid, bl);
+ ::encode(handle, bl);
+}
+
+void ClientId::decode(bufferlist::iterator &iter) {
+ ::decode(gid, iter);
+ ::decode(handle, iter);
+}
+
+void AsyncRequestId::encode(bufferlist &bl) const {
+ ::encode(client_id, bl);
+ ::encode(request_id, bl);
+}
+
+void AsyncRequestId::decode(bufferlist::iterator &iter) {
+ ::decode(client_id, iter);
+ ::decode(request_id, iter);
+}
+
+void AcquiredLockPayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_ACQUIRED_LOCK), bl);
+}
+
+void AcquiredLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void ReleasedLockPayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_RELEASED_LOCK), bl);
+}
+
+void ReleasedLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void RequestLockPayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_REQUEST_LOCK), bl);
+}
+
+void RequestLockPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void HeaderUpdatePayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_HEADER_UPDATE), bl);
+}
+
+void HeaderUpdatePayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void AsyncProgressPayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_ASYNC_PROGRESS), bl);
+ ::encode(async_request_id, bl);
+ ::encode(offset, bl);
+ ::encode(total, bl);
+}
+
+void AsyncProgressPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(async_request_id, iter);
+ ::decode(offset, iter);
+ ::decode(total, iter);
+}
+
+void AsyncCompletePayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_ASYNC_COMPLETE), bl);
+ ::encode(async_request_id, bl);
+ ::encode(result, bl);
+}
+
+void AsyncCompletePayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(async_request_id, iter);
+ ::decode(result, iter);
+}
+
+void FlattenPayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_FLATTEN), bl);
+ ::encode(async_request_id, bl);
+}
+
+void FlattenPayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(async_request_id, iter);
+}
+
+void ResizePayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_RESIZE), bl);
+ ::encode(size, bl);
+ ::encode(async_request_id, bl);
+}
+
+void ResizePayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(size, iter);
+ ::decode(async_request_id, iter);
+}
+
+void SnapCreatePayload::encode(bufferlist &bl) const {
+ ::encode(static_cast<uint32_t>(NOTIFY_OP_SNAP_CREATE), bl);
+ ::encode(snap_name, bl);
+}
+
+void SnapCreatePayload::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(snap_name, iter);
+}
+
+void UnknownPayload::encode(bufferlist &bl) const {
+ assert(false);
+}
+
+void UnknownPayload::decode(__u8 version, bufferlist::iterator &iter) {
+}
+
+void NotifyMessage::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ boost::apply_visitor(EncodePayloadVisitor(bl), payload);
+ ENCODE_FINISH(bl);
+}
+
+void NotifyMessage::decode(bufferlist::iterator& iter) {
+ DECODE_START(1, iter);
+
+ uint32_t notify_op;
+ ::decode(notify_op, iter);
+
+ // select the correct payload variant based upon the encoded op
+ switch (notify_op) {
+ case NOTIFY_OP_ACQUIRED_LOCK:
+ payload = AcquiredLockPayload();
+ break;
+ case NOTIFY_OP_RELEASED_LOCK:
+ payload = ReleasedLockPayload();
+ break;
+ case NOTIFY_OP_REQUEST_LOCK:
+ payload = RequestLockPayload();
+ break;
+ case NOTIFY_OP_HEADER_UPDATE:
+ payload = HeaderUpdatePayload();
+ break;
+ case NOTIFY_OP_ASYNC_PROGRESS:
+ payload = AsyncProgressPayload();
+ break;
+ case NOTIFY_OP_ASYNC_COMPLETE:
+ payload = AsyncCompletePayload();
+ break;
+ case NOTIFY_OP_FLATTEN:
+ payload = FlattenPayload();
+ break;
+ case NOTIFY_OP_RESIZE:
+ payload = ResizePayload();
+ break;
+ case NOTIFY_OP_SNAP_CREATE:
+ payload = SnapCreatePayload();
+ break;
+ default:
+ payload = UnknownPayload();
+ break;
+ }
+
+ apply_visitor(DecodePayloadVisitor(struct_v, iter), payload);
+ DECODE_FINISH(iter);
+}
+
+void ResponseMessage::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(result, bl);
+ ENCODE_FINISH(bl);
+}
+
+void ResponseMessage::decode(bufferlist::iterator& iter) {
+ DECODE_START(1, iter);
+ ::decode(result, iter);
+ DECODE_FINISH(iter);
+}
+
+} // namespace WatchNotify
+} // namespace librbd
+
+std::ostream &operator<<(std::ostream &out,
+ const librbd::WatchNotify::AsyncRequestId &request) {
+ out << "[" << request.client_id.gid << "," << request.client_id.handle << ","
+ << request.request_id << "]";
+ return out;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_WATCH_NOTIFY_TYPES_H
+#define LIBRBD_WATCH_NOTIFY_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iostream>
+#include <string>
+#include <boost/variant.hpp>
+
+namespace librbd {
+namespace WatchNotify {
+
+struct ClientId {
+ uint64_t gid;
+ uint64_t handle;
+
+ ClientId() : gid(0), handle(0) {}
+ ClientId(uint64_t gid_, uint64_t handle_) : gid(gid_), handle(handle_) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+
+ inline bool operator==(const ClientId &rhs) const {
+ return (gid == rhs.gid && handle == rhs.handle);
+ }
+ inline bool operator!=(const ClientId &rhs) const {
+ return !(*this == rhs);
+ }
+ inline bool operator<(const ClientId &rhs) const {
+ if (gid != rhs.gid) {
+ return gid < rhs.gid;
+ } else {
+ return handle < rhs.handle;
+ }
+ }
+};
+
+struct AsyncRequestId {
+ ClientId client_id;
+ uint64_t request_id;
+
+ AsyncRequestId() : request_id() {}
+ AsyncRequestId(const ClientId &client_id_, uint64_t request_id_)
+ : client_id(client_id_), request_id(request_id_) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+
+ inline bool operator<(const AsyncRequestId &rhs) const {
+ if (client_id != rhs.client_id) {
+ return client_id < rhs.client_id;
+ } else {
+ return request_id < rhs.request_id;
+ }
+ }
+};
+
+enum NotifyOp {
+ NOTIFY_OP_ACQUIRED_LOCK = 0,
+ NOTIFY_OP_RELEASED_LOCK = 1,
+ NOTIFY_OP_REQUEST_LOCK = 2,
+ NOTIFY_OP_HEADER_UPDATE = 3,
+ NOTIFY_OP_ASYNC_PROGRESS = 4,
+ NOTIFY_OP_ASYNC_COMPLETE = 5,
+ NOTIFY_OP_FLATTEN = 6,
+ NOTIFY_OP_RESIZE = 7,
+ NOTIFY_OP_SNAP_CREATE = 8
+};
+
+struct AcquiredLockPayload {
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct ReleasedLockPayload {
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct RequestLockPayload {
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct HeaderUpdatePayload {
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct AsyncProgressPayload {
+ AsyncProgressPayload() : offset(0), total(0) {}
+ AsyncProgressPayload(const AsyncRequestId &id, uint64_t offset_, uint64_t total_)
+ : async_request_id(id), offset(offset_), total(total_) {}
+
+ AsyncRequestId async_request_id;
+ uint64_t offset;
+ uint64_t total;
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct AsyncCompletePayload {
+ AsyncCompletePayload() {}
+ AsyncCompletePayload(const AsyncRequestId &id, int r)
+ : async_request_id(id), result(r) {}
+
+ AsyncRequestId async_request_id;
+ int result;
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct FlattenPayload {
+ FlattenPayload() {}
+ FlattenPayload(const AsyncRequestId &id) : async_request_id(id) {}
+
+ AsyncRequestId async_request_id;
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct ResizePayload {
+ ResizePayload() : size(0) {}
+ ResizePayload(uint64_t size_, const AsyncRequestId &id)
+ : size(size_), async_request_id(id) {}
+
+ uint64_t size;
+ AsyncRequestId async_request_id;
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct SnapCreatePayload {
+ SnapCreatePayload() {}
+ SnapCreatePayload(const std::string &name) : snap_name(name) {}
+
+ std::string snap_name;
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+struct UnknownPayload {
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+};
+
+typedef boost::variant<AcquiredLockPayload,
+ ReleasedLockPayload,
+ RequestLockPayload,
+ HeaderUpdatePayload,
+ AsyncProgressPayload,
+ AsyncCompletePayload,
+ FlattenPayload,
+ ResizePayload,
+ SnapCreatePayload,
+ UnknownPayload> Payload;
+
+struct NotifyMessage {
+ NotifyMessage() : payload(UnknownPayload()) {}
+ NotifyMessage(const Payload &payload_) : payload(payload_) {}
+
+ Payload payload;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+};
+
+struct ResponseMessage {
+ ResponseMessage() : result(0) {}
+ ResponseMessage(int result_) : result(result_) {}
+
+ int result;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+};
+
+} // namespace WatchNotify
+} // namespace librbd
+
+std::ostream &operator<<(std::ostream &out,
+ const librbd::WatchNotify::AsyncRequestId &request);
+
+WRITE_CLASS_ENCODER(librbd::WatchNotify::ClientId);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::AsyncRequestId);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::NotifyMessage);
+WRITE_CLASS_ENCODER(librbd::WatchNotify::ResponseMessage);
+
+#endif // LIBRBD_WATCH_NOTIFY_TYPES_H