#include "include/encoding.h"
#include "include/stringify.h"
#include "common/errno.h"
+#include "common/WorkQueue.h"
#include <sstream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
return ctx.wait();
}
-void ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const HeaderUpdatePayload &payload,
+ C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " image header updated" << dendl;
Mutex::Locker lictx(m_image_ctx.refresh_lock);
++m_image_ctx.refresh_seq;
m_image_ctx.perfcounter->inc(l_librbd_notify);
+ return true;
}
-void ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const AcquiredLockPayload &payload,
+ C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " image exclusively locked announcement"
<< dendl;
if (payload.client_id.is_valid()) {
Mutex::Locker l(m_owner_client_id_lock);
if (payload.client_id == m_owner_client_id) {
// we already know that the remote client is the owner
- return;
+ return true;
}
set_owner_client_id(payload.client_id);
}
schedule_cancel_async_requests();
schedule_retry_aio_requests(false);
}
+ return true;
}
-void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
+ C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " exclusive lock released" << dendl;
if (payload.client_id.is_valid()) {
Mutex::Locker l(m_owner_client_id_lock);
ldout(m_image_ctx.cct, 10) << this << " unexpected owner: "
<< payload.client_id << " != "
<< m_owner_client_id << dendl;
- return;
+ return true;
}
set_owner_client_id(ClientId());
}
schedule_cancel_async_requests();
schedule_retry_aio_requests(false);
}
+ return true;
}
-void ImageWatcher::handle_payload(const RequestLockPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const RequestLockPayload &payload,
+ C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " exclusive lock requested" << dendl;
if (payload.client_id == get_client_id()) {
- return;
+ return true;
}
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
// need to send something back so the client can detect a missing leader
- ::encode(ResponseMessage(0), *out);
+ ::encode(ResponseMessage(0), ack_ctx->out);
{
Mutex::Locker l(m_owner_client_id_lock);
if (!m_owner_client_id.is_valid()) {
- return;
+ return true;
}
}
boost::bind(&ImageWatcher::notify_release_lock, this));
m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
}
+ return true;
}
-void ImageWatcher::handle_payload(const AsyncProgressPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const AsyncProgressPayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_async_request_lock);
std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
m_async_requests.find(payload.async_request_id);
schedule_async_request_timed_out(payload.async_request_id);
req_it->second.second->update_progress(payload.offset, payload.total);
}
+ return true;
}
-void ImageWatcher::handle_payload(const AsyncCompletePayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const AsyncCompletePayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_async_request_lock);
std::map<AsyncRequestId, AsyncRequest>::iterator req_it =
m_async_requests.find(payload.async_request_id);
<< payload.result << dendl;
req_it->second.first->complete(payload.result);
}
+ return true;
}
-void ImageWatcher::handle_payload(const FlattenPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const FlattenPayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
}
}
- ::encode(ResponseMessage(r), *out);
+ ::encode(ResponseMessage(r), ack_ctx->out);
}
+ return true;
}
-void ImageWatcher::handle_payload(const ResizePayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const ResizePayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
int r = 0;
}
}
- ::encode(ResponseMessage(r), *out);
+ ::encode(ResponseMessage(r), ack_ctx->out);
}
+ return true;
}
-void ImageWatcher::handle_payload(const SnapCreatePayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const SnapCreatePayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
<< payload.snap_name << dendl;
- int r = librbd::snap_create_helper(&m_image_ctx, NULL,
- payload.snap_name.c_str());
- ::encode(ResponseMessage(r), *out);
+ // execute outside of librados AIO thread
+ m_image_ctx.op_work_queue->queue(new C_SnapCreateResponseMessage(
+ this, ack_ctx, payload.snap_name), 0);
+ return false;
}
+ return true;
}
-void ImageWatcher::handle_payload(const UnknownPayload &payload,
- bufferlist *out) {
+bool ImageWatcher::handle_payload(const UnknownPayload &payload,
+ C_NotifyAck *ack_ctx) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
- ::encode(ResponseMessage(-EOPNOTSUPP), *out);
+ ::encode(ResponseMessage(-EOPNOTSUPP), ack_ctx->out);
}
+ return true;
}
void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
m_image_watcher.schedule_async_complete(m_async_request_id, r);
}
+ImageWatcher::C_NotifyAck::C_NotifyAck(ImageWatcher *image_watcher,
+ uint64_t notify_id, uint64_t handle)
+ : image_watcher(image_watcher), notify_id(notify_id), handle(handle) {
+ CephContext *cct = image_watcher->m_image_ctx.cct;
+ ldout(cct, 10) << this << " C_NotifyAck start: id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+}
+
+void ImageWatcher::C_NotifyAck::finish(int r) {
+ assert(r == 0);
+ CephContext *cct = image_watcher->m_image_ctx.cct;
+ ldout(cct, 10) << this << " C_NotifyAck finish: id=" << notify_id << ", "
+ << "handle=" << handle << dendl;
+
+ image_watcher->acknowledge_notify(notify_id, handle, out);
+}
+
+void ImageWatcher::C_SnapCreateResponseMessage::finish(int r) {
+ CephContext *cct = notify_ack->image_watcher->m_image_ctx.cct;
+ ldout(cct, 10) << this << " C_SnapCreateResponseMessage: r=" << r << dendl;
+
+ RWLock::RLocker owner_locker(image_watcher->m_image_ctx.owner_lock);
+ if (image_watcher->m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
+ r = librbd::snap_create_helper(&image_watcher->m_image_ctx, NULL,
+ snap_name.c_str());
+ ::encode(ResponseMessage(r), notify_ack->out);
+ }
+ notify_ack->complete(0);
}
+
+} // namespace librbd
RemoteProgressContext *m_prog_ctx;
};
+ struct C_NotifyAck : public Context {
+ ImageWatcher *image_watcher;
+ uint64_t notify_id;
+ uint64_t handle;
+ bufferlist out;
+
+ C_NotifyAck(ImageWatcher *image_watcher, uint64_t notify_id,
+ uint64_t handle);
+ virtual void finish(int r);
+ };
+
+ struct C_SnapCreateResponseMessage : public Context {
+ ImageWatcher *image_watcher;
+ C_NotifyAck *notify_ack;
+ std::string snap_name;
+
+ C_SnapCreateResponseMessage(ImageWatcher *image_watcher,
+ C_NotifyAck *notify_ack,
+ const std::string &snap_name)
+ : image_watcher(image_watcher), notify_ack(notify_ack),
+ snap_name(snap_name) {
+ }
+ virtual void finish(int r);
+ };
+
struct HandlePayloadVisitor : public boost::static_visitor<void> {
ImageWatcher *image_watcher;
uint64_t notify_id;
{
}
- inline void operator()(const WatchNotify::HeaderUpdatePayload &payload) const {
- bufferlist out;
- image_watcher->handle_payload(payload, &out);
- image_watcher->acknowledge_notify(notify_id, handle, out);
- }
-
template <typename Payload>
inline void operator()(const Payload &payload) const {
- bufferlist out;
- image_watcher->handle_payload(payload, &out);
- image_watcher->acknowledge_notify(notify_id, handle, out);
+ C_NotifyAck *ctx = new C_NotifyAck(image_watcher, notify_id, handle);
+ if (image_watcher->handle_payload(payload, ctx)) {
+ ctx->complete(0);
+ }
}
};
int notify_async_complete(const WatchNotify::AsyncRequestId &id,
int r);
- void handle_payload(const WatchNotify::HeaderUpdatePayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::AcquiredLockPayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::ReleasedLockPayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::RequestLockPayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::AsyncProgressPayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::AsyncCompletePayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::FlattenPayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::ResizePayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::SnapCreatePayload& payload,
- bufferlist *out);
- void handle_payload(const WatchNotify::UnknownPayload& payload,
- bufferlist *out);
+ bool handle_payload(const WatchNotify::HeaderUpdatePayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::AcquiredLockPayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::ReleasedLockPayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::RequestLockPayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::AsyncProgressPayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::AsyncCompletePayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::FlattenPayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::ResizePayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::SnapCreatePayload& payload,
+ C_NotifyAck *ctx);
+ bool handle_payload(const WatchNotify::UnknownPayload& payload,
+ C_NotifyAck *ctx);
void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
void handle_error(uint64_t cookie, int err);