]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: remove replay_queue
authorSage Weil <sage@redhat.com>
Tue, 20 Dec 2016 20:46:47 +0000 (15:46 -0500)
committerSage Weil <sage@redhat.com>
Thu, 29 Dec 2016 15:30:07 +0000 (10:30 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index b18e9a4b6b1901f618be013ca1da4f0b22605566..70f39fde5b49951609185248042d771f9b4a8e90 100644 (file)
@@ -1716,7 +1716,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_command_thread_timeout,
     cct->_conf->osd_command_thread_suicide_timeout,
     &command_tp),
-  replay_queue_lock("OSD::replay_queue_lock"),
   remove_wq(
     cct,
     store,
@@ -4385,10 +4384,6 @@ void OSD::tick()
     start_boot();
   }
 
-  if (is_active()) {
-    check_replay_queue();
-  }
-
   do_waiters();
 
   tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
@@ -8448,44 +8443,6 @@ void OSD::_remove_pg(PG *pg)
 // =========================================================
 // RECOVERY
 
-/*
- * caller holds osd_lock
- */
-void OSD::check_replay_queue()
-{
-  assert(osd_lock.is_locked());
-
-  utime_t now = ceph_clock_now();
-  list< pair<spg_t,utime_t> > pgids;
-  replay_queue_lock.Lock();
-  while (!replay_queue.empty() &&
-        replay_queue.front().second <= now) {
-    pgids.push_back(replay_queue.front());
-    replay_queue.pop_front();
-  }
-  replay_queue_lock.Unlock();
-
-  for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
-    spg_t pgid = p->first;
-    pg_map_lock.get_read();
-    if (pg_map.count(pgid)) {
-      PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
-      pg_map_lock.unlock();
-      dout(10) << "check_replay_queue " << *pg << dendl;
-      if ((pg->is_active() || pg->is_activating()) &&
-         pg->is_replay() &&
-          pg->is_primary() &&
-          pg->replay_until == p->second) {
-       pg->replay_queued_ops();
-      }
-      pg->unlock();
-    } else {
-      pg_map_lock.unlock();
-      dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
-    }
-  }
-}
-
 void OSDService::_maybe_queue_recovery() {
   assert(recovery_lock.is_locked_by_me());
   uint64_t available_pushes;
index e07e7804a8983b28a9d8bc7afee866bb2db6281b..d404d0254e5c1e889ed99fe56c290fc4d51adca8 100644 (file)
@@ -2311,10 +2311,6 @@ protected:
   void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
                   ThreadPool::TPHandle &handle);
 
-  Mutex replay_queue_lock;
-  list< pair<spg_t, utime_t > > replay_queue;
-  
-  void check_replay_queue();
 
   // -- scrubbing --
   void sched_scrub();
index 111f8287e6c8279834127c0e54b0924c4d21bad6..6ae86eb2a956fa81ecf0278e10ec3c8c7c25ab5a 100644 (file)
@@ -1921,41 +1921,6 @@ void PG::queue_op(OpRequestRef& op)
   }
 }
 
-void PG::replay_queued_ops()
-{
-  assert(is_replay());
-  assert(is_active() || is_activating());
-  eversion_t c = info.last_update;
-  list<OpRequestRef> replay;
-  dout(10) << "replay_queued_ops" << dendl;
-  state_clear(PG_STATE_REPLAY);
-
-  for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
-       p != replay_queue.end();
-       ++p) {
-    if (p->first.version != c.version+1) {
-      dout(10) << "activate replay " << p->first
-              << " skipping " << c.version+1 - p->first.version 
-              << " ops"
-              << dendl;      
-      c = p->first;
-    }
-    dout(10) << "activate replay " << p->first << " "
-             << *p->second->get_req() << dendl;
-    replay.push_back(p->second);
-  }
-  replay_queue.clear();
-  if (is_active()) {
-    requeue_ops(replay);
-    requeue_ops(waiting_for_active);
-    assert(waiting_for_peered.empty());
-  } else {
-    waiting_for_active.splice(waiting_for_active.begin(), replay);
-  }
-
-  publish_stats_to_osd();
-}
-
 void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
 {
   lock();
@@ -2241,24 +2206,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
   }
 }
 
-static void split_replay_queue(
-  map<eversion_t, OpRequestRef> *from,
-  map<eversion_t, OpRequestRef> *to,
-  unsigned match,
-  unsigned bits)
-{
-  for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
-       i != from->end();
-       ) {
-    if (OSD::split_request(i->second, match, bits)) {
-      to->insert(*i);
-      from->erase(i++);
-    } else {
-      ++i;
-    }
-  }
-}
-
 void PG::split_ops(PG *child, unsigned split_bits) {
   unsigned match = child->info.pgid.ps();
   assert(waiting_for_all_missing.empty());
@@ -2268,7 +2215,6 @@ void PG::split_ops(PG *child, unsigned split_bits) {
   assert(waiting_for_ack.empty());
   assert(waiting_for_ondisk.empty());
   assert(waiting_for_active.empty());
-  split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
 
   osd->dequeue_pg(this, &waiting_for_peered);
 
@@ -5099,15 +5045,6 @@ void PG::start_peering_interval(
     if (was_old_primary != is_primary()) {
       state_clear(PG_STATE_CLEAN);
       clear_publish_stats();
-       
-      // take replay queue waiters
-      list<OpRequestRef> ls;
-      for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
-          it != replay_queue.end();
-          ++it)
-       ls.push_back(it->second);
-      replay_queue.clear();
-      requeue_ops(ls);
     }
 
     on_role_change();
index 461fc18be8b961c12acaf45de82eb6214a88eee6..5c5c4aea79c473b7893debc1f8a8cb55710b4c13 100644 (file)
@@ -856,7 +856,6 @@ protected:
   map<eversion_t,
       list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
 
-  map<eversion_t,OpRequestRef>   replay_queue;
   void split_ops(PG *child, unsigned split_bits);
 
   void requeue_object_waiters(map<hobject_t, list<OpRequestRef>, hobject_t::BitwiseComparator>& m);
@@ -1032,7 +1031,6 @@ public:
   bool choose_acting(pg_shard_t &auth_log_shard,
                     bool *history_les_bound);
   void build_might_have_unfound();
-  void replay_queued_ops();
   void activate(
     ObjectStore::Transaction& t,
     epoch_t activation_epoch,