]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: redid snapped reads, writes
authorSage Weil <sage@newdream.net>
Thu, 7 Aug 2008 00:36:43 +0000 (17:36 -0700)
committerSage Weil <sage@newdream.net>
Thu, 7 Aug 2008 00:36:43 +0000 (17:36 -0700)
src/messages/MOSDSubOp.h
src/os/ObjectStore.h
src/osd/PG.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index 4d251e5ad28a538b716b0e2f9f39b4443e3a81b2..20906f43e5f56ca9c7b597aabe28f4741115a15e 100644 (file)
@@ -41,8 +41,8 @@ public:
   eversion_t version;
   uint32_t inc_lock;
 
-  snapid_t follows_snap;
-  vector<snapid_t> snaps;
+  SnapSet snapset;
+  SnapContext snapc;
   
   // piggybacked osd/og state
   eversion_t pg_trim_to;   // primary->replica: trim to here
@@ -62,8 +62,8 @@ public:
     ::decode(rep_tid, p);
     ::decode(version, p);
     ::decode(inc_lock, p);
-    ::decode(follows_snap, p);
-    ::decode(snaps, p);
+    ::decode(snapset, p);
+    ::decode(snapc, p);
     ::decode(pg_trim_to, p);
     ::decode(peer_stat, p);
     ::decode(attrset, p);
@@ -80,8 +80,8 @@ public:
     ::encode(rep_tid, payload);
     ::encode(version, payload);
     ::encode(inc_lock, payload);
-    ::encode(follows_snap, payload);
-    ::encode(snaps, payload);
+    ::encode(snapset, payload);
+    ::encode(snapc, payload);
     ::encode(pg_trim_to, payload);
     ::encode(peer_stat, payload);
     ::encode(attrset, payload);
@@ -96,8 +96,7 @@ public:
   bool is_read() { return op < 10; }
  
   MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, int o, loff_t of, loff_t le,
-           epoch_t mape, tid_t rtid, unsigned il, eversion_t v,
-           snapid_t fs) :
+           epoch_t mape, tid_t rtid, unsigned il, eversion_t v) :
     Message(MSG_OSD_SUBOP),
     map_epoch(mape),
     reqid(r),
@@ -108,8 +107,7 @@ public:
     length(le),
     rep_tid(rtid),
     version(v),
-    inc_lock(il),
-    follows_snap(fs)
+    inc_lock(il)
   {
     memset(&peer_stat, 0, sizeof(peer_stat));
   }
@@ -121,7 +119,7 @@ public:
        << " " << MOSDOp::get_opname(op)
        << " " << poid
        << " v " << version
-       << " follows_snap=" << follows_snap << " snaps=" << snaps;    
+       << " snapset=" << snapset << " snapc=" << snapc;    
     if (length) out << " " << offset << "~" << length;
     out << ")";
   }
index d04458913d542b3dde4e1c1da777997a9027f6fb..af3e5b45f5728944b4866c3eb2815ec998c1bacf 100644 (file)
@@ -577,6 +577,12 @@ public:
   virtual int setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset, Context *onsafe=0) = 0;
   virtual int getattr(coll_t cid, pobject_t oid, const char *name, void *value, size_t size) = 0;
   virtual int getattr(coll_t cid, pobject_t oid, const char *name, bufferptr& value) = 0;
+  int getattr(coll_t cid, pobject_t oid, const char *name, bufferlist& value) {
+    bufferptr bp;
+    int r = getattr(cid, oid, name, bp);
+    value.push_back(bp);
+    return r;
+  }
   virtual int getattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset) {return 0;};
 
   virtual int rmattr(coll_t cid, pobject_t oid, const char *name,
index 36dec0be3935587e748fd9ee639f6604e4be9627..90f0e610ae4e3f594acb00913d8c5c757510a52e 100644 (file)
@@ -1098,7 +1098,8 @@ void PG::finish_recovery()
   ::encode(info, bl);
   t.collection_setattr(info.pgid, "info", bl);
   osd->store->apply_transaction(t);
-  osd->store->sync();
+  //osd->store->sync();  
+#warning fix finish_recovery sync behavior
 
   purge_strays();
   update_stats();
index 13c5c7ffd47d870cfc299aa09e6d2781675bf1fb..89b5351c01c8a8ba77e14391fa17c7efdb8176bb 100644 (file)
@@ -457,72 +457,61 @@ bool ReplicatedPG::pick_read_snap(pobject_t& poid)
   pobject_t head = poid;
   head.oid.snap = CEPH_NOSNAP;
 
-  bufferptr bp;
-  int r = osd->store->getattr(info.pgid, head, "snapc", bp);
-  if (r < 0)
-    return false;  // if head doesn't exist, no snapped version will either.
-  bufferlist bl;
-  bl.push_back(bp);
-  bufferlist::iterator p = bl.begin();
-  SnapContext snapc;
-  ::decode(snapc, p);
+  SnapSet snapset;
+  {
+    bufferlist bl;
+    int r = osd->store->getattr(info.pgid, head, "snapset", bl);
+    if (r < 0)
+      return false;  // if head doesn't exist, no snapped version will either.
+    bufferlist::iterator p = bl.begin();
+    ::decode(snapset, p);
+  }
 
-  dout(10) << "pick_read_snap " << poid << " snapc " << snapc << dendl;
+  dout(10) << "pick_read_snap " << poid << " snapset " << snapset << dendl;
   snapid_t want = poid.oid.snap;
-  vector<snapid_t> csnap;
-  pobject_t t = poid;
-
-  for (int i=-1; i<(int)snapc.snaps.size(); i++) {
-    snapid_t last;
-    if (i < 0)
-      last = t.oid.snap = CEPH_NOSNAP;
-    else
-      last = t.oid.snap = snapc.snaps[i];
-    if (last < want) {
-      dout(20) << "pick_read_snap  stopping (DNE) at clone " << t
-              << ": last " << last << " < want " << want << dendl;
-      return false;
-    }
-    
-    if (last == CEPH_NOSNAP) {
-      csnap.resize(1);
-      csnap[0] = CEPH_NOSNAP;
-    } else {
-      bufferptr bp;
-      r = osd->store->getattr(info.pgid, t, "snaps", bp);
-      if (r < 0) {
-       dout(20) << "pick_read_snap  " << t << " dne" << dendl;
-       continue;
-      }
-      bufferlist bl;
-      bl.push_back(bp);
-      bufferlist::iterator p = bl.begin();
-      ::decode(csnap, p);
-      dout(20) << "pick_read_snap  " << t << " snaps " << csnap << dendl;
-    }
-    
-    snapid_t first = csnap[csnap.size()-1];
-    dout(20) << "pick_read_snap ? " << t << " [" << first << "," << last
-            << "] csnap " << csnap << dendl;
-    assert(csnap[0] == last);
-
-    if (first <= want) {
-      dout(20) << "pick_read_snap  " << t << " first " << first << " <= " << poid.oid.snap
-              << " -- HIT" << dendl;
-      poid = t;
-      return true;
-    }
 
-    dout(20) << "pick_read_snap  skipping clone " << t
-            << ": first " << first << " > want " << want << dendl;
-    while (i+1 < (int)snapc.snaps.size() &&
-          snapc.snaps[i+1] > first) {
-      i++;
-      dout(20) << "pick_read_snap    and snap " << snapc.snaps[i] << dendl;
+  // head?
+  if (want > snapset.seq) {
+    dout(10) << "pick_read_snap  " << head << " want " << want << " > snapset seq " << snapset.seq << " -- HIT" << dendl;
+    poid = head;
+    return true;
+  }
+
+  // which clone would it be?
+  unsigned k = 0;
+  while (k<snapset.clones.size() && snapset.clones[k] < want)
+    k++;
+  if (k == snapset.clones.size()) {
+    dout(10) << "pick_read_snap  no clones with last >= want " << want << " -- DNE" << dendl;
+    return false;
+  }
+  
+  // check clone
+  poid.oid.snap = snapset.clones[k];
+  vector<snapid_t> snaps;
+  {
+    bufferlist bl;
+    int r = osd->store->getattr(info.pgid, poid, "snaps", bl);
+    if (r < 0) {
+      dout(20) << "pick_read_snap  " << poid << " dne" << dendl;
+      assert(0);
+      return false;
     }
+    bufferlist::iterator p = bl.begin();
+    ::decode(snaps, p);
   }
+  dout(20) << "pick_read_snap  " << poid << " snaps " << snaps << dendl;
+  snapid_t first = snaps[snaps.size()-1];
+  snapid_t last = snaps[0];
+  assert(last == poid.oid.snap);
+  if (first <= want) {
+    dout(20) << "pick_read_snap  " << poid << " [" << first << "," << last << "] contains " << want << " -- HIT" << dendl;
+    return true;
+  }
+
+  dout(20) << "pick_read_snap  " << poid << " [" << first << "," << last << "] does not contain " << want << " -- DNE" << dendl;
   return false;
-}
+} 
 
 
 void ReplicatedPG::op_read(MOSDOp *op)
@@ -676,7 +665,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
 void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
                                       pobject_t poid, int op, eversion_t at_version,
                                       off_t offset, off_t length, bufferlist& bl,
-                                      snapid_t follows_snap, vector<snapid_t>& snaps,
+                                      SnapSet& snapset, SnapContext& snapc,
                                       __u32 inc_lock, eversion_t trim_to)
 {
   // WRNOOP does nothing.
@@ -685,23 +674,32 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
 
   // clone?
   if (poid.oid.snap) {
-    assert(poid.oid.snap == NOSNAP);
-    dout(20) << "snaps=" << snaps << "  follows_snap=" << follows_snap << dendl;;
+    assert(poid.oid.snap == CEPH_NOSNAP);
+    dout(20) << "snapset=" << snapset << "  snapc=" << snapc << dendl;;
+
+    // use newer snapc?
+    if (snapset.seq > snapc.seq) {
+      snapc.seq = snapset.seq;
+      snapc.snaps = snapset.snaps;
+      dout(10) << " using newer snapc " << snapc << dendl;
+    }
 
-    if (follows_snap &&
-       !snaps.empty() && snaps[0] > follows_snap) {
+    if (snapset.seq &&                   // object exists
+       snapc.snaps.size() &&            // there are snaps
+       snapc.snaps[0] > snapset.seq) {  // existing object is old
       // clone
       pobject_t coid = poid;
-      coid.oid.snap = snaps[0];
+      coid.oid.snap = snapc.seq;
       
       unsigned l;
-      for (l=1; l<snaps.size() && snaps[l] > follows_snap; l++) ;
-      vector<snapid_t> csnaps(l);
+      for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > snapset.seq; l++) ;
+
+      vector<snapid_t> snaps(l);
       for (unsigned i=0; i<l; i++)
-       csnaps[i] = snaps[i];
+       snaps[i] = snapc.snaps[i];
 
       // log clone
-      dout(10) << "cloning to " << coid << " v " << at_version << " csnaps=" << csnaps << dendl;
+      dout(10) << "cloning to " << coid << " v " << at_version << " snaps=" << snaps << dendl;
       Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, reqid);
       dout(10) << "prepare_transaction " << cloneentry << dendl;
       log.add(cloneentry);
@@ -709,26 +707,35 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
 
       // prepare clone
       t.clone(info.pgid, poid, coid);
-      t.setattr(info.pgid, coid, "snaps", &csnaps[0], csnaps.size()*sizeof(snapid_t));
+      bufferlist snapsbl;
+      ::encode(snaps, snapsbl);
+      t.setattr(info.pgid, coid, "snaps", snapsbl);
       t.setattr(info.pgid, coid, "version", &at_version, sizeof(at_version));
       
+      snapset.clones.push_back(coid.oid.snap);
+      
       at_version.version++;
+    }
 
-      /*
-      // munge delete into a truncate?
-      if (op == CEPH_OSD_OP_DELETE) {
-       op = CEPH_OSD_OP_TRUNCATE;
-       length = 0;
-      }
-      */
+    // update snapset with latest snap context
+    snapset.seq = snapc.seq;
+    snapset.snaps = snapc.snaps;
+
+    // munge delete into a truncate?
+    if (op == CEPH_OSD_OP_DELETE &&
+       snapset.clones.size()) {
+      dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
+      op = CEPH_OSD_OP_TRUNCATE;
+      length = 0;
     }
   }
+
   
   // log op
   int opcode = Log::Entry::MODIFY;
   if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
   Log::Entry logentry(opcode, poid.oid, at_version, reqid);
-  dout(10) << "prepare_transaction " << op << " " << logentry << dendl;
+  dout(10) << "prepare_transaction " << logentry << dendl;
 
   assert(at_version > log.top);
   log.add(logentry);
@@ -816,8 +823,9 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
     // object version
     t.setattr(info.pgid, poid, "version", &at_version, sizeof(at_version));
 
-    if (snaps.size())
-      t.setattr(info.pgid, poid, "follows_snap", &snaps[0], snaps.size() * sizeof(snapid_t));
+    bufferlist snapsetbl;
+    ::encode(snapset, snapsetbl);
+    t.setattr(info.pgid, poid, "snapset", snapsetbl);
   }
 
   // update pg info:
@@ -995,9 +1003,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
                                repop->op->get_op(), 
                                repop->op->get_offset(), repop->op->get_length(), 
                                osd->osdmap->get_epoch(), 
-                               repop->rep_tid, repop->op->get_inc_lock(), repop->at_version,
-                               repop->follows_snap);
-  wr->snaps = repop->snaps;
+                               repop->rep_tid, repop->op->get_inc_lock(), repop->at_version);
+  wr->snapset = repop->snapset;
+  wr->snapc = repop->snapc;
   wr->get_data() = repop->op->get_data();   // _copy_ bufferlist
   wr->pg_trim_to = peers_complete_thru;
   wr->peer_stat = osd->get_my_stat_for(now, dest);
@@ -1005,11 +1013,11 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
 }
 
 ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
-                                                     snapid_t follows_snap, vector<snapid_t> &snaps)
+                                                     SnapSet& snapset, SnapContext& snapc)
 {
   dout(10) << "new_rep_gather rep_tid " << rep_tid << " on " << *op << dendl;
   RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete,
-                                  follows_snap, snaps);
+                                  snapset, snapc);
   
   // osds. commits all come to me.
   for (unsigned i=0; i<acting.size(); i++) {
@@ -1241,21 +1249,33 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     assert(av > log.top);
   }
 
-  // snap cloning?
-  snapid_t follows = 0;
-  if (poid.oid.snap == CEPH_NOSNAP)
-    osd->store->getattr(info.pgid, poid, "follows_snap", &follows, sizeof(follows));
-
+  // snap
+  SnapContext snapc;
+  snapc.seq = op->get_snap_seq();
+  snapc.snaps = op->get_snaps();
+
+  SnapSet snapset;
+  if (poid.oid.snap == CEPH_NOSNAP) {
+    bufferlist bl;
+    int r = osd->store->getattr(info.pgid, poid, "snapset", bl);
+    if (r >= 0) {
+      bufferlist::iterator p = bl.begin();
+      ::decode(snapset, p);
+    } else {
+      dout(10) << " no \"snapset\" attr, r = " << r << " " << strerror(r) << dendl;
+    }
+  } else 
+    assert(poid.oid.snap == 0);   // no snapshotting.
 
   // set version in op, for benefit of client and our eventual reply
   op->set_version(av);
 
   dout(10) << "op_modify " << opname 
            << " " << poid.oid 
-           << " av " << av 
-          << " snaps=" << op->get_snaps()
-          << " follows_snap " << follows
            << " " << op->get_offset() << "~" << op->get_length()
+           << " av " << av 
+          << " snapc " << snapc
+          << " snapset " << snapset
            << dendl;  
 
   // are any peers missing this?
@@ -1279,7 +1299,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
 
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
-  RepGather *repop = new_rep_gather(op, rep_tid, av, follows, op->get_snaps());
+  RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
   for (unsigned i=1; i<acting.size(); i++)
     issue_repop(repop, acting[i], now);
 
@@ -1289,7 +1309,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     prepare_transaction(repop->t, op->get_reqid(),
                        poid, op->get_op(), av,
                        op->get_offset(), op->get_length(), op->get_data(),
-                       follows, op->get_snaps(),
+                       snapset, snapc,
                        op->get_inc_lock(), peers_complete_thru);
   }
   
@@ -1337,7 +1357,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
     prepare_transaction(t, op->reqid,
                        op->poid, op->op, op->version,
                        op->offset, op->length, op->get_data(), 
-                       op->follows_snap, op->snaps,
+                       op->snapset, op->snapc,
                        op->inc_lock, op->pg_trim_to);
   }
   
@@ -1426,7 +1446,7 @@ void ReplicatedPG::pull(pobject_t poid)
   
   MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
                                   0, 0, 
-                                  osd->osdmap->get_epoch(), tid, 0, v, 0);
+                                  osd->osdmap->get_epoch(), tid, 0, v);
   osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
   
   // take note
@@ -1462,7 +1482,7 @@ void ReplicatedPG::push(pobject_t poid, int peer)
   // send
   osd_reqid_t rid;  // useless?
   MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, bl.length(),
-                                  osd->osdmap->get_epoch(), osd->get_tid(), 0, v, 0);
+                                  osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
   subop->set_data(bl);   // note: claims bl, set length above here!
   subop->attrset.swap(attrset);
   osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
index 9b2a7f426a02ed773dc7abd833d1f0021165537c..cc3cf174d802a0f1b1afde3c58d78a5a7a02486b 100644 (file)
@@ -44,19 +44,19 @@ public:
     set<int>         osds;
     eversion_t       at_version;
 
-    snapid_t follows_snap;
-    vector<snapid_t> snaps;
+    SnapSet snapset;
+    SnapContext snapc;
 
     eversion_t       pg_local_last_complete;
     map<int,eversion_t> pg_complete_thru;
     
     RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc,
-             snapid_t fs, vector<snapid_t> &sn) :
+             SnapSet& ss, SnapContext& sc) :
       op(o), rep_tid(rt),
       applied(false),
       sent_ack(false), sent_commit(false),
       at_version(av), 
-      follows_snap(fs), snaps(sn),
+      snapset(ss), snapc(sc),
       pg_local_last_complete(lc) { }
 
     bool can_send_ack() { 
@@ -86,7 +86,7 @@ protected:
   void put_rep_gather(RepGather*);
   void issue_repop(RepGather *repop, int dest, utime_t now);
   RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv,
-                           snapid_t follows_snap, vector<snapid_t> &snaps);
+                           SnapSet& snapset, SnapContext& snapc);
   void repop_ack(RepGather *repop,
                  int result, bool commit,
                  int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
@@ -106,7 +106,7 @@ protected:
   void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
                           pobject_t poid, int op, eversion_t at_version,
                           off_t offset, off_t length, bufferlist& bl,
-                          snapid_t follows_snap, vector<snapid_t>& snaps,
+                          SnapSet& snapset, SnapContext& snapc,
                           __u32 inc_lock, eversion_t trim_to);
   
   friend class C_OSD_ModifyCommit;
index 6c5ed8e2e9f51e8ec9399e4cd6c103ae38129f83..6ff05fd0972f707eaab3c5edde402856e29b5ba5 100644 (file)
@@ -411,4 +411,32 @@ inline ostream& operator<<(ostream& out, OSDSuperblock& sb)
 }
 
 
+// -------
+
+/*
+ * attached to object head.  describes most recent snap context, and
+ * set of existing clones.
+ */
+struct SnapSet {
+  snapid_t seq;
+  vector<snapid_t> snaps;
+  vector<snapid_t> clones;
+
+  void encode(bufferlist& bl) const {
+    ::encode(seq, bl);
+    ::encode(snaps, bl);
+    ::encode(clones, bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    ::decode(seq, bl);
+    ::decode(snaps, bl);
+    ::decode(clones, bl);
+  }
+};
+WRITE_CLASS_ENCODER(SnapSet)
+
+inline ostream& operator<<(ostream& out, const SnapSet& cs) {
+  return out << cs.seq << "=" << cs.snaps << "~" << cs.clones;
+}
+
 #endif