]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: Move log version pointer updates to PeeringState
authorSamuel Just <sjust@redhat.com>
Sat, 13 Apr 2019 00:12:45 +0000 (17:12 -0700)
committersjust@redhat.com <sjust@redhat.com>
Wed, 1 May 2019 18:22:27 +0000 (11:22 -0700)
Signed-off-by: sjust@redhat.com <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PeeringState.cc
src/osd/PeeringState.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index a2f616e663dbae2cb6e156ceac0a21ab06a58f19..06fd8340463b74cd7d77243b674d2418048bbba8 100644 (file)
@@ -188,13 +188,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   pg_whoami(recovery_state.pg_whoami),
   info(recovery_state.info),
   pg_log(recovery_state.pg_log),
-  last_update_ondisk(recovery_state.last_update_ondisk),
-  last_complete_ondisk(recovery_state.last_complete_ondisk),
-  last_update_applied(recovery_state.last_update_applied),
   peer_info(recovery_state.peer_info),
-  peer_last_complete_ondisk(recovery_state.peer_last_complete_ondisk),
-  min_last_complete_ondisk(recovery_state.min_last_complete_ondisk),
-  pg_trim_to(recovery_state.pg_trim_to),
   missing_loc(recovery_state.missing_loc),
   pg_id(p),
   coll(p),
@@ -332,8 +326,6 @@ void PG::update_object_snap_mapping(
 /******* PG ***********/
 void PG::clear_primary_state()
 {
-  last_update_ondisk = eversion_t();
-
   projected_log = PGLog::IndexedLog();
 
   snap_trimq.clear();
@@ -2814,7 +2806,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
         break;
 
       case PG::Scrubber::WAIT_LAST_UPDATE:
-        if (last_update_applied < scrubber.subset_last_update) {
+        if (recovery_state.get_last_update_applied() <
+         scrubber.subset_last_update) {
           // will be requeued by op_applied
           dout(15) << "wait for EC read/modify/writes to queue" << dendl;
           done = true;
@@ -2842,7 +2835,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
         break;
 
       case PG::Scrubber::BUILD_MAP:
-        ceph_assert(last_update_applied >= scrubber.subset_last_update);
+        ceph_assert(recovery_state.get_last_update_applied() >=
+         scrubber.subset_last_update);
 
         // build my own scrub map
        if (scrub_preempted) {
@@ -2898,7 +2892,8 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
         break;
 
       case PG::Scrubber::COMPARE_MAPS:
-        ceph_assert(last_update_applied >= scrubber.subset_last_update);
+        ceph_assert(recovery_state.get_last_update_applied() >=
+         scrubber.subset_last_update);
         ceph_assert(scrubber.waiting_on_whom.empty());
 
         scrub_compare_maps();
index f38ba46ad0437d2133148876f422af8eba7a4088..1469862fa9f937131981532dbcfa00c6c8a7ab82 100644 (file)
@@ -185,13 +185,7 @@ protected:
   pg_shard_t pg_whoami;
   pg_info_t &info;
   PGLog &pg_log;
-  eversion_t &last_update_ondisk;
-  eversion_t &last_complete_ondisk;
-  eversion_t &last_update_applied;
   map<pg_shard_t, pg_info_t> &peer_info;
-  map<pg_shard_t,eversion_t> &peer_last_complete_ondisk;
-  eversion_t &min_last_complete_ondisk;
-  eversion_t &pg_trim_to;
   MissingLoc &missing_loc;
 
 public:
@@ -998,10 +992,6 @@ protected:
 
   bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
 
-  virtual void calc_trim_to() = 0;
-
-  virtual void calc_trim_to_aggressive() = 0;
-
   struct PGLogEntryHandler : public PGLog::LogEntryHandler {
     PG *pg;
     ObjectStore::Transaction *t;
@@ -1377,10 +1367,6 @@ protected:
   bool delete_needs_sleep = false;
 
 protected:
-  bool hard_limit_pglog() const {
-    return (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT));
-  }
-
   bool state_test(uint64_t m) const { return recovery_state.state_test(m); }
   void state_set(uint64_t m) { recovery_state.state_set(m); }
   void state_clear(uint64_t m) { recovery_state.state_clear(m); }
index 4f2fac332f5aff4af22ed0be75d10c0b5f86d9d1..0802f5dc3725ca3a9c818c1cb7f5aa1895f84b9e 100644 (file)
@@ -11,6 +11,7 @@
 #include "messages/MRecoveryReserve.h"
 #include "messages/MOSDScrubReserve.h"
 #include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGTrim.h"
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
@@ -797,6 +798,7 @@ void PeeringState::clear_primary_state()
 
   clear_recovery_state();
 
+  last_update_ondisk = eversion_t();
   pl->clear_primary_state();
 }
 
@@ -3743,6 +3745,136 @@ void PeeringState::pre_submit_op(
   }
 }
 
+void PeeringState::recovery_committed_to(eversion_t version)
+{
+  psdout(10) << __func__ << " version " << version
+            << " now ondisk" << dendl;
+  last_complete_ondisk = version;
+
+  if (last_complete_ondisk == info.last_update) {
+    if (!is_primary()) {
+      // Either we are a replica or backfill target.
+      // we are fully up to date.  tell the primary!
+      pl->send_cluster_message(
+       get_primary().osd,
+       new MOSDPGTrim(
+         get_osdmap_epoch(),
+         spg_t(info.pgid.pgid, primary.shard),
+         last_complete_ondisk),
+       get_osdmap_epoch());
+    } else {
+      calc_min_last_complete_ondisk();
+    }
+  }
+}
+
+void PeeringState::complete_write(eversion_t v, eversion_t lc)
+{
+  last_update_ondisk = v;
+  last_complete_ondisk = lc;
+  calc_min_last_complete_ondisk();
+}
+
+void PeeringState::calc_trim_to()
+{
+  size_t target = cct->_conf->osd_min_pg_log_entries;
+  if (is_degraded() ||
+      state_test(PG_STATE_RECOVERING |
+                 PG_STATE_RECOVERY_WAIT |
+                 PG_STATE_BACKFILLING |
+                 PG_STATE_BACKFILL_WAIT |
+                 PG_STATE_BACKFILL_TOOFULL)) {
+    target = cct->_conf->osd_max_pg_log_entries;
+  }
+
+  eversion_t limit = std::min(
+    min_last_complete_ondisk,
+    pg_log.get_can_rollback_to());
+  if (limit != eversion_t() &&
+      limit != pg_trim_to &&
+      pg_log.get_log().approx_size() > target) {
+    size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
+                             cct->_conf->osd_pg_log_trim_max);
+    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+        cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
+      return;
+    }
+    list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
+    eversion_t new_trim_to;
+    for (size_t i = 0; i < num_to_trim; ++i) {
+      new_trim_to = it->version;
+      ++it;
+      if (new_trim_to > limit) {
+        new_trim_to = limit;
+        psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
+        break;
+      }
+    }
+    psdout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
+    pg_trim_to = new_trim_to;
+    assert(pg_trim_to <= pg_log.get_head());
+    assert(pg_trim_to <= min_last_complete_ondisk);
+  }
+}
+
+void PeeringState::calc_trim_to_aggressive()
+{
+  size_t target = cct->_conf->osd_min_pg_log_entries;
+  if (is_degraded() ||
+      state_test(PG_STATE_RECOVERING |
+                PG_STATE_RECOVERY_WAIT |
+                PG_STATE_BACKFILLING |
+                PG_STATE_BACKFILL_WAIT |
+                PG_STATE_BACKFILL_TOOFULL)) {
+    target = cct->_conf->osd_max_pg_log_entries;
+  }
+  // limit pg log trimming up to the can_rollback_to value
+  eversion_t limit = std::min(
+    pg_log.get_head(),
+    pg_log.get_can_rollback_to());
+  psdout(10) << __func__ << " limit = " << limit << dendl;
+
+  if (limit != eversion_t() &&
+      limit != pg_trim_to &&
+      pg_log.get_log().approx_size() > target) {
+    psdout(10) << __func__ << " approx pg log length =  "
+             << pg_log.get_log().approx_size() << dendl;
+    uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
+                                              cct->_conf->osd_pg_log_trim_max);
+    psdout(10) << __func__ << " num_to_trim =  " << num_to_trim << dendl;
+    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+       cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
+      return;
+    }
+    auto it = pg_log.get_log().log.begin(); // oldest log entry
+    auto rit = pg_log.get_log().log.rbegin();
+    eversion_t by_n_to_keep; // start from tail
+    eversion_t by_n_to_trim = eversion_t::max(); // start from head
+    for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
+      i++;
+      if (i > target && by_n_to_keep == eversion_t()) {
+        by_n_to_keep = rit->version;
+      }
+      if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
+        by_n_to_trim = it->version;
+      }
+      if (by_n_to_keep != eversion_t() &&
+          by_n_to_trim != eversion_t::max()) {
+        break;
+      }
+    }
+
+    if (by_n_to_keep == eversion_t()) {
+      return;
+    }
+
+    pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
+    psdout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
+    ceph_assert(pg_trim_to <= pg_log.get_head());
+  }
+}
+
+
 /*------------ Peering State Machine----------------*/
 #undef dout_prefix
 #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
index 8af0d6ed90a2cac8c3b17017ede84611525c6862..1211c62d313330abba0520b88d654501260a94ad 100644 (file)
@@ -1422,6 +1422,9 @@ public:
 
   void add_log_entry(const pg_log_entry_t& e, bool applied);
 
+  void calc_trim_to();
+  void calc_trim_to_aggressive();
+
 public:
   PeeringState(
     CephContext *cct,
@@ -1525,6 +1528,14 @@ public:
     bool transaction_applied,
     bool async);
 
+  void update_trim_to() {
+    bool hard_limit = (get_osdmap()->test_flag(CEPH_OSDMAP_PGLOG_HARDLIMIT));
+    if (hard_limit)
+      calc_trim_to_aggressive();
+    else
+      calc_trim_to();
+  }
+
   void pre_submit_op(
     const hobject_t &hoid,
     const vector<pg_log_entry_t>& logv,
@@ -1555,6 +1566,24 @@ public:
     const hobject_t &oid,
     eversion_t version);
 
+  void update_peer_last_complete_ondisk(
+    pg_shard_t fromosd,
+    eversion_t lcod) {
+    peer_last_complete_ondisk[fromosd] = lcod;
+  }
+
+  void update_last_complete_ondisk(
+    eversion_t lcod) {
+    last_complete_ondisk = lcod;
+  }
+
+  void recovery_committed_to(eversion_t version);
+
+  void complete_write(eversion_t v, eversion_t lc);
+  void local_write_applied(eversion_t v) {
+    last_update_applied = v;
+  }
+
   void dump_history(Formatter *f) const {
     state_history.dump(f);
   }
@@ -1815,6 +1844,14 @@ public:
     return min_last_complete_ondisk;
   }
 
+  eversion_t get_pg_trim_to() const {
+    return pg_trim_to;
+  }
+
+  eversion_t get_last_update_applied() const {
+    return last_update_applied;
+  }
+
   bool debug_has_dirty_state() const {
     return dirty_info || dirty_big_info;
   }
index 84e99eb9a40005cc7365c9ea756a7535936d3cd1..2dec89ee12b89e5f51c024b4bd5fe34f8023e761 100644 (file)
@@ -1533,105 +1533,6 @@ int PrimaryLogPG::do_scrub_ls(MOSDOp *m, OSDOp *osd_op)
   return r;
 }
 
-void PrimaryLogPG::calc_trim_to()
-{
-  size_t target = cct->_conf->osd_min_pg_log_entries;
-  if (is_degraded() ||
-      state_test(PG_STATE_RECOVERING |
-                 PG_STATE_RECOVERY_WAIT |
-                 PG_STATE_BACKFILLING |
-                 PG_STATE_BACKFILL_WAIT |
-                 PG_STATE_BACKFILL_TOOFULL)) {
-    target = cct->_conf->osd_max_pg_log_entries;
-  }
-
-  eversion_t limit = std::min(
-    min_last_complete_ondisk,
-    pg_log.get_can_rollback_to());
-  if (limit != eversion_t() &&
-      limit != pg_trim_to &&
-      pg_log.get_log().approx_size() > target) {
-    size_t num_to_trim = std::min(pg_log.get_log().approx_size() - target,
-                             cct->_conf->osd_pg_log_trim_max);
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
-        cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
-      return;
-    }
-    list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
-    eversion_t new_trim_to;
-    for (size_t i = 0; i < num_to_trim; ++i) {
-      new_trim_to = it->version;
-      ++it;
-      if (new_trim_to > limit) {
-        new_trim_to = limit;
-        dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
-        break;
-      }
-    }
-    dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
-    pg_trim_to = new_trim_to;
-    assert(pg_trim_to <= pg_log.get_head());
-    assert(pg_trim_to <= min_last_complete_ondisk);
-  }
-}
-
-void PrimaryLogPG::calc_trim_to_aggressive()
-{
-  size_t target = cct->_conf->osd_min_pg_log_entries;
-  if (is_degraded() ||
-      state_test(PG_STATE_RECOVERING |
-                PG_STATE_RECOVERY_WAIT |
-                PG_STATE_BACKFILLING |
-                PG_STATE_BACKFILL_WAIT |
-                PG_STATE_BACKFILL_TOOFULL)) {
-    target = cct->_conf->osd_max_pg_log_entries;
-  }
-  // limit pg log trimming up to the can_rollback_to value
-  eversion_t limit = std::min(
-    pg_log.get_head(),
-    pg_log.get_can_rollback_to());
-  dout(10) << __func__ << " limit = " << limit << dendl;
-
-  if (limit != eversion_t() &&
-      limit != pg_trim_to &&
-      pg_log.get_log().approx_size() > target) {
-    dout(10) << __func__ << " approx pg log length =  "
-             << pg_log.get_log().approx_size() << dendl;
-    uint64_t num_to_trim = std::min<uint64_t>(pg_log.get_log().approx_size() - target,
-                                              cct->_conf->osd_pg_log_trim_max);
-    dout(10) << __func__ << " num_to_trim =  " << num_to_trim << dendl;
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
-       cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
-      return;
-    }
-    auto it = pg_log.get_log().log.begin(); // oldest log entry
-    auto rit = pg_log.get_log().log.rbegin();
-    eversion_t by_n_to_keep; // start from tail
-    eversion_t by_n_to_trim = eversion_t::max(); // start from head
-    for (size_t i = 0; it != pg_log.get_log().log.end(); ++it, ++rit) {
-      i++;
-      if (i > target && by_n_to_keep == eversion_t()) {
-        by_n_to_keep = rit->version;
-      }
-      if (i >= num_to_trim && by_n_to_trim == eversion_t::max()) {
-        by_n_to_trim = it->version;
-      }
-      if (by_n_to_keep != eversion_t() &&
-          by_n_to_trim != eversion_t::max()) {
-        break;
-      }
-    }
-
-    if (by_n_to_keep == eversion_t()) {
-      return;
-    }
-
-    pg_trim_to = std::min({by_n_to_keep, by_n_to_trim, limit});
-    dout(10) << __func__ << " pg_trim_to now " << pg_trim_to << dendl;
-    ceph_assert(pg_trim_to <= pg_log.get_head());
-  }
-}
-
 PrimaryLogPG::PrimaryLogPG(OSDService *o, OSDMapRef curmap,
                           const PGPool &_pool,
                           const map<string,string>& ec_profile, spg_t p) :
@@ -3944,10 +3845,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   ceph_assert(op->may_write() || op->may_cache());
 
   // trim log?
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
 
   // verify that we are doing this in order?
   if (cct->_conf->osd_debug_op_order && m->get_source().is_client() &&
@@ -10344,8 +10242,7 @@ void PrimaryLogPG::repop_all_committed(RepGather *repop)
   repop->all_committed = true;
   if (!repop->rep_aborted) {
     if (repop->v != eversion_t()) {
-      last_update_ondisk = repop->v;
-      last_complete_ondisk = repop->pg_local_last_complete;
+      recovery_state.complete_write(repop->v, repop->pg_local_last_complete);
     }
     eval_repop(repop);
   }
@@ -10356,10 +10253,11 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   dout(10) << "op_applied version " << applied_version << dendl;
   ceph_assert(applied_version != eversion_t());
   ceph_assert(applied_version <= info.last_update);
-  last_update_applied = applied_version;
+  recovery_state.local_write_applied(applied_version);
   if (is_primary()) {
     if (scrubber.active) {
-      if (last_update_applied >= scrubber.subset_last_update) {
+      if (recovery_state.get_last_update_applied() >=
+       scrubber.subset_last_update) {
        requeue_scrub(ops_blocked_by_scrub());
       }
     } else {
@@ -10403,7 +10301,6 @@ void PrimaryLogPG::eval_repop(RepGather *repop)
     }
 
     publish_stats_to_osd();
-    recovery_state.calc_min_last_complete_ondisk();
 
     dout(10) << " removing " << *repop << dendl;
     ceph_assert(!repop_queue.empty());
@@ -10460,8 +10357,8 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
     ctx->delta_stats,
     ctx->at_version,
     std::move(ctx->op_t),
-    pg_trim_to,
-    min_last_complete_ondisk,
+    recovery_state.get_pg_trim_to(),
+    recovery_state.get_min_last_complete_ondisk(),
     ctx->log,
     ctx->updated_hset_history,
     on_all_commit,
@@ -10553,10 +10450,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
   dout(20) << __func__ << " " << repop << dendl;
   issue_repop(repop, ctx.get());
   eval_repop(repop);
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
   repop->put();
 }
 
@@ -10595,8 +10489,8 @@ void PrimaryLogPG::submit_log_entries(
       ObjectStore::Transaction t;
       eversion_t old_last_update = info.last_update;
       recovery_state.merge_new_log_entries(
-       entries, t, pg_trim_to, min_last_complete_ondisk);
-
+       entries, t, recovery_state.get_pg_trim_to(),
+       recovery_state.get_min_last_complete_ondisk());
 
       set<pg_shard_t> waiting_on;
       for (set<pg_shard_t>::const_iterator i = get_acting_recovery_backfill().begin();
@@ -10615,8 +10509,8 @@ void PrimaryLogPG::submit_log_entries(
            get_osdmap_epoch(),
            get_last_peering_reset(),
            repop->rep_tid,
-           pg_trim_to,
-           min_last_complete_ondisk);
+           recovery_state.get_pg_trim_to(),
+           recovery_state.get_min_last_complete_ondisk());
          osd->send_message_osd_cluster(
            peer.osd, m, get_osdmap_epoch());
          waiting_on.insert(peer);
@@ -10671,10 +10565,7 @@ void PrimaryLogPG::submit_log_entries(
       op_applied(info.last_update);
     });
 
-  if (hard_limit_pglog())
-    calc_trim_to_aggressive();
-  else
-    calc_trim_to();
+  recovery_state.update_trim_to();
 }
 
 void PrimaryLogPG::cancel_log_updates()
@@ -11484,27 +11375,10 @@ void PrimaryLogPG::_committed_pushed_object(
 {
   lock();
   if (!pg_has_reset_since(epoch)) {
-    dout(10) << __func__ << " last_complete " << last_complete << " now ondisk" << dendl;
-    last_complete_ondisk = last_complete;
-
-    if (last_complete_ondisk == info.last_update) {
-      if (!is_primary()) {
-        // Either we are a replica or backfill target.
-       // we are fully up to date.  tell the primary!
-       osd->send_message_osd_cluster(
-         get_primary().osd,
-         new MOSDPGTrim(
-           get_osdmap_epoch(),
-           spg_t(info.pgid.pgid, get_primary().shard),
-           last_complete_ondisk),
-         get_osdmap_epoch());
-      } else {
-       recovery_state.calc_min_last_complete_ondisk();
-      }
-    }
-
+    recovery_state.recovery_committed_to(last_complete);
   } else {
-    dout(10) << __func__ << " pg has changed, not touching last_complete_ondisk" << dendl;
+    dout(10) << __func__
+            << " pg has changed, not touching last_complete_ondisk" << dendl;
   }
 
   unlock();
index 7953989902ec88def726196934228b18c7a16e58..884db99fe716eba9209158cf78e1f6aa7b204b99 100644 (file)
@@ -482,12 +482,12 @@ public:
   void update_peer_last_complete_ondisk(
     pg_shard_t fromosd,
     eversion_t lcod) override {
-    peer_last_complete_ondisk[fromosd] = lcod;
+    recovery_state.update_peer_last_complete_ondisk(fromosd, lcod);
   }
 
   void update_last_complete_ondisk(
     eversion_t lcod) override {
-    last_complete_ondisk = lcod;
+    recovery_state.update_last_complete_ondisk(lcod);
   }
 
   void update_stats(
@@ -1382,8 +1382,6 @@ protected:
                    unsigned split_bits) override;
   void apply_and_flush_repops(bool requeue);
 
-  void calc_trim_to() override;
-  void calc_trim_to_aggressive() override;
   int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr);
   int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr);