]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: refactor push and pull
authorSamuel Just <samuel.just@dreamhost.com>
Mon, 13 Feb 2012 19:49:42 +0000 (11:49 -0800)
committerSamuel Just <samuel.just@dreamhost.com>
Mon, 13 Feb 2012 20:07:39 +0000 (12:07 -0800)
Now, push progress is represented by ObjectRecoveryProgress.  In
particular, rather than tracking data_subset_*ing, we track the furthest
offset before which the data will be consistent once cloning is complete.
sub_op_push now separates the pull response implementation from the
replica push implementation.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc
src/osd/osd_types.h
src/test/encoding/types.h

index 8f2974ca24511ebe1b2e2ac19a4b74edb28211c8..60cee021f4f27ef1a9599ac30212bc609f22889a 100644 (file)
@@ -107,7 +107,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
   assert(g != missing.missing.end());
   const eversion_t &v(g->second.need);
 
-  map<hobject_t, pull_info_t>::const_iterator p = pulling.find(soid);
+  map<hobject_t, PullInfo>::const_iterator p = pulling.find(soid);
   if (p != pulling.end()) {
     dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl;
   }
@@ -3971,8 +3971,7 @@ int ReplicatedPG::pull(const hobject_t& soid, eversion_t v)
          << " from osd." << fromosd
          << dendl;
 
-  map<hobject_t, interval_set<uint64_t> > clone_subsets;
-  interval_set<uint64_t> data_subset;
+  ObjectRecoveryInfo recovery_info;
   bool need_size = false;
 
   // is this a snapped object?  if so, consult the snapset.. we may not need the entire object!
@@ -4008,66 +4007,37 @@ int ReplicatedPG::pull(const hobject_t& soid, eversion_t v)
     SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
     dout(10) << " snapset " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
-                      data_subset, clone_subsets);
+                      recovery_info.copy_subset,
+                      recovery_info.clone_subset);
     put_snapset_context(ssc);
     // FIXME: this may overestimate if we are pulling multiple clones in parallel...
-    dout(10) << " pulling " << data_subset << ", will clone " << clone_subsets
-            << dendl;
+    dout(10) << " pulling " << recovery_info << dendl;
   } else {
     // pulling head or unversioned object.
     // always pull the whole thing.
     need_size = true;
-    data_subset.insert(0, (uint64_t)-1);
+    recovery_info.copy_subset.insert(0, (uint64_t)-1);
+    recovery_info.size = ((uint64_t)-1);
   }
 
-  // only pull so much at a time
-  interval_set<uint64_t> pullsub;
-  pullsub.span_of(data_subset, 0, g_conf->osd_recovery_max_chunk);
 
-  // take note
-  assert(pulling.count(soid) == 0);
+  recovery_info.soid = soid;
+  recovery_info.version = v;
+  ObjectRecoveryProgress progress;
+  progress.data_complete = false;
+  progress.data_recovered_to = 0;
+  progress.first = true;
+  assert(!pulling.count(soid));
   pull_from_peer[fromosd].insert(soid);
-  pull_info_t& p = pulling[soid];
-  p.version = v;
-  p.from = fromosd;
-  p.data_subset = data_subset;
-  p.data_subset_pulling = pullsub;
-  p.need_size = need_size;
-
-  send_pull_op(soid, v, true, p.data_subset_pulling, fromosd);
-  
+  PullInfo &pi = pulling[soid];
+  pi.recovery_info = recovery_info;
+  pi.recovery_progress = progress;
+  send_pull(fromosd, recovery_info, progress);
+
   start_recovery_op(soid);
   return PULL_YES;
 }
 
-void ReplicatedPG::send_pull_op(const hobject_t& soid, eversion_t v, bool first,
-                               const interval_set<uint64_t>& data_subset, int fromosd)
-{
-  // send op
-  tid_t tid = osd->get_tid();
-  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
-
-  dout(10) << "send_pull_op " << soid << " " << v
-          << " first=" << first
-          << " data " << data_subset << " from osd." << fromosd
-          << " tid " << tid << dendl;
-
-  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
-                                  get_osdmap()->get_epoch(), tid, v);
-  subop->ops = vector<OSDOp>(1);
-  subop->ops[0].op.op = CEPH_OSD_OP_PULL;
-  subop->data_subset = data_subset;
-  subop->first = first;
-
-  // do not include clone_subsets in pull request; we will recalculate this
-  // when the object is pushed back.
-  //subop->clone_subsets.swap(clone_subsets);
-
-  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(fromosd));
-
-  osd->logger->inc(l_osd_pull);
-}
-
 void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
 {
   tid_t tid = osd->get_tid();
@@ -4111,7 +4081,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
       map<hobject_t, interval_set<uint64_t> > clone_subsets;
       if (size)
        clone_subsets[head].insert(0, size);
-      push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+      push_start(obc, soid, peer, oi.version, data_subset, clone_subsets);
       return;
     }
 
@@ -4119,13 +4089,13 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
     // we need the head (and current SnapSet) locally to do that.
     if (missing.is_missing(head)) {
       dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
-      return push_start(soid, peer);
+      return push_start(obc, soid, peer);
     }
     hobject_t snapdir = head;
     snapdir.snap = CEPH_SNAPDIR;
     if (missing.is_missing(snapdir)) {
       dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
-      return push_start(soid, peer);
+      return push_start(obc, soid, peer);
     }
     
     SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
@@ -4145,117 +4115,439 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
     put_snapset_context(ssc);
   }
 
-  push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+  push_start(obc, soid, peer, oi.version, data_subset, clone_subsets);
 }
 
-void ReplicatedPG::push_start(const hobject_t& soid, int peer)
+void ReplicatedPG::push_start(ObjectContext *obc,
+                             const hobject_t& soid, int peer)
 {
-  struct stat st;
-  int r = osd->store->stat(coll, soid, &st);
-  assert(r == 0);
-  uint64_t size = st.st_size;
-
-  bufferlist bl;
-  r = osd->store->getattr(coll, soid, OI_ATTR, bl);
-  object_info_t oi(bl);
-
   interval_set<uint64_t> data_subset;
+  data_subset.insert(0, obc->obs.oi.size);
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
-  data_subset.insert(0, size);
 
-  push_start(soid, peer, size, oi.version, data_subset, clone_subsets);
+  push_start(obc, soid, peer, obc->obs.oi.version, data_subset, clone_subsets);
 }
 
-void ReplicatedPG::push_start(const hobject_t& soid, int peer,
-                             uint64_t size, eversion_t version,
-                             interval_set<uint64_t> &data_subset,
-                             map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::push_start(
+  ObjectContext *obc,
+  const hobject_t& soid, int peer,
+  eversion_t version,
+  interval_set<uint64_t> &data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
 {
   // take note.
-  push_info_t *pi = &pushing[soid][peer];
-  pi->size = size;
-  pi->version = version;
-  pi->data_subset = data_subset;
-  pi->clone_subsets = clone_subsets;
+  PushInfo &pi = pushing[soid][peer];
+  pi.recovery_info.size = obc->obs.oi.size;
+  pi.recovery_info.copy_subset = data_subset;
+  pi.recovery_info.clone_subset = clone_subsets;
+  pi.recovery_info.soid = soid;
+  pi.recovery_info.oi = obc->obs.oi;
+  pi.recovery_info.version = version;
+  pi.recovery_progress.first = true;
+  pi.recovery_progress.data_recovered_to = 0;
+  pi.recovery_progress.data_complete = 0;
+
+  ObjectRecoveryProgress new_progress;
+  send_push(peer, pi.recovery_info, pi.recovery_progress, &new_progress);
+  pi.recovery_progress = new_progress;
+}
+
+int ReplicatedPG::send_pull(int peer,
+                           ObjectRecoveryInfo recovery_info,
+                           ObjectRecoveryProgress progress)
+{
+  // send op
+  tid_t tid = osd->get_tid();
+  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+
+  dout(10) << "send_pull_op " << recovery_info.soid << " "
+          << recovery_info.version
+          << " first=" << progress.first
+          << " data " << recovery_info.copy_subset
+          << " from osd." << peer
+          << " tid " << tid << dendl;
+
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
+                                  false, CEPH_OSD_FLAG_ACK,
+                                  get_osdmap()->get_epoch(), tid,
+                                  recovery_info.version);
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+  subop->recovery_info = recovery_info;
+  subop->recovery_progress = progress;
 
-  pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf->osd_recovery_max_chunk);
-  bool complete = pi->data_subset_pushing == pi->data_subset;
+  osd->cluster_messenger->send_message(subop,
+                                      get_osdmap()->get_cluster_inst(peer));
 
-  dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
-          << " cloning " << clone_subsets << dendl;    
-  send_push_op(soid, version, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets);
+  osd->logger->inc(l_osd_pull);
+  return 0;
 }
 
+void ReplicatedPG::submit_push_data(
+  const ObjectRecoveryInfo &recovery_info,
+  bool first,
+  const interval_set<uint64_t> &intervals_included,
+  bufferlist data_included,
+  map<string, bufferptr> &attrs,
+  ObjectStore::Transaction *t)
+{
+  if (first) {
+    t->remove(coll_t::TEMP_COLL, recovery_info.soid);
+    t->touch(coll_t::TEMP_COLL, recovery_info.soid);
+  }
+  uint64_t off = 0;
+  for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+       p != intervals_included.end();
+       ++p) {
+    bufferlist bit;
+    bit.substr_of(data_included, off, p.get_len());
+    t->write(coll_t::TEMP_COLL, recovery_info.soid,
+            p.get_start(), p.get_len(), bit);
+    off += p.get_len();
+  }
 
-/*
- * push - send object to a peer
- */
+  t->setattrs(coll_t::TEMP_COLL, recovery_info.soid,
+             attrs);
+}
 
-int ReplicatedPG::send_push_op(const hobject_t& soid, eversion_t version, int peer, 
-                              uint64_t size, bool first, bool complete,
-                              interval_set<uint64_t> &data_subset,
-                              map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+                                       ObjectStore::Transaction *t)
 {
-  // read data+attrs
-  bufferlist bl;
-  map<string,bufferptr> attrset;
+  remove_object_with_snap_hardlinks(*t, recovery_info.soid);
+  t->collection_add(coll, coll_t::TEMP_COLL, recovery_info.soid);
+  t->collection_remove(coll_t::TEMP_COLL, recovery_info.soid);
+  for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
+        recovery_info.clone_subset.begin();
+       p != recovery_info.clone_subset.end();
+       ++p) {
+    for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+        q != p->second.end();
+        ++q) {
+      dout(15) << " clone_range " << p->first << " "
+              << q.get_start() << "~" << q.get_len() << dendl;
+      t->clone_range(coll, p->first, recovery_info.soid,
+                    q.get_start(), q.get_len(), q.get_start());
+    }
+  }
+
+  if (recovery_info.soid.snap < CEPH_NOSNAP) {
+    if (recovery_info.oi.snaps.size()) {
+      coll_t lc = make_snap_collection(*t,
+                                      recovery_info.oi.snaps[0]);
+      t->collection_add(lc, coll, recovery_info.soid);
+      if (recovery_info.oi.snaps.size() > 1) {
+       coll_t hc = make_snap_collection(
+         *t,
+         recovery_info.oi.snaps[recovery_info.oi.snaps.size()-1]);
+       t->collection_add(hc, coll, recovery_info.soid);
+      }
+    }
+  }
+
+  if (missing.is_missing(recovery_info.soid) &&
+      missing.missing[recovery_info.soid].need > recovery_info.version) {
+    assert(is_primary());
+    pg_log_entry_t *latest = log.objects[recovery_info.soid];
+    if (latest->op == pg_log_entry_t::LOST_REVERT &&
+       latest->prior_version == recovery_info.version) {
+      dout(10) << " got old revert version " << recovery_info.version
+              << " for " << *latest << dendl;
+      recovery_info.version = latest->version;
+      // update the attr to the revert event version
+      recovery_info.oi.prior_version = recovery_info.oi.version;
+      recovery_info.oi.version = latest->version;
+      bufferlist bl;
+      ::encode(recovery_info.oi, bl);
+      t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+    }
+  }
+  recover_got(recovery_info.soid, recovery_info.version);
+
+  // update pg
+  write_info(*t);
+}
+
+ObjectRecoveryInfo ReplicatedPG::recalc_subsets(ObjectRecoveryInfo recovery_info)
+{
+  if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
+    return recovery_info;
+
+  SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid,
+                                           recovery_info.soid.get_key(),
+                                           recovery_info.soid.hash,
+                                           false);
+  ObjectRecoveryInfo new_info = recovery_info;
+  new_info.copy_subset.clear();
+  new_info.clone_subset.clear();
+  assert(ssc);
+  calc_clone_subsets(ssc->snapset, new_info.soid, missing, info.last_backfill,
+                    new_info.copy_subset, new_info.clone_subset);
+  put_snapset_context(ssc);
+  return new_info;
+}
+
+void ReplicatedPG::handle_pull_response(OpRequest *op)
+{
+  MOSDSubOp *m = (MOSDSubOp *)op->request;
+  bufferlist data;
+  m->claim_data(data);
+  interval_set<uint64_t> data_included = m->data_included;
+  dout(10) << "handle_push "
+          << m->recovery_info
+          << m->recovery_progress
+          << " data.size() is " << data.length()
+          << " data_included: " << data_included
+          << dendl;
+  if (m->version == eversion_t()) {
+    // replica doesn't have it!
+    _failed_push(op);
+    return;
+  }
+
+  hobject_t &hoid = m->recovery_info.soid;
+  assert((data_included.empty() && data.length() == 0) ||
+        (!data_included.empty() && data.length() > 0));
+
+
+  if (!pulling.count(hoid)) {
+    return;
+  }
+
+  PullInfo &pi = pulling[hoid];
+  if (pi.recovery_info.size == (uint64_t(-1))) {
+    pi.recovery_info.size = m->recovery_info.size;
+    pi.recovery_info.copy_subset.intersection_of(
+      m->recovery_info.copy_subset);
+  }
+
+  pi.recovery_info = recalc_subsets(pi.recovery_info);
+
+  interval_set<uint64_t> usable_intervals;
+  bufferlist usable_data;
+  trim_pushed_data(pi.recovery_info.copy_subset,
+                  data_included,
+                  data,
+                  &usable_intervals,
+                  &usable_data);
+  data_included = usable_intervals;
+  data.claim(usable_data);
+
+  bool first = pi.recovery_progress.first;
+  pi.recovery_progress = m->recovery_progress;
+
+  dout(10) << "new recovery_info: "
+          << pi.recovery_info
+          << ", new progress " << pi.recovery_progress
+          << dendl;
+
+  if (first) {
+    bufferlist oibl;
+    if (m->attrset.count(OI_ATTR)) {
+      oibl.push_back(m->attrset[OI_ATTR]);
+      ::decode(pi.recovery_info.oi, oibl);
+    } else {
+      assert(0);
+    }
+    bufferlist ssbl;
+    if (m->attrset.count(SS_ATTR)) {
+      ssbl.push_back(m->attrset[SS_ATTR]);
+      ::decode(pi.recovery_info.ss, ssbl);
+    } else {
+      assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
+            pi.recovery_info.soid.snap != CEPH_SNAPDIR);
+    }
+  }
+
+  bool complete = pi.recovery_progress.data_recovered_to >=
+    (pi.recovery_info.copy_subset.empty() ?
+     0 : pi.recovery_info.copy_subset.range_end());
+
+  if (complete && !pi.recovery_progress.data_complete) {
+    dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted"
+           << dendl;
+    _failed_push(op);
+    return;
+  }
+
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  Context *onreadable = 0;
+  Context *onreadable_sync = 0;
+  submit_push_data(pi.recovery_info, first,
+                  data_included, data, m->attrset,
+                  t);
+
+  if (complete) {
+    submit_push_complete(pi.recovery_info, t);
+
+    ObjectContext *obc = get_object_context(hoid,
+                                           pi.recovery_info.oi.oloc,
+                                           true);
+    obc->ondisk_write_lock();
+    obc->obs.exists = true;
+    obc->obs.oi = pi.recovery_info.oi;
+
+    if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
+      obc->ssc->snapset = pi.recovery_info.ss;
+    }
+
+    onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
+    onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
+  } else {
+    onreadable = new ObjectStore::C_DeleteTransaction(t);
+  }
+
+  int r = osd->store->
+    queue_transaction(&osr, t,
+                     onreadable,
+                     new C_OSD_CommittedPushedObject(this, op,
+                                                     info.history.same_interval_since,
+                                                     info.last_complete),
+                     onreadable_sync);
+  assert(r == 0);
+
+  if (complete) {
+    finish_recovery_op(hoid);
+    pulling.erase(hoid);
+    pull_from_peer[m->get_source().num()].erase(hoid);
+    update_stats();
+    if (waiting_for_missing_object.count(hoid)) {
+      dout(20) << " kicking waiters on " << hoid << dendl;
+      osd->requeue_ops(this, waiting_for_missing_object[hoid]);
+      waiting_for_missing_object.erase(hoid);
+      if (missing.missing.size() == 0) {
+       osd->requeue_ops(this, waiting_for_all_missing);
+       waiting_for_all_missing.clear();
+      }
+    }
+  } else {
+    send_pull(m->get_source().num(), pi.recovery_info, pi.recovery_progress);
+  }
+}
+
+void ReplicatedPG::handle_push(OpRequest *op)
+{
+  MOSDSubOp *m = (MOSDSubOp *)op->request;
+  dout(10) << "handle_push "
+          << m->recovery_info
+          << m->recovery_progress
+          << dendl;
+  bufferlist data;
+  m->claim_data(data);
+  bool first = m->current_progress.first;
+  bool complete = m->recovery_progress.data_complete;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  Context *onreadable = new ObjectStore::C_DeleteTransaction(t);
+  Context *onreadable_sync = 0;
+  submit_push_data(m->recovery_info,
+                  first,
+                  m->data_included,
+                  data,
+                  m->attrset,
+                  t);
+  if (complete)
+    submit_push_complete(m->recovery_info,
+                        t);
+
+  int r = osd->store->
+    queue_transaction(&osr, t,
+                     onreadable,
+                     new C_OSD_CommittedPushedObject(
+                       this, op,
+                       info.history.same_interval_since,
+                       info.last_complete),
+                     onreadable_sync);
+  assert(r == 0);
+
+  MOSDSubOpReply *reply = new MOSDSubOpReply(
+    m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+  osd->cluster_messenger->send_message(reply, m->get_connection());
+}
+
+int ReplicatedPG::send_push(int peer,
+                           ObjectRecoveryInfo recovery_info,
+                           ObjectRecoveryProgress progress,
+                           ObjectRecoveryProgress *out_progress)
+{
+  ObjectRecoveryProgress new_progress = progress;
+
+  tid_t tid = osd->get_tid();
+  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
+                                  false, 0, get_osdmap()->get_epoch(),
+                                  tid, recovery_info.version);
+
+  dout(7) << "send_push_op " << recovery_info.soid
+         << " v " << recovery_info.version
+         << " size " << recovery_info.size
+          << " to osd." << peer
+         << " recovery_info: " << recovery_info
+          << dendl;
+
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+  if (progress.first) {
+    osd->store->getattrs(coll, recovery_info.soid, subop->attrset);
+
+    // Debug
+    bufferlist bv;
+    bv.push_back(subop->attrset[OI_ATTR]);
+    object_info_t oi(bv);
 
-  for (interval_set<uint64_t>::iterator p = data_subset.begin();
-       p != data_subset.end();
+    if (oi.version != recovery_info.version) {
+      osd->clog.error() << info.pgid << " push "
+                       << recovery_info.soid << " v "
+                       << recovery_info.version << " to osd." << peer
+                       << " failed because local copy is "
+                       << oi.version << "\n";
+      subop->put();
+      return -1;
+    }
+
+    new_progress.first = false;
+  }
+
+
+  subop->data_included.span_of(recovery_info.copy_subset,
+                              progress.data_recovered_to,
+                              g_conf->osd_recovery_max_chunk);
+
+  for (interval_set<uint64_t>::iterator p = subop->data_included.begin();
+       p != subop->data_included.end();
        ++p) {
     bufferlist bit;
-    osd->store->read(coll,
-                    soid, p.get_start(), p.get_len(), bit);
+    osd->store->read(coll, recovery_info.soid,
+                    p.get_start(), p.get_len(), bit);
     if (p.get_len() != bit.length()) {
       dout(10) << " extent " << p.get_start() << "~" << p.get_len()
-              << " is actually " << p.get_start() << "~" << bit.length() << dendl;
+              << " is actually " << p.get_start() << "~" << bit.length()
+              << dendl;
       p.set_len(bit.length());
+      new_progress.data_complete = true;
     }
-    bl.claim_append(bit);
+    subop->ops[0].indata.claim_append(bit);
   }
 
-  osd->store->getattrs(coll, soid, attrset);
+  if (!subop->data_included.empty())
+    new_progress.data_recovered_to = subop->data_included.range_end();
 
-  bufferlist bv;
-  bv.push_back(attrset[OI_ATTR]);
-  object_info_t oi(bv);
+  if (recovery_info.copy_subset.empty() ||
+      new_progress.data_recovered_to >= recovery_info.copy_subset.range_end())
+    new_progress.data_complete = true;
 
-  if (oi.version != version) {
-    osd->clog.error() << info.pgid << " push " << soid << " v " << version << " to osd." << peer
-                     << " failed because local copy is " << oi.version << "\n";
-    return -1;
-  }
-
-  // ok
-  dout(7) << "send_push_op " << soid << " v " << oi.version 
-         << " size " << size
-         << " subset " << data_subset
-          << " data " << bl.length()
-          << " to osd." << peer
-          << dendl;
 
   osd->logger->inc(l_osd_push);
-  osd->logger->inc(l_osd_push_outb, bl.length());
+  osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length());
   
   // send
-  tid_t tid = osd->get_tid();
-  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
-  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
-                                  get_osdmap()->get_epoch(), tid, oi.version);
-  subop->oloc = oi.oloc;
-  subop->ops = vector<OSDOp>(1);
-  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-  //subop->ops[0].op.extent.offset = 0;
-  //subop->ops[0].op.extent.length = size;
-  subop->ops[0].indata = bl;
-  subop->data_subset = data_subset;
-  subop->clone_subsets = clone_subsets;
-  subop->attrset.swap(attrset);
-  subop->old_size = size;
-  subop->first = first;
-  subop->complete = complete;
+  subop->recovery_info = recovery_info;
+  subop->recovery_progress = new_progress;
+  subop->current_progress = progress;
   osd->cluster_messenger->
     send_message(subop, get_osdmap()->get_cluster_inst(peer));
+  if (out_progress)
+    *out_progress = new_progress;
   return 0;
 }
 
@@ -4292,27 +4584,22 @@ void ReplicatedPG::sub_op_push_reply(OpRequest *op)
     dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
             << dendl;
   } else {
-    push_info_t *pi = &pushing[soid][peer];
-
-    bool complete = false;
-    if (pi->data_subset.empty() ||
-       pi->data_subset.range_end() == pi->data_subset_pushing.range_end())
-      complete = true;
-
-    if (!complete) {
-      // push more
-      uint64_t from = pi->data_subset_pushing.range_end();
-      pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf->osd_recovery_max_chunk);
-      dout(10) << " pushing more, " << pi->data_subset_pushing << " of " << pi->data_subset << dendl;
-      complete = pi->data_subset.range_end() == pi->data_subset_pushing.range_end();
-      send_push_op(soid, pi->version, peer, pi->size, false, complete,
-                  pi->data_subset_pushing, pi->clone_subsets);
+    PushInfo *pi = &pushing[soid][peer];
+
+    if (!pi->recovery_progress.data_complete) {
+      dout(10) << " pushing more from, "
+              << pi->recovery_progress.data_recovered_to
+              << " of " << pi->recovery_info.copy_subset << dendl;
+      ObjectRecoveryProgress new_progress;
+      send_push(
+       peer, pi->recovery_info, pi->recovery_progress, &new_progress);
+      pi->recovery_progress = new_progress;
     } else {
       // done!
       if (peer == backfill_target && backfills_in_flight.count(soid))
        backfills_in_flight.erase(soid);
       else
-       peer_missing[peer].got(soid, pi->version);
+       peer_missing[peer].got(soid, pi->recovery_info.version);
       
       pushing[soid].erase(peer);
       pi = NULL;
@@ -4381,24 +4668,23 @@ void ReplicatedPG::sub_op_pull(OpRequest *op)
                      << " but got " << cpp_strerror(-r) << "\n";
     send_push_op_blank(soid, m->get_source().num());
   } else {
-    uint64_t size = st.st_size;
-
-    bool complete = false;
-    if (!m->data_subset.empty() && m->data_subset.range_end() >= size)
-      complete = true;
-
-    // complete==true implies we are definitely complete.
-    // complete==false means nothing.  we don't know because the primary may
-    // not be pulling the entire object.
+    ObjectRecoveryInfo recovery_info = m->recovery_info;
+    ObjectRecoveryProgress progress = m->recovery_progress;
+    if (progress.first && recovery_info.size == ((uint64_t)-1)) {
+      // Adjust size and copy_subset
+      recovery_info.size = st.st_size;
+      recovery_info.copy_subset.clear();
+      if (st.st_size)
+       recovery_info.copy_subset.insert(0, st.st_size);
+      assert(recovery_info.clone_subset.empty());
+    }
 
-    r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete,
-                    m->data_subset, m->clone_subsets);
+    r = send_push(m->get_source().num(), recovery_info, progress);
     if (r < 0)
       send_push_op_blank(soid, m->get_source().num());
   }
 
   log_subop_stats(op, 0, l_osd_sop_pull_lat);
-
   op->put();
 }
 
@@ -4478,353 +4764,54 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
   }
 }
 
-/** op_push
- * NOTE: called from opqueue.
- */
-void ReplicatedPG::sub_op_push(OpRequest *op)
+void ReplicatedPG::trim_pushed_data(
+  const interval_set<uint64_t> &copy_subset,
+  const interval_set<uint64_t> &intervals_received,
+  bufferlist data_received,
+  interval_set<uint64_t> *intervals_usable,
+  bufferlist *data_usable)
 {
-  MOSDSubOp *m = (MOSDSubOp*)op->request;
-  assert(m->get_header().type == MSG_OSD_SUBOP);
-
-  const hobject_t& soid = m->poid;
-  eversion_t v = m->version;
-  OSDOp& push = m->ops[0];
-
-  dout(7) << "op_push " 
-          << soid 
-          << " v " << v 
-         << " " << m->oloc
-         << " len " << push.op.extent.length
-         << " data_subset " << m->data_subset
-         << " clone_subsets " << m->clone_subsets
-         << " data len " << m->get_data().length()
-          << dendl;
-
-  if (v == eversion_t()) {
-    // replica doesn't have it!
-    _failed_push(op);
+  if (intervals_received.subset_of(copy_subset)) {
+    *intervals_usable = intervals_received;
+    *data_usable = data_received;
     return;
   }
 
-  op->mark_started();
-
-  interval_set<uint64_t> data_subset;
-  map<hobject_t, interval_set<uint64_t> > clone_subsets;
-
-  bufferlist data;
-  m->claim_data(data);
-
-  // we need these later, and they get clobbered by t.setattrs()
-  bufferlist oibl;
-  if (m->attrset.count(OI_ATTR))
-    oibl.push_back(m->attrset[OI_ATTR]);
-  bufferlist ssbl;
-  if (m->attrset.count(SS_ATTR))
-    ssbl.push_back(m->attrset[SS_ATTR]);
-
-  // determine data/clone subsets
-  data_subset = m->data_subset;
-  if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length())
-    data_subset.insert(0, push.op.extent.length);
-  clone_subsets = m->clone_subsets;
-
-  pull_info_t *pi = 0;
-  bool first = m->first;
-  bool complete = m->complete;
-
-  // op->complete == true means we reached the end of the object (file size)
-  // op->complete == false means nothing; we may not have asked for the whole thing.
-
-  if (is_primary()) {
-    if (pulling.count(soid) == 0) {
-      dout(10) << " not pulling, ignoring" << dendl;
-      op->put();
-      return;
-    }
-    pi = &pulling[soid];
-    
-    // did we learn object size?
-    if (pi->need_size) {
-      dout(10) << " learned object size is " << m->old_size << dendl;
-      pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size);
-      pi->need_size = false;
-    }
-
-    if (soid.snap && soid.snap < CEPH_NOSNAP) {
-      // clone.  make sure we have enough data.
-      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
-      assert(ssc);
-
-      clone_subsets.clear();   // forget what pusher said; recalculate cloning.
-
-      interval_set<uint64_t> data_needed;
-      calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
-                        data_needed, clone_subsets);
-      pi->data_subset = data_needed;
-      put_snapset_context(ssc);
-
-      interval_set<uint64_t> overlap;
-      overlap.intersection_of(data_subset, data_needed);
-      
-      dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset
-              << ", overlap " << overlap << dendl;
-
-      // did we get more data than we need?
-      if (!data_subset.subset_of(data_needed)) {
-       interval_set<uint64_t> extra = data_subset;
-       interval_set<uint64_t> usable;
-       usable.intersection_of(extra, data_needed);
-       extra.subtract(usable);
-       dout(10) << " we got some extra: " << extra << dendl;
-
-       bufferlist result;
-       int off = 0;
-       for (interval_set<uint64_t>::const_iterator p = usable.begin();
-            p != usable.end();
-            ++p) {
-         interval_set<uint64_t> x;
-         x.insert(p.get_start(), p.get_len());
-         x.intersection_of(data_needed);
-         dout(20) << " data_subset object extent " << p.get_start() << "~" << p.get_len() << " need " << x << dendl;
-         if (!x.empty()) {
-           uint64_t first = x.begin().get_start();
-           uint64_t len = x.begin().get_len();
-           bufferlist sub;
-           int boff = off + (first - p.get_start());
-           dout(20) << "   keeping buffer extent " << boff << "~" << len << dendl;
-           sub.substr_of(data, boff, len);
-           result.claim_append(sub);
-         }
-         off += p.get_len();
-       }
-       data.claim(result);
-       data_subset.intersection_of(data_needed);
-       dout(20) << " new data len is " << data.length() << dendl;
-      }
-
-      // did we get everything we wanted?
-      if (pi->data_subset.empty()) {
-       complete = true;
-      } else if (data_subset.empty()) {
-       complete = false;
-      } else {
-       complete = pi->data_subset.range_end() == data_subset.range_end();
-      }
-
-      if (m->complete && !complete) {
-       dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
-       _failed_push(op);
-       return;
-      }
-
-    } else {
-      // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
-      assert(m->clone_subsets.empty());
-    }
-  }
-  dout(15) << " data_subset " << data_subset
-          << " clone_subsets " << clone_subsets
-          << " first=" << first << " complete=" << complete
-          << dendl;
-
-  coll_t target;
-  if (first && complete)
-    target = coll;
-  else
-    target = coll_t::TEMP_COLL;
-
-  // write object and add it to the PG
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  Context *onreadable = 0;
-  Context *onreadable_sync = 0;
+  intervals_usable->intersection_of(copy_subset,
+                                   intervals_received);
 
-  if (first && complete && soid.snap != CEPH_NOSNAP)
-    remove_object_with_snap_hardlinks(*t, soid);
-  else if (first)
-    t->remove(target, soid);  // in case old version exists
-
-  // write data
-  uint64_t boff = 0;
-  for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
-       p != data_subset.end();
+  uint64_t off = 0;
+  for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+       p != intervals_received.end();
        ++p) {
-    bufferlist bit;
-    bit.substr_of(data, boff, p.get_len());
-    dout(15) << " write " << p.get_start() << "~" << p.get_len() << dendl;
-    t->write(target, soid, p.get_start(), p.get_len(), bit);
-    boff += p.get_len();
-  }
-  
-  if (complete) {
-    // Clear out old snapdir contents
-    if (!first) {
-      if (soid.snap != CEPH_NOSNAP) {
-       remove_object_with_snap_hardlinks(*t, soid);
-      } else {
-       t->remove(coll, soid);
-      }
-      t->collection_add(coll, target, soid);
-      t->collection_remove(target, soid);
-    }
-
-    // clone bits
-    for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = clone_subsets.begin();
-        p != clone_subsets.end();
-        ++p)
-    {
-      for (interval_set<uint64_t>::const_iterator q = p->second.begin();
-          q != p->second.end();
-          ++q)
-      {
-       dout(15) << " clone_range " << p->first << " "
-                << q.get_start() << "~" << q.get_len() << dendl;
-       t->clone_range(coll, p->first, soid,
-                      q.get_start(), q.get_len(), q.get_start());
-      }
-    }
-
-    if (data_subset.empty())
-      t->touch(coll, soid);
-
-    t->setattrs(coll, soid, m->attrset);
-    if (soid.snap && soid.snap < CEPH_NOSNAP &&
-       m->attrset.count(OI_ATTR)) {
-      bufferlist bl;
-      bl.push_back(m->attrset[OI_ATTR]);
-      object_info_t oi(bl);
-      if (oi.snaps.size()) {
-       coll_t lc = make_snap_collection(*t, oi.snaps[0]);
-       t->collection_add(lc, coll, soid);
-       if (oi.snaps.size() > 1) {
-         coll_t hc = make_snap_collection(*t, oi.snaps[oi.snaps.size()-1]);
-         t->collection_add(hc, coll, soid);
-       }
-      }
-    }
-
-    bool revert = false;
-    if (missing.is_missing(soid) && missing.missing[soid].need > v) {
-      pg_log_entry_t *latest = log.objects[soid];
-      if (latest->op == pg_log_entry_t::LOST_REVERT &&
-         latest->prior_version == v) {
-       dout(10) << " got old revert version " << v << " for " << *latest << dendl;
-       revert = true;
-       v = latest->version;
-      }
+    interval_set<uint64_t> x;
+    x.insert(p.get_start(), p.get_len());
+    x.intersection_of(copy_subset);
+    for (interval_set<uint64_t>::const_iterator q = x.begin();
+        q != x.end();
+        ++q) {
+      bufferlist sub;
+      uint64_t data_off = off + (q.get_start() - p.get_start());
+      sub.substr_of(data_received, data_off, q.get_len());
+      data_usable->claim_append(sub);
     }
-
-    recover_got(soid, v);
-
-    // update pg
-    write_info(*t);
-
-    // track ObjectContext
-    if (is_primary()) {
-      dout(10) << " setting up obc for " << soid << dendl;
-      ObjectContext *obc = get_object_context(soid, m->oloc, true);
-      assert(obc->registered);
-      obc->ondisk_write_lock();
-      
-      obc->obs.exists = true;
-      obc->obs.oi.decode(oibl);
-
-      if (revert) {
-       // update the attr to the revert event version
-       obc->obs.oi.prior_version = obc->obs.oi.version;
-       obc->obs.oi.version = v;
-       bufferlist bl;
-       ::encode(obc->obs.oi, bl);
-       t->setattr(coll, soid, OI_ATTR, bl);
-      }
-      
-      // suck in snapset context?
-      SnapSetContext *ssc = obc->ssc;
-      if (ssbl.length()) {
-       bufferlist::iterator sp = ssbl.begin();
-       ssc->snapset.decode(sp);
-      }
-
-      onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
-      onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
-    } else {
-      onreadable = new ObjectStore::C_DeleteTransaction(t);
-    }
-
-  } else {
-    onreadable = new ObjectStore::C_DeleteTransaction(t);
+    off += p.get_len();
   }
+}
 
-  // apply to disk!
-  int r = osd->store->queue_transaction(&osr, t,
-                                       onreadable,
-                                       new C_OSD_CommittedPushedObject(this, op,
-                                                                       info.history.same_interval_since,
-                                                                       info.last_complete),
-                                       onreadable_sync);
-  assert(r == 0);
-
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void ReplicatedPG::sub_op_push(OpRequest *op)
+{
+  op->mark_started();
   if (is_primary()) {
-    assert(pi);
-
-    if (complete) {
-      // close out pull op
-      pulling.erase(soid);
-      pull_from_peer[pi->from].erase(soid);
-      finish_recovery_op(soid);
-      
-      update_stats();
-    } else {
-      // pull more
-      pi->data_subset_pulling.span_of(pi->data_subset, data_subset.empty() ? 0 : data_subset.range_end(),
-                                     g_conf->osd_recovery_max_chunk);
-      dout(10) << " pulling more, " << pi->data_subset_pulling << " of " << pi->data_subset << dendl;
-      send_pull_op(soid, v, false, pi->data_subset_pulling, pi->from);
-    }
-
-
-    /*
-    if (is_active()) {
-      // are others missing this too?  (only if we're active.. skip
-      // this part if we're still repeering, it'll just confuse us)
-      for (unsigned i=1; i<acting.size(); i++) {
-       int peer = acting[i];
-       assert(peer_missing.count(peer));
-       if (peer_missing[peer].is_missing(soid)) {
-         push_to_replica(soid, peer);  // ok, push it, and they (will) have it now.
-         start_recovery_op(soid);
-       }
-      }
-    }
-    */
-
+    handle_pull_response(op);
   } else {
-    // ack if i'm a replica and being pushed to.
-    MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-    assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
-    osd->cluster_messenger->send_message(reply, m->get_connection());
-  }
-
-  if (complete) {
-    // kick waiters
-    if (waiting_for_missing_object.count(soid)) {
-      dout(20) << " kicking waiters on " << soid << dendl;
-      osd->requeue_ops(this, waiting_for_missing_object[soid]);
-      waiting_for_missing_object.erase(soid);
-      if (missing.missing.size() == 0) {
-       osd->requeue_ops(this, waiting_for_all_missing);
-       waiting_for_all_missing.clear();
-      }
-    } else {
-      dout(20) << " no waiters on " << soid << dendl;
-      /*for (hash_map<hobject_t,list<class Message*> >::iterator p = waiting_for_missing_object.begin();
-        p != waiting_for_missing_object.end();
-        p++)
-      dout(20) << "   " << p->first << dendl;
-    */
-    }
+    handle_push(op);
   }
-    
-  op->put();  // at the end... soid is a ref to op->soid!
+  op->put();
+  return;
 }
 
 void ReplicatedPG::_failed_push(OpRequest *op)
index 41ec3e567cdab6c0187120e20ec16484428c3eea..8c636fc7397b5ef82459d8cfe793f974016de603 100644 (file)
@@ -543,17 +543,43 @@ protected:
   }
   void put_snapset_context(SnapSetContext *ssc);
 
-
-
-  
+  // push
+  struct PushInfo {
+    int in_progress;
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
+  };
+  map<hobject_t, map<int, PushInfo> > pushing;
   // pull
-  struct pull_info_t {
-    eversion_t version;
-    int from;
-    bool need_size;
-    interval_set<uint64_t> data_subset, data_subset_pulling;
+  struct PullInfo {
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
   };
-  map<hobject_t, pull_info_t> pulling;
+  map<hobject_t, PullInfo> pulling;
+
+  ObjectRecoveryInfo recalc_subsets(ObjectRecoveryInfo recovery_info);
+  static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+                              const interval_set<uint64_t> &intervals_received,
+                              bufferlist data_received,
+                              interval_set<uint64_t> *intervals_usable,
+                              bufferlist *data_usable);
+  void handle_pull_response(OpRequest *op);
+  void handle_push(OpRequest *op);
+  int send_push(int peer,
+               ObjectRecoveryInfo recovery_info,
+               ObjectRecoveryProgress progress,
+               ObjectRecoveryProgress *out_progress = 0);
+  int send_pull(int peer,
+               ObjectRecoveryInfo recovery_info,
+               ObjectRecoveryProgress progress);
+  void submit_push_data(const ObjectRecoveryInfo &recovery_info,
+                       bool first,
+                       const interval_set<uint64_t> &intervals_included,
+                       bufferlist data_included,
+                       map<string, bufferptr> &attrs,
+                       ObjectStore::Transaction *t);
+  void submit_push_complete(ObjectRecoveryInfo &recovery_info,
+                           ObjectStore::Transaction *t);
 
   /*
    * Backfill
@@ -578,15 +604,6 @@ protected:
   // Reverse mapping from osd peer to objects beging pulled from that peer
   map<int, set<hobject_t> > pull_from_peer;
 
-  // push
-  struct push_info_t {
-    uint64_t size;
-    eversion_t version;
-    interval_set<uint64_t> data_subset, data_subset_pushing;
-    map<hobject_t, interval_set<uint64_t> > clone_subsets;
-  };
-  map<hobject_t, map<int, push_info_t> > pushing;
-
   int recover_object_replicas(const hobject_t& soid, eversion_t v);
   void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
                         pg_missing_t& missing,
@@ -598,15 +615,13 @@ protected:
                          interval_set<uint64_t>& data_subset,
                          map<hobject_t, interval_set<uint64_t> >& clone_subsets);
   void push_to_replica(ObjectContext *obc, const hobject_t& oid, int dest);
-  void push_start(const hobject_t& oid, int dest);
-  void push_start(const hobject_t& soid, int peer,
-                 uint64_t size, eversion_t version,
+  void push_start(ObjectContext *obc,
+                 const hobject_t& oid, int dest);
+  void push_start(ObjectContext *obc,
+                 const hobject_t& soid, int peer,
+                 eversion_t version,
                  interval_set<uint64_t> &data_subset,
                  map<hobject_t, interval_set<uint64_t> >& clone_subsets);
-  int send_push_op(const hobject_t& oid, eversion_t version, int dest,
-                  uint64_t size, bool first, bool complete,
-                  interval_set<uint64_t>& data_subset, 
-                  map<hobject_t, interval_set<uint64_t> >& clone_subsets);
   void send_push_op_blank(const hobject_t& soid, int peer);
 
   void finish_degraded_object(const hobject_t& oid);
@@ -614,8 +629,6 @@ protected:
   // Cancels/resets pulls from peer
   void check_recovery_op_pulls(const OSDMapRef map);
   int pull(const hobject_t& oid, eversion_t v);
-  void send_pull_op(const hobject_t& soid, eversion_t v, bool first, const interval_set<uint64_t>& data_subset, int fromosd);
-
 
   // low level ops
 
index 7b458a865662a8ec433a8a3a03bc3dd522e07fc5..d6457eccaebf5b1079478a121f1e9706c5fc6a37 100644 (file)
@@ -2003,20 +2003,24 @@ ostream& operator<<(ostream& out, const object_info_t& oi)
 // -- ObjectRecovery --
 void ObjectRecoveryProgress::encode(bufferlist &bl) const
 {
+  ENCODE_START(1, 1, bl);
   ::encode(first, bl);
   ::encode(data_complete, bl);
   ::encode(data_recovered_to, bl);
   ::encode(omap_recovered_to, bl);
   ::encode(omap_complete, bl);
+  ENCODE_FINISH(bl);
 }
 
 void ObjectRecoveryProgress::decode(bufferlist::iterator &bl)
 {
+  DECODE_START(1, bl);
   ::decode(first, bl);
   ::decode(data_complete, bl);
   ::decode(data_recovered_to, bl);
   ::decode(omap_recovered_to, bl);
   ::decode(omap_complete, bl);
+  DECODE_FINISH(bl);
 }
 
 ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog)
@@ -2024,6 +2028,22 @@ ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog)
   return prog.print(out);
 }
 
+void ObjectRecoveryProgress::generate_test_instances(
+  list<ObjectRecoveryProgress*>& o)
+{
+  o.push_back(new ObjectRecoveryProgress);
+  o.back()->first = false;
+  o.back()->data_complete = true;
+  o.back()->omap_complete = true;
+  o.back()->data_recovered_to = 100;
+
+  o.push_back(new ObjectRecoveryProgress);
+  o.back()->first = true;
+  o.back()->data_complete = false;
+  o.back()->omap_complete = false;
+  o.back()->data_recovered_to = 0;
+}
+
 ostream &ObjectRecoveryProgress::print(ostream &out) const
 {
   return out << "ObjectRecoveryProgress("
@@ -2035,10 +2055,18 @@ ostream &ObjectRecoveryProgress::print(ostream &out) const
             << ")";
 }
 
+void ObjectRecoveryProgress::dump(Formatter *f) const
+{
+  f->dump_int("first?", first);
+  f->dump_int("data_complete?", data_complete);
+  f->dump_unsigned("data_recovered_to", data_recovered_to);
+  f->dump_int("omap_complete?", omap_complete);
+  f->dump_string("omap_recovered_to", omap_recovered_to);
+}
+
 void ObjectRecoveryInfo::encode(bufferlist &bl) const
 {
-  __u8 v = 0;
-  ::encode(v, bl);
+  ENCODE_START(1, 1, bl);
   ::encode(soid, bl);
   ::encode(version, bl);
   ::encode(size, bl);
@@ -2046,12 +2074,12 @@ void ObjectRecoveryInfo::encode(bufferlist &bl) const
   ::encode(ss, bl);
   ::encode(copy_subset, bl);
   ::encode(clone_subset, bl);
+  ENCODE_FINISH(bl);
 }
 
 void ObjectRecoveryInfo::decode(bufferlist::iterator &bl)
 {
-  __u8 v;
-  ::decode(v, bl);
+  DECODE_START(1, bl);
   ::decode(soid, bl);
   ::decode(version, bl);
   ::decode(size, bl);
@@ -2059,6 +2087,36 @@ void ObjectRecoveryInfo::decode(bufferlist::iterator &bl)
   ::decode(ss, bl);
   ::decode(copy_subset, bl);
   ::decode(clone_subset, bl);
+  DECODE_FINISH(bl);
+}
+
+void ObjectRecoveryInfo::generate_test_instances(
+  list<ObjectRecoveryInfo*>& o)
+{
+  o.push_back(new ObjectRecoveryInfo);
+  o.back()->soid = hobject_t(sobject_t("key", CEPH_NOSNAP));
+  o.back()->version = eversion_t(0,0);
+  o.back()->size = 100;
+}
+
+
+void ObjectRecoveryInfo::dump(Formatter *f) const
+{
+  f->dump_stream("object") << soid;
+  f->dump_stream("at_version") << version;
+  f->dump_stream("size") << size;
+  {
+    f->open_object_section("object_info");
+    oi.dump(f);
+    f->close_section();
+  }
+  {
+    f->open_object_section("snapset");
+    ss.dump(f);
+    f->close_section();
+  }
+  f->dump_stream("copy_subset") << copy_subset;
+  f->dump_stream("clone_subset") << clone_subset;
 }
 
 ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf)
index 342e63c630e5c880d237811ff47169f08dcc2a0d..a019394a89fe0100fb871921c99ab5122235516b 100644 (file)
@@ -1620,9 +1620,11 @@ struct ObjectRecoveryProgress {
   string omap_recovered_to;
   bool omap_complete;
 
+  static void generate_test_instances(list<ObjectRecoveryProgress*>& o);
   void encode(bufferlist &bl) const;
   void decode(bufferlist::iterator &bl);
   ostream &print(ostream &out) const;
+  void dump(Formatter *f) const;
 };
 WRITE_CLASS_ENCODER(ObjectRecoveryProgress)
 ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog);
@@ -1637,9 +1639,11 @@ struct ObjectRecoveryInfo {
   interval_set<uint64_t> copy_subset;
   map<hobject_t, interval_set<uint64_t> > clone_subset;
 
+  static void generate_test_instances(list<ObjectRecoveryInfo*>& o);
   void encode(bufferlist &bl) const;
   void decode(bufferlist::iterator &bl);
   ostream &print(ostream &out) const;
+  void dump(Formatter *f) const;
 };
 WRITE_CLASS_ENCODER(ObjectRecoveryInfo)
 ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf);
index c7e68078b62ea46c76d95b843244f99289f89bb6..fd98f4005f008456f0b2a1ae959dda93ad2dd9c0 100644 (file)
@@ -50,6 +50,8 @@ TYPE(pg_create_t)
 TYPE(watch_info_t)
 TYPE(object_info_t)
 TYPE(SnapSet)
+TYPE(ObjectRecoveryInfo)
+TYPE(ObjectRecoveryProgress)
 TYPE(ScrubMap::object)
 TYPE(ScrubMap)