]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: refactor push code
authorSage Weil <sage@newdream.net>
Mon, 19 Jul 2010 23:22:52 +0000 (16:22 -0700)
committerSage Weil <sage@newdream.net>
Mon, 19 Jul 2010 23:22:52 +0000 (16:22 -0700)
- send_push_op() does a push, nothing else
- push_start() starts a primary->replica push, tracks state
- push_to_replica() ensures we push head first, calculates cloning, etc.

src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 43bccbb2729b725e8970df4696bc330f902be965..e450c696a63ac78a41c7ff6f27d8d57ecce59325 100644 (file)
@@ -3127,22 +3127,15 @@ void ReplicatedPG::send_pull_op(const sobject_t& soid, eversion_t v,
  * intelligently push an object to a replica.  make use of existing
  * clones/heads and dup data ranges where possible.
  */
-void ReplicatedPG::push_to_replica(const sobject_t& soid, int peer)
+void ReplicatedPG::push_to_replica(ObjectContext *obc, const sobject_t& soid, int peer)
 {
-  dout(10) << "push_to_replica " << soid << " osd" << peer << dendl;
+  const object_info_t& oi = obc->obs.oi;
+  uint64_t size = obc->obs.oi.size;
+
+  dout(10) << "push_to_replica " << soid << " v" << oi.version << " size " << size << " to osd" << peer << dendl;
 
-  // get size
-  struct stat st;
-  int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
-  assert(r == 0);
-  
   map<sobject_t, interval_set<uint64_t> > clone_subsets;
   interval_set<uint64_t> data_subset;
-
-  bufferlist bv;
-  r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bv);
-  assert(r >= 0);
-  object_info_t oi(bv);
   
   // are we doing a clone on the replica?
   if (soid.snap && soid.snap < CEPH_NOSNAP) {  
@@ -3155,9 +3148,9 @@ void ReplicatedPG::push_to_replica(const sobject_t& soid, int peer)
               << ", pushing " << soid << " attrs as a clone op" << dendl;
       interval_set<uint64_t> data_subset;
       map<sobject_t, interval_set<uint64_t> > clone_subsets;
-      if (st.st_size)
-       clone_subsets[head].insert(0, st.st_size);
-      push(soid, peer, data_subset, clone_subsets);
+      if (size)
+       clone_subsets[head].insert(0, size);
+      push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
       return;
     }
 
@@ -3165,13 +3158,13 @@ void ReplicatedPG::push_to_replica(const sobject_t& soid, int peer)
     // we need the head (and current SnapSet) to do that.
     if (missing.is_missing(head)) {
       dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
-      return push(soid, peer);
+      return push_start(soid, peer);
     }
     sobject_t snapdir = head;
     snapdir.snap = CEPH_SNAPDIR;
     if (missing.is_missing(snapdir)) {
       dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
-      return push(snapdir, peer);
+      return push_start(snapdir, peer);
     }
     
     SnapSetContext *ssc = get_snapset_context(soid.oid, false);
@@ -3188,51 +3181,69 @@ void ReplicatedPG::push_to_replica(const sobject_t& soid, int peer)
     put_snapset_context(ssc);
   }
 
-  dout(10) << "push_to_replica " << soid << " pushing " << data_subset
+  push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+}
+
+void ReplicatedPG::push_start(const sobject_t& soid, int peer)
+{
+  struct stat st;
+  int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
+  assert(r == 0);
+  uint64_t size = st.st_size;
+
+  bufferlist bl;
+  r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bl);
+  object_info_t oi(bl);
+
+  interval_set<uint64_t> data_subset;
+  map<sobject_t, interval_set<uint64_t> > clone_subsets;
+  data_subset.insert(0, size);
+
+  push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+}
+
+void ReplicatedPG::push_start(const sobject_t& soid, int peer,
+                             uint64_t size, eversion_t version,
+                             interval_set<uint64_t> &data_subset,
+                             map<sobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+  // take note.
+  push_info_t *pi = &pushing[soid][peer];
+  pi->version = version;
+  pi->data_subset = data_subset;
+  pi->clone_subsets = clone_subsets;
+  //pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+
+  dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
           << " cloning " << clone_subsets << dendl;    
-  push(soid, peer, data_subset, clone_subsets);
+  send_push_op(soid, peer, size, pi->data_subset, pi->clone_subsets);
 }
 
+
 /*
  * push - send object to a peer
  */
-void ReplicatedPG::push(const sobject_t& soid, int peer)
-{
-  interval_set<uint64_t> subset;
-  map<sobject_t, interval_set<uint64_t> > clone_subsets;
-  push(soid, peer, subset, clone_subsets);
-}
 
-void ReplicatedPG::push(const sobject_t& soid, int peer, 
-                       interval_set<uint64_t> &data_subset,
-                       map<sobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::send_push_op(const sobject_t& soid, int peer, 
+                               uint64_t size,
+                               interval_set<uint64_t> &data_subset,
+                               map<sobject_t, interval_set<uint64_t> >& clone_subsets)
 {
   // read data+attrs
   bufferlist bl;
   map<string,bufferptr> attrset;
-  uint64_t size;
-
-  if (data_subset.size() || clone_subsets.size()) {
-    struct stat st;
-    int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
-    assert(r == 0);
-    size = st.st_size;
-
-    for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
-        p != data_subset.m.end();
-        p++) {
-      bufferlist bit;
-      osd->store->read(coll_t::build_pg_coll(info.pgid), soid, p->first, p->second, bit);
-      if (p->second != bit.length()) {
-       dout(10) << " extent " << p->first << "~" << p->second
-                << " is actually " << p->first << "~" << bit.length() << dendl;
-       p->second = bit.length();
-      }
-      bl.claim_append(bit);
+
+  for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
+       p != data_subset.m.end();
+       p++) {
+    bufferlist bit;
+    osd->store->read(coll_t::build_pg_coll(info.pgid), soid, p->first, p->second, bit);
+    if (p->second != bit.length()) {
+      dout(10) << " extent " << p->first << "~" << p->second
+              << " is actually " << p->first << "~" << bit.length() << dendl;
+      p->second = bit.length();
     }
-  } else {
-    osd->store->read(coll_t::build_pg_coll(info.pgid), soid, 0, 0, bl);
-    size = bl.length();
+    bl.claim_append(bit);
   }
 
   osd->store->getattrs(coll_t::build_pg_coll(info.pgid), soid, attrset);
@@ -3242,8 +3253,8 @@ void ReplicatedPG::push(const sobject_t& soid, int peer,
   object_info_t oi(bv);
 
   // ok
-  dout(7) << "push " << soid << " v " << oi.version 
-         << " size " << size
+  dout(7) << "send_push_op " << soid << " v " << oi.version 
+         << " size " << size
          << " subset " << data_subset
           << " data " << bl.length()
           << " to osd" << peer
@@ -3258,19 +3269,14 @@ void ReplicatedPG::push(const sobject_t& soid, int peer,
                                   osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
   subop->ops = vector<OSDOp>(1);
   subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-  subop->ops[0].op.extent.offset = 0;
-  subop->ops[0].op.extent.length = size;
+  //subop->ops[0].op.extent.offset = 0;
+  //subop->ops[0].op.extent.length = size;
   subop->ops[0].data = bl;
   subop->data_subset.swap(data_subset);
   subop->clone_subsets.swap(clone_subsets);
   subop->attrset.swap(attrset);
   subop->old_size = size;
   osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
-  
-  if (is_primary()) {
-    pushing[soid].insert(peer);
-    peer_missing[peer].got(soid, oi.version);
-  }
 }
 
 void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
@@ -3286,18 +3292,21 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
             << dendl;
   } else if (pushing[soid].count(peer) == 0) {
     dout(10) << "huh, i wasn't pushing " << soid << " to osd" << peer
-            << ", only " << pushing[soid]
             << dendl;
   } else {
-    pushing[soid].erase(peer);
+    push_info_t *pi = &pushing[soid][peer];
 
-    if (peer_missing.count(peer) == 0 ||
-        peer_missing[peer].num_missing() == 0) 
+    peer_missing[peer].got(soid, pi->version);
+    if (peer_missing[peer].num_missing() == 0) 
       uptodate_set.insert(peer);
 
+    pushing[soid].erase(peer);
+    pi = NULL;
+
     update_stats();
 
     if (pushing[soid].empty()) {
+      pushing.erase(soid);
       dout(10) << "pushed " << soid << " to all replicas" << dendl;
       finish_recovery_op(soid);
       if (waiting_for_degraded_object.count(soid)) {
@@ -3306,7 +3315,7 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
       }
     } else {
       dout(10) << "pushed " << soid << ", still waiting for push ack from " 
-              << pushing[soid] << dendl;
+              << pushing[soid].size() << " others" << dendl;
     }
   }
   reply->put();
@@ -3328,7 +3337,12 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
 
   assert(!is_primary());  // we should be a replica or stray.
 
-  push(soid, op->get_source().num(), op->data_subset, op->clone_subsets);
+  struct stat st;
+  int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
+  assert(r == 0);
+  uint64_t size = st.st_size;
+
+  send_push_op(soid, op->get_source().num(), size, op->data_subset, op->clone_subsets);
   op->put();
 }
 
@@ -3901,11 +3915,9 @@ int ReplicatedPG::recover_object_replicas(const sobject_t& soid)
 
   dout(10) << "recover_object_replicas " << soid << dendl;
 
-  ObjectContext *obc = lookup_object_context(soid);
-  if (obc) {
-    dout(10) << " ondisk_read_lock for " << soid << dendl;
-    obc->ondisk_read_lock();
-  }
+  ObjectContext *obc = get_object_context(soid);
+  dout(10) << " ondisk_read_lock for " << soid << dendl;
+  obc->ondisk_read_lock();
   
   start_recovery_op(soid);
   started++;
@@ -3914,15 +3926,14 @@ int ReplicatedPG::recover_object_replicas(const sobject_t& soid)
   for (unsigned i=1; i<acting.size(); i++) {
     int peer = acting[i];
     if (peer_missing.count(peer) &&
-       peer_missing[peer].is_missing(soid)) 
-      push_to_replica(soid, peer);
+       peer_missing[peer].is_missing(soid)) {
+      push_to_replica(obc, soid, peer);
+    }
   }
   
-  if (obc) {
-    dout(10) << " ondisk_read_unlock on " << soid << dendl;
-    obc->ondisk_read_unlock();
-    put_object_context(obc);
-  }
+  dout(10) << " ondisk_read_unlock on " << soid << dendl;
+  obc->ondisk_read_unlock();
+  put_object_context(obc);
 
   return started;
 }
index 29eda2721564180b79ba36e17fefd79bc21c1e97..9a74a868b5df77ad76f692587698ff0b13e5883f 100644 (file)
@@ -456,7 +456,12 @@ protected:
   map<sobject_t, pull_info_t> pulling;
 
   // push
-  map<sobject_t, set<int> > pushing;
+  struct push_info_t {
+    eversion_t version;
+    interval_set<uint64_t> data_subset;
+    map<sobject_t, interval_set<uint64_t> > clone_subsets;
+  };
+  map<sobject_t, map<int, push_info_t> > pushing;
 
   int recover_object_replicas(const sobject_t& soid);
   void calc_head_subsets(SnapSet& snapset, const sobject_t& head,
@@ -466,10 +471,17 @@ protected:
   void calc_clone_subsets(SnapSet& snapset, const sobject_t& poid, Missing& missing,
                          interval_set<uint64_t>& data_subset,
                          map<sobject_t, interval_set<uint64_t> >& clone_subsets);
-  void push_to_replica(const sobject_t& oid, int dest);
-  void push(const sobject_t& oid, int dest);
-  void push(const sobject_t& oid, int dest, interval_set<uint64_t>& data_subset, 
-           map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+  void push_to_replica(ObjectContext *obc, const sobject_t& oid, int dest);
+  void push_start(const sobject_t& oid, int dest);
+  void push_start(const sobject_t& soid, int peer,
+                 uint64_t size, eversion_t version,
+                 interval_set<uint64_t> &data_subset,
+                 map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+  void send_push_op(const sobject_t& oid, int dest,
+                   uint64_t size,
+                   interval_set<uint64_t>& data_subset, 
+                   map<sobject_t, interval_set<uint64_t> >& clone_subsets);
+
   bool pull(const sobject_t& oid);
   void send_pull_op(const sobject_t& soid, eversion_t v, const interval_set<uint64_t>& data_subset, int fromosd);