]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: block reads on an object until the write is committed
authorSamuel Just <sam.just@inktank.com>
Fri, 4 Oct 2013 06:20:07 +0000 (23:20 -0700)
committerSamuel Just <sam.just@inktank.com>
Wed, 9 Oct 2013 20:35:42 +0000 (13:35 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f466eb8ccdc3848322d64dc7b8920cb143d25f7d..781ba33ec35d6625f24560a7b16614e24015e16f 100644 (file)
@@ -988,21 +988,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
     return;
   }
 
-  if ((op->may_read()) && (obc->obs.oi.is_lost())) {
-    // This object is lost. Reading from it returns an error.
-    dout(20) << __func__ << ": object " << obc->obs.oi.soid
-            << " is lost" << dendl;
-    osd->reply_op_error(op, -ENFILE);
-    return;
-  }
   dout(25) << __func__ << ": object " << obc->obs.oi.soid
           << " has oi of " << obc->obs.oi << dendl;
-  
-  if (!op->may_write() && (!obc->obs.exists ||
-                          obc->obs.oi.is_whiteout())) {
-    osd->reply_op_error(op, -ENOENT);
-    return;
-  }
 
   // are writes blocked by another object?
   if (obc->blocked_by) {
@@ -1126,11 +1113,31 @@ void ReplicatedPG::do_op(OpRequestRef op)
     }
   }
 
-  op->mark_started();
-
   OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
                                 &obc->obs, obc->ssc, 
                                 this);
+  if (!get_rw_locks(ctx)) {
+    op->mark_delayed("waiting for rw locks");
+    close_op_ctx(ctx);
+    return;
+  }
+
+  if ((op->may_read()) && (obc->obs.oi.is_lost())) {
+    // This object is lost. Reading from it returns an error.
+    dout(20) << __func__ << ": object " << obc->obs.oi.soid
+            << " is lost" << dendl;
+    close_op_ctx(ctx);
+    osd->reply_op_error(op, -ENFILE);
+    return;
+  }
+  if (!op->may_write() && (!obc->obs.exists ||
+                           obc->obs.oi.is_whiteout())) {
+    close_op_ctx(ctx);
+    osd->reply_op_error(op, -ENOENT);
+    return;
+  }
+
+  op->mark_started();
   ctx->obc = obc;
   ctx->src_obc = src_obc;
 
@@ -1207,7 +1214,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
       if (already_complete(oldv)) {
        reply_ctx(ctx, 0, oldv, entry->user_version);
       } else {
-       delete ctx;
+       close_op_ctx(ctx);
 
        if (m->wants_ack()) {
          if (already_ack(oldv)) {
@@ -1300,7 +1307,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 
   if (result == -EAGAIN) {
     // clean up after the ctx
-    delete ctx;
+    close_op_ctx(ctx);
     return;
   }
 
@@ -1352,7 +1359,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
     
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
     osd->send_message_osd_client(reply, m->get_connection());
-    delete ctx;
+    close_op_ctx(ctx);
     return;
   }
 
@@ -1400,13 +1407,13 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 void ReplicatedPG::reply_ctx(OpContext *ctx, int r)
 {
   osd->reply_op_error(ctx->op, r);
-  delete ctx;
+  close_op_ctx(ctx);
 }
 
 void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
 {
   osd->reply_op_error(ctx->op, r, v, uv);
-  delete ctx;
+  close_op_ctx(ctx);
 }
 
 void ReplicatedPG::log_op_stats(OpContext *ctx)
@@ -4724,6 +4731,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     // ondisk?
     if (repop->waitfor_disk.empty()) {
 
+      release_op_ctx_locks(repop->ctx);
+
       log_op_stats(repop->ctx);
       publish_stats_to_osd();
 
@@ -4929,6 +4938,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe
  
 void ReplicatedPG::remove_repop(RepGather *repop)
 {
+  release_op_ctx_locks(repop->ctx);
   repop_map.erase(repop->rep_tid);
   repop->put();
 
index 27c9d1bb60589b0153460ba02fb16f0dcfa98426..21cca471538f00937bc84e985d3bb70da603adc3 100644 (file)
@@ -183,7 +183,7 @@ public:
        if (r != -ECANCELED) { // on cancel just toss it out; client resends
          ctx->pg->osd->reply_op_error(ctx->op, r);
        }
-       delete ctx;
+       ctx->pg->close_op_ctx(ctx);
       }
     }
 
@@ -374,6 +374,8 @@ public:
 
     hobject_t new_temp_oid, discard_temp_oid;  ///< temp objects we should start/stop tracking
 
+    enum { W_LOCK, R_LOCK, NONE } lock_to_release;
+
     OpContext(const OpContext& other);
     const OpContext& operator=(const OpContext& other);
 
@@ -388,7 +390,8 @@ public:
       data_off(0), reply(NULL), pg(_pg),
       num_read(0),
       num_write(0),
-      copy_cb(NULL) {
+      copy_cb(NULL),
+      lock_to_release(NONE) {
       if (_ssc) {
        new_snapset = _ssc->snapset;
        snapset = &_ssc->snapset;
@@ -396,6 +399,7 @@ public:
     }
     ~OpContext() {
       assert(!clone_obc);
+      assert(lock_to_release == NONE);
       if (reply)
        reply->put();
     }
@@ -454,7 +458,7 @@ public:
       if (--nref == 0) {
        assert(!obc);
        assert(src_obc.empty());
-       delete ctx;
+       delete ctx; // must already be unlocked
        delete this;
        //generic_dout(0) << "deleting " << this << dendl;
       }
@@ -465,6 +469,171 @@ public:
 
 protected:
 
+  /// Tracks pending readers or writers on an object
+  class RWTracker {
+    struct ObjState {
+      enum State {
+       NONE,
+       READ,
+       WRITE
+      };
+      State state;                 /// rw state
+      uint64_t count;              /// number of readers or writers
+      list<OpRequestRef> waiters;  /// ops waiting on state change
+
+      ObjState() : state(NONE), count(0) {}
+      bool get_read(OpRequestRef op) {
+       // don't starve!
+       if (!waiters.empty()) {
+         waiters.push_back(op);
+         return false;
+       }
+       switch (state) {
+       case NONE:
+         assert(count == 0);
+         state = READ;
+         // fall through
+       case READ:
+         count++;
+         return true;
+       case WRITE:
+         waiters.push_back(op);
+         return false;
+       default:
+         assert(0 == "unhandled case");
+         return false;
+       }
+      }
+      bool get_write(OpRequestRef op) {
+       if (!waiters.empty()) {
+         // don't starve!
+         waiters.push_back(op);
+         return false;
+       }
+       switch (state) {
+       case NONE:
+         assert(count == 0);
+         state = WRITE;
+         // fall through
+       case WRITE:
+         count++;
+         return true;
+       case READ:
+         waiters.push_back(op);
+         return false;
+       default:
+         assert(0 == "unhandled case");
+         return false;
+       }
+      }
+      void dec(list<OpRequestRef> *requeue) {
+       assert(count > 0);
+       assert(requeue);
+       assert(requeue->empty());
+       count--;
+       if (count == 0) {
+         state = NONE;
+         requeue->swap(waiters);
+       }
+      }
+      void put_read(list<OpRequestRef> *requeue) {
+       assert(state == READ);
+       dec(requeue);
+      }
+      void put_write(list<OpRequestRef> *requeue) {
+       assert(state == WRITE);
+       dec(requeue);
+      }
+      void clear(list<OpRequestRef> *requeue) {
+       state = NONE;
+       count = 0;
+       assert(requeue);
+       assert(requeue->empty());
+       requeue->swap(waiters);
+      }
+      bool empty() const { return state == NONE; }
+    };
+    map<hobject_t, ObjState > obj_state;
+  public:
+    bool get_read(const hobject_t &hoid, OpRequestRef op) {
+      return obj_state[hoid].get_read(op);
+    }
+    bool get_write(const hobject_t &hoid, OpRequestRef op) {
+      return obj_state[hoid].get_write(op);
+    }
+    void put_read(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+      obj_state[hoid].put_read(to_wake);
+      if (obj_state[hoid].empty()) {
+       obj_state.erase(hoid);
+      }
+    }
+    void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+      obj_state[hoid].put_write(to_wake);
+      if (obj_state[hoid].empty()) {
+       obj_state.erase(hoid);
+      }
+    }
+  } rw_manager;
+
+  /**
+   * Grabs locks for OpContext, should be cleaned up in close_op_ctx
+   *
+   * @param ctx [in,out] ctx to get locks for
+   * @return true on success, false if we are queued
+   */
+  bool get_rw_locks(OpContext *ctx) {
+    if (ctx->op->may_write()) {
+      if (rw_manager.get_write(ctx->obs->oi.soid, ctx->op)) {
+       ctx->lock_to_release = OpContext::W_LOCK;
+       return true;
+      } else {
+       assert(0 == "Currently there cannot be a read in flight here");
+       return false;
+      }
+    } else {
+      assert(ctx->op->may_read());
+      if (rw_manager.get_read(ctx->obs->oi.soid, ctx->op)) {
+       ctx->lock_to_release = OpContext::R_LOCK;
+       return true;
+      } else {
+       return false;
+      }
+    }
+  }
+
+  /**
+   * Cleans up OpContext
+   *
+   * @param ctx [in] ctx to clean up
+   */
+  void close_op_ctx(OpContext *ctx) {
+    release_op_ctx_locks(ctx);
+    delete ctx;
+  }
+
+  /**
+   * Releases ctx locks
+   *
+   * @param ctx [in] ctx to clean up
+   */
+  void release_op_ctx_locks(OpContext *ctx) {
+    list<OpRequestRef> to_req;
+    switch (ctx->lock_to_release) {
+    case OpContext::W_LOCK:
+      rw_manager.put_write(ctx->obs->oi.soid, &to_req);
+      break;
+    case OpContext::R_LOCK:
+      rw_manager.put_read(ctx->obs->oi.soid, &to_req);
+      break;
+    case OpContext::NONE:
+      break;
+    default:
+      assert(0);
+    };
+    ctx->lock_to_release = OpContext::NONE;
+    requeue_ops(to_req);
+  }
+
   // replica ops
   // [primary|tail]
   xlist<RepGather*> repop_queue;