NOTIFY_OP_SNAP_CREATE = 8
};
-class RemoteProgressContext : public ProgressContext {
-public:
- RemoteProgressContext(ImageWatcher &image_watcher, Finisher &finisher,
- const RemoteAsyncRequest &remote_async_request)
- : m_image_watcher(image_watcher), m_finisher(finisher),
- m_remote_async_request(remote_async_request)
- {
- }
-
- virtual int update_progress(uint64_t offset, uint64_t total) {
- // TODO: JD throttle notify updates(?)
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_async_progress,
- &m_image_watcher, m_remote_async_request, offset, total));
- m_finisher.queue(ctx);
- return 0;
- }
-
-private:
- ImageWatcher &m_image_watcher;
- Finisher &m_finisher;
- RemoteAsyncRequest m_remote_async_request;
-};
-
-class RemoteContext : public Context {
-public:
- RemoteContext(ImageWatcher &image_watcher, Finisher &finisher,
- const RemoteAsyncRequest &remote_async_request,
- RemoteProgressContext *prog_ctx)
- : m_image_watcher(image_watcher), m_finisher(finisher),
- m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
- {
- }
-
- ~RemoteContext() {
- delete m_prog_ctx;
- }
-
- virtual void finish(int r) {
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_async_complete,
- &m_image_watcher, m_remote_async_request, r));
- m_finisher.queue(ctx);
- }
-
-private:
- ImageWatcher &m_image_watcher;
- Finisher &m_finisher;
- RemoteAsyncRequest m_remote_async_request;
- RemoteProgressContext *m_prog_ctx;
-};
-
ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
: m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
ENCODE_FINISH(bl);
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+
+ RWLock::WLocker l(m_async_request_lock);
+ m_async_progress.erase(request);
return 0;
}
return r;
}
+void ImageWatcher::schedule_update_progress(
+ const RemoteAsyncRequest &remote_async_request,
+ uint64_t offset, uint64_t total) {
+ RWLock::WLocker l(m_async_request_lock);
+ if (m_async_progress.count(remote_async_request) == 0) {
+ m_async_progress.insert(remote_async_request);
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_progress,
+ this, remote_async_request, offset, total));
+ m_finisher->queue(ctx);
+ }
+}
+
void ImageWatcher::handle_header_update() {
ldout(m_image_ctx.cct, 1) << "image header updated" << dendl;
RemoteAsyncRequest request;
::decode(request, iter);
- RemoteProgressContext *prog_ctx =
- new RemoteProgressContext(*this, *m_finisher, request);
- RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
- prog_ctx);
+ RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
+ request);
+ RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
ldout(m_image_ctx.cct, 20) << "remote flatten request: " << request << dendl;
-
int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
if (r < 0) {
delete ctx;
RemoteAsyncRequest request;
::decode(request, iter);
- RemoteProgressContext *prog_ctx =
- new RemoteProgressContext(*this, *m_finisher, request);
- RemoteContext *ctx = new RemoteContext(*this, *m_finisher, request,
- prog_ctx);
+ RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
+ request);
+ RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
ldout(m_image_ctx.cct, 20) << "remote resize request: " << request
<< " " << size << dendl;
-
int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
if (r < 0) {
delete ctx;
image_watcher.handle_error(handle, err);
}
+void ImageWatcher::RemoteContext::finish(int r) {
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_complete,
+ &m_image_watcher, m_remote_async_request, r));
+ m_image_watcher.m_finisher->queue(ctx);
+}
+
}
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.hpp"
+#include <set>
#include <string>
#include <utility>
#include <vector>
class AioCompletion;
class ImageCtx;
- class ProgressContext;
struct RemoteAsyncRequest {
uint64_t gid;
RemoteAsyncRequest() : gid(), handle(), request_id() {}
RemoteAsyncRequest(uint64_t gid_, uint64_t handle_, uint64_t request_id_)
: gid(gid_), handle(handle_), request_id(request_id_) {}
+
+ inline bool operator<(const RemoteAsyncRequest &rhs) const {
+ if (gid != rhs.gid) {
+ return gid < rhs.gid;
+ } else if (handle != rhs.handle) {
+ return handle < rhs.handle;
+ } else {
+ return request_id < request_id;
+ }
+ }
};
class ImageWatcher {
public:
-
ImageWatcher(ImageCtx& image_ctx);
~ImageWatcher();
virtual void handle_error(uint64_t handle, int err);
};
+ class RemoteProgressContext : public ProgressContext {
+ public:
+ RemoteProgressContext(ImageWatcher &image_watcher,
+ const RemoteAsyncRequest &remote_async_request)
+ : m_image_watcher(image_watcher),
+ m_remote_async_request(remote_async_request)
+ {
+ }
+
+ virtual int update_progress(uint64_t offset, uint64_t total) {
+ m_image_watcher.schedule_update_progress(
+ m_remote_async_request, offset, total);
+ return 0;
+ }
+
+ private:
+ ImageWatcher &m_image_watcher;
+ RemoteAsyncRequest m_remote_async_request;
+ };
+
+ class RemoteContext : public Context {
+ public:
+ RemoteContext(ImageWatcher &image_watcher,
+ const RemoteAsyncRequest &remote_async_request,
+ RemoteProgressContext *prog_ctx)
+ : m_image_watcher(image_watcher),
+ m_remote_async_request(remote_async_request), m_prog_ctx(prog_ctx)
+ {
+ }
+
+ ~RemoteContext() {
+ delete m_prog_ctx;
+ }
+
+ virtual void finish(int r);
+
+ private:
+ ImageWatcher &m_image_watcher;
+ RemoteAsyncRequest m_remote_async_request;
+ RemoteProgressContext *m_prog_ctx;
+ };
+
ImageCtx &m_image_ctx;
WatchCtx m_watch_ctx;
RWLock m_async_request_lock;
uint64_t m_async_request_id;
std::map<uint64_t, AsyncRequest> m_async_requests;
+ std::set<RemoteAsyncRequest> m_async_progress;
Mutex m_aio_request_lock;
Cond m_aio_request_cond;
ProgressContext& prog_ctx);
void notify_request_leadership();
+ void schedule_update_progress(const RemoteAsyncRequest &remote_async_request,
+ uint64_t offset, uint64_t total);
+
void handle_header_update();
void handle_acquired_lock();
void handle_released_lock();