]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: push in chunks
authorSage Weil <sage@newdream.net>
Tue, 20 Jul 2010 19:30:53 +0000 (12:30 -0700)
committerSage Weil <sage@newdream.net>
Tue, 20 Jul 2010 19:30:53 +0000 (12:30 -0700)
Signed-off-by: Sage Weil <sage@newdream.net>
src/messages/MOSDSubOp.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index e4aecf8b114ca5d73a524fb8e46971df6fc32dc6..aacd023857bd0ab6206af8c48c6dec5e1a18ca8c 100644 (file)
@@ -64,7 +64,9 @@ public:
   interval_set<uint64_t> data_subset;
   map<sobject_t, interval_set<uint64_t> > clone_subsets;
 
- virtual void decode_payload() {
+  bool first, complete;
+
+  virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(map_epoch, p);
     ::decode(reqid, p);
@@ -96,9 +98,16 @@ public:
     ::decode(attrset, p);
     ::decode(data_subset, p);
     ::decode(clone_subsets, p);
+    
+    if (header.version >= 2) {
+      ::decode(first, p);
+      ::decode(complete, p);
+    }
   }
 
   virtual void encode_payload() {
+    header.version = 2;
+
     ::encode(map_epoch, payload);
     ::encode(reqid, payload);
     ::encode(pgid, payload);
@@ -131,6 +140,8 @@ public:
       header.data_off = ops[0].op.extent.offset;
     else
       header.data_off = 0;
+    ::encode(first, payload);
+    ::encode(complete, payload);
   }
 
 
@@ -144,7 +155,8 @@ public:
     acks_wanted(aw),
     noop(noop_),   
     old_exists(false), old_size(0),
-    version(v)
+    version(v),
+    first(false), complete(false)
   {
     memset(&peer_stat, 0, sizeof(peer_stat));
     set_tid(rtid);
@@ -162,6 +174,10 @@ public:
        << " " << ops;
     if (noop)
       out << " (NOOP)";
+    if (first)
+      out << " first";
+    if (complete)
+      out << " complete";
     out << " v " << version
        << " snapset=" << snapset << " snapc=" << snapc;    
     if (!data_subset.empty()) out << " subset " << data_subset;
index e450c696a63ac78a41c7ff6f27d8d57ecce59325..00e77651ff68e3aca9a9e8d819514c186d243a8c 100644 (file)
@@ -3209,14 +3209,17 @@ void ReplicatedPG::push_start(const sobject_t& soid, int peer,
 {
   // take note.
   push_info_t *pi = &pushing[soid][peer];
+  pi->size = size;
   pi->version = version;
   pi->data_subset = data_subset;
   pi->clone_subsets = clone_subsets;
-  //pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+
+  pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+  bool complete = pi->data_subset_pushing == pi->data_subset;
 
   dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
           << " cloning " << clone_subsets << dendl;    
-  send_push_op(soid, peer, size, pi->data_subset, pi->clone_subsets);
+  send_push_op(soid, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets);
 }
 
 
@@ -3225,7 +3228,7 @@ void ReplicatedPG::push_start(const sobject_t& soid, int peer,
  */
 
 void ReplicatedPG::send_push_op(const sobject_t& soid, int peer, 
-                               uint64_t size,
+                               uint64_t size, bool first, bool complete,
                                interval_set<uint64_t> &data_subset,
                                map<sobject_t, interval_set<uint64_t> >& clone_subsets)
 {
@@ -3272,10 +3275,12 @@ void ReplicatedPG::send_push_op(const sobject_t& soid, int peer,
   //subop->ops[0].op.extent.offset = 0;
   //subop->ops[0].op.extent.length = size;
   subop->ops[0].data = bl;
-  subop->data_subset.swap(data_subset);
-  subop->clone_subsets.swap(clone_subsets);
+  subop->data_subset = data_subset;
+  subop->clone_subsets = clone_subsets;
   subop->attrset.swap(attrset);
   subop->old_size = size;
+  subop->first = first;
+  subop->complete = complete;
   osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
 }
 
@@ -3296,26 +3301,41 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
   } else {
     push_info_t *pi = &pushing[soid][peer];
 
-    peer_missing[peer].got(soid, pi->version);
-    if (peer_missing[peer].num_missing() == 0) 
-      uptodate_set.insert(peer);
-
-    pushing[soid].erase(peer);
-    pi = NULL;
-
-    update_stats();
-
-    if (pushing[soid].empty()) {
-      pushing.erase(soid);
-      dout(10) << "pushed " << soid << " to all replicas" << dendl;
-      finish_recovery_op(soid);
-      if (waiting_for_degraded_object.count(soid)) {
-       osd->take_waiters(waiting_for_degraded_object[soid]);
-       waiting_for_degraded_object.erase(soid);
-      }
+    bool complete = false;
+    if (pi->data_subset.empty() ||
+       pi->data_subset.end() == pi->data_subset_pushing.end())
+      complete = true;
+
+    if (!complete) {
+      // push more
+      uint64_t from = pi->data_subset_pushing.end();
+      dout(10) << " pushing more, " << pi->data_subset << " from " << from << dendl;
+      pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf.osd_recovery_max_chunk);
+      complete = pi->data_subset.end() == pi->data_subset_pushing.end();
+      send_push_op(soid, peer, pi->size, false, complete, pi->data_subset_pushing, pi->clone_subsets);
     } else {
-      dout(10) << "pushed " << soid << ", still waiting for push ack from " 
-              << pushing[soid].size() << " others" << dendl;
+      // done!
+      peer_missing[peer].got(soid, pi->version);
+      if (peer_missing[peer].num_missing() == 0) 
+       uptodate_set.insert(peer);
+      
+      pushing[soid].erase(peer);
+      pi = NULL;
+      
+      update_stats();
+      
+      if (pushing[soid].empty()) {
+       pushing.erase(soid);
+       dout(10) << "pushed " << soid << " to all replicas" << dendl;
+       finish_recovery_op(soid);
+       if (waiting_for_degraded_object.count(soid)) {
+         osd->take_waiters(waiting_for_degraded_object[soid]);
+         waiting_for_degraded_object.erase(soid);
+       }
+      } else {
+       dout(10) << "pushed " << soid << ", still waiting for push ack from " 
+                << pushing[soid].size() << " others" << dendl;
+      }
     }
   }
   reply->put();
@@ -3342,7 +3362,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
   assert(r == 0);
   uint64_t size = st.st_size;
 
-  send_push_op(soid, op->get_source().num(), size, op->data_subset, op->clone_subsets);
+  send_push_op(soid, op->get_source().num(), size, op->first, op->complete, op->data_subset, op->clone_subsets);
   op->put();
 }
 
@@ -3431,8 +3451,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   clone_subsets = op->clone_subsets;
 
   pull_info_t *pi = 0;
-  bool first = true;
-  bool complete = true;
+  bool first = op->first;
+  bool complete = op->complete;
   if (is_primary()) {
     if (pulling.count(soid) == 0) {
       dout(10) << " not pulling, ignoring" << dendl;
@@ -3500,9 +3520,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     }
 
     if (pi->data_subset.empty()) {
-      first = complete = true;
+      complete = true;
     } else {
-      first = pi->data_subset.start() == data_subset.start();
       complete = pi->data_subset.end() == data_subset.end();
     }
   }
@@ -3591,6 +3610,10 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
        dout(10) << " log.complete_to = " << log.complete_to->version << dendl;
     }
 
+    // update pg
+    write_info(*t);
+
+
     // track ObjectContext
     if (is_primary()) {
       dout(10) << " setting up obc for " << soid << dendl;
@@ -3606,12 +3629,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     } else {
       onreadable = new ObjectStore::C_DeleteTransaction(t);
     }
+
   } else {
     onreadable = new ObjectStore::C_DeleteTransaction(t);
   }
 
   // apply to disk!
-  write_info(*t);
   int r = osd->store->queue_transaction(&osr, t,
                                        onreadable,
                                        new C_OSD_Commit(this, info.history.same_acting_since,
index 9a74a868b5df77ad76f692587698ff0b13e5883f..65dbcab0b4c337caace179c7b1aa1f59e7042775 100644 (file)
@@ -457,8 +457,9 @@ protected:
 
   // push
   struct push_info_t {
+    uint64_t size;
     eversion_t version;
-    interval_set<uint64_t> data_subset;
+    interval_set<uint64_t> data_subset, data_subset_pushing;
     map<sobject_t, interval_set<uint64_t> > clone_subsets;
   };
   map<sobject_t, map<int, push_info_t> > pushing;
@@ -478,7 +479,7 @@ protected:
                  interval_set<uint64_t> &data_subset,
                  map<sobject_t, interval_set<uint64_t> >& clone_subsets);
   void send_push_op(const sobject_t& oid, int dest,
-                   uint64_t size,
+                   uint64_t size, bool first, bool complete,
                    interval_set<uint64_t>& data_subset, 
                    map<sobject_t, interval_set<uint64_t> >& clone_subsets);