]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG, PrimaryLogPG: trim log and rollback info for error log entries
authorJosh Durgin <jdurgin@redhat.com>
Thu, 8 Mar 2018 03:09:19 +0000 (22:09 -0500)
committerJosh Durgin <jdurgin@redhat.com>
Fri, 9 Mar 2018 22:54:08 +0000 (17:54 -0500)
Regular updates piggyback some osd state for this purpose with
MOSDRepOp[Reply]. Do the same thing for pure log entry updates (write
errors and lost/revert additions) via MOSDPGUpdateLogMissing[Reply].

Fixes: http://tracker.ceph.com/issues/22050
Signed-off-by: Josh Durgin <jdurgin@redhat.com>
qa/standalone/osd/repro_long_log.sh
src/messages/MOSDPGUpdateLogMissing.h
src/messages/MOSDPGUpdateLogMissingReply.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PrimaryLogPG.cc

index 28afca6e03e3dca6136ec00973da5002ae763ac6..9fadc50609be623106bf3a0982c1b7556d637627 100755 (executable)
@@ -81,8 +81,8 @@ function do_repro_long_log() {
         rados -p test rm foo
     done
 
-    # this demonstrates the problem - it should fail
-    test_log_size $PGID 41 || return 1
+    # log should have been trimmed down to min_entries with one extra
+    test_log_size $PGID 21 || return 1
 
     if [ "$which" = "test1" ];
     then
@@ -92,10 +92,11 @@ function do_repro_long_log() {
     else
         PRIMARY=$(ceph pg $PGID query  | jq '.info.stats.up_primary')
         kill_daemons $dir TERM osd.$PRIMARY || return 1
-        CEPH_ARGS="--osd-max-pg-log-entries=30 --osd-pg-log-trim-max=5" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1
+
+        CEPH_ARGS="--osd-max-pg-log-entries=2" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1
         run_osd $dir $PRIMARY || return 1
         wait_for_clean || return 1
-        test_log_size $PGID 30 || return 1
+        test_log_size $PGID 2 || return 1
     fi
 }
 
index 652757a69aa069e538907f2f0791171de3bbe9d9..18ff5423664d147b7348749076265d3923789981 100644 (file)
@@ -20,7 +20,7 @@
 
 class MOSDPGUpdateLogMissing : public MOSDFastDispatchOp {
 
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
   static const int COMPAT_VERSION = 1;
 
 
@@ -30,6 +30,9 @@ public:
   shard_id_t from;
   ceph_tid_t rep_tid = 0;
   mempool::osd_pglog::list<pg_log_entry_t> entries;
+  // piggybacked osd/pg state
+  eversion_t pg_trim_to; // primary->replica: trim to here
+  eversion_t pg_roll_forward_to; // primary->replica: trim rollback info to here
 
   epoch_t get_epoch() const { return map_epoch; }
   spg_t get_pgid() const { return pgid; }
@@ -55,7 +58,9 @@ public:
     shard_id_t from,
     epoch_t epoch,
     epoch_t min_epoch,
-    ceph_tid_t rep_tid)
+    ceph_tid_t rep_tid,
+    eversion_t pg_trim_to,
+    eversion_t pg_roll_forward_to)
     : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION,
                         COMPAT_VERSION),
       map_epoch(epoch),
@@ -63,7 +68,10 @@ public:
       pgid(pgid),
       from(from),
       rep_tid(rep_tid),
-      entries(entries) {}
+      entries(entries),
+      pg_trim_to(pg_trim_to),
+      pg_roll_forward_to(pg_roll_forward_to)
+  {}
 
 private:
   ~MOSDPGUpdateLogMissing() override {}
@@ -74,7 +82,10 @@ public:
     out << "pg_update_log_missing(" << pgid << " epoch " << map_epoch
        << "/" << min_epoch
        << " rep_tid " << rep_tid
-       << " entries " << entries << ")";
+       << " entries " << entries
+       << " trim_to " << pg_trim_to
+       << " roll_forward_to " << pg_roll_forward_to
+       << ")";
   }
 
   void encode_payload(uint64_t features) override {
@@ -85,6 +96,8 @@ public:
     encode(rep_tid, payload);
     encode(entries, payload);
     encode(min_epoch, payload);
+    encode(pg_trim_to, payload);
+    encode(pg_roll_forward_to, payload);
   }
   void decode_payload() override {
     bufferlist::iterator p = payload.begin();
@@ -98,6 +111,10 @@ public:
     } else {
       min_epoch = map_epoch;
     }
+    if (header.version >= 3) {
+      decode(pg_trim_to, p);
+      decode(pg_roll_forward_to, p);
+    }
   }
 };
 
index e7d088a1fda9e686edcb71a36d0808c8356fa138..3e5b5e3b7c23b7f4748736b48ac30be723ce2aad 100644 (file)
@@ -20,7 +20,7 @@
 
 class MOSDPGUpdateLogMissingReply : public MOSDFastDispatchOp {
 
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
   static const int COMPAT_VERSION = 1;
 
 
@@ -29,6 +29,8 @@ public:
   spg_t pgid;
   shard_id_t from;
   ceph_tid_t rep_tid = 0;
+  // piggybacked osd state
+  eversion_t last_complete_ondisk;
 
   epoch_t get_epoch() const { return map_epoch; }
   spg_t get_pgid() const { return pgid; }
@@ -58,7 +60,8 @@ public:
     shard_id_t from,
     epoch_t epoch,
     epoch_t min_epoch,
-    ceph_tid_t rep_tid)
+    ceph_tid_t rep_tid,
+    eversion_t last_complete_ondisk)
     : MOSDFastDispatchOp(
         MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY,
         HEAD_VERSION,
@@ -67,7 +70,8 @@ public:
       min_epoch(min_epoch),
       pgid(pgid),
       from(from),
-      rep_tid(rep_tid)
+      rep_tid(rep_tid),
+      last_complete_ondisk(last_complete_ondisk)
     {}
 
 private:
@@ -78,7 +82,8 @@ public:
   void print(ostream& out) const override {
     out << "pg_update_log_missing_reply(" << pgid << " epoch " << map_epoch
        << "/" << min_epoch
-       << " rep_tid " << rep_tid << ")";
+       << " rep_tid " << rep_tid
+       << " lcod " << last_complete_ondisk << ")";
   }
 
   void encode_payload(uint64_t features) override {
@@ -88,6 +93,7 @@ public:
     encode(from, payload);
     encode(rep_tid, payload);
     encode(min_epoch, payload);
+    encode(last_complete_ondisk, payload);
   }
   void decode_payload() override {
     bufferlist::iterator p = payload.begin();
@@ -100,6 +106,9 @@ public:
     } else {
       min_epoch = map_epoch;
     }
+    if (header.version >= 3) {
+      decode(last_complete_ondisk, p);
+    }
   }
 };
 
index 175bd755dc42a15ddc2f770d62d9ef63039735ec..744cde166f23e199260e41110bd4fa7c8a14b1be 100644 (file)
@@ -5246,25 +5246,41 @@ void PG::share_pg_info()
 
 bool PG::append_log_entries_update_missing(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-  ObjectStore::Transaction &t)
+  ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
+  boost::optional<eversion_t> roll_forward_to)
 {
   assert(!entries.empty());
   assert(entries.begin()->version > info.last_update);
 
   PGLogEntryHandler rollbacker{this, &t};
+  if (roll_forward_to) {
+    pg_log.roll_forward(&rollbacker);
+  }
   bool invalidate_stats =
     pg_log.append_new_log_entries(info.last_backfill,
                                  info.last_backfill_bitwise,
                                  entries,
                                  &rollbacker);
+
+  if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
+    pg_log.roll_forward(&rollbacker);
+  }
+  if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
+    pg_log.roll_forward_to(*roll_forward_to, &rollbacker);
+    last_rollback_info_trimmed_to_applied = *roll_forward_to;
+  }
+
   info.last_update = pg_log.get_head();
 
   if (pg_log.get_missing().num_missing() == 0) {
     // advance last_complete since nothing else is missing!
     info.last_complete = info.last_update;
   }
-
   info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
+
+  dout(20) << __func__ << "trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
+  if (trim_to)
+    pg_log.trim(*trim_to, info);
   dirty_info = true;
   write_if_dirty(t);
   return invalidate_stats;
@@ -5273,12 +5289,14 @@ bool PG::append_log_entries_update_missing(
 
 void PG::merge_new_log_entries(
   const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-  ObjectStore::Transaction &t)
+  ObjectStore::Transaction &t,
+  boost::optional<eversion_t> trim_to,
+  boost::optional<eversion_t> roll_forward_to)
 {
   dout(10) << __func__ << " " << entries << dendl;
   assert(is_primary());
 
-  bool rebuild_missing = append_log_entries_update_missing(entries, t);
+  bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
   for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
        i != actingbackfill.end();
        ++i) {
index a167e75bbe0f66d7bc1951f5802d162683c86012..e091e458501b525cf5355cc8795aba7b2cb6d377 100644 (file)
@@ -2814,7 +2814,9 @@ protected:
 
   bool append_log_entries_update_missing(
     const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-    ObjectStore::Transaction &t);
+    ObjectStore::Transaction &t,
+    boost::optional<eversion_t> trim_to,
+    boost::optional<eversion_t> roll_forward_to);
 
   /**
    * Merge entries updating missing as necessary on all
@@ -2822,7 +2824,9 @@ protected:
    */
   void merge_new_log_entries(
     const mempool::osd_pglog::list<pg_log_entry_t> &entries,
-    ObjectStore::Transaction &t);
+    ObjectStore::Transaction &t,
+    boost::optional<eversion_t> trim_to,
+    boost::optional<eversion_t> roll_forward_to);
 
   void reset_interval_flush();
   void start_peering_interval(
index dc194e0372998fe754e79754cacd61cd4b13104d..d60a9824a46523fa49f05dd4a838c7723d241343 100644 (file)
@@ -10156,7 +10156,7 @@ void PrimaryLogPG::submit_log_entries(
     [this, entries, repop, on_complete]() {
       ObjectStore::Transaction t;
       eversion_t old_last_update = info.last_update;
-      merge_new_log_entries(entries, t);
+      merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
 
 
       set<pg_shard_t> waiting_on;
@@ -10175,7 +10175,9 @@ void PrimaryLogPG::submit_log_entries(
            pg_whoami.shard,
            get_osdmap()->get_epoch(),
            last_peering_reset,
-           repop->rep_tid);
+           repop->rep_tid,
+           pg_trim_to,
+           min_last_complete_ondisk);
          osd->send_message_osd_cluster(
            peer.osd, m, get_osdmap()->get_epoch());
          waiting_on.insert(peer);
@@ -10228,6 +10230,8 @@ void PrimaryLogPG::submit_log_entries(
       int r = osd->store->queue_transaction(ch, std::move(t), NULL);
       assert(r == 0);
     });
+
+  calc_trim_to();
 }
 
 void PrimaryLogPG::cancel_log_updates()
@@ -11162,7 +11166,16 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
     op->get_req());
   assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
   ObjectStore::Transaction t;
-  append_log_entries_update_missing(m->entries, t);
+  boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  if (m->pg_trim_to != eversion_t())
+    op_trim_to = m->pg_trim_to;
+  if (m->pg_roll_forward_to != eversion_t())
+    op_roll_forward_to = m->pg_roll_forward_to;
+
+  dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+  append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+  eversion_t new_lcod = info.last_complete;
 
   Context *complete = new FunctionContext(
     [=](int) {
@@ -11170,13 +11183,15 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
        op->get_req());
       lock();
       if (!pg_has_reset_since(msg->get_epoch())) {
+       update_last_complete_ondisk(new_lcod);
        MOSDPGUpdateLogMissingReply *reply =
          new MOSDPGUpdateLogMissingReply(
            spg_t(info.pgid.pgid, primary_shard().shard),
            pg_whoami.shard,
            msg->get_epoch(),
            msg->min_epoch,
-           msg->get_tid());
+           msg->get_tid(),
+           new_lcod);
        reply->set_priority(CEPH_MSG_PRIO_HIGH);
        msg->get_connection()->send_message(reply);
       }
@@ -11216,6 +11231,9 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
   if (it != log_entry_update_waiting_on.end()) {
     if (it->second.waiting_on.count(m->get_from())) {
       it->second.waiting_on.erase(m->get_from());
+      if (m->last_complete_ondisk != eversion_t()) {
+       update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+      }
     } else {
       osd->clog->error()
        << info.pgid << " got reply "