lderr(ictx->cct) << "completed invalid aio_type: " << aio_type << dendl;
break;
}
+
+ {
+ Mutex::Locker l(ictx->aio_lock);
+ assert(ictx->pending_aio != 0);
+ --ictx->pending_aio;
+ ictx->pending_aio_cond.Signal();
+ }
+
if (complete_cb) {
complete_cb(rbd_comp, complete_arg);
}
void init_time(ImageCtx *i, aio_type_t t) {
ictx = i;
+ {
+ Mutex::Locker l(ictx->aio_lock);
+ ++ictx->pending_aio;
+ }
aio_type = t;
start_time = ceph_clock_now(ictx->cct);
}
snap_lock("librbd::ImageCtx::snap_lock"),
parent_lock("librbd::ImageCtx::parent_lock"),
refresh_lock("librbd::ImageCtx::refresh_lock"),
+ aio_lock("librbd::ImageCtx::aio_lock"),
extra_read_flags(0),
old_format(true),
order(0), size(0), features(0),
stripe_unit(0), stripe_count(0),
object_cacher(NULL), writeback_handler(NULL), object_set(NULL),
readahead(),
- total_bytes_read(0)
+ total_bytes_read(0),
+ pending_aio(0)
{
md_ctx.dup(p);
data_ctx.dup(p);
} else if (r) {
lderr(cct) << "flush_cache returned " << r << dendl;
}
+ wait_for_pending_aio();
cache_lock.Lock();
loff_t unclean = object_cacher->release_set(object_set);
cache_lock.Unlock();
<< ", object overlap " << len
<< " from image extents " << objectx << dendl;
return len;
- }
+ }
+
+ void ImageCtx::wait_for_pending_aio() {
+ Mutex::Locker l(aio_lock);
+ while (pending_aio > 0) {
+ pending_aio_cond.Wait(aio_lock);
+ }
+ }
}
#include <string>
#include <vector>
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/Readahead.h"
#include "common/RWLock.h"
/**
* Lock ordering:
- * md_lock, cache_lock, snap_lock, parent_lock, refresh_lock
+ * md_lock, cache_lock, snap_lock, parent_lock, refresh_lock,
+ * aio_lock
*/
RWLock md_lock; // protects access to the mutable image metadata that
// isn't guarded by other locks below
RWLock snap_lock; // protects snapshot-related member variables:
RWLock parent_lock; // protects parent_md and parent
Mutex refresh_lock; // protects refresh_seq and last_refresh
+ Mutex aio_lock; // protects pending_aio and pending_aio_cond
unsigned extra_read_flags;
Readahead readahead;
uint64_t total_bytes_read;
+ Cond pending_aio_cond;
+ uint64_t pending_aio;
+
/**
* Either image_name or image_id must be set.
* If id is not known, pass the empty std::string,
librados::snap_t in_snap_id);
uint64_t prune_parent_extents(vector<pair<uint64_t,uint64_t> >& objectx,
uint64_t overlap);
-
+ void wait_for_pending_aio();
};
}
ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
ictx->readahead.wait_for_pending();
-
- if (ictx->object_cacher)
+ if (ictx->object_cacher) {
ictx->shutdown_cache(); // implicitly flushes
- else
+ } else {
flush(ictx);
+ ictx->wait_for_pending_aio();
+ }
if (ictx->parent) {
close_image(ictx->parent);
rados_ioctx_destroy(ioctx);
}
+TEST_F(TestLibRBD, TestPendingAio)
+{
+ rados_ioctx_t ioctx;
+ rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+ int features = RBD_FEATURE_LAYERING;
+ rbd_image_t image;
+ int order = 0;
+
+ std::string name = get_temp_image_name();
+
+ uint64_t size = 4 << 20;
+ ASSERT_EQ(0, create_image_full(ioctx, name.c_str(), size, &order,
+ false, features));
+ ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+ char test_data[TEST_IO_SIZE];
+ for (size_t i = 0; i < TEST_IO_SIZE; ++i) {
+ test_data[i] = (char) (rand() % (126 - 33) + 33);
+ }
+
+ size_t num_aios = 256;
+ rbd_completion_t comps[num_aios];
+ for (size_t i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &comps[i]));
+ uint64_t offset = rand() % (size - TEST_IO_SIZE);
+ ASSERT_EQ(0, rbd_aio_write(image, offset, TEST_IO_SIZE, test_data,
+ comps[i]));
+ }
+ for (size_t i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(0, rbd_aio_wait_for_complete(comps[i]));
+ rbd_aio_release(comps[i]);
+ }
+ ASSERT_EQ(0, rbd_invalidate_cache(image));
+
+ for (size_t i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(0, rbd_aio_create_completion(NULL, NULL, &comps[i]));
+ uint64_t offset = rand() % (size - TEST_IO_SIZE);
+ ASSERT_LE(0, rbd_aio_read(image, offset, TEST_IO_SIZE, test_data,
+ comps[i]));
+ }
+
+ ASSERT_EQ(0, rbd_close(image));
+ for (size_t i = 0; i < num_aios; ++i) {
+ ASSERT_EQ(1, rbd_aio_is_complete(comps[i]));
+ rbd_aio_release(comps[i]);
+ }
+
+ rados_ioctx_destroy(ioctx);
+}
+
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);