]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: use PGBackend::call_write_ordered to submit log entries in commit order
authorSamuel Just <sjust@redhat.com>
Thu, 3 Nov 2016 00:38:13 +0000 (17:38 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 17 Nov 2016 18:41:33 +0000 (10:41 -0800)
Without this change, we might submit new log entries for marking objects
unfound in a way that causes replicas to process them out-of-order with
pending writes with lower version numbers.  That would be bad.  Instead,
add an interface to allow an arbitrary callback to be called after any
previously submitted transaction commit, but before any subsequently
submitted operations commit.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/PGBackend.h
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc

index 418c2c08a88f60902c006730c94b31435df485bf..d4e0091ca735b219ce2588ce4c4d6579ce437224 100644 (file)
@@ -401,6 +401,8 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
      OpRequestRef op                      ///< [in] op
      ) = 0;
 
+   /// submit callback to be called in order with pending writes
+   virtual void call_write_ordered(std::function<void(void)> &&cb) = 0;
 
    void try_stash(
      const hobject_t &hoid,
index 6b8df0a5d34011f0316c488ea0af9bc66996804e..b9431cc0a8d01f8b6a34eed4474af5f3f2e3c365 100644 (file)
@@ -342,6 +342,13 @@ private:
 public:
   friend class C_OSD_OnOpCommit;
   friend class C_OSD_OnOpApplied;
+
+  void call_write_ordered(std::function<void(void)> &&cb) override {
+    // ReplicatedBackend submits writes inline in submit_transaction, so
+    // we can just call the callback.
+    cb();
+  }
+
   void submit_transaction(
     const hobject_t &hoid,
     const object_stat_sum_t &delta_stats,
index aa250028212583fa15dc7982a1436da8013649d2..a3d324938832e8d36ac875a871a2124fa1eee46a 100644 (file)
@@ -8665,121 +8665,133 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
 void ReplicatedPG::submit_log_entries(
   const mempool::osd::list<pg_log_entry_t> &entries,
   ObcLockManager &&manager,
-  boost::optional<std::function<void(void)> > &&on_complete,
+  boost::optional<std::function<void(void)> > &&_on_complete,
   OpRequestRef op)
 {
   dout(10) << __func__ << " " << entries << dendl;
   assert(is_primary());
 
-  ObjectStore::Transaction t;
-
-  eversion_t old_last_update = info.last_update;
-  merge_new_log_entries(entries, t);
+  if (!entries.empty()) {
+    assert(entries.rbegin()->version >= projected_last_update);
+    projected_last_update = entries.rbegin()->version;
+  }
 
   boost::intrusive_ptr<RepGather> repop;
-  set<pg_shard_t> waiting_on;
+  boost::optional<std::function<void(void)> > on_complete;
   if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
     repop = new_repop(
       std::move(manager),
       std::move(op),
-      std::move(on_complete));
-  }
-  for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
-       i != actingbackfill.end();
-       ++i) {
-    pg_shard_t peer(*i);
-    if (peer == pg_whoami) continue;
-    assert(peer_missing.count(peer));
-    assert(peer_info.count(peer));
-    if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
-      assert(repop);
-      MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
-       entries,
-       spg_t(info.pgid.pgid, i->shard),
-       pg_whoami.shard,
-       get_osdmap()->get_epoch(),
-       repop->rep_tid);
-      osd->send_message_osd_cluster(
-       peer.osd, m, get_osdmap()->get_epoch());
-      waiting_on.insert(peer);
-    } else {
-      MOSDPGLog *m = new MOSDPGLog(
-       peer.shard, pg_whoami.shard,
-       info.last_update.epoch,
-       info);
-      m->log.log = entries;
-      m->log.tail = old_last_update;
-      m->log.head = info.last_update;
-      osd->send_message_osd_cluster(
-       peer.osd, m, get_osdmap()->get_epoch());
-    }
+      std::move(_on_complete));
+  } else {
+    on_complete = std::move(_on_complete);
   }
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
-    ceph_tid_t rep_tid = repop->rep_tid;
-    waiting_on.insert(pg_whoami);
-    log_entry_update_waiting_on.insert(
-      make_pair(
-       rep_tid,
-       LogUpdateCtx{std::move(repop), std::move(waiting_on)}
-       ));
-    struct OnComplete : public Context {
-      ReplicatedPGRef pg;
-      ceph_tid_t rep_tid;
-      epoch_t epoch;
-      OnComplete(
-       ReplicatedPGRef pg,
-       ceph_tid_t rep_tid,
-       epoch_t epoch)
-       : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
-      void finish(int) override {
-       pg->lock();
-       if (!pg->pg_has_reset_since(epoch)) {
-         auto it = pg->log_entry_update_waiting_on.find(rep_tid);
-         assert(it != pg->log_entry_update_waiting_on.end());
-         auto it2 = it->second.waiting_on.find(pg->pg_whoami);
-         assert(it2 != it->second.waiting_on.end());
-         it->second.waiting_on.erase(it2);
-         if (it->second.waiting_on.empty()) {
-           pg->repop_all_applied(it->second.repop.get());
-           pg->repop_all_committed(it->second.repop.get());
-           pg->log_entry_update_waiting_on.erase(it);
+
+  pgbackend->call_write_ordered(
+    [this, entries, repop, on_complete]() {
+      ObjectStore::Transaction t;
+      eversion_t old_last_update = info.last_update;
+      merge_new_log_entries(entries, t);
+
+
+      set<pg_shard_t> waiting_on;
+      for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
+          i != actingbackfill.end();
+          ++i) {
+       pg_shard_t peer(*i);
+       if (peer == pg_whoami) continue;
+       assert(peer_missing.count(peer));
+       assert(peer_info.count(peer));
+       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+         assert(repop);
+         MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
+           entries,
+           spg_t(info.pgid.pgid, i->shard),
+           pg_whoami.shard,
+           get_osdmap()->get_epoch(),
+           repop->rep_tid);
+         osd->send_message_osd_cluster(
+           peer.osd, m, get_osdmap()->get_epoch());
+         waiting_on.insert(peer);
+       } else {
+         MOSDPGLog *m = new MOSDPGLog(
+           peer.shard, pg_whoami.shard,
+           info.last_update.epoch,
+           info);
+         m->log.log = entries;
+         m->log.tail = old_last_update;
+         m->log.head = info.last_update;
+         osd->send_message_osd_cluster(
+           peer.osd, m, get_osdmap()->get_epoch());
+       }
+      }
+      if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       ceph_tid_t rep_tid = repop->rep_tid;
+       waiting_on.insert(pg_whoami);
+       log_entry_update_waiting_on.insert(
+         make_pair(
+           rep_tid,
+           LogUpdateCtx{std::move(repop), std::move(waiting_on)}
+           ));
+       struct OnComplete : public Context {
+         ReplicatedPGRef pg;
+         ceph_tid_t rep_tid;
+         epoch_t epoch;
+         OnComplete(
+           ReplicatedPGRef pg,
+           ceph_tid_t rep_tid,
+           epoch_t epoch)
+           : pg(pg), rep_tid(rep_tid), epoch(epoch) {}
+         void finish(int) override {
+           pg->lock();
+           if (!pg->pg_has_reset_since(epoch)) {
+             auto it = pg->log_entry_update_waiting_on.find(rep_tid);
+             assert(it != pg->log_entry_update_waiting_on.end());
+             auto it2 = it->second.waiting_on.find(pg->pg_whoami);
+             assert(it2 != it->second.waiting_on.end());
+             it->second.waiting_on.erase(it2);
+             if (it->second.waiting_on.empty()) {
+               pg->repop_all_applied(it->second.repop.get());
+               pg->repop_all_committed(it->second.repop.get());
+               pg->log_entry_update_waiting_on.erase(it);
+             }
+           }
+           pg->unlock();
          }
+       };
+       t.register_on_complete(
+         new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
+      } else {
+       if (on_complete) {
+         struct OnComplete : public Context {
+           ReplicatedPGRef pg;
+           std::function<void(void)> on_complete;
+           epoch_t epoch;
+           OnComplete(
+             ReplicatedPGRef pg,
+             const std::function<void(void)> &on_complete,
+             epoch_t epoch)
+             : pg(pg),
+               on_complete(std::move(on_complete)),
+               epoch(epoch) {}
+           void finish(int) override {
+             pg->lock();
+             if (!pg->pg_has_reset_since(epoch))
+               on_complete();
+             pg->unlock();
+           }
+         };
+         t.register_on_complete(
+           new OnComplete{
+             this, *on_complete, get_osdmap()->get_epoch()
+               });
        }
-       pg->unlock();
       }
-    };
-    t.register_on_complete(
-      new OnComplete{this, rep_tid, get_osdmap()->get_epoch()});
-  } else {
-    if (on_complete) {
-      struct OnComplete : public Context {
-       ReplicatedPGRef pg;
-       std::function<void(void)> on_complete;
-       epoch_t epoch;
-       OnComplete(
-         ReplicatedPGRef pg,
-         std::function<void(void)> &&on_complete,
-         epoch_t epoch)
-         : pg(pg),
-           on_complete(std::move(on_complete)),
-           epoch(epoch) {}
-       void finish(int) override {
-         pg->lock();
-         if (!pg->pg_has_reset_since(epoch))
-           on_complete();
-         pg->unlock();
-       }
-      };
-      t.register_on_complete(
-       new OnComplete{
-         this, std::move(*on_complete), get_osdmap()->get_epoch()
-         });
-    }
-  }
-  t.register_on_applied(
-    new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
-  int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
-  assert(r == 0);
+      t.register_on_applied(
+       new C_OSD_OnApplied{this, get_osdmap()->get_epoch(), info.last_update});
+      int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
+      assert(r == 0);
+    });
 }
 
 void ReplicatedPG::cancel_log_updates()
@@ -9900,8 +9912,8 @@ void ReplicatedPG::mark_all_unfound_lost(
          reply->set_tid(tid);
          con->send_message(reply);
        }
-      }
-      ));
+      }),
+    OpRequestRef());
 }
 
 void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)