]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,PG: Move pg accesible methods, objects to OSDService
authorSamuel Just <sam.just@inktank.com>
Thu, 14 Jun 2012 02:05:47 +0000 (19:05 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:14:59 +0000 (10:14 -0700)
In order to clarify data structure locking, PGs will now access
OSDService rather the the OSD directly.  Over time, more structures will
be moved to the OSDService.  osd_lock can no longer be held while pg
locks are held.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 2575057dc2fa358c0e8ea8bdd83f8dad9bb391ce..a2972bf126a092bc61fcdd8a67d080e764b79562 100644 (file)
 
 #define dout_subsys ceph_subsys_osd
 #undef dout_prefix
-#define dout_prefix _prefix(*_dout, whoami, osdmap)
+#define dout_prefix _prefix(*_dout, whoami, get_osdmap())
 
 static ostream& _prefix(std::ostream* _dout, int whoami, OSDMapRef osdmap) {
-  return *_dout << "osd." << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " ";
+  return *_dout << "osd." << whoami << " "
+               << (osdmap ? osdmap->get_epoch():0)
+               << " ";
 }
 
 const coll_t coll_t::META_COLL("meta");
@@ -136,6 +138,53 @@ static CompatSet get_osd_compat_set() {
                   ceph_osd_feature_incompat);
 }
 
+OSDService::OSDService(OSD *osd) :
+  osd(osd),
+  whoami(osd->whoami), store(osd->store), clog(osd->clog),
+  pg_recovery_stats(osd->pg_recovery_stats),
+  cluster_messenger(osd->cluster_messenger),
+  client_messenger(osd->client_messenger),
+  logger(osd->logger),
+  monc(osd->monc),
+  op_wq(osd->op_wq),
+  peering_wq(osd->peering_wq),
+  recovery_wq(osd->recovery_wq),
+  snap_trim_wq(osd->snap_trim_wq),
+  scrub_wq(osd->scrub_wq),
+  scrub_finalize_wq(osd->scrub_finalize_wq),
+  remove_wq(osd->remove_wq),
+  rep_scrub_wq(osd->rep_scrub_wq),
+  class_handler(osd->class_handler),
+  publish_lock("OSDService::publish_lock"),
+  sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
+  scrubs_active(0),
+  watch_lock(osd->watch_lock),
+  watch_timer(osd->watch_timer),
+  watch(osd->watch),
+  last_tid(0),
+  tid_lock("OSDService::tid_lock"),
+  pg_temp_lock("OSDService::pg_temp_lock"),
+  map_cache_lock("OSDService::map_lock"),
+  map_cache(g_conf->osd_map_cache_size),
+  map_bl_cache(g_conf->osd_map_cache_size),
+  map_bl_inc_cache(g_conf->osd_map_cache_size)
+{}
+
+void OSDService::need_heartbeat_peer_update()
+{
+  osd->need_heartbeat_peer_update();
+}
+
+void OSDService::pg_stat_queue_enqueue(PG *pg)
+{
+  osd->pg_stat_queue_enqueue(pg);
+}
+
+void OSDService::pg_stat_queue_dequeue(PG *pg)
+{
+  osd->pg_stat_queue_dequeue(pg);
+}
+
 ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev)
 {
   struct stat st;
@@ -647,32 +696,24 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
   peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
-  map_cache_lock("OSD::map_cache_lock"),
-  map_cache(g_conf->osd_map_cache_size),
-  map_bl_cache(g_conf->osd_map_cache_bl_size),
-  map_bl_inc_cache(g_conf->osd_map_cache_bl_inc_size),
   outstanding_pg_stats(false),
   up_thru_wanted(0), up_thru_pending(0),
   pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
   osd_stat_updated(false),
   pg_stat_tid(0), pg_stat_tid_flushed(0),
-  last_tid(0),
-  tid_lock("OSD::tid_lock"),
   command_wq(this, g_conf->osd_command_thread_timeout, &command_tp),
   recovery_ops_active(0),
   recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
   remove_list_lock("OSD::remove_list_lock"),
   replay_queue_lock("OSD::replay_queue_lock"),
   snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp),
-  sched_scrub_lock("OSD::sched_scrub_lock"),
-  scrubs_pending(0),
-  scrubs_active(0),
   scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
   scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
   rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
   remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
   watch_lock("OSD::watch_lock"),
-  watch_timer(external_messenger->cct, watch_lock)
+  watch_timer(external_messenger->cct, watch_lock),
+  service(this)
 {
   monc->set_messenger(client_messenger);
 }
@@ -756,6 +797,7 @@ int OSD::init()
     delete store;
     return -EINVAL;
   }
+  service.publish_superblock(superblock);
 
   class_handler = new ClassHandler();
   cls_initialize(class_handler);
@@ -1187,7 +1229,7 @@ PG *OSD::_open_lock_pg(pg_t pgid, bool no_lockdep_check, bool hold_map_lock)
   hobject_t logoid = make_pg_log_oid(pgid);
   hobject_t infooid = make_pg_biginfo_oid(pgid);
   if (osdmap->get_pg_type(pgid) == pg_pool_t::TYPE_REP)
-    pg = new ReplicatedPG(this, osdmap, pool, pgid, logoid, infooid);
+    pg = new ReplicatedPG(&service, osdmap, pool, pgid, logoid, infooid);
   else 
     assert(0);
 
@@ -1315,7 +1357,7 @@ void OSD::load_pgs()
     // read pg state, log
     pg->read_state(store);
 
-    reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+    service.reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
 
     // generate state for current mapping
     osdmap->pg_to_up_acting_osds(pgid, pg->up, pg->acting);
@@ -1835,7 +1877,7 @@ void OSD::tick()
   // periodically kick recovery work queue
   recovery_tp.kick();
   
-  if (scrub_should_schedule()) {
+  if (service.scrub_should_schedule()) {
     sched_scrub();
   }
 
@@ -1929,7 +1971,7 @@ void OSD::do_mon_report()
 
   // do any pending reports
   send_alive();
-  send_pg_temp();
+  service.send_pg_temp();
   send_failures();
   send_pg_stats(now);
 }
@@ -1943,7 +1985,7 @@ void OSD::ms_handle_connect(Connection *con)
       start_boot();
     } else {
       send_alive();
-      send_pg_temp();
+      service.send_pg_temp();
       send_failures();
       send_pg_stats(ceph_clock_now(g_ceph_context));
     }
@@ -2179,13 +2221,15 @@ void OSD::send_alive()
   }
 }
 
-void OSD::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
 {
+  Mutex::Locker l(pg_temp_lock);
   pg_temp_wanted[pgid] = want;
 }
 
-void OSD::send_pg_temp()
+void OSDService::send_pg_temp()
 {
+  Mutex::Locker l(pg_temp_lock);
   if (pg_temp_wanted.empty())
     return;
   dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
@@ -3010,12 +3054,15 @@ void OSD::handle_scrub(MOSDScrub *m)
   m->put();
 }
 
-bool OSD::scrub_should_schedule()
+bool OSDService::scrub_should_schedule()
 {
   double loadavgs[1];
 
+  // TODOSAM: is_active should be conveyed to OSDService
+  /*
   if (!is_active())
     return false;
+  */
 
   if (getloadavg(loadavgs, 1) != 1) {
     dout(10) << "scrub_should_schedule couldn't read loadavgs\n" << dendl;
@@ -3050,18 +3097,13 @@ void OSD::sched_scrub()
 
   dout(20) << "sched_scrub" << dendl;
 
-  pair<utime_t,pg_t> pos;
   utime_t max = ceph_clock_now(g_ceph_context);
   max -= g_conf->osd_scrub_max_interval;
   
-  sched_scrub_lock.Lock();
-
   //dout(20) << " " << last_scrub_pg << dendl;
 
-  set< pair<utime_t,pg_t> >::iterator p = last_scrub_pg.begin();
-  while (p != last_scrub_pg.end()) {
-    //dout(10) << "pos is " << *p << dendl;
-    pos = *p;
+  pair<utime_t, pg_t> pos;
+  while (service.next_scrub_stamp(pos, &pos)) {
     utime_t t = pos.first;
     pg_t pgid = pos.second;
 
@@ -3072,30 +3114,19 @@ void OSD::sched_scrub()
     }
 
     dout(10) << " on " << t << " " << pgid << dendl;
-    sched_scrub_lock.Unlock();
     PG *pg = _lookup_lock_pg(pgid);
     if (pg) {
       if (pg->is_active() && !pg->sched_scrub()) {
        pg->unlock();
-       sched_scrub_lock.Lock();
        break;
       }
       pg->unlock();
     }
-    sched_scrub_lock.Lock();
-
-    // next!
-    p = last_scrub_pg.lower_bound(pos);
-    //dout(10) << "lb is " << *p << dendl;
-    if (p != last_scrub_pg.end())
-      p++;
   }    
-  sched_scrub_lock.Unlock();
-
   dout(20) << "sched_scrub done" << dendl;
 }
 
-bool OSD::inc_scrubs_pending()
+bool OSDService::inc_scrubs_pending()
 {
   bool result = false;
 
@@ -3113,7 +3144,7 @@ bool OSD::inc_scrubs_pending()
   return result;
 }
 
-void OSD::dec_scrubs_pending()
+void OSDService::dec_scrubs_pending()
 {
   sched_scrub_lock.Lock();
   dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
@@ -3123,7 +3154,7 @@ void OSD::dec_scrubs_pending()
   sched_scrub_lock.Unlock();
 }
 
-void OSD::dec_scrubs_active()
+void OSDService::dec_scrubs_active()
 {
   sched_scrub_lock.Lock();
   dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
@@ -3331,6 +3362,7 @@ void OSD::handle_osd_map(MOSDMap *m)
        note_down_osd(*p);
     
     osdmap = newmap;
+    service.publish_map(newmap);
 
     superblock.current_epoch = cur;
     advance_map(t, fin);
@@ -3423,6 +3455,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     shutdown();
     return;
   }
+  service.publish_superblock(superblock);
 
   clear_map_bl_cache_pins();
   map_lock.put_write();
@@ -3697,63 +3730,63 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la
   }
 }
 
-bool OSD::_get_map_bl(epoch_t e, bufferlist& bl)
+bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
 {
   bool found = map_bl_cache.lookup(e, &bl);
   if (found)
     return true;
   found = store->read(
-    coll_t::META_COLL, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+    coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
   if (found)
     _add_map_bl(e, bl);
   return found;
 }
 
-bool OSD::get_inc_map_bl(epoch_t e, bufferlist& bl)
+bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
 {
   Mutex::Locker l(map_cache_lock);
   bool found = map_bl_inc_cache.lookup(e, &bl);
   if (found)
     return true;
   found = store->read(
-    coll_t::META_COLL, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+    coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
   if (found)
     _add_map_inc_bl(e, bl);
   return found;
 }
 
-void OSD::_add_map_bl(epoch_t e, bufferlist& bl)
+void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
 {
   dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
   map_bl_cache.add(e, bl);
 }
 
-void OSD::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
 {
   dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
   map_bl_inc_cache.add(e, bl);
 }
 
-void OSD::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
 {
   Mutex::Locker l(map_cache_lock);
   map_bl_inc_cache.pin(e, bl);
 }
 
-void OSD::pin_map_bl(epoch_t e, bufferlist &bl)
+void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
 {
   Mutex::Locker l(map_cache_lock);
   map_bl_cache.pin(e, bl);
 }
 
-void OSD::clear_map_bl_cache_pins()
+void OSDService::clear_map_bl_cache_pins()
 {
   Mutex::Locker l(map_cache_lock);
   map_bl_inc_cache.clear_pinned();
   map_bl_cache.clear_pinned();
 }
 
-OSDMapRef OSD::_add_map(OSDMap *o)
+OSDMapRef OSDService::_add_map(OSDMap *o)
 {
   epoch_t e = o->get_epoch();
 
@@ -3768,7 +3801,7 @@ OSDMapRef OSD::_add_map(OSDMap *o)
   return l;
 }
 
-OSDMapRef OSD::get_map(epoch_t epoch)
+OSDMapRef OSDService::get_map(epoch_t epoch)
 {
   Mutex::Locker l(map_cache_lock);
   OSDMapRef retval = map_cache.lookup(epoch);
@@ -4715,7 +4748,7 @@ void OSD::_remove_pg(PG *pg)
   // remove from map
   pg_map.erase(pgid);
   pg->put(); // since we've taken it out of map
-  unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
+  service.unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
 
   _put_pool(pg->pool);
 
@@ -4769,7 +4802,7 @@ void OSD::check_replay_queue()
 }
 
 
-bool OSD::queue_for_recovery(PG *pg)
+bool OSDService::queue_for_recovery(PG *pg)
 {
   bool b = recovery_wq.queue(pg);
   if (b)
@@ -4924,12 +4957,12 @@ void OSD::defer_recovery(PG *pg)
 // =========================================================
 // OPS
 
-void OSD::reply_op_error(OpRequestRef op, int err)
+void OSDService::reply_op_error(OpRequestRef op, int err)
 {
   reply_op_error(op, err, eversion_t());
 }
 
-void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -4944,7 +4977,7 @@ void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
   msgr->send_message(reply, m->get_connection());
 }
 
-void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
+void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
 {
   MOSDOp *m = (MOSDOp*)op->request;
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -4990,14 +5023,14 @@ void OSD::handle_op(OpRequestRef op)
   if (m->get_oid().name.size() > MAX_CEPH_OBJECT_NAME_LEN) {
     dout(4) << "handle_op '" << m->get_oid().name << "' is longer than "
            << MAX_CEPH_OBJECT_NAME_LEN << " bytes!" << dendl;
-    reply_op_error(op, -ENAMETOOLONG);
+    service.reply_op_error(op, -ENAMETOOLONG);
     return;
   }
 
   // blacklisted?
   if (osdmap->is_blacklisted(m->get_source_addr())) {
     dout(4) << "handle_op " << m->get_source_addr() << " is blacklisted" << dendl;
-    reply_op_error(op, -EBLACKLISTED);
+    service.reply_op_error(op, -EBLACKLISTED);
     return;
   }
 
@@ -5007,7 +5040,7 @@ void OSD::handle_op(OpRequestRef op)
 
   int r = init_op_flags(m);
   if (r) {
-    reply_op_error(op, r);
+    service.reply_op_error(op, r);
     return;
   }
 
@@ -5015,13 +5048,13 @@ void OSD::handle_op(OpRequestRef op)
     // full?
     if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
        !m->get_source().is_mds()) {  // FIXME: we'll exclude mds writes for now.
-      reply_op_error(op, -ENOSPC);
+      service.reply_op_error(op, -ENOSPC);
       return;
     }
 
     // invalid?
     if (m->get_snapid() != CEPH_NOSNAP) {
-      reply_op_error(op, -EINVAL);
+      service.reply_op_error(op, -EINVAL);
       return;
     }
 
@@ -5029,7 +5062,7 @@ void OSD::handle_op(OpRequestRef op)
     if (g_conf->osd_max_write_size &&
        m->get_data_len() > g_conf->osd_max_write_size << 20) {
       // journal can't hold commit!
-      reply_op_error(op, -OSD_WRITETOOBIG);
+      service.reply_op_error(op, -OSD_WRITETOOBIG);
       return;
     }
   }
@@ -5070,7 +5103,7 @@ void OSD::handle_op(OpRequestRef op)
       dout(7) << "dropping request; client will resend when they get new map" << dendl;
     } else {
       dout(7) << "we are invalid target" << dendl;
-      handle_misdirected_op(NULL, op);
+      service.handle_misdirected_op(NULL, op);
     }
     return;
   } else if (!op_has_sufficient_caps(pg, m)) {
@@ -5223,16 +5256,22 @@ PG *OSD::OpWQ::_dequeue()
   return pg;
 }
 
-void OSD::queue_for_peering(PG *pg)
+void OSDService::queue_for_peering(PG *pg)
 {
   peering_wq.queue(pg);
 }
 
-void OSD:: queue_for_op(PG *pg)
+void OSDService::queue_for_op(PG *pg)
 {
   op_wq.queue(pg);
 }
 
+void OSDService::queue_for_removal(epoch_t epoch, int osdnum, pg_t pgid) {
+  osd->remove_list_lock.Lock();
+  osd->remove_list[epoch][osdnum].push_back(pgid);
+  osd->remove_list_lock.Unlock();
+}
+
 void OSD::process_peering_event(PG *pg)
 {
   map< int, map<pg_t, pg_query_t> > query_map;
@@ -5280,42 +5319,8 @@ void OSD::process_peering_event(PG *pg)
     do_notifies(notify_list);
     do_queries(query_map);
     do_infos(info_map);
-    send_pg_temp();
   }
-}
-
-/*
- * requeue ops at _front_ of queue.  these are previously queued
- * operations that need to get requeued ahead of anything the dispatch
- * thread is currently chewing on so as not to violate ordering from
- * the clients' perspective.
- */
-void OSD::requeue_ops(PG *pg, list<OpRequestRef>& ls)
-{
-  dout(15) << *pg << " requeue_ops " << ls << dendl;
-  assert(pg->is_locked());
-
-  // you can't call this on pg->op_queue!
-  assert(&ls != &pg->op_queue);
-
-  // set current queue contents aside..
-  list<OpRequestRef> orig_queue;
-  orig_queue.swap(pg->op_queue);
-
-  // grab whole list at once, in case methods we call below start adding things
-  // back on the list reference we were passed!
-  list<OpRequestRef> q;
-  q.swap(ls);
-
-  // requeue old items, now at front.
-  while (!q.empty()) {
-    OpRequestRef op = q.front();
-    q.pop_front();
-    enqueue_op(pg, op);
-  }
-
-  // put orig queue contents back in line, after the stuff we requeued.
-  pg->op_queue.splice(pg->op_queue.end(), orig_queue);
+  service.send_pg_temp();
 }
 
 /*
index 7ab8de231a7dbf02f73bdd6f65608064cd30830c..4999878890998e8fd7d96385c2dc3ec7538ece7c 100644 (file)
@@ -128,6 +128,167 @@ class OpsFlightSocketHook;
 
 extern const coll_t meta_coll;
 
+class OSD;
+class OSDService {
+public:
+  OSD *osd;
+  const int whoami;
+  ObjectStore *&store;
+  LogClient &clog;
+  PGRecoveryStats &pg_recovery_stats;
+  Messenger *&cluster_messenger;
+  Messenger *&client_messenger;
+  PerfCounters *&logger;
+  MonClient   *&monc;
+  ThreadPool::WorkQueue<PG> &op_wq;
+  ThreadPool::WorkQueue<PG> &peering_wq;
+  ThreadPool::WorkQueue<PG> &recovery_wq;
+  ThreadPool::WorkQueue<PG> &snap_trim_wq;
+  ThreadPool::WorkQueue<PG> &scrub_wq;
+  ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
+  ThreadPool::WorkQueue<PG> &remove_wq;
+  ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
+  ClassHandler  *&class_handler;
+
+  // -- superblock --
+  Mutex publish_lock;
+  OSDSuperblock superblock;
+  OSDSuperblock get_superblock() {
+    Mutex::Locker l(publish_lock);
+    return superblock;
+  }
+  void publish_superblock(OSDSuperblock block) {
+    Mutex::Locker l(publish_lock);
+    superblock = block;
+  }
+  OSDMapRef osdmap;
+  OSDMapRef get_osdmap() {
+    Mutex::Locker l(publish_lock);
+    return osdmap;
+  }
+  void publish_map(OSDMapRef map) {
+    Mutex::Locker l(publish_lock);
+    osdmap = map;
+  }
+
+
+  int get_nodeid() const { return whoami; }
+
+  // -- scrub scheduling --
+  Mutex sched_scrub_lock;
+  int scrubs_pending;
+  int scrubs_active;
+  set< pair<utime_t,pg_t> > last_scrub_pg;
+
+  bool scrub_should_schedule();
+
+  void reg_last_pg_scrub(pg_t pgid, utime_t t) {
+    Mutex::Locker l(sched_scrub_lock);
+    last_scrub_pg.insert(pair<utime_t,pg_t>(t, pgid));
+  }
+  void unreg_last_pg_scrub(pg_t pgid, utime_t t) {
+    Mutex::Locker l(sched_scrub_lock);
+    pair<utime_t,pg_t> p(t, pgid);
+    assert(last_scrub_pg.count(p));
+    last_scrub_pg.erase(p);
+  }
+  bool next_scrub_stamp(pair<utime_t, pg_t> after,
+                       pair<utime_t, pg_t> *out) {
+    Mutex::Locker l(sched_scrub_lock);
+    if (last_scrub_pg.size() == 0) return false;
+    set< pair<utime_t, pg_t> >::iterator iter = last_scrub_pg.lower_bound(after);
+    if (iter == last_scrub_pg.end()) return false;
+    ++iter;
+    if (iter == last_scrub_pg.end()) return false;
+    *out = *iter;
+    return true;
+  }
+
+  bool inc_scrubs_pending();
+  void dec_scrubs_pending();
+  void dec_scrubs_active();
+
+  void reply_op_error(OpRequestRef op, int err);
+  void reply_op_error(OpRequestRef op, int err, eversion_t v);
+  void handle_misdirected_op(PG *pg, OpRequestRef op);
+
+  // -- Watch --
+  Mutex &watch_lock;
+  SafeTimer &watch_timer;
+  Watch *watch;
+
+  // -- tids --
+  // for ops i issue
+  tid_t last_tid;
+  Mutex tid_lock;
+  tid_t get_tid() {
+    tid_t t;
+    tid_lock.Lock();
+    t = ++last_tid;
+    tid_lock.Unlock();
+    return t;
+  }
+
+  // -- pg_temp --
+  Mutex pg_temp_lock;
+  map<pg_t, vector<int> > pg_temp_wanted;
+  void queue_want_pg_temp(pg_t pgid, vector<int>& want);
+  void send_pg_temp();
+
+  void queue_for_peering(PG *pg);
+  void queue_for_op(PG *pg);
+  bool queue_for_recovery(PG *pg);
+  bool queue_for_snap_trim(PG *pg) {
+    return snap_trim_wq.queue(pg);
+  }
+  bool queue_for_scrub(PG *pg) {
+    return scrub_wq.queue(pg);
+  }
+  void queue_for_removal(epoch_t epoch, int osd, pg_t pgid);
+
+  // osd map cache (past osd maps)
+  Mutex map_cache_lock;
+  SharedLRU<epoch_t, OSDMap> map_cache;
+  SimpleLRU<epoch_t, bufferlist> map_bl_cache;
+  SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
+
+
+  OSDMapRef get_map(epoch_t e);
+  OSDMapRef add_map(OSDMap *o) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map(o);
+  }
+  OSDMapRef _add_map(OSDMap *o);
+
+  void add_map_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map_bl(e, bl);
+  }
+  void pin_map_bl(epoch_t e, bufferlist &bl);
+  void _add_map_bl(epoch_t e, bufferlist& bl);
+  bool get_map_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _get_map_bl(e, bl);
+  }
+  bool _get_map_bl(epoch_t e, bufferlist& bl);
+
+  void add_map_inc_bl(epoch_t e, bufferlist& bl) {
+    Mutex::Locker l(map_cache_lock);
+    return _add_map_inc_bl(e, bl);
+  }
+  void pin_map_inc_bl(epoch_t e, bufferlist &bl);
+  void _add_map_inc_bl(epoch_t e, bufferlist& bl);
+  bool get_inc_map_bl(epoch_t e, bufferlist& bl);
+
+  void clear_map_bl_cache_pins();
+
+  void need_heartbeat_peer_update();
+
+  void pg_stat_queue_enqueue(PG *pg);
+  void pg_stat_queue_dequeue(PG *pg);
+
+  OSDService(OSD *osd);
+};
 class OSD : public Dispatcher {
   /** OSD **/
 protected:
@@ -363,7 +524,6 @@ private:
   } op_wq;
 
   void enqueue_op(PG *pg, OpRequestRef op);
-  void requeue_ops(PG *pg, list<OpRequestRef>& ls);
   void dequeue_op(PG *pg);
   static void static_dequeueop(OSD *o, PG *pg) {
     o->dequeue_op(pg);
@@ -404,8 +564,6 @@ private:
     }
   } peering_wq;
 
-  void queue_for_peering(PG *pg);
-  void queue_for_op(PG *pg);
   void process_peering_event(PG *pg);
 
   friend class PG;
@@ -416,6 +574,9 @@ private:
 
   // -- osd map --
   OSDMapRef       osdmap;
+  OSDMapRef get_osdmap() {
+    return osdmap;
+  }
   utime_t         had_map_since;
   RWLock          map_lock;
   list<OpRequestRef>  waiting_for_osdmap;
@@ -441,41 +602,34 @@ private:
   void activate_map();
 
   // osd map cache (past osd maps)
-  Mutex map_cache_lock;
-  SharedLRU<epoch_t, OSDMap> map_cache;
-  SimpleLRU<epoch_t, bufferlist> map_bl_cache;
-  SimpleLRU<epoch_t, bufferlist> map_bl_inc_cache;
-
-
-  OSDMapRef get_map(epoch_t e);
+  OSDMapRef get_map(epoch_t e) {
+    return service.get_map(e);
+  }
   OSDMapRef add_map(OSDMap *o) {
-    Mutex::Locker l(map_cache_lock);
-    return _add_map(o);
+    return service.add_map(o);
   }
-  OSDMapRef _add_map(OSDMap *o);
-
   void add_map_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
-    return _add_map_bl(e, bl);
+    return service.add_map_bl(e, bl);
+  }
+  void pin_map_bl(epoch_t e, bufferlist &bl) {
+    return service.pin_map_bl(e, bl);
   }
-  void pin_map_bl(epoch_t e, bufferlist &bl);
-  void _add_map_bl(epoch_t e, bufferlist& bl);
   bool get_map_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
-    return _get_map_bl(e, bl);
+    return service.get_map_bl(e, bl);
   }
-  bool _get_map_bl(epoch_t e, bufferlist& bl);
-
   void add_map_inc_bl(epoch_t e, bufferlist& bl) {
-    Mutex::Locker l(map_cache_lock);
-    return _add_map_inc_bl(e, bl);
+    return service.add_map_inc_bl(e, bl);
+  }
+  void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
+    return service.pin_map_inc_bl(e, bl);
+  }
+  bool get_inc_map_bl(epoch_t e, bufferlist& bl) {
+    return service.get_inc_map_bl(e, bl);
+  }
+  void clear_map_bl_cache_pins() {
+    service.clear_map_bl_cache_pins();
   }
-  void pin_map_inc_bl(epoch_t e, bufferlist &bl);
-  void _add_map_inc_bl(epoch_t e, bufferlist& bl);
-  bool get_inc_map_bl(epoch_t e, bufferlist& bl);
 
-  void clear_map_bl_cache_pins();
-  
   MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to);
   void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
   void send_map(MOSDMap *m, const entity_inst_t& inst, bool lazy);
@@ -579,12 +733,6 @@ protected:
   void queue_want_up_thru(epoch_t want);
   void send_alive();
 
-  // -- pg_temp --
-  map<pg_t, vector<int> > pg_temp_wanted;
-
-  void queue_want_pg_temp(pg_t pgid, vector<int>& want);
-  void send_pg_temp();
-
   // -- failures --
   set<int> failure_queue;
   map<int,entity_inst_t> failure_pending;
@@ -632,22 +780,10 @@ protected:
     pg_stat_queue_lock.Unlock();
   }
 
-
-  // -- tids --
-  // for ops i issue
-  tid_t               last_tid;
-
-  Mutex tid_lock;
   tid_t get_tid() {
-    tid_t t;
-    tid_lock.Lock();
-    t = ++last_tid;
-    tid_lock.Unlock();
-    return t;
+    return service.get_tid();
   }
 
-
-
   // -- generic pg peering --
   void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list);
   void do_queries(map< int, map<pg_t,pg_query_t> >& query_map);
@@ -788,7 +924,6 @@ protected:
     }
   } recovery_wq;
 
-  bool queue_for_recovery(PG *pg);
   void start_recovery_op(PG *pg, const hobject_t& soid);
   void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
   void defer_recovery(PG *pg);
@@ -798,12 +933,6 @@ protected:
   Mutex remove_list_lock;
   map<epoch_t, map<int, vector<pg_t> > > remove_list;
 
-  void queue_for_removal(epoch_t epoch, int osd, pg_t pgid) {
-    remove_list_lock.Lock();
-    remove_list[epoch][osd].push_back(pgid);
-    remove_list_lock.Unlock();
-  }
-
   // replay / delayed pg activation
   Mutex replay_queue_lock;
   list< pair<pg_t, utime_t > > replay_queue;
@@ -848,31 +977,9 @@ protected:
     }
   } snap_trim_wq;
 
-  // -- scrub scheduling --
-  Mutex sched_scrub_lock;
-  int scrubs_pending;
-  int scrubs_active;
-  set< pair<utime_t,pg_t> > last_scrub_pg;
-
-  bool scrub_should_schedule();
-  void sched_scrub();
-
-  void reg_last_pg_scrub(pg_t pgid, utime_t t) {
-    Mutex::Locker l(sched_scrub_lock);
-    last_scrub_pg.insert(pair<utime_t,pg_t>(t, pgid));
-  }
-  void unreg_last_pg_scrub(pg_t pgid, utime_t t) {
-    Mutex::Locker l(sched_scrub_lock);
-    pair<utime_t,pg_t> p(t, pgid);
-    assert(last_scrub_pg.count(p));
-    last_scrub_pg.erase(p);
-  }
-
-  bool inc_scrubs_pending();
-  void dec_scrubs_pending();
-  void dec_scrubs_active();
 
   // -- scrubbing --
+  void sched_scrub();
   xlist<PG*> scrub_queue;
 
 
@@ -1112,10 +1219,6 @@ public:
 
   void handle_signal(int signum);
 
-  void reply_op_error(OpRequestRef op, int r);
-  void reply_op_error(OpRequestRef op, int r, eversion_t v);
-  void handle_misdirected_op(PG *pg, OpRequestRef op);
-
   void handle_rep_scrub(MOSDRepScrub *m);
   void handle_scrub(class MOSDScrub *m);
   void handle_osd_ping(class MOSDPing *m);
@@ -1146,6 +1249,8 @@ public:
                            ReplicatedPG *pg,
                            entity_name_t entity,
                            utime_t expire);
+  OSDService service;
+  friend class OSDService;
 };
 
 //compatibility of the executable
index 902a43a7a8c1b90e15021cdaf0808700137792cd..efbe3dcdb5d29991a2b9d0f8b13a5ab2d76e0506 100644 (file)
@@ -584,7 +584,7 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
       map<hobject_t, list<OpRequestRef> >::iterator wmo =
        waiting_for_missing_object.find(soid);
       if (wmo != waiting_for_missing_object.end()) {
-       osd->requeue_ops(this, wmo->second);
+       requeue_ops(wmo->second);
       }
       stats_updated = true;
       missing_loc[soid].insert(fromosd);
@@ -724,7 +724,7 @@ void PG::generate_past_intervals()
 
   epoch_t cur_epoch = MAX(MAX(info.history.epoch_created,
                              info.history.last_epoch_clean),
-                         osd->superblock.oldest_map);
+                         osd->get_superblock().oldest_map);
   OSDMapRef last_map, cur_map;
   if (cur_epoch >= end_epoch) {
     dout(10) << __func__ << " start epoch " << cur_epoch
@@ -1219,9 +1219,11 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
     dout(10) << "activate starting replay interval for " << pool->info.crash_replay_interval
             << " until " << replay_until << dendl;
     state_set(PG_STATE_REPLAY);
-    osd->replay_queue_lock.Lock();
-    osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
-    osd->replay_queue_lock.Unlock();
+
+    // TODOSAM: osd->osd-> is no good
+    osd->osd->replay_queue_lock.Lock();
+    osd->osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
+    osd->osd->replay_queue_lock.Unlock();
   }
 
   // twiddle pg state
@@ -1423,7 +1425,7 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
 
   // waiters
   if (!is_replay()) {
-    osd->requeue_ops(this, waiting_for_active);
+    requeue_ops(waiting_for_active);
   }
 
   on_activate();
@@ -1509,8 +1511,8 @@ void PG::replay_queued_ops()
     replay.push_back(p->second);
   }
   replay_queue.clear();
-  osd->requeue_ops(this, replay);
-  osd->requeue_ops(this, waiting_for_active);
+  requeue_ops(replay);
+  requeue_ops(waiting_for_active);
 
   update_stats();
 }
@@ -1576,7 +1578,7 @@ void PG::all_activated_and_committed()
 
 void PG::queue_snap_trim()
 {
-  if (osd->snap_trim_wq.queue(this))
+  if (osd->queue_for_snap_trim(this))
     dout(10) << "queue_snap_trim -- queuing" << dendl;
   else
     dout(10) << "queue_snap_trim -- already trimming" << dendl;
@@ -1589,7 +1591,7 @@ bool PG::queue_scrub()
     return false;
   }
   state_set(PG_STATE_SCRUBBING);
-  osd->scrub_wq.queue(this);
+  osd->queue_for_scrub(this);
   return true;
 }
 
@@ -1677,7 +1679,8 @@ void PG::start_recovery_op(const hobject_t& soid)
   assert(recovering_oids.count(soid) == 0);
   recovering_oids.insert(soid);
 #endif
-  osd->start_recovery_op(this, soid);
+  // TODOSAM: osd->osd-> not good
+  osd->osd->start_recovery_op(this, soid);
 }
 
 void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
@@ -1693,13 +1696,15 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
   assert(recovering_oids.count(soid));
   recovering_oids.erase(soid);
 #endif
-  osd->finish_recovery_op(this, soid, dequeue);
+  // TODOSAM: osd->osd-> not good
+  osd->osd->finish_recovery_op(this, soid, dequeue);
 }
 
 
 void PG::defer_recovery()
 {
-  osd->defer_recovery(this);
+  // TODOSAM: osd->osd-> not good
+  osd->osd->defer_recovery(this);
 }
 
 void PG::clear_recovery_state() 
@@ -2548,10 +2553,19 @@ void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
   for (map<hobject_t, list<OpRequestRef> >::iterator it = m.begin();
        it != m.end();
        it++)
-    osd->requeue_ops(this, it->second);
+    requeue_ops(it->second);
   m.clear();
 }
 
+void PG::requeue_ops(list<OpRequestRef> &ls)
+{
+  dout(15) << " requeue_ops " << ls << dendl;
+  assert(&ls != &op_queue);
+  size_t requeue_size = ls.size();
+  op_queue.splice(op_queue.begin(), ls, ls.begin(), ls.end());
+  for (size_t i = 0; i < requeue_size; ++i) osd->queue_for_op(this);
+}
+
 
 // ==========================================================================================
 // SCRUB
@@ -3139,7 +3153,7 @@ void PG::scrub_clear_state()
   // active -> nothing.
   osd->dec_scrubs_active();
 
-  osd->requeue_ops(this, waiting_for_active);
+  requeue_ops(waiting_for_active);
 
   finalizing_scrub = false;
   scrub_block_writes = false;
@@ -3503,7 +3517,7 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
 
   dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
 
-  osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from));
+  osd->osd->_share_map_outgoing(get_osdmap()->get_cluster_inst(from));
   osd->cluster_messenger->send_message(mlog, 
                                       get_osdmap()->get_cluster_inst(from));
 }
@@ -3733,13 +3747,13 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
           it++)
        ls.push_back(it->second);
       replay_queue.clear();
-      osd->requeue_ops(this, ls);
+      requeue_ops(ls);
     }
 
     on_role_change();
 
     // take active waiters
-    osd->requeue_ops(this, waiting_for_active);
+    requeue_ops(waiting_for_active);
 
     // new primary?
     if (role == 0) {
@@ -4014,7 +4028,7 @@ void PG::queue_op(OpRequestRef op)
 void PG::take_waiters()
 {
   dout(10) << "take_waiters" << dendl;
-  osd->requeue_ops(this, op_waiters);
+  requeue_ops(op_waiters);
   for (list<CephPeeringEvtRef>::iterator i = peering_waiters.begin();
        i != peering_waiters.end();
        ++i) osd->queue_for_peering(this);
index 8188075f432148f35955d73ee6dc86f68bd0424f..051bd7401ca484ab0d67aca839148a051c5d15c0 100644 (file)
@@ -56,8 +56,8 @@ using namespace __gnu_cxx;
 
 //#define DEBUG_RECOVERY_OIDS   // track set of recovering oids explicitly, to find counting bugs
 
-
 class OSD;
+class OSDService;
 class MOSDOp;
 class MOSDSubOp;
 class MOSDSubOpReply;
@@ -336,7 +336,7 @@ public:
 
   /*** PG ****/
 protected:
-  OSD *osd;
+  OSDService *osd;
   OSDMapRef osdmap_ref;
   PGPool *pool;
 
@@ -617,6 +617,7 @@ protected:
   map<eversion_t,OpRequestRef>   replay_queue;
 
   void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
+  void requeue_ops(list<OpRequestRef> &l);
 
   // stats
   Mutex pg_stats_lock;
@@ -1320,8 +1321,8 @@ public:
   } recovery_state;
 
 
- public:  
-  PG(OSD *o, OSDMapRef curmap,
+ public:
+  PG(OSDService *o, OSDMapRef curmap,
      PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) :
     osd(o), osdmap_ref(curmap), pool(_pool),
     _lock("PG::_lock"),
index fd67f38dcb4e3622612869c6782e5b81da8965c5..fbb7e1f5bb31105fcbd53f1f8044caaed1827f1b 100644 (file)
@@ -581,7 +581,7 @@ void ReplicatedPG::calc_trim_to()
   }
 }
 
-ReplicatedPG::ReplicatedPG(OSD *o, OSDMapRef curmap,
+ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
                           PGPool *_pool, pg_t p, const hobject_t& oid,
                           const hobject_t& ioid) :
   PG(o, curmap, _pool, p, oid, ioid), temp_created(false),
@@ -3087,7 +3087,8 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
        if (notif->obc == obc) {
          dout(10) << " acking pending notif " << notif->id << " by " << by << dendl;
          session->del_notif(notif);
-         osd->ack_notification(entity, notif, obc, this);
+         // TODOSAM: osd->osd-> not good
+         osd->osd->ack_notification(entity, notif, obc, this);
        }
       }
     }
@@ -3129,12 +3130,14 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 
       notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
       if (notif->watchers.empty()) {
-       osd->complete_notify(notif, obc);
+       // TODOSAM: osd->osd-> not good
+       osd->osd->complete_notify(notif, obc);
       } else {
        obc->notifs[notif] = true;
        obc->ref++;
        notif->obc = obc;
-       notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
+       // TODOSAM: osd->osd not good
+       notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif);
        osd->watch_timer.add_event_after(p->timeout, notif->timeout);
       }
     }
@@ -3150,7 +3153,8 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
       assert(notif);
 
       session->del_notif(notif);
-      osd->ack_notification(entity, notif, obc, this);
+      // TODOSAM: osd->osd-> not good
+      osd->osd->ack_notification(entity, notif, obc, this);
     }
 
     osd->watch_lock.Unlock();
@@ -3448,7 +3452,7 @@ void ReplicatedPG::op_commit(RepGather *repop)
     
     last_update_ondisk = repop->v;
     if (waiting_for_ondisk.count(repop->v)) {
-      osd->requeue_ops(this, waiting_for_ondisk[repop->v]);
+      requeue_ops(waiting_for_ondisk[repop->v]);
       waiting_for_ondisk.erase(repop->v);
     }
 
@@ -3767,7 +3771,7 @@ void ReplicatedPG::register_unconnected_watcher(void *_obc,
   pgid.set_ps(obc->obs.oi.soid.hash);
   get();
   obc->ref++;
-  Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd,
+  Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd,
                                                        static_cast<void *>(obc),
                                                        this,
                                                        entity, expire);
@@ -4021,7 +4025,7 @@ void ReplicatedPG::put_object_context(ObjectContext *obc)
           << obc->ref << " -> " << (obc->ref-1) << dendl;
 
   if (mode.wake) {
-    osd->requeue_ops(this, mode.waiting);
+    requeue_ops(mode.waiting);
     for (list<Cond*>::iterator p = mode.waiting_cond.begin(); p != mode.waiting_cond.end(); p++)
       (*p)->Signal();
     mode.wake = false;
@@ -4971,10 +4975,10 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
     update_stats();
     if (waiting_for_missing_object.count(hoid)) {
       dout(20) << " kicking waiters on " << hoid << dendl;
-      osd->requeue_ops(this, waiting_for_missing_object[hoid]);
+      requeue_ops(waiting_for_missing_object[hoid]);
       waiting_for_missing_object.erase(hoid);
       if (missing.missing.size() == 0) {
-       osd->requeue_ops(this, waiting_for_all_missing);
+       requeue_ops(waiting_for_all_missing);
        waiting_for_all_missing.clear();
       }
     }
@@ -5190,7 +5194,7 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
        dout(10) << "pushed " << soid << " to all replicas" << dendl;
        finish_recovery_op(soid);
        if (waiting_for_degraded_object.count(soid)) {
-         osd->requeue_ops(this, waiting_for_degraded_object[soid]);
+         requeue_ops(waiting_for_degraded_object[soid]);
          waiting_for_degraded_object.erase(soid);
        }
        finish_degraded_object(soid);
@@ -5478,7 +5482,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
   map<hobject_t, list<OpRequestRef> >::iterator wmo =
     waiting_for_missing_object.find(oid);
   if (wmo != waiting_for_missing_object.end()) {
-    osd->requeue_ops(this, wmo->second);
+    requeue_ops(wmo->second);
   }
 
   // Add log entry
@@ -5617,7 +5621,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
   lock();
   dout(10) << "_finish_mark_all_unfound_lost " << dendl;
 
-  osd->requeue_ops(this, waiting_for_all_missing);
+  requeue_ops(waiting_for_all_missing);
   waiting_for_all_missing.clear();
 
   while (!obcs.empty()) {
@@ -5706,10 +5710,10 @@ void ReplicatedPG::on_change()
   for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
        p != waiting_for_degraded_object.end();
        waiting_for_degraded_object.erase(p++)) {
-    osd->requeue_ops(this, p->second);
+    requeue_ops(p->second);
     finish_degraded_object(p->first);
   }
-  osd->requeue_ops(this, waiting_for_all_missing);
+  requeue_ops(waiting_for_all_missing);
   waiting_for_all_missing.clear();
 
   // clear pushing/pulling maps
@@ -5729,7 +5733,7 @@ void ReplicatedPG::on_role_change()
   for (map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.begin();
        p != waiting_for_ondisk.end();
        p++)
-    osd->requeue_ops(this, p->second);
+    requeue_ops(p->second);
   waiting_for_ondisk.clear();
 }
 
@@ -6236,8 +6240,7 @@ int ReplicatedPG::recover_backfill(int max)
       to_remove[pbi.begin] = pbi.objects.begin()->second;
       // Object was degraded, but won't be recovered
       if (waiting_for_degraded_object.count(pbi.begin)) {
-       osd->requeue_ops(
-         this,
+       requeue_ops(
          waiting_for_degraded_object[pbi.begin]);
        waiting_for_degraded_object.erase(pbi.begin);
       }
@@ -6259,8 +6262,7 @@ int ReplicatedPG::recover_backfill(int max)
                 << pbi.objects.begin()->second << dendl;
        // Object was degraded, but won't be recovered
        if (waiting_for_degraded_object.count(pbi.begin)) {
-         osd->requeue_ops(
-           this,
+         requeue_ops(
            waiting_for_degraded_object[pbi.begin]);
          waiting_for_degraded_object.erase(pbi.begin);
        }
index 2192d0c187c6bb335a359a4c075450bb06da63d3..8bd5105550c16130db070926ae82bb12238be726 100644 (file)
@@ -788,7 +788,7 @@ protected:
   int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
 
 public:
-  ReplicatedPG(OSD *o, OSDMapRef curmap,
+  ReplicatedPG(OSDService *o, OSDMapRef curmap,
               PGPool *_pool, pg_t p, const hobject_t& oid,
               const hobject_t& ioid);
   ~ReplicatedPG() {}