]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: track ObjectContext for cloned objects
authorSage Weil <sage@newdream.net>
Mon, 25 May 2009 20:18:14 +0000 (13:18 -0700)
committerSage Weil <sage@newdream.net>
Mon, 25 May 2009 20:18:23 +0000 (13:18 -0700)
This orders access to a newly cloned object.  This is really only important
when you have a racing clone creation and a clone read.  The read will
look at the head's context and expect the clone to be there, but we may
not have applied the write to disk yet.  So, we set up an obc for the
cloned object too (with the same mode as the head).

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 446d50f73e7542ab606c0e096b3e002f89a05650..9ca127bceaf70adae3778e8c1600adb2206147ca 100644 (file)
@@ -409,7 +409,8 @@ void ReplicatedPG::do_op(MOSDOp *op)
   }
 
   const sobject_t& soid = obc->soid;
-  OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(), &obc->oi);
+  OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(),
+                                obc->state, &obc->oi);
 
   bool noop = false;
   if (ctx->ops.empty()) {
@@ -1207,25 +1208,23 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
 
 void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
                               sobject_t head, pobject_t coid,
-                              eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps)
+                              object_info_t *poi)
 {
-  object_info_t pi(coid);
-  pi.version = v;
-  pi.prior_version = ov;
-  pi.last_reqid = reqid;
-  pi.mtime = mtime;
-  pi.snaps.swap(snaps);
   bufferlist bv;
-  ::encode(pi, bv);
+  ::encode(*poi, bv);
 
   t.clone(info.pgid.to_coll(), head, coid);
   t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv);
 }
 
-void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& stats,
-                                sobject_t soid, loff_t old_size, object_info_t& oi,
-                                eversion_t& at_version, SnapContext& snapc)
+void ReplicatedPG::prepare_clone(OpContext *ctx, loff_t old_size, eversion_t old_version,
+                                utime_t old_mtime, osd_reqid_t old_last_reqid)
 {
+  object_info_t& oi = *ctx->poi;
+  const sobject_t& soid = oi.soid;
+  SnapContext& snapc = ctx->snapc;
+  ObjectStore::Transaction& t = ctx->clone_t;
+
   // clone?
   assert(soid.snap == CEPH_NOSNAP);
   dout(20) << "snapset=" << oi.snapset << "  snapc=" << snapc << dendl;;
@@ -1252,7 +1251,21 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>
       snaps[i] = snapc.snaps[i];
     
     // prepare clone
-    _make_clone(t, soid, coid, oi.version, at_version, reqid, oi.mtime, snaps);
+    ctx->clone_obc = new ObjectContext;
+    ctx->clone_obc->state = ctx->mode;   // take state from head obc's
+    ctx->clone_obc->soid = coid;
+    ctx->clone_obc->oi.soid = coid;
+    ctx->clone_obc->oi.version = ctx->at_version;
+    ctx->clone_obc->oi.prior_version = old_version;
+    ctx->clone_obc->oi.last_reqid = old_last_reqid;
+    ctx->clone_obc->oi.mtime = old_mtime;
+    ctx->clone_obc->oi.snaps = snaps;
+
+    ctx->clone_obc->force_start_write();
+    if (is_primary())
+      register_object_context(ctx->clone_obc);
+    
+    _make_clone(t, soid, coid, &ctx->clone_obc->oi);
     
     // add to snap bound collections
     coll_t fc = make_snap_collection(t, snaps[0]);
@@ -1262,20 +1275,20 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>
       t.collection_add(lc, info.pgid.to_coll(), coid);
     }
     
-    stats.num_objects++;
-    stats.num_object_clones++;
+    info.stats.num_objects++;
+    info.stats.num_object_clones++;
     oi.snapset.clones.push_back(coid.snap);
     oi.snapset.clone_size[coid.snap] = old_size;
     oi.snapset.clone_overlap[coid.snap].insert(0, old_size);
     
     // log clone
     dout(10) << "cloning v " << oi.version
-            << " to " << coid << " v " << at_version
+            << " to " << coid << " v " << ctx->at_version
             << " snaps=" << snaps << dendl;
-    log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime));
-    ::encode(snaps, log.back().snaps);
+    ctx->log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, ctx->at_version, old_version, ctx->reqid, oi.mtime));
+    ::encode(snaps, ctx->log.back().snaps);
 
-    at_version.version++;
+    ctx->at_version.version++;
   }
   
   // update snapset with latest snap context
@@ -1599,22 +1612,27 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size)
 {
   assert(!ctx->ops.empty());
   
-  eversion_t old_version = ctx->poi->version;
-
-  const sobject_t& soid = ctx->poi->soid;
   object_info_t *poi = ctx->poi;
+  const sobject_t& soid = poi->soid;
+
+  // we'll need this to log
+  eversion_t old_version = poi->version;
+
+  // set these values aside, in case we need to clone
+  __u64 old_size = size;
+  utime_t old_mtime = poi->mtime;
+  osd_reqid_t old_last_reqid = poi->last_reqid;
 
   // prepare the actual mutation
   bufferlist::iterator bp = ctx->indata.begin();
   int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata, exists, size);
 
   if (result < 0 || ctx->op_t.empty())
-    return result;
+    return result;  // error, or read op.
 
   // clone?
   if (soid.snap)
-    prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi,
-                 ctx->at_version, ctx->snapc);
+    prepare_clone(ctx, old_size, old_version, old_mtime, old_last_reqid);
 
   // finish and log the op.
   poi->version = ctx->at_version;
@@ -1732,6 +1750,12 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   
   repop->applied = true;
   
+  if (repop->ctx->clone_obc) {
+   repop->ctx->clone_obc->finish_write();
+   put_object_context(repop->ctx->clone_obc);
+   repop->ctx->clone_obc = 0;
+  }
+
   repop->obc->finish_write();
 
   put_object_context(repop->obc);
@@ -2278,7 +2302,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
   oi.version = op->old_version;
   oi.snapset = op->snapset;
 
-  OpContext ctx(op, op->reqid, op->ops, op->get_data(), &oi);
+  OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &oi);
 
   if (!op->noop) {
     ctx.mtime = op->mtime;
@@ -3097,10 +3121,20 @@ int ReplicatedPG::recover_primary(int max)
            dout(10) << "recover_primary cloning " << head << " v" << latest->prior_version
                     << " to " << soid << " v" << latest->version
                     << " snaps " << latest->snaps << dendl;
-           vector<snapid_t> snaps;
-           ::decode(snaps, latest->snaps);
            ObjectStore::Transaction t;
-           _make_clone(t, head, soid, latest->prior_version, latest->version, latest->reqid, latest->mtime, snaps);
+
+           ObjectContext *headobc = get_object_context(head);
+
+           object_info_t oi(soid);
+           oi.version = latest->version;
+           oi.prior_version = latest->prior_version;
+           oi.last_reqid = headobc->oi.last_reqid;
+           oi.mtime = headobc->oi.mtime;
+           ::decode(oi.snaps, latest->snaps);
+           _make_clone(t, head, soid, &oi);
+
+           put_object_context(headobc);
+
            osd->store->apply_transaction(t);
            missing.got(latest->soid, latest->version);
            missing_loc.erase(latest->soid);
index e9d2c38a30f13f004e814b651eacaa816afb3831..05b4ffe4658f4f22c76b35e05b87fa7580c804cd 100644 (file)
@@ -65,20 +65,13 @@ public:
    * replicas ack.
    */
   struct ObjectContext {
-    sobject_t soid;
-    int ref;
-    bool registered; 
-
-    void get() { ++ref; }
-
-    enum {
+    typedef enum {
       IDLE,
       DELAYED,
       RMW,
       DELAYED_FLUSHING,
       RMW_FLUSHING
-    } state;
-
+    } state_t;
     static const char *get_state_name(int s) {
       switch (s) {
       case IDLE: return "idle";
@@ -90,6 +83,12 @@ public:
       }
     }
 
+    sobject_t soid;
+    int ref;
+    bool registered; 
+
+    state_t state;
+
     int num_wr, num_rmw;
     entity_inst_t client;
     list<Message*> waiting;
@@ -99,6 +98,9 @@ public:
     __u64 size;
 
     object_info_t oi;
+
+
+    void get() { ++ref; }
     
     bool is_delayed_mode() {
       return state == DELAYED || state == DELAYED_FLUSHING;
@@ -168,6 +170,9 @@ public:
       num_wr++;
       assert(state == DELAYED || state == RMW);
     }
+    void force_start_write() {
+      num_wr++;
+    }
     void finish_write() {
       assert(num_wr > 0);
       --num_wr;
@@ -228,6 +233,7 @@ public:
     bufferlist& indata;
     bufferlist outdata;
 
+    ObjectContext::state_t mode;  // DELAYED or RMW (or _FLUSHING variant?)
     object_info_t *poi;
 
     utime_t mtime;
@@ -237,12 +243,17 @@ public:
     ObjectStore::Transaction op_t, clone_t, local_t;
     vector<PG::Log::Entry> log;
 
+    ObjectContext *clone_obc;    // if we created a clone
+
     int data_off;        // FIXME: we may want to kill this msgr hint off at some point!
 
     OpContext(Message *_op, osd_reqid_t _reqid, vector<ceph_osd_op>& _ops, bufferlist& _data,
-             object_info_t *_poi) :
-      op(_op), reqid(_reqid), ops(_ops), indata(_data), poi(_poi),
-      data_off(0) {}
+             ObjectContext::state_t _mode, object_info_t *_poi) :
+      op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), poi(_poi),
+      clone_obc(0), data_off(0) {}
+    ~OpContext() {
+      assert(!clone_obc);
+    }
   };
 
   /*
@@ -380,10 +391,9 @@ protected:
 
   void _make_clone(ObjectStore::Transaction& t,
                   sobject_t head, sobject_t coid,
-                  eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps);
-  void prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& st,
-                    sobject_t poid, loff_t old_size, object_info_t& oi,
-                    eversion_t& at_version, SnapContext& snapc);
+                  object_info_t *poi);
+  void prepare_clone(OpContext *ctx, loff_t old_size,
+                    eversion_t old_version, utime_t old_mtime, osd_reqid_t old_last_reqid);
   void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);  
   int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
                        sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,