ssize_t AioImageRequestWQ::read(uint64_t off, size_t len, char *buf,
int op_flags) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "read " << &m_image_ctx << " off = " << off << " len = "
- << len << dendl;
+ ldout(cct, 20) << "read: ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
std::vector<std::pair<uint64_t,uint64_t> > image_extents;
image_extents.push_back(make_pair(off, len));
ssize_t AioImageRequestWQ::write(uint64_t off, size_t len, const char *buf,
int op_flags) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "write " << &m_image_ctx << " off = " << off << " len = "
- << len << dendl;
+ ldout(cct, 20) << "write: ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
m_image_ctx.snap_lock.get_read();
int r = clip_io(&m_image_ctx, off, &len);
int AioImageRequestWQ::discard(uint64_t off, uint64_t len) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "discard " << &m_image_ctx << " off = " << off << " len = "
- << len << dendl;
+ ldout(cct, 20) << "discard: ictx=" << &m_image_ctx << ", off=" << off << ", "
+ << "len = " << len << dendl;
m_image_ctx.snap_lock.get_read();
int r = clip_io(&m_image_ctx, off, &len);
c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", len=" << len
- << "flags=" << op_flags << dendl;
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << ", " << "flags=" << op_flags << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio) {
- queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
+ queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags),
+ false);
} else {
AioImageRequest::read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
}
c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", len=" << len
- << "flags=" << op_flags << dendl;
+ << "completion=" << c << ", off=" << off << ", "
+ << "len=" << len << ", flags=" << op_flags << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
bool lock_required = is_lock_required();
if (m_image_ctx.non_blocking_aio || lock_required) {
- queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
+ queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags),
+ lock_required);
} else {
AioImageRequest::write(&m_image_ctx, c, off, len, buf, op_flags);
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
bool lock_required = is_lock_required();
if (m_image_ctx.non_blocking_aio || lock_required) {
- queue(new AioImageDiscard(m_image_ctx, c, off, len));
+ queue(new AioImageDiscard(m_image_ctx, c, off, len),
+ lock_required);
} else {
AioImageRequest::discard(&m_image_ctx, c, off, len);
}
<< "completion=" << c << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio) {
- queue(new AioImageFlush(m_image_ctx, c));
+ if (m_image_ctx.non_blocking_aio || !writes_empty()) {
+ queue(new AioImageFlush(m_image_ctx, c), false);
} else {
AioImageRequest::flush(&m_image_ctx, c);
}
}
-bool AioImageRequestWQ::writes_empty() const {
+void AioImageRequestWQ::suspend_writes() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
+
Mutex::Locker locker(m_lock);
- return (m_queued_writes > 0);
+ m_writes_suspended = true;
+ while (m_in_progress_writes > 0) {
+ m_cond.Wait(m_lock);
+ }
+}
+
+void AioImageRequestWQ::resume_writes() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_writes_suspended = false;
+ }
+ signal();
}
void *AioImageRequestWQ::_void_dequeue() {
}
void AioImageRequestWQ::process(AioImageRequest *req) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
+ << "req=" << req << dendl;
+
{
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
req->send();
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes > 0);
- --m_queued_writes;
+ if (--m_queued_writes == 0) {
+ m_image_ctx.image_watcher->clear_aio_ops_pending();
+ }
assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
!m_image_ctx.image_watcher->is_lock_owner());
}
-void AioImageRequestWQ::queue(AioImageRequest *req) {
+void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
+ << "req=" << req << ", lock_req=" << lock_required << dendl;
+
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ bool first_write_op = false;
{
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
- ++m_queued_writes;
+ if (++m_queued_writes == 1) {
+ first_write_op = true;
+ }
}
}
ThreadPool::PointerWQ<AioImageRequest>::queue(req);
+
+ if (first_write_op) {
+ m_image_ctx.image_watcher->flag_aio_ops_pending();
+ if (lock_required) {
+ m_image_ctx.image_watcher->request_lock();
+ }
+ }
}
} // namespace librbd
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageWatcher.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
: m_image_ctx(image_ctx),
m_watch_lock(unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)),
m_watch_ctx(*this), m_watch_handle(0),
- m_watch_state(WATCH_STATE_UNREGISTERED),
+ m_watch_state(WATCH_STATE_UNREGISTERED), m_aio_ops_pending(false),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
m_async_request_lock(unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
- m_aio_request_lock(unique_lock_name("librbd::ImageWatcher::m_aio_request_lock", this)),
m_owner_client_id_lock(unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
{
}
int ImageWatcher::unregister_watch() {
ldout(m_image_ctx.cct, 10) << this << " unregistering image watcher" << dendl;
- {
- Mutex::Locker l(m_aio_request_lock);
- assert(m_aio_requests.empty());
- }
-
cancel_async_requests();
m_task_finisher->cancel_all();
return r;
}
+void ImageWatcher::refresh() {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ if (is_lock_supported() && !is_lock_owner()) {
+ m_image_ctx.aio_work_queue->suspend_writes();
+ } else if (!is_lock_supported()) {
+ m_image_ctx.aio_work_queue->resume_writes();
+ }
+}
+
int ImageWatcher::try_lock() {
assert(m_image_ctx.owner_lock.is_wlocked());
assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
return 0;
}
-void ImageWatcher::request_lock(
- const boost::function<void(AioCompletion*)>& restart_op, AioCompletion* c) {
- assert(m_image_ctx.owner_lock.is_locked());
- assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
-
+void ImageWatcher::request_lock() {
{
- Mutex::Locker l(m_aio_request_lock);
- bool request_pending = !m_aio_requests.empty();
- ldout(m_image_ctx.cct, 15) << this << " queuing aio request: " << c
- << dendl;
-
- c->get();
- m_aio_requests.push_back(std::make_pair(restart_op, c));
- if (request_pending) {
- return;
- }
- }
-
- RWLock::RLocker l(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERED) {
- ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
-
- // run notify request in finisher to avoid blocking aio path
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_request_lock, this));
- m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
+ RWLock::WLocker watch_locker(m_watch_lock);
+ m_aio_ops_pending = true;
}
+ schedule_request_lock(false);
}
bool ImageWatcher::try_request_lock() {
bufferlist bl;
::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
+ m_image_ctx.aio_work_queue->resume_writes();
+
// send the notification when we aren't holding locks
FunctionContext *ctx = new FunctionContext(
boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
m_image_ctx.object_map.unlock();
}
+ if (is_lock_supported()) {
+ m_image_ctx.aio_work_queue->suspend_writes();
+ }
+
Mutex::Locker l(m_owner_client_id_lock);
set_owner_client_id(ClientId());
}
prepare_unlock();
m_image_ctx.owner_lock.put_write();
+
m_image_ctx.cancel_async_requests();
m_image_ctx.flush_async_operations();
+ m_image_ctx.aio_work_queue->suspend_writes();
{
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
return true;
}
+void ImageWatcher::flag_aio_ops_pending() {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (!m_aio_ops_pending) {
+ ldout(m_image_ctx.cct, 20) << this << " pending AIO ops" << dendl;
+ m_aio_ops_pending = true;
+ }
+}
+
+void ImageWatcher::clear_aio_ops_pending() {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (m_aio_ops_pending) {
+ ldout(m_image_ctx.cct, 20) << this << " no pending AIO ops" << dendl;
+ m_aio_ops_pending = false;
+ }
+}
+
void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
rados::cls::lock::assert_locked(op, RBD_LOCK_NAME, LOCK_EXCLUSIVE,
encode_lock_cookie(), WATCHER_LOCK_TAG);
return true;
}
-void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
- m_task_finisher->cancel(TASK_CODE_REQUEST_LOCK);
- Context *ctx = new FunctionContext(boost::bind(
- &ImageWatcher::retry_aio_requests, this));
- if (use_timer) {
- m_task_finisher->add_event_after(TASK_CODE_RETRY_AIO_REQUESTS,
- RETRY_DELAY_SECONDS, ctx);
- } else {
- m_task_finisher->queue(TASK_CODE_RETRY_AIO_REQUESTS, ctx);
- }
-}
-
-void ImageWatcher::retry_aio_requests() {
- m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
-
- std::vector<AioRequest> lock_request_restarts;
- {
- Mutex::Locker l(m_aio_request_lock);
- lock_request_restarts.swap(m_aio_requests);
- }
-
- ldout(m_image_ctx.cct, 15) << this << " retrying pending aio requests"
- << dendl;
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- for (std::vector<AioRequest>::iterator iter = lock_request_restarts.begin();
- iter != lock_request_restarts.end(); ++iter) {
- ldout(m_image_ctx.cct, 20) << this << " retrying aio request: "
- << iter->second << dendl;
- iter->first(iter->second);
- iter->second->put();
- }
-}
-
void ImageWatcher::schedule_cancel_async_requests() {
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::cancel_async_requests, this));
m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
}
+void ImageWatcher::schedule_request_lock(bool use_timer, int timer_delay) {
+ assert(m_image_ctx.owner_lock.is_locked());
+ assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+
+ RWLock::RLocker watch_locker(m_watch_lock);
+ if (m_watch_state == WATCH_STATE_REGISTERED && m_aio_ops_pending) {
+ ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_request_lock, this));
+ if (use_timer) {
+ if (timer_delay < 0) {
+ timer_delay = RETRY_DELAY_SECONDS;
+ }
+ m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK, timer_delay,
+ ctx);
+ } else {
+ m_task_finisher->queue(TASK_CODE_REQUEST_LOCK, ctx);
+ }
+ }
+}
+
void ImageWatcher::notify_request_lock() {
ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;
- m_task_finisher->cancel(TASK_CODE_RETRY_AIO_REQUESTS);
- m_image_ctx.owner_lock.get_read();
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (try_request_lock()) {
- m_image_ctx.owner_lock.put_read();
- retry_aio_requests();
return;
}
::encode(NotifyMessage(RequestLockPayload(get_client_id())), bl);
int r = notify_lock_owner(bl);
- m_image_ctx.owner_lock.put_read();
-
if (r == -ETIMEDOUT) {
- ldout(m_image_ctx.cct, 5) << this << "timed out requesting lock: retrying"
+ ldout(m_image_ctx.cct, 5) << this << " timed out requesting lock: retrying"
<< dendl;
- retry_aio_requests();
+ schedule_request_lock(false);
} else if (r < 0) {
lderr(m_image_ctx.cct) << this << " error requesting lock: "
<< cpp_strerror(r) << dendl;
- schedule_retry_aio_requests(true);
+ schedule_request_lock(true);
} else {
// lock owner acked -- but resend if we don't see them release the lock
int retry_timeout = m_image_ctx.cct->_conf->client_notify_timeout;
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_request_lock, this));
- m_task_finisher->add_event_after(TASK_CODE_REQUEST_LOCK,
- retry_timeout, ctx);
+ ldout(m_image_ctx.cct, 15) << this << " will retry in " << retry_timeout
+ << " seconds" << dendl;
+ schedule_request_lock(true, retry_timeout);
}
}
set_owner_client_id(payload.client_id);
}
- RWLock::RLocker l(m_image_ctx.owner_lock);
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
schedule_cancel_async_requests();
- schedule_retry_aio_requests(false);
+ schedule_request_lock(false);
}
}
set_owner_client_id(ClientId());
}
- RWLock::RLocker l(m_image_ctx.owner_lock);
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
schedule_cancel_async_requests();
- schedule_retry_aio_requests(false);
+ schedule_request_lock(false);
}
}
void ImageWatcher::reregister_watch() {
ldout(m_image_ctx.cct, 10) << this << " re-registering image watch" << dendl;
+ RWLock::WLocker l(m_image_ctx.owner_lock);
+ bool was_lock_owner = false;
+ if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
+ // ensure all async requests are canceled and IO is flushed
+ was_lock_owner = release_lock();
+ }
+
+ int r;
{
- RWLock::WLocker l(m_image_ctx.owner_lock);
- bool was_lock_owner = false;
- if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
- // ensure all async requests are canceled and IO is flushed
- was_lock_owner = release_lock();
+ RWLock::WLocker l(m_watch_lock);
+ if (m_watch_state != WATCH_STATE_ERROR) {
+ return;
}
- int r;
- {
- RWLock::WLocker l(m_watch_lock);
- if (m_watch_state != WATCH_STATE_ERROR) {
- return;
- }
-
- r = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid,
- &m_watch_handle, &m_watch_ctx);
- if (r < 0) {
- lderr(m_image_ctx.cct) << this << " failed to re-register image watch: "
- << cpp_strerror(r) << dendl;
- if (r != -ESHUTDOWN) {
- FunctionContext *ctx = new FunctionContext(boost::bind(
- &ImageWatcher::reregister_watch, this));
- m_task_finisher->add_event_after(TASK_CODE_REREGISTER_WATCH,
- RETRY_DELAY_SECONDS, ctx);
- }
- return;
+ r = m_image_ctx.md_ctx.watch2(m_image_ctx.header_oid,
+ &m_watch_handle, &m_watch_ctx);
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << this << " failed to re-register image watch: "
+ << cpp_strerror(r) << dendl;
+ if (r != -ESHUTDOWN) {
+ FunctionContext *ctx = new FunctionContext(boost::bind(
+ &ImageWatcher::reregister_watch, this));
+ m_task_finisher->add_event_after(TASK_CODE_REREGISTER_WATCH,
+ RETRY_DELAY_SECONDS, ctx);
}
-
- m_watch_state = WATCH_STATE_REGISTERED;
+ return;
}
- handle_payload(HeaderUpdatePayload(), NULL);
- if (was_lock_owner) {
- r = try_lock();
- if (r == -EBUSY) {
- ldout(m_image_ctx.cct, 5) << this << "lost image lock while "
- << "re-registering image watch" << dendl;
- } else if (r < 0) {
- lderr(m_image_ctx.cct) << this
- << "failed to lock image while re-registering "
- << "image watch" << cpp_strerror(r) << dendl;
- }
+ m_watch_state = WATCH_STATE_REGISTERED;
+ }
+ handle_payload(HeaderUpdatePayload(), NULL);
+
+ if (was_lock_owner) {
+ r = try_lock();
+ if (r == -EBUSY) {
+ ldout(m_image_ctx.cct, 5) << this << "lost image lock while "
+ << "re-registering image watch" << dendl;
+ } else if (r < 0) {
+ lderr(m_image_ctx.cct) << this
+ << "failed to lock image while re-registering "
+ << "image watch" << cpp_strerror(r) << dendl;
}
}
- retry_aio_requests();
+ if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+ schedule_request_lock(false);
+ }
}
void ImageWatcher::WatchCtx::handle_notify(uint64_t notify_id,