break;
}
- {
- Mutex::Locker l(ictx->aio_lock);
- assert(ictx->pending_aio != 0);
- --ictx->pending_aio;
- ictx->pending_aio_cond.Signal();
- }
+ async_op.finish_op();
if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
#include "include/utime.h"
#include "include/rbd/librbd.hpp"
+#include "librbd/AsyncOperation.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
char *read_buf;
size_t read_buf_len;
+ AsyncOperation async_op;
+
AioCompletion() : lock("AioCompletion::lock", true),
done(false), rval(0), complete_cb(NULL),
complete_arg(NULL), rbd_comp(NULL),
aio_type = t;
start_time = ceph_clock_now(ictx->cct);
- Mutex::Locker l(ictx->aio_lock);
- ++ictx->pending_aio;
+ async_op.start_op(*ictx);
}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#include "librbd/AsyncOperation.h"
+#include "librbd/ImageCtx.h"
+#include "common/dout.h"
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::AsyncOperation: "
+
+namespace librbd {
+
+void AsyncOperation::start_op(ImageCtx &image_ctx) {
+ assert(m_image_ctx == NULL);
+ m_image_ctx = &image_ctx;
+
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
+ Mutex::Locker l(m_image_ctx->async_ops_lock);
+ m_image_ctx->async_ops.push_back(&m_xlist_item);
+}
+
+void AsyncOperation::finish_op() {
+ ldout(m_image_ctx->cct, 20) << this << " " << __func__ << dendl;
+ {
+ Mutex::Locker l(m_image_ctx->async_ops_lock);
+ assert(m_xlist_item.remove_myself());
+ }
+
+ while (!m_flush_contexts.empty()) {
+ Context *flush_ctx = m_flush_contexts.front();
+ m_flush_contexts.pop_front();
+
+ ldout(m_image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
+ flush_ctx->complete(0);
+ }
+}
+
+void AsyncOperation::add_flush_context(Context *on_finish) {
+ assert(m_image_ctx->async_ops_lock.is_locked());
+ m_flush_contexts.push_back(on_finish);
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef LIBRBD_ASYNC_OPERATION_H
+#define LIBRBD_ASYNC_OPERATION_H
+
+#include "include/assert.h"
+#include "include/xlist.h"
+#include <list>
+
+class Context;
+
+namespace librbd {
+
+class ImageCtx;
+
+class AsyncOperation {
+public:
+
+ AsyncOperation()
+ : m_image_ctx(NULL), m_xlist_item(this)
+ {
+ }
+
+ ~AsyncOperation()
+ {
+ assert(!m_xlist_item.is_on_list());
+ }
+
+ void start_op(ImageCtx &image_ctx);
+ void finish_op();
+
+ void add_flush_context(Context *on_finish);
+
+private:
+
+ ImageCtx *m_image_ctx;
+ xlist<AsyncOperation *>::item m_xlist_item;
+ std::list<Context *> m_flush_contexts;
+
+};
+
+} // namespace librbd
+
+#endif // LIBRBD_ASYNC_OPERATION_H
: m_ictx(ictx), m_oid(oid), m_object_no(objectno),
m_image_extents(image_extents), m_state(STATE_READ_FROM_PARENT)
{
+ m_async_op.start_op(*m_ictx);
}
CopyupRequest::~CopyupRequest() {
assert(m_pending_requests.empty());
+ m_async_op.finish_op();
}
ceph::bufferlist& CopyupRequest::get_copyup_data() {
m_ictx->copyup_list.find(m_object_no);
assert(it != m_ictx->copyup_list.end());
m_ictx->copyup_list.erase(it);
-
- if (m_ictx->copyup_list.empty()) {
- m_ictx->copyup_list_cond.Signal();
- }
}
bool CopyupRequest::send_object_map() {
#ifndef CEPH_LIBRBD_COPYUPREQUEST_H
#define CEPH_LIBRBD_COPYUPREQUEST_H
+#include "librbd/AsyncOperation.h"
#include "include/int_types.h"
#include "common/Mutex.h"
ceph::bufferlist m_copyup_data;
vector<AioRequest *> m_pending_requests;
+ AsyncOperation m_async_op;
+
bool complete_requests(int r);
void complete(int r);
#include "common/errno.h"
#include "common/perf_counters.h"
+#include "librbd/AsyncOperation.h"
#include "librbd/internal.h"
-
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
#include "librbd/ObjectMap.h"
parent_lock("librbd::ImageCtx::parent_lock"),
refresh_lock("librbd::ImageCtx::refresh_lock"),
object_map_lock("librbd::ImageCtx::object_map_lock"),
- aio_lock("librbd::ImageCtx::aio_lock"),
+ async_ops_lock("librbd::ImageCtx::async_ops_lock"),
copyup_list_lock("librbd::ImageCtx::copyup_list_lock"),
- copyup_list_cond(),
extra_read_flags(0),
old_format(true),
order(0), size(0), features(0),
object_cacher(NULL), writeback_handler(NULL), object_set(NULL),
readahead(),
total_bytes_read(0), copyup_finisher(NULL),
- pending_aio(0), object_map(NULL)
+ object_map(NULL)
{
md_ctx.dup(p);
data_ctx.dup(p);
int ImageCtx::invalidate_cache() {
if (!object_cacher)
return 0;
+ flush_async_operations();
cache_lock.Lock();
object_cacher->release_set(object_set);
cache_lock.Unlock();
} else if (r) {
lderr(cct) << "flush_cache returned " << r << dendl;
}
- wait_for_pending_aio();
cache_lock.Lock();
loff_t unclean = object_cacher->release_set(object_set);
cache_lock.Unlock();
return len;
}
- void ImageCtx::wait_for_pending_aio() {
- Mutex::Locker l(aio_lock);
- while (pending_aio > 0) {
- pending_aio_cond.Wait(aio_lock);
- }
+ void ImageCtx::flush_async_operations() {
+ C_SaferCond *ctx = new C_SaferCond();
+ flush_async_operations(ctx);
+ ctx->wait();
}
- void ImageCtx::wait_for_pending_copyup() {
- Mutex::Locker l(copyup_list_lock);
- while (!copyup_list.empty()) {
- ldout(cct, 20) << __func__ << " waiting CopyupRequest to be completed" << dendl;
- copyup_list_cond.Wait(copyup_list_lock);
+ void ImageCtx::flush_async_operations(Context *on_finish) {
+ Mutex::Locker l(async_ops_lock);
+ if (async_ops.empty()) {
+ on_finish->complete(0);
+ return;
}
+
+ ldout(cct, 20) << "flush async operations: " << on_finish << " "
+ << "count=" << async_ops.size() << dendl;
+ async_ops.back()->add_flush_context(on_finish);
}
}
#include "include/rbd/librbd.hpp"
#include "include/rbd_types.h"
#include "include/types.h"
+#include "include/xlist.h"
#include "osdc/ObjectCacher.h"
#include "cls/rbd/cls_rbd_client.h"
namespace librbd {
- class ImageWatcher;
+ class AsyncOperation;
class CopyupRequest;
+ class ImageWatcher;
class ObjectMap;
struct ImageCtx {
/**
* Lock ordering:
* owner_lock, md_lock, cache_lock, snap_lock, parent_lock, refresh_lock,
- * object_map_lock, aio_lock
+ * object_map_lock, async_op_lock
*/
RWLock owner_lock; // protects exclusive lock leadership updates
RWLock md_lock; // protects access to the mutable image metadata that
RWLock parent_lock; // protects parent_md and parent
Mutex refresh_lock; // protects refresh_seq and last_refresh
RWLock object_map_lock; // protects object map updates
- Mutex aio_lock; // protects pending_aio and pending_aio_cond
+ Mutex async_ops_lock; // protects async_ops
Mutex copyup_list_lock; // protects copyup_waiting_list
- Cond copyup_list_cond; // protected by copyup_waiting_list_lock
-
unsigned extra_read_flags;
bool old_format;
Finisher *copyup_finisher;
std::map<uint64_t, CopyupRequest*> copyup_list;
- Cond pending_aio_cond;
- uint64_t pending_aio;
+ xlist<AsyncOperation*> async_ops;
ObjectMap *object_map;
librados::snap_t in_snap_id);
uint64_t prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
uint64_t overlap);
- void wait_for_pending_aio();
- void wait_for_pending_copyup();
+
+ void flush_async_operations();
+ void flush_async_operations(Context *on_finish);
};
}
m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
m_async_request_id(0),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
- m_retrying_aio_requests(false), m_retry_aio_context(NULL)
+ m_retry_aio_context(NULL)
{
m_finisher->start();
m_timer->init();
return r;
}
-bool ImageWatcher::has_pending_aio_operations() {
- Mutex::Locker l(m_aio_request_lock);
- return !m_aio_requests.empty();
-}
-
-void ImageWatcher::flush_aio_operations() {
- C_SaferCond *ctx = new C_SaferCond();
- flush_aio_operations(ctx);
- ctx->wait();
-}
-
-void ImageWatcher::flush_aio_operations(Context *ctx) {
- Mutex::Locker l(m_aio_request_lock);
- if (!m_retrying_aio_requests && m_aio_requests.empty()) {
- ctx->complete(0);
- return;
- }
-
- ldout(m_image_ctx.cct, 20) << "pending flush: " << ctx << " "
- << "retrying=" << m_retrying_aio_requests << ", "
- << "count=" << m_aio_requests.size() << dendl;
- m_aio_flush_contexts.push_back(ctx);
-}
-
int ImageWatcher::try_lock() {
assert(m_image_ctx.owner_lock.is_wlocked());
assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
std::vector<AioRequest> lock_request_restarts;
{
Mutex::Locker l(m_aio_request_lock);
- assert(!m_retrying_aio_requests);
lock_request_restarts.swap(m_aio_requests);
- m_retrying_aio_requests = true;
}
for (std::vector<AioRequest>::iterator iter = lock_request_restarts.begin();
}
Mutex::Locker l(m_aio_request_lock);
- m_retrying_aio_requests = false;
while (!m_aio_flush_contexts.empty()) {
Context *flush_ctx = m_aio_flush_contexts.front();
m_aio_flush_contexts.pop_front();
int register_watch();
int unregister_watch();
- bool has_pending_aio_operations();
- void flush_aio_operations();
- void flush_aio_operations(Context *ctx);
-
int try_lock();
int request_lock(const boost::function<int(AioCompletion*)>& restart_op,
AioCompletion* c);
Mutex m_aio_request_lock;
std::list<Context *> m_aio_flush_contexts;
std::vector<AioRequest> m_aio_requests;
- bool m_retrying_aio_requests;
Context *m_retry_aio_context;
std::string encode_lock_cookie() const;
librbd/AioRequest.cc \
librbd/AsyncFlattenRequest.cc \
librbd/AsyncObjectThrottle.cc \
+ librbd/AsyncOperation.cc \
librbd/AsyncRequest.cc \
librbd/AsyncResizeRequest.cc \
librbd/AsyncTrimRequest.cc \
librbd/AioRequest.h \
librbd/AsyncFlattenRequest.h \
librbd/AsyncObjectThrottle.h \
+ librbd/AsyncOperation.h \
librbd/AsyncRequest.h \
librbd/AsyncResizeRequest.h \
librbd/AsyncTrimRequest.h \
RWLock::RLocker l(ictx->md_lock);
original_size = ictx->size;
if (size < ictx->size) {
- ictx->wait_for_pending_copyup();
+ ictx->flush_async_operations();
if (ictx->object_cacher) {
// need to invalidate since we're deleting objects, and
// ObjectCacher doesn't track non-existent objects
// ignore return value, since we may be set to a non-existent
// snapshot and the user is trying to fix that
ictx_check(ictx);
- ictx->wait_for_pending_copyup();
- if (ictx->image_watcher != NULL) {
- ictx->image_watcher->flush_aio_operations();
- }
+ ictx->flush_async_operations();
if (ictx->object_cacher) {
// complete pending writes before we're set to a snapshot and
// get -EROFS for writes
ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
ictx->readahead.wait_for_pending();
- if (ictx->image_watcher != NULL) {
- ictx->image_watcher->flush_aio_operations();
- }
if (ictx->object_cacher) {
ictx->shutdown_cache(); // implicitly flushes
} else {
ictx->copyup_finisher->wait_for_empty();
ictx->copyup_finisher->stop();
}
- ictx->wait_for_pending_copyup();
if (ictx->parent) {
close_image(ictx->parent);
return r;
}
- if (ictx->image_watcher != NULL) {
- ictx->image_watcher->flush_aio_operations();
- }
ictx->user_flushed();
c->get();
- c->init_time(ictx, AIO_TYPE_FLUSH);
- if (ictx->image_watcher != NULL) {
- C_AioWrite *flush_ctx = new C_AioWrite(cct, c);
- c->add_request();
- ictx->image_watcher->flush_aio_operations(flush_ctx);
- }
+ C_AioWrite *flush_ctx = new C_AioWrite(cct, c);
+ c->add_request();
+ ictx->flush_async_operations(flush_ctx);
+ c->init_time(ictx, AIO_TYPE_FLUSH);
C_AioWrite *req_comp = new C_AioWrite(cct, c);
c->add_request();
if (ictx->object_cacher) {
int _flush(ImageCtx *ictx)
{
- if (ictx->image_watcher != NULL) {
- ictx->image_watcher->flush_aio_operations();
- }
-
CephContext *cct = ictx->cct;
int r;
// flush any outstanding writes
r = ictx->flush_cache();
} else {
r = ictx->data_ctx.aio_flush();
- ictx->wait_for_pending_aio();
+ ictx->flush_async_operations();
}
if (r)
return r;
}
- if (ictx->image_watcher != NULL) {
- ictx->image_watcher->flush_aio_operations();
- }
+ ictx->flush_async_operations();
RWLock::WLocker l(ictx->md_lock);
r = ictx->invalidate_cache();
// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "test/librbd/test_fixture.h"
+#include "librbd/AioCompletion.h"
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
#include <boost/scope_exit.hpp>
bool is_owner;
ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
ASSERT_FALSE(is_owner);
-
- ASSERT_TRUE(ictx->image_watcher->has_pending_aio_operations());
+ ASSERT_FALSE(c->is_complete());
}
TEST_F(TestInternal, AioDiscardRequestsLock) {
bool is_owner;
ASSERT_EQ(0, librbd::is_exclusive_lock_owner(ictx, &is_owner));
ASSERT_FALSE(is_owner);
-
- ASSERT_TRUE(ictx->image_watcher->has_pending_aio_operations());
+ ASSERT_FALSE(c->is_complete());
}