]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: Update PGBackend users to project last_update and submit stat deltas
authorSamuel Just <sjust@redhat.com>
Fri, 21 Oct 2016 21:33:08 +0000 (14:33 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 17 Nov 2016 18:41:33 +0000 (10:41 -0800)
The RMW pipeline means that we don't start committing an update
immediately, so we can't update the log syncronously with
submit_transaction.  Thus, in order to pipeline writes, PG/ReplicatedPG
will need to project last_update and abstain from updating info
directly (updating info.stats was the only offender).

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 73c71bbdc9be4c4d3c82c408cfeea402efd9e033..030d4abce530190b2af758d892cb5bfa1a19bde8 100644 (file)
@@ -5115,6 +5115,8 @@ void PG::start_peering_interval(
   // pg->on_*
   on_change(t);
 
+  projected_last_update = eversion_t();
+
   assert(!deleting);
 
   // should we tell the primary we are here?
index ce79c69660fde77e288b4d127421d39501b55f6b..517970cc68f267b7dda7f709e3417b2246f2f903 100644 (file)
@@ -2192,11 +2192,14 @@ public:
     PerfCounters *logger = NULL);
   void write_if_dirty(ObjectStore::Transaction& t);
 
+  eversion_t projected_last_update;
   eversion_t get_next_version() const {
-    eversion_t at_version(get_osdmap()->get_epoch(),
-                         pg_log.get_head().version+1);
+    eversion_t at_version(
+      get_osdmap()->get_epoch(),
+      projected_last_update.version+1);
     assert(at_version > info.last_update);
     assert(at_version > pg_log.get_head());
+    assert(at_version > projected_last_update);
     return at_version;
   }
 
index c92d44fb8558d64b7bdd9e594fa61577818525d7..418c2c08a88f60902c006730c94b31435df485bf 100644 (file)
@@ -106,6 +106,11 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
      
      virtual void cancel_pull(const hobject_t &soid) = 0;
 
+     virtual void apply_stats(
+       const hobject_t &soid,
+       const object_stat_sum_t &delta_stats) = 0;
+
+
      /**
       * Bless a context
       *
@@ -380,6 +385,7 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
    /// execute implementation specific transaction
    virtual void submit_transaction(
      const hobject_t &hoid,               ///< [in] object
+     const object_stat_sum_t &delta_stats,///< [in] stat change
      const eversion_t &at_version,        ///< [in] version
      PGTransactionUPtr &&t,               ///< [in] trans to execute (move)
      const eversion_t &trim_to,           ///< [in] trim log to here
index 795e34dae3a56b2cfc3917211365c97e45f3ce72..1bd94c5cc523f6a42f1e46500e85bc38945ce5f2 100644 (file)
@@ -521,6 +521,7 @@ void generate_transaction(
 
 void ReplicatedBackend::submit_transaction(
   const hobject_t &soid,
+  const object_stat_sum_t &delta_stats,
   const eversion_t &at_version,
   PGTransactionUPtr &&_t,
   const eversion_t &trim_to,
@@ -534,6 +535,10 @@ void ReplicatedBackend::submit_transaction(
   osd_reqid_t reqid,
   OpRequestRef orig_op)
 {
+  parent->apply_stats(
+    soid,
+    delta_stats);
+
   vector<pg_log_entry_t> log_entries(_log_entries);
   ObjectStore::Transaction op_t;
   PGTransactionUPtr t(std::move(_t));
index 65a4c55d688b3731b103ea53c455f6404cd38608..6b8df0a5d34011f0316c488ea0af9bc66996804e 100644 (file)
@@ -344,6 +344,7 @@ public:
   friend class C_OSD_OnOpApplied;
   void submit_transaction(
     const hobject_t &hoid,
+    const object_stat_sum_t &delta_stats,
     const eversion_t &at_version,
     PGTransactionUPtr &&t,
     const eversion_t &trim_to,
index ca28a1d50ceb74d56058359311e3b2bcc6d10923..aa250028212583fa15dc7982a1436da8013649d2 100644 (file)
@@ -6901,25 +6901,30 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
     ctx->obc->ssc->snapset = ctx->new_snapset;
   }
 
-  apply_ctx_stats(ctx, scrub_ok);
+  apply_ctx_scrub_stats(ctx, scrub_ok);
 }
 
-void ReplicatedPG::apply_ctx_stats(OpContext *ctx, bool scrub_ok)
-{
-  info.stats.stats.add(ctx->delta_stats);
+void ReplicatedPG::apply_stats(
+  const hobject_t &soid,
+  const object_stat_sum_t &delta_stats) {
+
+  info.stats.stats.add(delta_stats);
 
-  const hobject_t& soid = ctx->obs->oi.soid;
   for (set<pg_shard_t>::iterator i = backfill_targets.begin();
        i != backfill_targets.end();
        ++i) {
     pg_shard_t bt = *i;
     pg_info_t& pinfo = peer_info[bt];
     if (cmp(soid, pinfo.last_backfill, get_sort_bitwise()) <= 0)
-      pinfo.stats.stats.add(ctx->delta_stats);
+      pinfo.stats.stats.add(delta_stats);
     else if (cmp(soid, last_backfill_started, get_sort_bitwise()) <= 0)
-      pending_backfill_updates[soid].stats.add(ctx->delta_stats);
+      pending_backfill_updates[soid].stats.add(delta_stats);
   }
+}
 
+void ReplicatedPG::apply_ctx_scrub_stats(OpContext *ctx, bool scrub_ok)
+{
+  const hobject_t& soid = ctx->obs->oi.soid;
   if (!scrub_ok && scrubber.active) {
     assert(cmp(soid, scrubber.start, get_sort_bitwise()) < 0 ||
           cmp(soid, scrubber.end, get_sort_bitwise()) >= 0);
@@ -8555,8 +8560,13 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
     ctx->obc,
     ctx->clone_obc,
     unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
+  if (!(ctx->log.empty())) {
+    assert(ctx->at_version >= projected_last_update);
+    projected_last_update = ctx->at_version;
+  }
   pgbackend->submit_transaction(
     soid,
+    ctx->delta_stats,
     ctx->at_version,
     std::move(ctx->op_t),
     pg_trim_to,
@@ -9806,7 +9816,7 @@ void ReplicatedPG::mark_all_unfound_lost(
     missing_loc.get_needs_recovery().end();
 
   ObcLockManager manager;
-  eversion_t v = info.last_update;
+  eversion_t v = get_next_version();
   v.epoch = get_osdmap()->get_epoch();
   unsigned num_unfound = missing_loc.num_unfound();
   while (m != mend) {
@@ -9829,7 +9839,6 @@ void ReplicatedPG::mark_all_unfound_lost(
       prev = pick_newest_available(oid);
       if (prev > eversion_t()) {
        // log it
-       ++v.version;
        pg_log_entry_t e(
          pg_log_entry_t::LOST_REVERT, oid, v,
          m->second.need, 0, osd_reqid_t(), mtime, 0);
@@ -9839,13 +9848,13 @@ void ReplicatedPG::mark_all_unfound_lost(
        dout(10) << e << dendl;
 
        // we are now missing the new version; recovery code will sort it out.
+       ++v.version;
        ++m;
        break;
       }
 
     case pg_log_entry_t::LOST_DELETE:
       {
-       ++v.version;
        pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                         0, osd_reqid_t(), mtime, 0);
        if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
@@ -9858,6 +9867,7 @@ void ReplicatedPG::mark_all_unfound_lost(
        dout(10) << e << dendl;
        log_entries.push_back(e);
 
+       ++v.version;
        ++m;
       }
       break;
@@ -11518,7 +11528,7 @@ void ReplicatedPG::hit_set_remove_all()
     utime_t now = ceph_clock_now(cct);
     ctx->mtime = now;
     hit_set_trim(ctx, 0);
-    apply_ctx_stats(ctx.get());
+    apply_ctx_scrub_stats(ctx.get());
     simple_opc_submit(std::move(ctx));
   }
 
@@ -11737,7 +11747,7 @@ void ReplicatedPG::hit_set_persist()
 
   hit_set_trim(ctx, max);
 
-  apply_ctx_stats(ctx.get());
+  apply_ctx_scrub_stats(ctx.get());
   simple_opc_submit(std::move(ctx));
 }
 
@@ -13118,7 +13128,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
        pg->queue_snap_trim();
       });
 
-    pg->apply_ctx_stats(ctx.get());
+    pg->apply_ctx_scrub_stats(ctx.get());
 
     in_flight.insert(pos);
     pg->simple_opc_submit(std::move(ctx));
index 7b99a78773e38a5143d5dfff0a960d47045a5d71..88cf36ba2c81e1f68603e6912cb1fed11a6f77f2 100644 (file)
@@ -268,6 +268,9 @@ public:
     const object_stat_sum_t &stat_diff) override;
   void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
   void cancel_pull(const hobject_t &soid) override;
+  void apply_stats(
+    const hobject_t &soid,
+    const object_stat_sum_t &delta_stats) override;
 
   template<class T> class BlessedGenContext;
   class BlessedContext;
@@ -1100,8 +1103,9 @@ protected:
   void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
   void make_writeable(OpContext *ctx);
   void log_op_stats(OpContext *ctx);
-  void apply_ctx_stats(OpContext *ctx,
-                      bool scrub_ok=false); ///< true if we should skip scrub stat update
+  void apply_ctx_scrub_stats(
+    OpContext *ctx,
+    bool scrub_ok=false); ///< true if we should skip scrub stat update
 
   void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
                                   interval_set<uint64_t>& modified, uint64_t offset,