]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG::get_rw_locks: use excl lock for read & write_ordered
authorSamuel Just <sjust@redhat.com>
Tue, 13 Jan 2015 22:34:23 +0000 (14:34 -0800)
committerSamuel Just <sjust@redhat.com>
Fri, 30 Jan 2015 19:33:24 +0000 (11:33 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 20fbfec1d9a1b960e8a7b79daa28ff23d0151ea2..112ae72b2c9b5de85b0d585aef70bf7fac811c7a 100644 (file)
@@ -1709,7 +1709,7 @@ void ReplicatedPG::do_op(OpRequestRef& op)
       reply_ctx(ctx, -EINVAL);
       return;
     }
-  } else if (!get_rw_locks(ctx)) {
+  } else if (!get_rw_locks(write_ordered, ctx)) {
     dout(20) << __func__ << " waiting for rw locks " << dendl;
     op->mark_delayed("waiting for rw locks");
     close_op_ctx(ctx, -EBUSY);
index 49ef4b818263fdcfb2f0fe47cf35b1788ca5d2c9..728fbc5fdf4aacdfe8639c666248412a3f74ec55 100644 (file)
@@ -597,7 +597,7 @@ public:
 
     ObjectModDesc mod_desc;
 
-    enum { W_LOCK, R_LOCK, NONE } lock_to_release;
+    enum { W_LOCK, R_LOCK, E_LOCK, NONE } lock_to_release;
 
     Context *on_finish;
 
@@ -752,48 +752,40 @@ protected:
    * @param ctx [in,out] ctx to get locks for
    * @return true on success, false if we are queued
    */
-  bool get_rw_locks(OpContext *ctx) {
+  bool get_rw_locks(bool write_ordered, OpContext *ctx) {
     /* If snapset_obc, !obc->obs->exists and we will always take the
      * snapdir lock *before* the head lock.  Since all callers will do
      * this (read or write) if we get the first we will be guaranteed
      * to get the second.
      */
+    ObjectContext::RWState::State type = ObjectContext::RWState::RWNONE;
+    if (write_ordered && ctx->op->may_read()) {
+      type = ObjectContext::RWState::RWEXCL;
+      ctx->lock_to_release = OpContext::E_LOCK;
+    } else if (write_ordered) {
+      type = ObjectContext::RWState::RWWRITE;
+      ctx->lock_to_release = OpContext::W_LOCK;
+    } else {
+      assert(ctx->op->may_read());
+      type = ObjectContext::RWState::RWREAD;
+      ctx->lock_to_release = OpContext::R_LOCK;
+    }
+
     if (ctx->snapset_obc) {
       assert(!ctx->obc->obs.exists);
-      if (ctx->op->may_write() || ctx->op->may_cache()) {
-       if (ctx->snapset_obc->get_write(ctx->op)) {
-         ctx->release_snapset_obc = true;
-         ctx->lock_to_release = OpContext::W_LOCK;
-       } else {
-         return false;
-       }
-      } else {
-       assert(ctx->op->may_read());
-       if (ctx->snapset_obc->get_read(ctx->op)) {
-         ctx->release_snapset_obc = true;
-         ctx->lock_to_release = OpContext::R_LOCK;
-       } else {
-         return false;
-       }
-      }
-    }
-    if (ctx->op->may_write() || ctx->op->may_cache()) {
-      if (ctx->obc->get_write(ctx->op)) {
-       ctx->lock_to_release = OpContext::W_LOCK;
-       return true;
+      if (ctx->snapset_obc->get_lock_type(ctx->op, type)) {
+       ctx->release_snapset_obc = true;
       } else {
-       assert(!ctx->snapset_obc);
+       ctx->lock_to_release = OpContext::NONE;
        return false;
       }
+    }
+    if (ctx->obc->get_lock_type(ctx->op, type)) {
+      return true;
     } else {
-      assert(ctx->op->may_read());
-      if (ctx->obc->get_read(ctx->op)) {
-       ctx->lock_to_release = OpContext::R_LOCK;
-       return true;
-      } else {
-       assert(!ctx->snapset_obc);
-       return false;
-      }
+      assert(!ctx->snapset_obc);
+      ctx->lock_to_release = OpContext::NONE;
+      return false;
     }
   }
 
@@ -842,6 +834,24 @@ protected:
          &requeue_recovery_clone,
          &requeue_snaptrimmer_clone);
       break;
+    case OpContext::E_LOCK:
+      if (ctx->snapset_obc && ctx->release_snapset_obc) {
+       ctx->snapset_obc->put_excl(
+         &to_req,
+         &requeue_recovery_snapset,
+         &requeue_snaptrimmer_snapset);
+       ctx->release_snapset_obc = false;
+      }
+      ctx->obc->put_excl(
+       &to_req,
+       &requeue_recovery,
+       &requeue_snaptrimmer);
+      if (ctx->clone_obc)
+       ctx->clone_obc->put_write(
+         &to_req,
+         &requeue_recovery_clone,
+         &requeue_snaptrimmer_clone);
+      break;
     case OpContext::R_LOCK:
       if (ctx->snapset_obc && ctx->release_snapset_obc) {
        ctx->snapset_obc->put_read(&to_req);