]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: misc fixes
authorSage Weil <sage@redhat.com>
Wed, 17 Jan 2018 16:23:15 +0000 (10:23 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Apr 2018 13:26:50 +0000 (08:26 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/ECBackend.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PGBackend.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h
src/osd/ReplicatedBackend.cc

index 09edb5e89be6a1c200fcb8759013ea236d172148..ea0e1a2db3417c8a499214a785c1b7f97f67f38c 100644 (file)
@@ -1323,7 +1323,7 @@ void ECBackend::filter_read_op(
 
   if (op.in_progress.empty()) {
     get_parent()->schedule_recovery_work(
-      get_parent()->bless_gencontext(
+      get_parent()->bless_unlocked_gencontext(
        new FinishReadOp(this, op.tid)));
   }
 }
index 77449bd1f0400a509ec97c075bf4752a8338dcdf..bb813010d8b43f13c05b8fc046ee6d2a29e5d69c 100644 (file)
@@ -2655,7 +2655,7 @@ int OSD::init()
   clear_temp_objects();
 
   // initialize osdmap references in sharded wq
-  op_shardedwq.prune_pg_waiters(osdmap, whoami);
+  op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
 
   // load up pgs (as they previously existed)
   load_pgs();
@@ -3810,6 +3810,9 @@ PGRef OSD::_open_pg(
     pg_map_size = pg_map.size();
     pg->get("PGMap");  // because it's in pg_map
     service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
+
+    // make sure we register any splits that happened between when the pg
+    // was created and our latest map.
     service.init_splits_between(pgid, createmap, servicemap);
   }
   return pg;
@@ -6566,9 +6569,13 @@ void OSD::ms_fast_dispatch(Message *m)
   case MSG_OSD_RECOVERY_RESERVE:
     {
       MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);
-      return enqueue_peering_evt(
-       pm->get_spg(),
-       PGPeeringEventRef(pm->get_event()));
+      if (require_osd_peer(pm)) {
+       enqueue_peering_evt(
+         pm->get_spg(),
+         PGPeeringEventRef(pm->get_event()));
+      }
+      pm->put();
+      return;
     }
   }
 
@@ -7999,7 +8006,7 @@ void OSD::consume_map()
 
   // remove any PGs which we no longer host from the session waiting_for_pg lists
   dout(20) << __func__ << " checking waiting_for_pg" << dendl;
-  op_shardedwq.prune_pg_waiters(osdmap, whoami);
+  op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
 
   service.maybe_inject_dispatch_delay();
 
@@ -8507,6 +8514,7 @@ void OSD::handle_fast_pg_create(MOSDPGCreate2 *m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
   if (!require_mon_peer(m)) {
+    m->put();
     return;
   }
   for (auto& p : m->pgs) {
@@ -8542,11 +8550,16 @@ void OSD::handle_fast_pg_create(MOSDPGCreate2 *m)
            true)
          )));
   }
+  m->put();
 }
 
 void OSD::handle_fast_pg_query(MOSDPGQuery *m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+  if (!require_osd_peer(m)) {
+    m->put();
+    return;
+  }
   int from = m->get_source().num();
   for (auto& p : m->pg_list) {
     enqueue_peering_evt(
@@ -8562,11 +8575,16 @@ void OSD::handle_fast_pg_query(MOSDPGQuery *m)
          false))
       );
   }
+  m->put();
 }
 
 void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+  if (!require_osd_peer(m)) {
+    m->put();
+    return;
+  }
   int from = m->get_source().num();
   for (auto& p : m->get_pg_list()) {
     spg_t pgid(p.first.info.pgid.pgid, p.first.to);
@@ -8590,11 +8608,16 @@ void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
            false)
          )));
   }
+  m->put();
 }
 
 void OSD::handle_fast_pg_info(MOSDPGInfo* m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+  if (!require_osd_peer(m)) {
+    m->put();
+    return;
+  }
   int from = m->get_source().num();
   for (auto& p : m->pg_list) {
     enqueue_peering_evt(
@@ -8608,11 +8631,16 @@ void OSD::handle_fast_pg_info(MOSDPGInfo* m)
            p.first.epoch_sent)))
       );
   }
+  m->put();
 }
 
 void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
 {
   dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+  if (!require_osd_peer(m)) {
+    m->put();
+    return;
+  }
   for (auto& pgid : m->pg_list) {
     enqueue_peering_evt(
       pgid,
@@ -8621,11 +8649,16 @@ void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
          m->get_epoch(), m->get_epoch(),
          PG::DeleteStart())));
   }
+  m->put();
 }
 
 void OSD::handle_fast_force_recovery(MOSDForceRecovery *m)
 {
   dout(10) << __func__ << " " << *m << dendl;
+  if (!require_mon_or_mgr_peer(m)) {
+    m->put();
+    return;
+  }
   epoch_t epoch = get_osdmap()->get_epoch();
   for (auto pgid : m->forced_pgs) {
     if (m->options & OFR_BACKFILL) {
@@ -9479,7 +9512,7 @@ void OSD::ShardedOpWQ::prime_splits(const set<spg_t>& pgs)
   }
 }
 
-void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
+void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
 {
   unsigned pushes_to_free = 0;
   bool queued = false;
@@ -9489,28 +9522,33 @@ void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
     auto p = sdata->pg_slots.begin();
     while (p != sdata->pg_slots.end()) {
       ShardData::pg_slot& slot = p->second;
+      if (slot.pending_nopg_epoch &&
+         slot.pending_nopg_epoch <= osdmap->get_epoch()) {
+       dout(20) << __func__ << "  " << p->first
+                << " pending_nopg_epoch " << slot.pending_nopg_epoch
+                << " < " << osdmap->get_epoch() << ", requeueing" << dendl;
+       assert(slot.waiting_for_pg);
+       assert(!slot.to_process.empty());
+       for (auto& q : slot.to_process) {
+         pushes_to_free += q.get_reserved_pushes();
+       }
+       for (auto i = slot.to_process.rbegin();
+            i != slot.to_process.rend();
+            ++i) {
+         sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+       }
+       slot.to_process.clear();
+       slot.waiting_for_pg = false;
+       slot.pending_nopg_epoch = 0;
+       ++slot.requeue_seq;
+       queued = true;
+       ++p;
+       continue;
+      }
       if (!slot.to_process.empty() && slot.num_running == 0) {
        if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
-         if (slot.pending_nopg) {
-           dout(20) << __func__ << "  " << p->first << " maps to us, pending create,"
-                    << " requeuing" << dendl;
-           for (auto& q : slot.to_process) {
-             pushes_to_free += q.get_reserved_pushes();
-           }
-           for (auto i = slot.to_process.rbegin();
-                i != slot.to_process.rend();
-                ++i) {
-             sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
-           }
-           slot.to_process.clear();
-           slot.waiting_for_pg = false;
-           slot.pending_nopg = false;
-           ++slot.requeue_seq;
-           queued = true;
-         } else {
-           dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
-                    << dendl;
-         }
+         dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
+                  << dendl;
          ++p;
          continue;
        }
@@ -9616,13 +9654,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     dout(30) << __func__ << " " << token
             << " to_process " << slot.to_process
             << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+    bool can_wait = item.requires_pg() && !item.creates_pg();
     slot.to_process.push_back(std::move(item));
     // note the requeue seq now...
     requeue_seq = slot.requeue_seq;
-    if (slot.waiting_for_pg) {
-      // save ourselves a bit of effort
+    if (slot.waiting_for_pg && can_wait) {
       dout(20) << __func__ << slot.to_process.back()
-              << " queued, waiting_for_pg" << dendl;
+              << " queued, already waiting_for_pg" << dendl;
       sdata->sdata_op_ordering_lock.Unlock();
       return;
     }
@@ -9663,7 +9701,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   --slot.num_running;
 
   if (slot.to_process.empty()) {
-    // raced with wake_pg_waiters or prune_pg_waiters
+    // raced with wake_pg_waiters or prune_or_wake_pg_waiters
     dout(20) << __func__ << " " << token
             << " nothing queued" << dendl;
     if (pg) {
@@ -9716,15 +9754,31 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
     const PGCreateInfo *create_info = qi.creates_pg();
     if (qi.get_map_epoch() > osdmap->get_epoch()) {
-      dout(20) << __func__ << " " << token
-              << " no pg, item epoch is "
-              << qi.get_map_epoch() << " > " << osdmap->get_epoch()
-              << ", will wait on " << qi << dendl;
       if (!!create_info || !qi.requires_pg()) {
-       slot.pending_nopg = true;
+       if (!slot.pending_nopg_epoch ||
+           slot.pending_nopg_epoch > qi.get_map_epoch()) {
+         slot.pending_nopg_epoch = qi.get_map_epoch();
+       }
+       dout(20) << __func__ << " " << token
+                << " no pg, item epoch is "
+                << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+                << ", will wait on " << qi
+                << ", pending_nopg_epoch now "
+                << slot.pending_nopg_epoch << dendl;
+      } else {
+       dout(20) << __func__ << " " << token
+                << " no pg, item epoch is "
+                << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+                << ", will wait on " << qi << dendl;
       }
       slot.to_process.push_front(std::move(qi));
       slot.waiting_for_pg = true;
+    } else if (!qi.requires_pg()) {
+      // for pg-less events, we run them under the ordering lock, since
+      // we don't have the pg lock to keep them ordered.
+      qi.run(osd, pg, tp_handle);
+      sdata->sdata_op_ordering_lock.Unlock();
+      return;
     } else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) {
       if (osd->service.splitting(token)) {
        dout(20) << __func__ << " " << token
@@ -9748,12 +9802,6 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
          }
          dout(20) << __func__ << " ignored create on " << qi << dendl;
        }
-      } else if (!qi.requires_pg()) {
-       // for pg-less events, we run them under the ordering lock, since
-       // we don't have the pg lock to keep them ordered.
-       qi.run(osd, pg, tp_handle);
-       sdata->sdata_op_ordering_lock.Unlock();
-       return;
       } else {
        dout(20) << __func__ << " " << token
                 << " no pg, should exist, will wait on " << qi << dendl;
index be71d8e59acc6cd5b2e668f85f534f4909c34335..cb526076b510ea2023841ec27b5dd320a05114c3 100644 (file)
@@ -1584,8 +1584,8 @@ private:
        /// to_process.  cleared by prune_pg_waiters.
        bool waiting_for_pg = false;
 
-       /// one or more queued items doesn't need a pg
-       bool pending_nopg = false;
+       /// one or more queued items doesn't need a pg, only a map >= this
+       epoch_t pending_nopg_epoch = 0;
 
        /// incremented by wake_pg_waiters; indicates racing _process threads
        /// should bail out (their op has been requeued)
@@ -1594,7 +1594,7 @@ private:
 
       /// map of slots for each spg_t.  maintains ordering of items dequeued
       /// from pqueue while _process thread drops shard lock to acquire the
-      /// pg lock.  slots are removed only by prune_pg_waiters.
+      /// pg lock.  slots are removed only by prune_or_wake_pg_waiters.
       unordered_map<spg_t,pg_slot> pg_slots;
 
       /// priority queue
@@ -1681,7 +1681,7 @@ private:
     void prime_splits(const set<spg_t>& pgs);
 
     /// prune ops (and possibly pg_slots) for pgs that shouldn't be here
-    void prune_pg_waiters(OSDMapRef osdmap, int whoami);
+    void prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami);
 
     /// clear cached PGRef on pg deletion
     void clear_pg_pointer(PG *pg);
@@ -1876,7 +1876,8 @@ public:
 
 protected:
   PGRef _open_pg(
-    OSDMapRef createmap, OSDMapRef servicemap,
+    OSDMapRef createmap,   ///< map pg is created in
+    OSDMapRef servicemap,  ///< latest service map
     spg_t pg);
   PG *_open_lock_pg(
     OSDMapRef createmap,
index 0721d747d7586c8762a506684b95bc1447d9c4aa..320f3a320252aeec1140ca98097573866e42ca30 100644 (file)
@@ -6324,6 +6324,8 @@ void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
   } else {
     recovery_state.handle_event(evt, rctx);
   }
+  // write_if_dirty regardless of path above to ensure we capture any work
+  // done by OSD::advance_pg().
   write_if_dirty(*rctx->transaction);
 }
 
index 70c967d0d9efcd4403f4bf997c16b20dcd18571e..2f9b53c377ce65259c82c31a17729aad97f3119b 100644 (file)
@@ -142,6 +142,8 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
      virtual Context *bless_context(Context *c) = 0;
      virtual GenContext<ThreadPool::TPHandle&> *bless_gencontext(
        GenContext<ThreadPool::TPHandle&> *c) = 0;
+     virtual GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+       GenContext<ThreadPool::TPHandle&> *c) = 0;
 
      virtual void send_message(int to_osd, Message *m) = 0;
      virtual void queue_transaction(
index 9198238d3843949e17228b8e304f293ef551549f..ba57f0d4f2a01b39197c86aca5122b611d3cd3a8 100644 (file)
@@ -137,6 +137,33 @@ GenContext<ThreadPool::TPHandle&> *PrimaryLogPG::bless_gencontext(
     this, c, get_osdmap()->get_epoch());
 }
 
+template <typename T>
+class PrimaryLogPG::UnlockedBlessedGenContext : public GenContext<T> {
+  PrimaryLogPGRef pg;
+  unique_ptr<GenContext<T>> c;
+  epoch_t e;
+public:
+  UnlockedBlessedGenContext(PrimaryLogPG *pg, GenContext<T> *c, epoch_t e)
+    : pg(pg), c(c), e(e) {}
+  void finish(T t) override {
+    if (pg->pg_has_reset_since(e))
+      c.reset();
+    else
+      c.release()->complete(t);
+  }
+  bool sync_finish(T t) {
+    // we assume here all blessed/wrapped Contexts can complete synchronously.
+    c.release()->complete(t);
+    return true;
+  }
+};
+
+GenContext<ThreadPool::TPHandle&> *PrimaryLogPG::bless_unlocked_gencontext(
+  GenContext<ThreadPool::TPHandle&> *c) {
+  return new UnlockedBlessedGenContext<ThreadPool::TPHandle&>(
+    this, c, get_osdmap()->get_epoch());
+}
+
 class PrimaryLogPG::BlessedContext : public Context {
   PrimaryLogPGRef pg;
   unique_ptr<Context> c;
@@ -159,7 +186,6 @@ public:
   }
 };
 
-
 Context *PrimaryLogPG::bless_context(Context *c) {
   return new BlessedContext(this, c, get_osdmap()->get_epoch());
 }
index 504adafcf0280fb7136711fc4cc75999fab1e3ee..b2d3c860062db8b0136973689c25a3b9baa510a3 100644 (file)
@@ -299,11 +299,14 @@ public:
                             Context *on_complete) override;
 
   template<class T> class BlessedGenContext;
+  template<class T> class UnlockedBlessedGenContext;
   class BlessedContext;
   Context *bless_context(Context *c) override;
 
   GenContext<ThreadPool::TPHandle&> *bless_gencontext(
     GenContext<ThreadPool::TPHandle&> *c) override;
+  GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) override;
     
   void send_message(int to_osd, Message *m) override {
     osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
index 5adaa4af058073c050fe2eb20846a4a1277c2d4b..2ad145988f0a433b8ee4b1b1702cff24b810fd7a 100644 (file)
@@ -821,7 +821,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
     t.register_on_complete(
       new PG_RecoveryQueueAsync(
        get_parent(),
-       get_parent()->bless_gencontext(c)));
+       get_parent()->bless_unlocked_gencontext(c)));
   }
   replies.erase(replies.end() - 1);