]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: split send_push into build_push_op and send_push_op
authorSamuel Just <sam.just@inktank.com>
Wed, 12 Jun 2013 22:50:55 +0000 (15:50 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:30 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f01425b4414cf05727cf065bc8985d50bc689766..f0f959fa16be93f3369bed22209c51090659560d 100644 (file)
@@ -5723,45 +5723,48 @@ void ReplicatedPG::handle_push(OpRequestRef op)
 
 int ReplicatedPG::send_push(int prio, int peer,
                            const ObjectRecoveryInfo &recovery_info,
-                           ObjectRecoveryProgress progress,
+                           const ObjectRecoveryProgress &progress,
                            ObjectRecoveryProgress *out_progress)
 {
-  ObjectRecoveryProgress new_progress = progress;
+  PushOp op;
+  int r = build_push_op(recovery_info, progress, out_progress, &op);
+  if (r < 0)
+    return r;
+  return send_push_op(prio, peer, op);
+}
 
-  tid_t tid = osd->get_tid();
-  osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
-  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
-                                  false, 0, get_osdmap()->get_epoch(),
-                                  tid, recovery_info.version);
-  subop->set_priority(prio);
+int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
+                               const ObjectRecoveryProgress &progress,
+                               ObjectRecoveryProgress *out_progress,
+                               PushOp *out_op)
+{
+  ObjectRecoveryProgress _new_progress;
+  if (!out_progress)
+    out_progress = &_new_progress;
+  ObjectRecoveryProgress &new_progress = *out_progress;
+  new_progress = progress;
 
   dout(7) << "send_push_op " << recovery_info.soid
          << " v " << recovery_info.version
          << " size " << recovery_info.size
-          << " to osd." << peer
          << " recovery_info: " << recovery_info
           << dendl;
 
-  subop->ops = vector<OSDOp>(1);
-  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-
   if (progress.first) {
-    osd->store->omap_get_header(coll, recovery_info.soid, &subop->omap_header);
-    osd->store->getattrs(coll, recovery_info.soid, subop->attrset);
+    osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+    osd->store->getattrs(coll, recovery_info.soid, out_op->attrset);
 
     // Debug
     bufferlist bv;
-    bv.push_back(subop->attrset[OI_ATTR]);
+    bv.push_back(out_op->attrset[OI_ATTR]);
     object_info_t oi(bv);
 
     if (oi.version != recovery_info.version) {
       osd->clog.error() << info.pgid << " push "
                        << recovery_info.soid << " v "
-                       << recovery_info.version << " to osd." << peer
                        << " failed because local copy is "
                        << oi.version << "\n";
-      subop->put();
-      return -1;
+      return -EINVAL;
     }
 
     new_progress.first = false;
@@ -5775,10 +5778,10 @@ int ReplicatedPG::send_push(int prio, int peer,
     for (iter->lower_bound(progress.omap_recovered_to);
         iter->valid();
         iter->next()) {
-      if (!subop->omap_entries.empty() &&
+      if (!out_op->omap_entries.empty() &&
          available <= (iter->key().size() + iter->value().length()))
        break;
-      subop->omap_entries.insert(make_pair(iter->key(), iter->value()));
+      out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
 
       if ((iter->key().size() + iter->value().length()) <= available)
        available -= (iter->key().size() + iter->value().length());
@@ -5792,15 +5795,15 @@ int ReplicatedPG::send_push(int prio, int peer,
   }
 
   if (available > 0) {
-    subop->data_included.span_of(recovery_info.copy_subset,
+    out_op->data_included.span_of(recovery_info.copy_subset,
                                 progress.data_recovered_to,
                                 available);
   } else {
-    subop->data_included.clear();
+    out_op->data_included.clear();
   }
 
-  for (interval_set<uint64_t>::iterator p = subop->data_included.begin();
-       p != subop->data_included.end();
+  for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
+       p != out_op->data_included.end();
        ++p) {
     bufferlist bit;
     osd->store->read(coll, recovery_info.soid,
@@ -5812,30 +5815,54 @@ int ReplicatedPG::send_push(int prio, int peer,
       p.set_len(bit.length());
       new_progress.data_complete = true;
     }
-    subop->ops[0].indata.claim_append(bit);
+    out_op->data.claim_append(bit);
   }
 
-  if (!subop->data_included.empty())
-    new_progress.data_recovered_to = subop->data_included.range_end();
+  if (!out_op->data_included.empty())
+    new_progress.data_recovered_to = out_op->data_included.range_end();
 
   if (new_progress.is_complete(recovery_info)) {
     new_progress.data_complete = true;
     info.stats.stats.sum.num_objects_recovered++;
   }
 
-  info.stats.stats.sum.num_keys_recovered += subop->omap_entries.size();
-  info.stats.stats.sum.num_bytes_recovered += subop->ops[0].indata.length();
+  info.stats.stats.sum.num_keys_recovered += out_op->omap_entries.size();
+  info.stats.stats.sum.num_bytes_recovered += out_op->data.length();
 
   osd->logger->inc(l_osd_push);
-  osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length());
+  osd->logger->inc(l_osd_push_outb, out_op->data.length());
   
   // send
-  subop->recovery_info = recovery_info;
-  subop->recovery_progress = new_progress;
-  subop->current_progress = progress;
+  out_op->version = recovery_info.version;
+  out_op->soid = recovery_info.soid;
+  out_op->recovery_info = recovery_info;
+  out_op->after_progress = new_progress;
+  out_op->before_progress = progress;
+  return 0;
+}
+
+int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop)
+{
+  tid_t tid = osd->get_tid();
+  osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, pop.soid,
+                                  false, 0, get_osdmap()->get_epoch(),
+                                  tid, pop.recovery_info.version);
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+  subop->set_priority(prio);
+  subop->version = pop.version;
+  subop->ops[0].indata.claim(pop.data);
+  subop->data_included.swap(pop.data_included);
+  subop->omap_header.claim(pop.omap_header);
+  subop->omap_entries.swap(pop.omap_entries);
+  subop->attrset.swap(pop.attrset);
+  subop->recovery_info = pop.recovery_info;
+  subop->current_progress = pop.before_progress;
+  subop->recovery_progress = pop.after_progress;
+
   osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
-  if (out_progress)
-    *out_progress = new_progress;
   return 0;
 }
 
index c8b6e15bb843b67a2ed09dd5b34ea3901d349590..bc44dee0cff36e893cf54faa2b1ef9de5734d9ed 100644 (file)
@@ -560,8 +560,15 @@ protected:
   void handle_push(OpRequestRef op);
   int send_push(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
-               ObjectRecoveryProgress progress,
+               const ObjectRecoveryProgress &progress,
                ObjectRecoveryProgress *out_progress = 0);
+  int build_push_op(const ObjectRecoveryInfo &recovery_info,
+                   const ObjectRecoveryProgress &progress,
+                   ObjectRecoveryProgress *out_progress,
+                   PushOp *out_op);
+  int send_push_op(int priority, int peer,
+                  PushOp &pop);
+    
   int send_pull(int priority, int peer,
                const ObjectRecoveryInfo& recovery_info,
                ObjectRecoveryProgress progress);