#include <sstream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
+#include <boost/scope_exit.hpp>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::ImageWatcher: "
+static void decode(librbd::RemoteAsyncRequest &request,
+ bufferlist::iterator &iter) {
+ ::decode(request.gid, iter);
+ ::decode(request.handle, iter);
+ ::decode(request.request_id, iter);
+}
+
+static void encode(const librbd::RemoteAsyncRequest &request, bufferlist &bl) {
+ ::encode(request.gid, bl);
+ ::encode(request.handle, bl);
+ ::encode(request.request_id, bl);
+}
+
+static std::ostream &operator<<(std::ostream &out,
+ const librbd::RemoteAsyncRequest &request) {
+ out << "[" << request.gid << "," << request.handle << ","
+ << request.request_id << "]";
+ return out;
+}
+
namespace librbd {
static const std::string WATCHER_LOCK_TAG = "internal";
static const double RETRY_DELAY_SECONDS = 1.0;
enum {
- NOTIFY_OP_ACQUIRED_LOCK = 0,
- NOTIFY_OP_RELEASED_LOCK = 1,
- NOTIFY_OP_REQUEST_LOCK = 2,
- NOTIFY_OP_HEADER_UPDATE = 3
+ NOTIFY_OP_ACQUIRED_LOCK = 0,
+ NOTIFY_OP_RELEASED_LOCK = 1,
+ NOTIFY_OP_REQUEST_LOCK = 2,
+ NOTIFY_OP_HEADER_UPDATE = 3,
+ NOTIFY_OP_ASYNC_PROGRESS = 4,
+ NOTIFY_OP_ASYNC_COMPLETE = 5,
+ NOTIFY_OP_FLATTEN = 6,
+ NOTIFY_OP_RESIZE = 7,
+ NOTIFY_OP_SNAP_CREATE = 8
+};
+
+class RemoteProgressContext : public ProgressContext {
+public:
+ RemoteProgressContext(ImageWatcher &image_watcher, Finisher &finisher,
+ const RemoteAsyncRequest &remote_async_request)
+ : m_image_watcher(image_watcher), m_finisher(finisher),
+ m_remote_async_request(remote_async_request)
+ {
+ }
+
+ virtual int update_progress(uint64_t offset, uint64_t total) {
+ // TODO: JD throttle notify updates(?)
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_progress,
+ &m_image_watcher, m_remote_async_request, offset, total));
+ m_finisher.queue(ctx);
+ return 0;
+ }
+
+private:
+ ImageWatcher &m_image_watcher;
+ Finisher &m_finisher;
+ RemoteAsyncRequest m_remote_async_request;
+};
+
+class RemoteContext : public Context {
+public:
+ RemoteContext(ImageWatcher &image_watcher, Finisher &finisher,
+ const RemoteAsyncRequest &remote_async_request,
+ RemoteProgressContext *prog_ctx)
+ : m_image_watcher(image_watcher), m_finisher(finisher),
+ m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
+ {
+ }
+
+ ~RemoteContext() {
+ delete m_prog_ctx;
+ }
+
+ virtual void finish(int r) {
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_complete,
+ &m_image_watcher, m_remote_async_request, r));
+ m_finisher.queue(ctx);
+ }
+
+private:
+ ImageWatcher &m_image_watcher;
+ Finisher &m_finisher;
+ RemoteAsyncRequest m_remote_async_request;
+ RemoteProgressContext *m_prog_ctx;
};
ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
m_watch_lock("librbd::ImageWatcher::m_watch_lock"), m_watch_error(0),
+ m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
+ m_async_request_id(0),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
m_retrying_aio_requests(false), m_retry_aio_context(NULL)
{
assert(m_aio_requests.empty());
}
+ cancel_async_requests(-ESHUTDOWN);
+
RWLock::WLocker l(m_watch_lock);
return m_image_ctx.md_ctx.unwatch2(m_handle);
}
unlock();
}
+int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
+ uint64_t offset, uint64_t total) {
+ ldout(m_image_ctx.cct, 20) << "remote async request progress: "
+ << request << " @ " << offset
+ << "/" << total << dendl;
+
+ bufferlist bl;
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+ ::encode(NOTIFY_OP_ASYNC_PROGRESS, bl);
+ ::encode(request, bl);
+ ::encode(offset, bl);
+ ::encode(total, bl);
+ ENCODE_FINISH(bl);
+
+ m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+ return 0;
+}
+
+int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
+ int r) {
+ ldout(m_image_ctx.cct, 20) << "remote async request finished: "
+ << request << " = " << r << dendl;
+
+ bufferlist bl;
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+ ::encode(NOTIFY_OP_ASYNC_COMPLETE, bl);
+ ::encode(request, bl);
+ ::encode(r, bl);
+ ENCODE_FINISH(bl);
+
+ librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid,
+ &m_image_ctx);
+ m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+ return 0;
+}
+
+int ImageWatcher::notify_flatten(ProgressContext &prog_ctx) {
+ bufferlist bl;
+ uint64_t async_request_id;
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+ ::encode(NOTIFY_OP_FLATTEN, bl);
+ async_request_id = encode_async_request(bl);
+ ENCODE_FINISH(bl);
+
+ return notify_async_request(async_request_id, bl, prog_ctx);
+}
+
+int ImageWatcher::notify_resize(uint64_t size, ProgressContext &prog_ctx) {
+ bufferlist bl;
+ uint64_t async_request_id;
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+ ::encode(NOTIFY_OP_RESIZE, bl);
+ ::encode(size, bl);
+ async_request_id = encode_async_request(bl);
+ ENCODE_FINISH(bl);
+
+ return notify_async_request(async_request_id, bl, prog_ctx);
+}
+
+int ImageWatcher::notify_snap_create(const std::string &snap_name) {
+ bufferlist bl;
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
+ ::encode(NOTIFY_OP_SNAP_CREATE, bl);
+ ::encode(snap_name, bl);
+ ENCODE_FINISH(bl);
+
+ bufferlist response;
+ int r = notify_lock_owner(bl, response);
+ if (r < 0) {
+ return r;
+ }
+ return decode_response_code(response);
+}
+
void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid)
{
m_aio_request_cond.Signal();
}
+void ImageWatcher::cancel_aio_requests(int result) {
+ Mutex::Locker l(m_aio_request_lock);
+ for (std::vector<AioRequest>::iterator iter = m_aio_requests.begin();
+ iter != m_aio_requests.end(); ++iter) {
+ AioCompletion *c = iter->second;
+ c->get();
+ c->lock.Lock();
+ c->rval = result;
+ c->lock.Unlock();
+ c->finish_adding_requests(m_image_ctx.cct);
+ c->put();
+ }
+ m_aio_requests.clear();
+ m_aio_request_cond.Signal();
+}
+
+void ImageWatcher::cancel_async_requests(int result) {
+ RWLock::WLocker l(m_async_request_lock);
+ for (std::map<uint64_t, AsyncRequest>::iterator iter = m_async_requests.begin();
+ iter != m_async_requests.end(); ++iter) {
+ iter->second.first->complete(result);
+ }
+ m_async_requests.clear();
+}
+
+uint64_t ImageWatcher::encode_async_request(bufferlist &bl) {
+ RWLock::WLocker l(m_async_request_lock);
+ ++m_async_request_id;
+
+ RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(),
+ m_handle, m_async_request_id);
+ ::encode(request, bl);
+
+ ldout(m_image_ctx.cct, 20) << "async request: " << request << dendl;
+ return m_async_request_id;
+}
+
int ImageWatcher::decode_response_code(bufferlist &bl) {
int r;
bufferlist::iterator iter = bl.begin();
return 0;
}
+int ImageWatcher::notify_async_request(uint64_t async_request_id,
+ bufferlist &in,
+ ProgressContext& prog_ctx) {
+ Mutex my_lock("librbd::ImageWatcher::notify_async_request::my_lock");
+ Cond cond;
+ bool done = false;
+ int r;
+ Context *ctx = new C_SafeCond(&my_lock, &cond, &done, &r);
+
+ {
+ RWLock::WLocker l(m_async_request_lock);
+ m_async_requests[async_request_id] = AsyncRequest(ctx, &prog_ctx);
+ }
+
+ BOOST_SCOPE_EXIT( (ctx)(async_request_id)(&m_async_requests)
+ (&m_async_request_lock)(&done) ) {
+ RWLock::WLocker l(m_async_request_lock);
+ m_async_requests.erase(async_request_id);
+ if (!done) {
+ delete ctx;
+ }
+ } BOOST_SCOPE_EXIT_END
+
+ bufferlist response;
+ r = notify_lock_owner(in, response);
+ if (r < 0) {
+ return r;
+ }
+
+ my_lock.Lock();
+ while (!done) {
+ cond.Wait(my_lock);
+ }
+ my_lock.Unlock();
+ return r;
+}
+
void ImageWatcher::handle_header_update() {
ldout(m_image_ctx.cct, 1) << "image header updated" << dendl;
void ImageWatcher::handle_acquired_lock() {
ldout(m_image_ctx.cct, 1) << "image exclusively locked announcement" << dendl;
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::cancel_async_requests, this, -ERESTART));
+ m_finisher->queue(ctx);
}
void ImageWatcher::handle_released_lock() {
}
}
+void ImageWatcher::handle_async_progress(bufferlist::iterator iter) {
+ RemoteAsyncRequest request;
+ ::decode(request, iter);
+
+ uint64_t offset;
+ uint64_t total;
+ ::decode(offset, iter);
+ ::decode(total, iter);
+ if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
+ request.handle == m_handle) {
+ RWLock::RLocker l(m_async_request_lock);
+ std::map<uint64_t, AsyncRequest>::iterator iter =
+ m_async_requests.find(request.request_id);
+ if (iter != m_async_requests.end()) {
+ ldout(m_image_ctx.cct, 20) << "request progress: "
+ << request << " @ " << offset
+ << "/" << total << dendl;
+ iter->second.second->update_progress(offset, total);
+ }
+ }
+}
+
+void ImageWatcher::handle_async_complete(bufferlist::iterator iter) {
+ RemoteAsyncRequest request;
+ ::decode(request, iter);
+
+ int r;
+ ::decode(r, iter);
+ if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
+ request.handle == m_handle) {
+ Context *ctx = NULL;
+ {
+ RWLock::RLocker l(m_async_request_lock);
+ std::map<uint64_t, AsyncRequest>::iterator iter =
+ m_async_requests.find(request.request_id);
+ if (iter != m_async_requests.end()) {
+ ctx = iter->second.first;
+ }
+ }
+ if (ctx != NULL) {
+ ldout(m_image_ctx.cct, 20) << "request finished: "
+ << request << " = " << r << dendl;
+ ctx->complete(r);
+ }
+ }
+}
+
+void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (is_lock_owner()) {
+ RemoteAsyncRequest request;
+ ::decode(request, iter);
+
+ RemoteProgressContext *prog_ctx =
+ new RemoteProgressContext(*this, *m_finisher, request);
+ RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
+ prog_ctx);
+
+ ldout(m_image_ctx.cct, 20) << "remote flatten request: " << request << dendl;
+
+ int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
+ if (r < 0) {
+ delete ctx;
+ }
+
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
+ ::encode(r, *out);
+ ENCODE_FINISH(*out);
+ }
+}
+
+void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (is_lock_owner()) {
+ uint64_t size;
+ ::decode(size, iter);
+
+ RemoteAsyncRequest request;
+ ::decode(request, iter);
+
+ RemoteProgressContext *prog_ctx =
+ new RemoteProgressContext(*this, *m_finisher, request);
+ RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
+ prog_ctx);
+
+ ldout(m_image_ctx.cct, 20) << "remote resize request: " << request
+ << " " << size << dendl;
+
+ int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
+ if (r < 0) {
+ delete ctx;
+ }
+
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
+ ::encode(r, *out);
+ ENCODE_FINISH(*out);
+ }
+}
+
+void ImageWatcher::handle_snap_create(bufferlist::iterator iter, bufferlist *out) {
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (is_lock_owner()) {
+ std::string snap_name;
+ ::decode(snap_name, iter);
+
+ ldout(m_image_ctx.cct, 20) << "remote snap_create request: " << snap_name << dendl;
+
+ int r = librbd::snap_create(&m_image_ctx, snap_name.c_str());
+ ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
+ ::encode(r, *out);
+ ENCODE_FINISH(*out);
+ }
+}
+
void ImageWatcher::handle_unknown_op(bufferlist *out) {
RWLock::RLocker l(m_image_ctx.owner_lock);
if (is_lock_owner()) {
acknowledge_notify(notify_id, handle, out);
handle_header_update();
break;
+ case NOTIFY_OP_ASYNC_PROGRESS:
+ acknowledge_notify(notify_id, handle, out);
+ handle_async_progress(iter);
+ break;
+ case NOTIFY_OP_ASYNC_COMPLETE:
+ acknowledge_notify(notify_id, handle, out);
+ handle_async_complete(iter);
+ break;
// lock owner-only ops
case NOTIFY_OP_REQUEST_LOCK:
handle_request_lock(&out);
acknowledge_notify(notify_id, handle, out);
break;
+ case NOTIFY_OP_FLATTEN:
+ handle_flatten(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ case NOTIFY_OP_RESIZE:
+ handle_resize(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ case NOTIFY_OP_SNAP_CREATE:
+ handle_snap_create(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
default:
handle_unknown_op(&out);
lderr(m_image_ctx.cct) << "failed to re-register image watch: "
<< cpp_strerror(m_watch_error) << dendl;
schedule_retry_aio_requests();
+ cancel_async_requests(m_watch_error);
return;
}
}
class AioCompletion;
class ImageCtx;
+ class ProgressContext;
+
+ struct RemoteAsyncRequest {
+ uint64_t gid;
+ uint64_t handle;
+ uint64_t request_id;
+
+ RemoteAsyncRequest() : gid(), handle(), request_id() {}
+ RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
+ : gid(gid_), handle(handle_), request_id(request_id_) {}
+ };
class ImageWatcher {
public:
+
ImageWatcher(ImageCtx& image_ctx);
~ImageWatcher();
AioCompletion* c);
int unlock();
+ int notify_async_progress(const RemoteAsyncRequest &remote_async_request,
+ uint64_t offset, uint64_t total);
+ int notify_async_complete(const RemoteAsyncRequest &remote_async_request,
+ int r);
+
+ int notify_flatten(ProgressContext &prog_ctx);
+ int notify_resize(uint64_t size, ProgressContext &prog_ctx);
+ int notify_snap_create(const std::string &snap_name);
+
static void notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid);
LOCK_OWNER_STATE_RELEASING
};
+ typedef std::pair<Context *, ProgressContext *> AsyncRequest;
typedef std::pair<boost::function<int(AioCompletion *)>,
AioCompletion *> AioRequest;
RWLock m_watch_lock;
int m_watch_error;
+ RWLock m_async_request_lock;
+ uint64_t m_async_request_id;
+ std::map<uint64_t, AsyncRequest> m_async_requests;
+
Mutex m_aio_request_lock;
Cond m_aio_request_cond;
std::vector<AioRequest> m_aio_requests;
void cancel_retry_aio_requests();
void finalize_retry_aio_requests();
void retry_aio_requests();
+
+ void cancel_aio_requests(int result);
+ void cancel_async_requests(int result);
+
+ uint64_t encode_async_request(bufferlist &bl);
static int decode_response_code(bufferlist &bl);
void notify_released_lock();
void notify_request_lock();
int notify_lock_owner(bufferlist &bl, bufferlist &response);
+ int notify_async_request(uint64_t async_request_id, bufferlist &in,
+ ProgressContext& prog_ctx);
+ void notify_request_leadership();
+ int notify_leader(bufferlist &bl, bufferlist &response);
+
void handle_header_update();
void handle_acquired_lock();
void handle_released_lock();
void handle_request_lock(bufferlist *out);
+
+ void handle_async_progress(bufferlist::iterator iter);
+ void handle_async_complete(bufferlist::iterator iter);
+ void handle_flatten(bufferlist::iterator iter, bufferlist *out);
+ void handle_resize(bufferlist::iterator iter, bufferlist *out);
+ void handle_snap_create(bufferlist::iterator iter, bufferlist *out);
void handle_unknown_op(bufferlist *out);
void handle_notify(uint64_t notify_id, uint64_t handle, bufferlist &bl);
void handle_error(uint64_t cookie, int err);