]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: add projected_log to do client dup detection on not yet committed log entries
authorSamuel Just <sjust@redhat.com>
Fri, 28 Oct 2016 01:38:02 +0000 (18:38 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 17 Nov 2016 18:41:34 +0000 (10:41 -0800)
Log entries don't get added to the log for ECBackend until reads are
done, yet we still want any other requests with the same id to wait.

ReplicatedPG::update_range should consider the projected log as well.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/PGLog.h
src/osd/ReplicatedPG.cc

index 4b43a141956ffd9317eb01c9c12329c45ea19ee4..0ab50db70771a45e0e37cc3d39b904986f875371 100644 (file)
@@ -944,6 +944,7 @@ void PG::clear_primary_state()
   min_last_complete_ondisk = eversion_t();
   pg_trim_to = eversion_t();
   might_have_unfound.clear();
+  projected_log = PGLog::IndexedLog();
 
   last_update_ondisk = eversion_t();
 
@@ -2470,6 +2471,18 @@ void PG::update_heartbeat_peers()
     osd->need_heartbeat_peer_update();
 }
 
+
+bool PG::check_in_progress_op(
+  const osd_reqid_t &r,
+  eversion_t *replay_version,
+  version_t *user_version,
+  int *return_code) const
+{
+  return (
+    projected_log.get_request(r, replay_version, user_version, return_code) ||
+    pg_log.get_log().get_request(r, replay_version, user_version, return_code));
+}
+
 void PG::_update_calc_stats()
 {
   info.stats.version = info.last_update;
@@ -3062,6 +3075,12 @@ void PG::append_log(
       pg_log.roll_forward(&handler);
     }
   }
+  auto last = logv.rbegin();
+  if (is_primary() && last != logv.rend()) {
+    projected_log.skip_can_rollback_to_to_head();
+    projected_log.trim(last->version, nullptr);
+  }
+
   if (transaction_applied && roll_forward_to > pg_log.get_can_rollback_to()) {
     pg_log.roll_forward_to(
       roll_forward_to,
@@ -4194,16 +4213,28 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
         }
 
         // walk the log to find the latest update that affects our chunk
-        scrubber.subset_last_update = pg_log.get_tail();
-        for (list<pg_log_entry_t>::const_reverse_iterator p = pg_log.get_log().log.rbegin();
-             p != pg_log.get_log().log.rend();
-             ++p) {
+        scrubber.subset_last_update = eversion_t();
+       for (auto p = projected_log.log.rbegin();
+            p != projected_log.log.rend();
+            ++p) {
           if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 &&
              cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) {
             scrubber.subset_last_update = p->version;
             break;
-          }
-        }
+         }
+       }
+       if (scrubber.subset_last_update == eversion_t()) {
+         for (list<pg_log_entry_t>::const_reverse_iterator p =
+                pg_log.get_log().log.rbegin();
+              p != pg_log.get_log().log.rend();
+              ++p) {
+           if (cmp(p->soid, scrubber.start, get_sort_bitwise()) >= 0 &&
+               cmp(p->soid, scrubber.end, get_sort_bitwise()) < 0) {
+             scrubber.subset_last_update = p->version;
+             break;
+           }
+         }
+       }
 
         // ask replicas to wait until
         // last_update_applied >= scrubber.subset_last_update and then scan
index 22ab92f38abe51bc68faea615bc2879e30bf1fe5..27255f398732af1b3f16d9613d2556c4ec05b221 100644 (file)
@@ -2191,6 +2191,12 @@ public:
     PerfCounters *logger = NULL);
   void write_if_dirty(ObjectStore::Transaction& t);
 
+  PGLog::IndexedLog projected_log;
+  bool check_in_progress_op(
+    const osd_reqid_t &r,
+    eversion_t *replay_version,
+    version_t *user_version,
+    int *return_code) const;
   eversion_t projected_last_update;
   eversion_t get_next_version() const {
     eversion_t at_version(
index eff487db2efa2ebb220e83dbcaa36acd34d0874e..837f69f450ddad757aae7cdef46a140e73c37904 100644 (file)
@@ -186,6 +186,21 @@ public:
       return divergent;
     }
 
+    template <typename T>
+    void scan_log_after(
+      const eversion_t &bound, ///< [in] scan entries > bound
+      T &&f) const {
+      auto iter = log.rbegin();
+      while (iter != log.rend() && iter->version > bound)
+       ++iter;
+
+      while (true) {
+       if (iter == log.rbegin())
+         break;
+       f(*(--iter));
+      }
+    }
+
     /****/
     void claim_log_and_clear_rollback_info(const pg_log_t& o) {
       // we must have already trimmed the old entries
index 6fdadd310d6ca8727338ca7711dbeac0b8b26073..773dd8e7deb21561e338ab7525f3c26d9fc1d6d6 100644 (file)
@@ -1950,7 +1950,7 @@ void ReplicatedPG::do_op(OpRequestRef& op)
     eversion_t replay_version;
     version_t user_version;
     int return_code = 0;
-    bool got = pg_log.get_log().get_request(
+    bool got = check_in_progress_op(
       m->get_reqid(), &replay_version, &user_version, &return_code);
     if (got) {
       dout(3) << __func__ << " dup " << m->get_reqid()
@@ -8573,6 +8573,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
     assert(ctx->at_version >= projected_last_update);
     projected_last_update = ctx->at_version;
   }
+  for (auto &&entry: ctx->log) {
+    projected_log.add(entry);
+  }
   pgbackend->submit_transaction(
     soid,
     ctx->delta_stats,
@@ -11358,11 +11361,11 @@ void ReplicatedPG::update_range(
     scan_range(local_min, local_max, bi, handle);
   }
 
-  if (bi->version >= info.last_update) {
+  if (bi->version >= projected_last_update) {
     dout(10) << __func__<< ": bi is current " << dendl;
-    assert(bi->version == info.last_update);
+    assert(bi->version == projected_last_update);
   } else if (bi->version >= info.log_tail) {
-    if (pg_log.get_log().empty()) {
+    if (pg_log.get_log().empty() && projected_log.empty()) {
       /* Because we don't move log_tail on split, the log might be
        * empty even if log_tail != last_update.  However, the only
        * way to get here with an empty log is if log_tail is actually
@@ -11372,41 +11375,36 @@ void ReplicatedPG::update_range(
       assert(bi->version == eversion_t());
       return;
     }
-    assert(!pg_log.get_log().empty());
+
     dout(10) << __func__<< ": bi is old, (" << bi->version
-            << ") can be updated with log" << dendl;
-    list<pg_log_entry_t>::const_iterator i =
-      pg_log.get_log().log.end();
-    --i;
-    while (i != pg_log.get_log().log.begin() &&
-           i->version > bi->version) {
-      --i;
-    }
-    if (i->version == bi->version)
-      ++i;
+            << ") can be updated with log to projected_last_update "
+            << projected_last_update << dendl;
 
-    assert(i != pg_log.get_log().log.end());
-    dout(10) << __func__ << ": updating from version " << i->version
-            << dendl;
-    for (; i != pg_log.get_log().log.end(); ++i) {
-      const hobject_t &soid = i->soid;
+    auto func = [&](const pg_log_entry_t &e) {
+      dout(10) << __func__ << ": updating from version " << e.version
+               << dendl;
+      const hobject_t &soid = e.soid;
       if (cmp(soid, bi->begin, get_sort_bitwise()) >= 0 &&
          cmp(soid, bi->end, get_sort_bitwise()) < 0) {
-       if (i->is_update()) {
-         dout(10) << __func__ << ": " << i->soid << " updated to version "
-                  << i->version << dendl;
-         bi->objects.erase(i->soid);
+       if (e.is_update()) {
+         dout(10) << __func__ << ": " << e.soid << " updated to version "
+                  << e.version << dendl;
+         bi->objects.erase(e.soid);
          bi->objects.insert(
            make_pair(
-             i->soid,
-             i->version));
-       } else if (i->is_delete()) {
-         dout(10) << __func__ << ": " << i->soid << " removed" << dendl;
-         bi->objects.erase(i->soid);
-       }
-      }
-    }
-    bi->version = info.last_update;
+             e.soid,
+             e.version));
+       } else if (e.is_delete()) {
+         dout(10) << __func__ << ": " << e.soid << " removed" << dendl;
+         bi->objects.erase(e.soid);
+       }
+      }
+    };
+    dout(10) << "scanning pg log first" << dendl;
+    pg_log.get_log().scan_log_after(bi->version, func);
+    dout(10) << "scanning projected log" << dendl;
+    projected_log.scan_log_after(bi->version, func);
+    bi->version = projected_last_update;
   } else {
     assert(0 == "scan_range should have raised bi->version past log_tail");
   }