]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: allow AioCompletion objects to be blocked
authorJason Dillaman <dillaman@redhat.com>
Tue, 30 Apr 2019 16:56:08 +0000 (12:56 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 2 May 2019 13:30:45 +0000 (09:30 -0400)
This will be used when user-provided memory is wrapped into a
ceph::buffer::raw pointer to prevent its release prior to the
drop of its last reference internally.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/io/AioCompletion.cc
src/librbd/io/AioCompletion.h

index 610435aceefeb0b42d2da6819cf56b6046b45bb9..011b005dfd4acf0049e89651b4de4826527f649b 100644 (file)
@@ -140,32 +140,64 @@ void AioCompletion::start_op() {
   async_op.start_op(*ictx);
 }
 
+void AioCompletion::queue_complete() {
+  uint32_t zero = 0;
+  pending_count.compare_exchange_strong(zero, 1);
+  ceph_assert(zero == 0);
+
+  // ensure completion fires in clean lock context
+  ictx->op_work_queue->queue(new C_AioRequest(this), 0);
+}
+
+void AioCompletion::block(CephContext* cct) {
+  ldout(cct, 20) << dendl;
+  ceph_assert(!was_armed);
+
+  get();
+  ++pending_count;
+}
+
+void AioCompletion::unblock(CephContext* cct) {
+  ldout(cct, 20) << dendl;
+  ceph_assert(was_armed);
+
+  uint32_t previous_pending_count = pending_count--;
+  ceph_assert(previous_pending_count > 0);
+
+  if (previous_pending_count == 1) {
+    queue_complete();
+  }
+  put();
+}
+
 void AioCompletion::fail(int r)
 {
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
-
   lderr(cct) << cpp_strerror(r) << dendl;
-  uint32_t previous_pending_count = pending_count.exchange(1);
-  ceph_assert(previous_pending_count == 0);
 
-  // ensure completion fires in clean lock context
-  ictx->op_work_queue->queue(new C_AioRequest(this), r);
+  ceph_assert(!was_armed);
+  was_armed = true;
+
+  error_rval = r;
+
+  uint32_t previous_pending_count = pending_count.load();
+  if (previous_pending_count == 0) {
+    queue_complete();
+  }
 }
 
 void AioCompletion::set_request_count(uint32_t count) {
   ceph_assert(ictx != nullptr);
   CephContext *cct = ictx->cct;
 
-  uint32_t previous_pending_count = pending_count.exchange(
-    count == 0 ? 1 : count);
-  ceph_assert(previous_pending_count == 0);
+  ceph_assert(!was_armed);
+  was_armed = true;
 
   ldout(cct, 20) << "pending=" << count << dendl;
-  if (count == 0) {
-    // ensure completion fires in clean lock context
-    ictx->op_work_queue->queue(new C_AioRequest(this), 0);
-    return;
+  uint32_t previous_pending_count = pending_count.fetch_add(count);
+  if (previous_pending_count == 0 && count == 0) {
+    queue_complete();
   }
 }
 
index 46f2bd0b02167816c871907cec23158eaae2a9cf..551596a0de83e7c49c93ef39949477089a49d780 100644 (file)
@@ -57,7 +57,7 @@ struct AioCompletion {
   std::atomic<ssize_t> rval{0};
   std::atomic<int> error_rval{0};
   std::atomic<uint32_t> ref{1};
-  std::atomic<uint32_t> pending_count{0};   ///< number of requests
+  std::atomic<uint32_t> pending_count{0};   ///< number of requests/blocks
   std::atomic<bool> released{false};
 
   ImageCtx *ictx = nullptr;
@@ -69,6 +69,7 @@ struct AioCompletion {
   AsyncOperation async_op;
 
   bool event_notify = false;
+  bool was_armed = false;
 
   template <typename T, void (T::*MF)(int)>
   static void callback_adapter(completion_t cb, void *arg) {
@@ -122,6 +123,9 @@ struct AioCompletion {
     return async_op.started();
   }
 
+  void block(CephContext* cct);
+  void unblock(CephContext* cct);
+
   void init_time(ImageCtx *i, aio_type_t t);
   void start_op();
   void fail(int r);
@@ -169,6 +173,10 @@ struct AioCompletion {
   void *get_arg() {
     return complete_arg;
   }
+
+private:
+  void queue_complete();
+
 };
 
 class C_AioRequest : public Context {