m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
- m_async_request_id(0),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
m_retry_aio_context(NULL)
{
}
}
- {
- // aio operations will be retried once the the watch is re-established
- RWLock::RLocker l(m_watch_lock);
- if (m_watch_state == WATCH_STATE_ERROR) {
- return 0;
- }
- }
+ RWLock::RLocker l(m_watch_lock);
+ if (m_watch_state == WATCH_STATE_REGISTERED) {
+ ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl;
- // run notify request in finisher to avoid blocking aio path
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_request_lock, this));
- m_finisher->queue(ctx);
- ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl;
+ // run notify request in finisher to avoid blocking aio path
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_request_lock, this));
+ m_finisher->queue(ctx);
+ }
return 0;
}
}
if (owned_lock) {
retry_aio_requests();
-
} else {
- schedule_retry_aio_requests();
+ schedule_retry_aio_requests(true);
}
}
return 0;
}
+void ImageWatcher::schedule_async_complete(const RemoteAsyncRequest &request,
+ int r) {
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
+ m_finisher->queue(ctx);
+}
+
int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
int r) {
ldout(m_image_ctx.cct, 20) << "remote async request finished: "
librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid,
&m_image_ctx);
- m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+ int ret = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl,
+ NOTIFY_TIMEOUT, NULL);
+ if (ret < 0) {
+ lderr(m_image_ctx.cct) << "failed to notify async complete: "
+ << cpp_strerror(ret) << dendl;
+ if (ret == -ETIMEDOUT) {
+ schedule_async_complete(request, r);
+ }
+ } else {
+ RWLock::WLocker l(m_async_request_lock);
+ m_async_pending.erase(request);
+ }
return 0;
}
-int ImageWatcher::notify_flatten(ProgressContext &prog_ctx) {
+int ImageWatcher::notify_flatten(uint64_t request_id, ProgressContext &prog_ctx) {
assert(m_image_ctx.owner_lock.is_locked());
assert(!is_lock_owner());
bufferlist bl;
- uint64_t async_request_id;
ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
::encode(NOTIFY_OP_FLATTEN, bl);
- async_request_id = encode_async_request(bl);
+ encode_async_request(request_id, bl);
ENCODE_FINISH(bl);
- return notify_async_request(async_request_id, bl, prog_ctx);
+ return notify_async_request(request_id, bl, prog_ctx);
}
-int ImageWatcher::notify_resize(uint64_t size, ProgressContext &prog_ctx) {
+int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size,
+ ProgressContext &prog_ctx) {
assert(m_image_ctx.owner_lock.is_locked());
assert(!is_lock_owner());
bufferlist bl;
- uint64_t async_request_id;
ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
::encode(NOTIFY_OP_RESIZE, bl);
::encode(size, bl);
- async_request_id = encode_async_request(bl);
+ encode_async_request(request_id, bl);
ENCODE_FINISH(bl);
- return notify_async_request(async_request_id, bl, prog_ctx);
+ return notify_async_request(request_id, bl, prog_ctx);
}
int ImageWatcher::notify_snap_create(const std::string &snap_name) {
return true;
}
-void ImageWatcher::schedule_retry_aio_requests() {
+void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
Mutex::Locker l(m_timer_lock);
- if (m_retry_aio_context == NULL) {
- m_retry_aio_context = new FunctionContext(boost::bind(
- &ImageWatcher::finalize_retry_aio_requests, this));
- m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
+ if (use_timer) {
+ if (m_retry_aio_context == NULL) {
+ m_retry_aio_context = new FunctionContext(boost::bind(
+ &ImageWatcher::finalize_retry_aio_requests, this));
+ m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
+ }
+ } else {
+ m_timer->cancel_event(m_retry_aio_context);
+ m_retry_aio_context = NULL;
+
+ Context *ctx = new FunctionContext(boost::bind(
+ &ImageWatcher::retry_aio_requests, this));
+ m_finisher->queue(ctx);
}
}
}
}
+void ImageWatcher::schedule_cancel_async_requests() {
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::cancel_async_requests, this));
+ m_finisher->queue(ctx);
+}
+
void ImageWatcher::cancel_async_requests() {
RWLock::WLocker l(m_async_request_lock);
for (std::map<uint64_t, AsyncRequest>::iterator iter = m_async_requests.begin();
m_async_requests.clear();
}
-uint64_t ImageWatcher::encode_async_request(bufferlist &bl) {
- RWLock::WLocker l(m_async_request_lock);
- ++m_async_request_id;
-
+void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) {
RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(),
- m_watch_handle, m_async_request_id);
+ reinterpret_cast<uint64_t>(this), request_id);
::encode(request, bl);
ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl;
- return m_async_request_id;
}
int ImageWatcher::decode_response_code(bufferlist &bl) {
} else if (r < 0) {
lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r)
<< dendl;
- schedule_retry_aio_requests();
+ schedule_retry_aio_requests(true);
}
}
return r;
}
-void ImageWatcher::schedule_update_progress(
- const RemoteAsyncRequest &remote_async_request,
- uint64_t offset, uint64_t total) {
+void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request,
+ uint64_t offset, uint64_t total) {
RWLock::WLocker l(m_async_request_lock);
- if (m_async_progress.count(remote_async_request) == 0) {
- m_async_progress.insert(remote_async_request);
+ if (m_async_progress.count(request) == 0) {
+ m_async_progress.insert(request);
FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_async_progress,
- this, remote_async_request, offset, total));
+ boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
+ total));
m_finisher->queue(ctx);
}
}
void ImageWatcher::handle_acquired_lock() {
ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::cancel_async_requests, this));
- m_finisher->queue(ctx);
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+ schedule_cancel_async_requests();
+ }
}
void ImageWatcher::handle_released_lock() {
ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
RWLock::RLocker l(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::cancel_async_requests, this));
- m_finisher->queue(ctx);
-
- Mutex::Locker l(m_aio_request_lock);
- if (!m_aio_requests.empty()) {
- ldout(m_image_ctx.cct, 20) << "queuing lock request" << dendl;
- FunctionContext *req_ctx = new FunctionContext(
- boost::bind(&ImageWatcher::finalize_request_lock, this));
- m_finisher->queue(req_ctx);
- }
+ schedule_cancel_async_requests();
+ schedule_retry_aio_requests(false);
}
}
void ImageWatcher::handle_request_lock(bufferlist *out) {
RWLock::WLocker l(m_image_ctx.owner_lock);
- if (is_lock_owner()) {
+ if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
// need to send something back so the client can detect a missing leader
::decode(offset, iter);
::decode(total, iter);
if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
- request.handle == m_watch_handle) {
+ request.handle == reinterpret_cast<uint64_t>(this)) {
RWLock::RLocker l(m_async_request_lock);
std::map<uint64_t, AsyncRequest>::iterator iter =
m_async_requests.find(request.request_id);
int r;
::decode(r, iter);
if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
- request.handle == m_watch_handle) {
+ request.handle == reinterpret_cast<uint64_t>(this)) {
Context *ctx = NULL;
{
RWLock::RLocker l(m_async_request_lock);
RemoteAsyncRequest request;
::decode(request, iter);
- RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
- request);
- RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+ int r = 0;
+ RWLock::WLocker l(m_async_request_lock);
+ if (m_async_pending.count(request) == 0) {
+ RemoteProgressContext *prog_ctx =
+ new RemoteProgressContext(*this, request);
+ RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
- ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
- int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
- if (r < 0) {
- delete ctx;
+ ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
+ r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
+ if (r < 0) {
+ delete ctx;
+ lderr(m_image_ctx.cct) << "remove flatten request failed: "
+ << cpp_strerror(r) << dendl;
+ } else {
+ m_async_pending.insert(request);
+ }
}
ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
RemoteAsyncRequest request;
::decode(request, iter);
- RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
- request);
- RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
-
- ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
- << " " << size << dendl;
- int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
- if (r < 0) {
- delete ctx;
+ int r = 0;
+ RWLock::WLocker l(m_async_request_lock);
+ if (m_async_pending.count(request) == 0) {
+ RemoteProgressContext *prog_ctx =
+ new RemoteProgressContext(*this, request);
+ RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+
+ ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
+ << " " << size << dendl;
+ r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "remove resize request failed: "
+ << cpp_strerror(r) << dendl;
+ delete ctx;
+ } else {
+ m_async_pending.insert(request);
+ }
}
ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
return;
}
+ bool loopback;
+ {
+ RWLock::RLocker l(m_watch_lock);
+ loopback = (m_watch_handle == handle);
+ }
+
bufferlist::iterator iter = bl.begin();
try {
DECODE_START(NOTIFY_VERSION, iter);
::decode(op, iter);
bufferlist out;
- switch (op) {
- // client ops
- case NOTIFY_OP_ACQUIRED_LOCK:
- acknowledge_notify(notify_id, handle, out);
- handle_acquired_lock();
- break;
- case NOTIFY_OP_RELEASED_LOCK:
- acknowledge_notify(notify_id, handle, out);
- handle_released_lock();
- break;
- case NOTIFY_OP_HEADER_UPDATE:
- acknowledge_notify(notify_id, handle, out);
- handle_header_update();
- break;
- case NOTIFY_OP_ASYNC_PROGRESS:
- acknowledge_notify(notify_id, handle, out);
- handle_async_progress(iter);
- break;
- case NOTIFY_OP_ASYNC_COMPLETE:
- acknowledge_notify(notify_id, handle, out);
- handle_async_complete(iter);
- break;
-
- // lock owner-only ops
- case NOTIFY_OP_REQUEST_LOCK:
- handle_request_lock(&out);
- acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_FLATTEN:
- handle_flatten(iter, &out);
- acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_RESIZE:
- handle_resize(iter, &out);
+ if (loopback && op != NOTIFY_OP_HEADER_UPDATE) {
acknowledge_notify(notify_id, handle, out);
- break;
- case NOTIFY_OP_SNAP_CREATE:
- handle_snap_create(iter, &out);
- acknowledge_notify(notify_id, handle, out);
- break;
-
- default:
- handle_unknown_op(&out);
- acknowledge_notify(notify_id, handle, out);
- break;
+ } else {
+ switch (op) {
+ // client ops
+ case NOTIFY_OP_ACQUIRED_LOCK:
+ acknowledge_notify(notify_id, handle, out);
+ handle_acquired_lock();
+ break;
+ case NOTIFY_OP_RELEASED_LOCK:
+ acknowledge_notify(notify_id, handle, out);
+ handle_released_lock();
+ break;
+ case NOTIFY_OP_HEADER_UPDATE:
+ acknowledge_notify(notify_id, handle, out);
+ handle_header_update();
+ break;
+ case NOTIFY_OP_ASYNC_PROGRESS:
+ acknowledge_notify(notify_id, handle, out);
+ handle_async_progress(iter);
+ break;
+ case NOTIFY_OP_ASYNC_COMPLETE:
+ acknowledge_notify(notify_id, handle, out);
+ handle_async_complete(iter);
+ break;
+
+ // lock owner-only ops
+ case NOTIFY_OP_REQUEST_LOCK:
+ handle_request_lock(&out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ case NOTIFY_OP_FLATTEN:
+ handle_flatten(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ case NOTIFY_OP_RESIZE:
+ handle_resize(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ case NOTIFY_OP_SNAP_CREATE:
+ handle_snap_create(iter, &out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ default:
+ handle_unknown_op(&out);
+ acknowledge_notify(notify_id, handle, out);
+ break;
+ }
}
DECODE_FINISH(iter);
} catch (const buffer::error &err) {
}
}
- Mutex::Locker l(m_timer_lock);
retry_aio_requests();
}
}
void ImageWatcher::RemoteContext::finish(int r) {
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_async_complete,
- &m_image_watcher, m_remote_async_request, r));
- m_image_watcher.m_finisher->queue(ctx);
+ m_image_watcher.schedule_async_complete(m_remote_async_request, r);
}
}