]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,ReplicatedPG: queue pushes in a op_tp workqueue
authorSamuel Just <sam.just@inktank.com>
Sat, 14 Sep 2013 07:45:00 +0000 (00:45 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:28 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/ReplicatedPG.cc

index 85cf817adc5449af03134b6fdf1b7d697f89f8e9..822596cd99731eb96dbad49db0b5703fe5fa2ec6 100644 (file)
@@ -170,6 +170,7 @@ OSDService::OSDService(OSD *osd) :
   scrub_wq(osd->scrub_wq),
   scrub_finalize_wq(osd->scrub_finalize_wq),
   rep_scrub_wq(osd->rep_scrub_wq),
+  push_wq("push_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp),
   class_handler(osd->class_handler),
   publish_lock("OSDService::publish_lock"),
   pre_publish_lock("OSDService::pre_publish_lock"),
index 17256d61702d83602142476fb0ffb2a57acd0130..f906573e5adcfbd6310754f042cdb6340c3a3d2e 100644 (file)
@@ -307,6 +307,7 @@ public:
   ThreadPool::WorkQueue<PG> &scrub_wq;
   ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
   ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
+  GenContextWQ push_wq;
   ClassHandler  *&class_handler;
 
   void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
index 8c1e8a30352fbc335e6b28d3e70daf1f1b7d5dd9..449994fadf0751efae7b14f64235f3c07f347d7e 100644 (file)
@@ -1603,14 +1603,14 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
   get_parent()->queue_transaction(t);
 }
 
-struct C_ReplicatedBackend_OnPullComplete : Context {
+struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
   ReplicatedBackend *bc;
   list<pair<hobject_t, ObjectContextRef> > to_continue;
   int priority;
   C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
     : bc(bc), priority(priority) {}
 
-  void finish(int) {
+  void finish(ThreadPool::TPHandle &handle) {
     ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
     for (list<pair<hobject_t, ObjectContextRef> >::iterator i =
           to_continue.begin();
@@ -1620,6 +1620,7 @@ struct C_ReplicatedBackend_OnPullComplete : Context {
        bc->get_parent()->on_global_recover(
          i->first);
       }
+      handle.reset_tp_timeout();
     }
     bc->run_recovery_op(h, priority);
   }
@@ -1648,7 +1649,9 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
        m->get_priority());
     c->to_continue.swap(to_continue);
     t->register_on_complete(
-      get_parent()->bless_context(c));
+      new C_QueueInWQ(
+       &osd->push_wq,
+       get_parent()->bless_gencontext(c)));
   }
   replies.erase(replies.end() - 1);
 
@@ -6992,7 +6995,9 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
          op->request->get_priority());
       c->to_continue.swap(to_continue);
       t->register_on_complete(
-       get_parent()->bless_context(c));
+       new C_QueueInWQ(
+         &osd->push_wq,
+         get_parent()->bless_gencontext(c)));
     }
     run_recovery_op(h, op->request->get_priority());
   } else {