]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
FileSTore: do not time out threads while they're waiting for op throttle 886/head
authorGreg Farnum <greg@inktank.com>
Wed, 4 Dec 2013 01:02:24 +0000 (17:02 -0800)
committerGreg Farnum <greg@inktank.com>
Wed, 4 Dec 2013 01:02:24 +0000 (17:02 -0800)
In order to support this, we pass TPHandle* through the ObjectStore
interface, and then if we have one we suspend the timeouts while
waiting to get our op/byte throttles in the FileStore.
The OSD uses this when doing PG splits.

Signed-off-by: Greg Farnum <greg@inktank.com>
src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h
src/osd/OSD.cc
src/osd/OSD.h

index 89a55b393db7cfa872ca2b69ea27ae8fd1b30d92..db033c6e39b1f2fb69604e0aaa59954e2771852c 100644 (file)
@@ -1564,7 +1564,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o)
   op_wq.queue(osr);
 }
 
-void FileStore::op_queue_reserve_throttle(Op *o)
+void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
 {
   // Do not call while holding the journal lock!
   uint64_t max_ops = m_filestore_queue_max_ops;
@@ -1586,7 +1586,11 @@ void FileStore::op_queue_reserve_throttle(Op *o)
              && (op_queue_bytes + o->bytes) > max_bytes)) {
       dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
              << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
+      if (handle)
+       handle->suspend_tp_timeout();
       op_throttle_cond.Wait(op_throttle_lock);
+      if (handle)
+       handle->reset_tp_timeout();
     }
 
     op_queue_len++;
@@ -1671,7 +1675,8 @@ struct C_JournaledAhead : public Context {
 };
 
 int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
-                                 TrackedOpRef osd_op)
+                                 TrackedOpRef osd_op,
+                                 ThreadPool::TPHandle *handle)
 {
   Context *onreadable;
   Context *ondisk;
@@ -1699,7 +1704,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
 
   if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
     Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
-    op_queue_reserve_throttle(o);
+    op_queue_reserve_throttle(o, handle);
     journal->throttle();
     uint64_t op_num = submit_manager.op_submit_start();
     o->op = op_num;
index c489fdd57960900b6aa4be5544f3da36c20ef877..23b588096564936ada81e57351c2e8ec526a1178 100644 (file)
@@ -305,7 +305,7 @@ private:
               Context *onreadable, Context *onreadable_sync,
               TrackedOpRef osd_op);
   void queue_op(OpSequencer *osr, Op *o);
-  void op_queue_reserve_throttle(Op *o);
+  void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
   void op_queue_release_throttle(Op *o);
   void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
   friend struct C_JournaledAhead;
@@ -378,7 +378,8 @@ public:
     ThreadPool::TPHandle *handle);
 
   int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
-                        TrackedOpRef op = TrackedOpRef());
+                        TrackedOpRef op = TrackedOpRef(),
+                        ThreadPool::TPHandle *handle = NULL);
 
   /**
    * set replay guard xattr on given file
index 6494290b54112d130be64ca55e98400db1e87c56..b0be3c83acadd68da0a45c65ca6c07139193a41a 100644 (file)
@@ -19,6 +19,7 @@
 #include "include/types.h"
 #include "osd/osd_types.h"
 #include "common/TrackedOp.h"
+#include "common/WorkQueue.h"
 #include "ObjectMap.h"
 
 #include <errno.h>
@@ -793,34 +794,40 @@ public:
   }
   unsigned apply_transactions(Sequencer *osr, list<Transaction*>& tls, Context *ondisk=0);
 
-  int queue_transaction(Sequencer *osr, Transaction* t) {
+  int queue_transaction(Sequencer *osr, Transaction* t,
+                        ThreadPool::TPHandle *handle = NULL) {
     list<Transaction *> tls;
     tls.push_back(t);
-    return queue_transactions(osr, tls, new C_DeleteTransaction(t));
+    return queue_transactions(osr, tls, new C_DeleteTransaction(t),
+                              NULL, NULL, TrackedOpRef(), handle);
   }
 
   int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
                                Context *onreadable_sync=0,
-                               TrackedOpRef op = TrackedOpRef()) {
+                               TrackedOpRef op = TrackedOpRef(),
+                               ThreadPool::TPHandle *handle = NULL) {
     list<Transaction*> tls;
     tls.push_back(t);
-    return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op);
+    return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync,
+                              op, handle);
   }
 
   int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
                         Context *onreadable, Context *ondisk=0,
                         Context *onreadable_sync=0,
-                        TrackedOpRef op = TrackedOpRef()) {
+                        TrackedOpRef op = TrackedOpRef(),
+                        ThreadPool::TPHandle *handle = NULL) {
     assert(!tls.empty());
     tls.back()->register_on_applied(onreadable);
     tls.back()->register_on_commit(ondisk);
     tls.back()->register_on_applied_sync(onreadable_sync);
-    return queue_transactions(osr, tls, op);
+    return queue_transactions(osr, tls, op, handle);
   }
 
   virtual int queue_transactions(
     Sequencer *osr, list<Transaction*>& tls,
-    TrackedOpRef op = TrackedOpRef()) = 0;
+    TrackedOpRef op = TrackedOpRef(),
+    ThreadPool::TPHandle *handle = NULL) = 0;
 
 
   int queue_transactions(
index 11f1c84a13b86f828cf263ad46d59ad7fd433d54..e4aff735c41858b24b5cdf65fe3fc69359387e97 100644 (file)
@@ -5994,13 +5994,15 @@ PG::RecoveryCtx OSD::create_context()
   return rctx;
 }
 
-void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg)
+void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+                                       ThreadPool::TPHandle *handle)
 {
   if (!ctx.transaction->empty()) {
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
       pg->osr.get(),
-      ctx.transaction, ctx.on_applied, ctx.on_safe);
+      ctx.transaction, ctx.on_applied, ctx.on_safe, NULL,
+      TrackedOpRef(), handle);
     assert(tr == 0);
     ctx.transaction = new ObjectStore::Transaction;
     ctx.on_applied = new C_Contexts(cct);
@@ -6025,7 +6027,8 @@ bool OSD::compat_must_dispatch_immediately(PG *pg)
   return false;
 }
 
-void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
+void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+                           ThreadPool::TPHandle *handle)
 {
   if (service.get_osdmap()->is_up(whoami)) {
     do_notifies(*ctx.notify_list, curmap);
@@ -6045,7 +6048,8 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
     ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
     int tr = store->queue_transaction(
       pg->osr.get(),
-      ctx.transaction, ctx.on_applied, ctx.on_safe);
+      ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
+      handle);
     assert(tr == 0);
   }
 }
@@ -7250,17 +7254,17 @@ void OSD::process_peering_events(
       split_pgs.clear();
     }
     if (compat_must_dispatch_immediately(pg)) {
-      dispatch_context(rctx, pg, curmap);
+      dispatch_context(rctx, pg, curmap, &handle);
       rctx = create_context();
     } else {
-      dispatch_context_transaction(rctx, pg);
+      dispatch_context_transaction(rctx, pg, &handle);
     }
     pg->unlock();
     handle.reset_tp_timeout();
   }
   if (need_up_thru)
     queue_want_up_thru(same_interval_since);
-  dispatch_context(rctx, 0, curmap);
+  dispatch_context(rctx, 0, curmap, &handle);
 
   service.send_pg_temp();
 }
index 11ad2b89399be1c7b5be2e060e24e6b0440b67b2..29a1f875c0100cac9e335fa06e4cbe140a0928fc 100644 (file)
@@ -1318,8 +1318,10 @@ protected:
   // -- generic pg peering --
   PG::RecoveryCtx create_context();
   bool compat_must_dispatch_immediately(PG *pg);
-  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap);
-  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg);
+  void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+                        ThreadPool::TPHandle *handle = NULL);
+  void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+                                    ThreadPool::TPHandle *handle = NULL);
   void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list,
                   OSDMapRef map);
   void do_queries(map< int, map<pg_t,pg_query_t> >& query_map,