]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: call safe callback on read operation
authorSage Weil <sage@newdream.net>
Fri, 4 May 2012 22:26:33 +0000 (15:26 -0700)
committerSage Weil <sage@newdream.net>
Fri, 4 May 2012 22:26:33 +0000 (15:26 -0700)
This avoids confusion for the user who isn't sure if they should wait for
complete or safe on a read aio.  It also means that you can always wait
for safe for both reads or writes, which can simplify some code.

Dup the roundtrip functional tests to verify this works.

Signed-off-by: Sage Weil <sage@newdream.net>
Reviewed-by: Yehuda Sadeh <yehuda.sadeh@inktank.com>
src/librados/AioCompletionImpl.h
src/librados/IoCtxImpl.cc
src/test/rados-api/aio.cc

index 9db4f72e734546267b9400e71912db35ffcbac0b..66270c92737b1d357de19303079b7443ece62384 100644 (file)
@@ -38,6 +38,7 @@ struct librados::AioCompletionImpl {
   void *callback_arg;
 
   // for read
+  bool is_read;
   bufferlist bl, *pbl;
   char *buf;
   unsigned maxlen;
@@ -49,7 +50,7 @@ struct librados::AioCompletionImpl {
   AioCompletionImpl() : lock("AioCompletionImpl lock"),
                        ref(1), rval(0), released(false), ack(false), safe(false),
                        callback_complete(0), callback_safe(0), callback_arg(0),
-                       pbl(0), buf(0), maxlen(0),
+                       is_read(false), pbl(0), buf(0), maxlen(0),
                        io(NULL), aio_write_seq(0), aio_write_list_item(this) { }
 
   int set_complete_callback(void *cb_arg, rados_callback_t cb) {
index d90d204adfd5148a3c1ca6aa8506807b6c60b7d7..1455ccc7230f822a8873f089f538fd6599b492bb 100644 (file)
@@ -646,6 +646,7 @@ int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
 {
   Context *onack = new C_aio_Ack(c);
 
+  c->is_read = true;
   c->io = this;
   c->pbl = pbl;
 
@@ -683,6 +684,7 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
   Context *onack = new C_aio_Ack(c);
   eversion_t ver;
 
+  c->is_read = true;
   c->io = this;
   c->pbl = pbl;
 
@@ -698,6 +700,7 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
 {
   Context *onack = new C_aio_Ack(c);
 
+  c->is_read = true;
   c->io = this;
   c->buf = buf;
   c->maxlen = len;
@@ -1010,6 +1013,7 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
 {
   Context *onack = new C_aio_Ack(c);
 
+  c->is_read = true;
   c->io = this;
 
   Mutex::Locker l(*lock);
@@ -1489,6 +1493,8 @@ void librados::IoCtxImpl::C_aio_Ack::finish(int r)
   c->lock.Lock();
   c->rval = r;
   c->ack = true;
+  if (c->is_read)
+    c->safe = true;
   c->cond.Signal();
 
   if (c->buf && c->bl.length() > 0) {
@@ -1503,6 +1509,9 @@ void librados::IoCtxImpl::C_aio_Ack::finish(int r)
   if (c->callback_complete) {
     c->io->client->finisher.queue(new C_AioComplete(c));
   }
+  if (c->is_read && c->callback_safe) {
+    c->io->client->finisher.queue(new C_AioSafe(c));
+  }
 
   c->put_unlock();
 }
index 36f43827dfe7d1bbde1b6bc56e22891558f718fd..784de6269bb88d6041f2894f6d39a99862fe9685 100644 (file)
@@ -241,6 +241,37 @@ TEST(LibRadosAio, RoundTrip) {
   rados_aio_release(my_completion2);
 }
 
+TEST(LibRadosAio, RoundTrip2) {
+  AioTestData test_data;
+  rados_completion_t my_completion;
+  ASSERT_EQ("", test_data.init());
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion));
+  char buf[128];
+  memset(buf, 0xcc, sizeof(buf));
+  ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
+                              my_completion, buf, sizeof(buf), 0));
+  {
+    TestAlarm alarm;
+    sem_wait(&test_data.m_sem);
+    sem_wait(&test_data.m_sem);
+  }
+  char buf2[128];
+  memset(buf2, 0, sizeof(buf2));
+  rados_completion_t my_completion2;
+  ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
+             set_completion_complete, set_completion_safe, &my_completion2));
+  ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "foo",
+                             my_completion2, buf2, sizeof(buf2), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, rados_aio_wait_for_safe(my_completion2));
+  }
+  ASSERT_EQ(0, memcmp(buf, buf2, sizeof(buf)));
+  rados_aio_release(my_completion);
+  rados_aio_release(my_completion2);
+}
+
 TEST(LibRadosAio, RoundTripPP) {
   AioTestDataPP test_data;
   ASSERT_EQ("", test_data.init());
@@ -274,6 +305,39 @@ TEST(LibRadosAio, RoundTripPP) {
   delete my_completion2;
 }
 
+TEST(LibRadosAio, RoundTripPP2) {
+  AioTestDataPP test_data;
+  ASSERT_EQ("", test_data.init());
+  AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  AioCompletion *my_completion_null = NULL;
+  ASSERT_NE(my_completion, my_completion_null);
+  char buf[128];
+  memset(buf, 0xcc, sizeof(buf));
+  bufferlist bl1;
+  bl1.append(buf, sizeof(buf));
+  ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
+                                          bl1, sizeof(buf), 0));
+  {
+    TestAlarm alarm;
+    sem_wait(&test_data.m_sem);
+    sem_wait(&test_data.m_sem);
+  }
+  bufferlist bl2;
+  AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
+         (void*)&test_data, set_completion_complete, set_completion_safe);
+  ASSERT_NE(my_completion2, my_completion_null);
+  ASSERT_EQ(0, test_data.m_ioctx.aio_read("foo",
+                             my_completion2, &bl2, sizeof(buf), 0));
+  {
+    TestAlarm alarm;
+    ASSERT_EQ(0, my_completion2->wait_for_safe());
+  }
+  ASSERT_EQ(0, memcmp(buf, bl2.c_str(), sizeof(buf)));
+  delete my_completion;
+  delete my_completion2;
+}
+
 TEST(LibRadosAio, RoundTripAppend) {
   AioTestData test_data;
   rados_completion_t my_completion, my_completion2, my_completion3;