]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: extract PGBackend::Listener recovery callbacks
authorSamuel Just <sam.just@inktank.com>
Fri, 30 Aug 2013 01:46:21 +0000 (18:46 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:25 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PGBackend.h
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index b17f0542d550c67e9217c91dce3149e79457b23d..27dbd91b80e74e72e7863fe2b20f24844e4916c1 100644 (file)
@@ -53,6 +53,7 @@
        const hobject_t &oid,
        const object_stat_sum_t &stat_diff,
        const ObjectRecoveryInfo &recovery_info,
+       ObjectContextRef obc,
        ObjectStore::Transaction *t
        ) = 0;
 
        const ObjectRecoveryInfo &recovery_info
        ) = 0;
 
+     virtual void begin_peer_recover(
+       int peer,
+       const hobject_t oid) = 0;
+
      virtual void failed_push(int from, const hobject_t &soid) = 0;
 
      /**
@@ -91,6 +96,7 @@
 
      virtual const map<hobject_t, set<int> > &get_missing_loc() = 0;
      virtual const map<int, pg_missing_t> &get_peer_missing() = 0;
+     virtual const map<int, pg_info_t> &get_peer_info() = 0;
      virtual const pg_missing_t &get_local_missing() = 0;
      virtual const PGLog &get_log() = 0;
      virtual bool pgb_is_primary() const = 0;
index bcd1239c6265657f04d082fc81e053a4118f89f9..4c20f2486509c36cf102d4266f302e2f757fb4d1 100644 (file)
@@ -22,7 +22,6 @@
 class ReplicatedBackend : public PGBackend {
   struct RPGHandle : public PGBackend::RecoveryHandle {
     map<int, vector<PushOp> > pushes;
-    map<int, vector<PushReplyOp> > push_replies;
     map<int, vector<PullOp> > pulls;
   };
 private:
index f7bcdd2949b113fadcb7e323d878e2a2b7748654..d413847455aecdaa6f6ce83d84e546957401856f 100644 (file)
@@ -79,6 +79,137 @@ PGLSFilter::~PGLSFilter()
 {
 }
 
+// ======================
+// PGBackend::Listener
+
+
+void ReplicatedPG::on_local_recover_start(
+  const hobject_t &oid,
+  ObjectStore::Transaction *t)
+{
+  pg_log.revise_have(oid, eversion_t());
+  remove_snap_mapped_object(*t, oid);
+  t->remove(coll, oid);
+}
+
+void ReplicatedPG::on_local_recover(
+  const hobject_t &hoid,
+  const object_stat_sum_t &stat_diff,
+  const ObjectRecoveryInfo &_recovery_info,
+  ObjectContextRef obc,
+  ObjectStore::Transaction *t
+  )
+{
+  ObjectRecoveryInfo recovery_info(_recovery_info);
+  if (recovery_info.soid.snap < CEPH_NOSNAP) {
+    assert(recovery_info.oi.snaps.size());
+    OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+    set<snapid_t> snaps(
+      recovery_info.oi.snaps.begin(),
+      recovery_info.oi.snaps.end());
+    snap_mapper.add_oid(
+      recovery_info.soid,
+      snaps,
+      &_t);
+  }
+
+  if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+      pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+    assert(is_primary());
+    const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+    if (latest->op == pg_log_entry_t::LOST_REVERT &&
+       latest->reverting_to == recovery_info.version) {
+      dout(10) << " got old revert version " << recovery_info.version
+              << " for " << *latest << dendl;
+      recovery_info.version = latest->version;
+      // update the attr to the revert event version
+      recovery_info.oi.prior_version = recovery_info.oi.version;
+      recovery_info.oi.version = latest->version;
+      bufferlist bl;
+      ::encode(recovery_info.oi, bl);
+      t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+    }
+  }
+
+  // keep track of active pushes for scrub
+  ++active_pushes;
+
+  recover_got(recovery_info.soid, recovery_info.version);
+
+  if (is_primary()) {
+    info.stats.stats.sum.add(stat_diff);
+
+    assert(obc);
+    obc->obs.exists = true;
+    obc->ondisk_write_lock();
+    obc->obs.oi = recovery_info.oi;  // may have been updated above
+
+
+    t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+    t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+    t->register_on_complete(
+      new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
+
+    publish_stats_to_osd();
+    if (waiting_for_missing_object.count(hoid)) {
+      dout(20) << " kicking waiters on " << hoid << dendl;
+      requeue_ops(waiting_for_missing_object[hoid]);
+      waiting_for_missing_object.erase(hoid);
+      if (pg_log.get_missing().missing.size() == 0) {
+       requeue_ops(waiting_for_all_missing);
+       waiting_for_all_missing.clear();
+      }
+    }
+  } else {
+    t->register_on_applied(
+      new C_OSD_AppliedRecoveredObjectReplica(this));
+
+  }
+
+  t->register_on_commit(
+    new C_OSD_CommittedPushedObject(
+      this,
+      get_osdmap()->get_epoch(),
+      info.last_complete));
+
+  // update pg
+  dirty_info = true;
+  write_if_dirty(*t);
+
+}
+
+void ReplicatedPG::on_global_recover(
+  const hobject_t &soid)
+{
+  publish_stats_to_osd();
+  pushing.erase(soid);
+  dout(10) << "pushed " << soid << " to all replicas" << dendl;
+  finish_recovery_op(soid);
+  if (waiting_for_degraded_object.count(soid)) {
+    requeue_ops(waiting_for_degraded_object[soid]);
+    waiting_for_degraded_object.erase(soid);
+  }
+  finish_degraded_object(soid);
+}
+
+void ReplicatedPG::on_peer_recover(
+  int peer,
+  const hobject_t &soid,
+  const ObjectRecoveryInfo &recovery_info)
+{
+  // done!
+  if (peer == backfill_target && backfills_in_flight.count(soid))
+    backfills_in_flight.erase(soid);
+  else
+    peer_missing[peer].got(soid, recovery_info.version);
+}
+
+void ReplicatedPG::begin_peer_recover(
+  int peer,
+  const hobject_t soid)
+{
+}
+
 // =======================
 // pg changes
 
@@ -6020,9 +6151,7 @@ void ReplicatedPG::submit_push_data(
   }
 
   if (first) {
-    pg_log.revise_have(recovery_info.soid, eversion_t());
-    remove_snap_mapped_object(*t, recovery_info.soid);
-    t->remove(coll, recovery_info.soid);
+    on_local_recover_start(recovery_info.soid, t);
     t->remove(get_temp_coll(t), recovery_info.soid);
     t->touch(target_coll, recovery_info.soid);
     t->omap_setheader(target_coll, recovery_info.soid, omap_header);
@@ -6072,41 +6201,6 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
                     q.get_start(), q.get_len(), q.get_start());
     }
   }
-
-  if (recovery_info.soid.snap < CEPH_NOSNAP) {
-    assert(recovery_info.oi.snaps.size());
-    OSDriver::OSTransaction _t(osdriver.get_transaction(t));
-    set<snapid_t> snaps(
-      recovery_info.oi.snaps.begin(),
-      recovery_info.oi.snaps.end());
-    snap_mapper.add_oid(
-      recovery_info.soid,
-      snaps,
-      &_t);
-  }
-
-  if (pg_log.get_missing().is_missing(recovery_info.soid) &&
-      pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
-    assert(is_primary());
-    const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
-    if (latest->op == pg_log_entry_t::LOST_REVERT &&
-       latest->reverting_to == recovery_info.version) {
-      dout(10) << " got old revert version " << recovery_info.version
-              << " for " << *latest << dendl;
-      recovery_info.version = latest->version;
-      // update the attr to the revert event version
-      recovery_info.oi.prior_version = recovery_info.oi.version;
-      recovery_info.oi.version = latest->version;
-      bufferlist bl;
-      ::encode(recovery_info.oi, bl);
-      t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
-    }
-  }
-  recover_got(recovery_info.soid, recovery_info.version);
-
-  // update pg
-  dirty_info = true;
-  write_if_dirty(*t);
 }
 
 ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info)
@@ -6215,51 +6309,11 @@ bool ReplicatedPG::handle_pull_response(
 
   info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
 
-  if (complete) {
-    info.stats.stats.sum.num_objects_recovered++;
-
-    SnapSetContext *ssc;
-    if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
-      ssc = create_snapset_context(hoid.oid);
-      ssc->snapset = pi.recovery_info.ss;
-    } else {
-      ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false,
-       hoid.get_namespace());
-      assert(ssc);
-    }
-    ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc);
-    obc->obs.exists = true;
-
-    obc->ondisk_write_lock();
-
-    // keep track of active pushes for scrub
-    ++active_pushes;
-
-    t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
-    t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
-    t->register_on_complete(
-      new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
-  }
-
-  t->register_on_commit(
-    new C_OSD_CommittedPushedObject(
-      this,
-      get_osdmap()->get_epoch(),
-      info.last_complete));
-
   if (complete) {
     pulling.erase(hoid);
     pull_from_peer[from].erase(hoid);
-    publish_stats_to_osd();
-    if (waiting_for_missing_object.count(hoid)) {
-      dout(20) << " kicking waiters on " << hoid << dendl;
-      requeue_ops(waiting_for_missing_object[hoid]);
-      waiting_for_missing_object.erase(hoid);
-      if (pg_log.get_missing().missing.size() == 0) {
-       requeue_ops(waiting_for_all_missing);
-       waiting_for_all_missing.clear();
-      }
-    }
+    info.stats.stats.sum.num_objects_recovered++;
+    on_local_recover(hoid, object_stat_sum_t(), pi.recovery_info, pi.obc, t);
     return false;
   } else {
     response->soid = pop.soid;
@@ -6293,12 +6347,7 @@ void ReplicatedPG::handle_push(
   bool complete = pop.after_progress.data_complete &&
     pop.after_progress.omap_complete;
 
-  // keep track of active pushes for scrub
-  ++active_pushes;
-
   response->soid = pop.recovery_info.soid;
-  t->register_on_applied(
-    new C_OSD_AppliedRecoveredObjectReplica(this));
   submit_push_data(pop.recovery_info,
                   first,
                   complete,
@@ -6309,11 +6358,13 @@ void ReplicatedPG::handle_push(
                   pop.omap_entries,
                   t);
 
-  t->register_on_commit(
-    new C_OSD_CommittedPushedObject(
-      this,
-      get_osdmap()->get_epoch(),
-      info.last_complete));
+  if (complete)
+    on_local_recover(
+      pop.recovery_info.soid,
+      object_stat_sum_t(),
+      pop.recovery_info,
+      ObjectContextRef(), // ok, is replica
+      t);
 }
 
 void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
@@ -6582,25 +6633,14 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
       return true;
     } else {
       // done!
-      if (peer == backfill_target && backfills_in_flight.count(soid))
-       backfills_in_flight.erase(soid);
-      else
-       peer_missing[peer].got(soid, pi->recovery_info.version);
+      on_peer_recover(peer, soid, pi->recovery_info);
       
       pushing[soid].erase(peer);
       pi = NULL;
       
-      publish_stats_to_osd();
       
       if (pushing[soid].empty()) {
-       pushing.erase(soid);
-       dout(10) << "pushed " << soid << " to all replicas" << dendl;
-       finish_recovery_op(soid);
-       if (waiting_for_degraded_object.count(soid)) {
-         requeue_ops(waiting_for_degraded_object[soid]);
-         waiting_for_degraded_object.erase(soid);
-       }
-       finish_degraded_object(soid);
+       on_global_recover(soid);
       } else {
        dout(10) << "pushed " << soid << ", still waiting for push ack from " 
                 << pushing[soid].size() << " others" << dendl;
@@ -6867,7 +6907,6 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
     t->register_on_complete(new C_OSD_SendMessageOnConn(
                             osd, reply, m->get_connection()));
   }
-  t->register_on_commit(new C_OnPushCommit(this, op));
   osd->store->queue_transaction(osr.get(), t);
   return;
 }
index 24f001b0fba6b21816e4a2c7e374f59f47bcc928..22ae47287d2b05e92ddaec6723fa30c23e985c1f 100644 (file)
@@ -130,19 +130,23 @@ public:
   /// Listener methods
   void on_local_recover_start(
     const hobject_t &oid,
-    ObjectStore::Transaction *t) {}
+    ObjectStore::Transaction *t);
   void on_local_recover(
     const hobject_t &oid,
     const object_stat_sum_t &stat_diff,
     const ObjectRecoveryInfo &recovery_info,
+    ObjectContextRef obc,
     ObjectStore::Transaction *t
-    ) {}
+    );
   void on_peer_recover(
     int peer,
     const hobject_t &oid,
-    const ObjectRecoveryInfo &recovery_info) {}
+    const ObjectRecoveryInfo &recovery_info);
+  void begin_peer_recover(
+    int peer,
+    const hobject_t oid);
   void on_global_recover(
-    const hobject_t &oid) {}
+    const hobject_t &oid);
   void failed_push(int from, const hobject_t &soid);
 
   template <typename T>
@@ -207,6 +211,9 @@ public:
   const map<int, pg_missing_t> &get_peer_missing() {
     return peer_missing;
   }
+  const map<int, pg_info_t> &get_peer_info() {
+    return peer_info;
+  }
   const pg_missing_t &get_local_missing() {
     return pg_log.get_missing();
   }