]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: support multiple ondisk finish and apply finisher 6486/head
authorXinze Chi <xinze@xsky.com>
Fri, 6 Nov 2015 15:46:33 +0000 (23:46 +0800)
committerXinze Chi <xinze@xsky.com>
Fri, 6 Nov 2015 23:13:41 +0000 (07:13 +0800)
ondisk finisher and apply finisher wouble be bottleneck in fast
ssd. Because apply finisher should free memory which may be slow.

Signed-off-by: Haomai Wang <haomai@xsky.com>
Signed-off-by: Xinze Chi <xinze@xsky.com>
src/common/config_opts.h
src/os/FileStore.cc
src/os/FileStore.h

index 9c1a3c2f6761cea8de3610bdc726eaeec4470680..87decaf2951d0ef382e998e98b2be664d7f267fc 100644 (file)
@@ -914,6 +914,8 @@ OPTION(filestore_update_to, OPT_INT, 1000)
 OPTION(filestore_blackhole, OPT_BOOL, false)     // drop any new transactions on the floor
 OPTION(filestore_fd_cache_size, OPT_INT, 128)    // FD lru size
 OPTION(filestore_fd_cache_shards, OPT_INT, 16)   // FD number of shards
+OPTION(filestore_ondisk_finisher_threads, OPT_INT, 1)
+OPTION(filestore_apply_finisher_threads, OPT_INT, 1)
 OPTION(filestore_dump_file, OPT_STR, "")         // file onto which store transaction dumps
 OPTION(filestore_kill_at, OPT_INT, 0)            // inject a failure at the n'th opportunity
 OPTION(filestore_inject_stall, OPT_INT, 0)       // artificially stall for N seconds in op queue thread
index 5b8c2b5edf39b586eb4e346a88b80f0227e32937..291d539df45b15c86e9fe1b8baf37a954315023c 100644 (file)
@@ -522,7 +522,6 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
   basedir_fd(-1), current_fd(-1),
   backend(NULL),
   index_manager(do_update),
-  ondisk_finisher(g_ceph_context),
   lock("FileStore::lock"),
   force_sync(false), 
   sync_entry_timeo_lock("sync_entry_timeo_lock"),
@@ -530,9 +529,11 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
   stop(false), sync_thread(this),
   fdcache(g_ceph_context),
   wbthrottle(g_ceph_context),
+  next_osr_id(0),
   throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops),
   throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes),
-  op_finisher(g_ceph_context),
+  m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads),
+  m_apply_finisher_num(g_conf->filestore_apply_finisher_threads),
   op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
   op_wq(this, g_conf->filestore_op_thread_timeout,
        g_conf->filestore_op_thread_suicide_timeout, &op_tp),
@@ -567,6 +568,18 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
   m_filestore_max_inline_xattrs(0)
 {
   m_filestore_kill_at.set(g_conf->filestore_kill_at);
+  for (int i = 0; i < m_ondisk_finisher_num; ++i) {
+    ostringstream oss;
+    oss << "filestore-ondisk-" << i;
+    Finisher *f = new Finisher(g_ceph_context, oss.str());
+    ondisk_finishers.push_back(f);
+  }
+  for (int i = 0; i < m_apply_finisher_num; ++i) {
+    ostringstream oss;
+    oss << "filestore-apply-" << i;
+    Finisher *f = new Finisher(g_ceph_context, oss.str());
+    apply_finishers.push_back(f);
+  }
 
   ostringstream oss;
   oss << basedir << "/current";
@@ -617,6 +630,14 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
 
 FileStore::~FileStore()
 {
+  for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+    delete *it;
+    *it = NULL;
+  }
+  for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+    delete *it;
+    *it = NULL;
+  }
   g_ceph_context->_conf->remove_observer(this);
   g_ceph_context->get_perfcounters_collection()->remove(logger);
 
@@ -1632,8 +1653,12 @@ int FileStore::mount()
   journal_start();
 
   op_tp.start();
-  op_finisher.start();
-  ondisk_finisher.start();
+  for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+    (*it)->start();
+  }
+  for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+    (*it)->start();
+  }
 
   timer.init();
 
@@ -1723,8 +1748,12 @@ int FileStore::umount()
   if (!(generic_flags & SKIP_JOURNAL_REPLAY))
     journal_write_close();
 
-  op_finisher.stop();
-  ondisk_finisher.stop();
+  for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+    (*it)->stop();
+  }
+  for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+    (*it)->stop();
+  }
 
   if (fsid_fd >= 0) {
     VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
@@ -1895,10 +1924,10 @@ void FileStore::_finish_op(OpSequencer *osr)
     o->onreadable_sync->complete(0);
   }
   if (o->onreadable) {
-    op_finisher.queue(o->onreadable);
+    apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
   }
   if (!to_queue.empty()) {
-    op_finisher.queue(to_queue);
+    apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
   }
   delete o;
 }
@@ -1941,7 +1970,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     osr = static_cast<OpSequencer *>(posr->p.get());
     dout(5) << "queue_transactions existing " << osr << " " << *osr << dendl;
   } else {
-    osr = new OpSequencer;
+    osr = new OpSequencer(next_osr_id.inc());
     osr->set_cct(g_ceph_context);
     osr->parent = posr;
     posr->p = osr;
@@ -2032,7 +2061,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   if (onreadable_sync) {
     onreadable_sync->complete(r);
   }
-  op_finisher.queue(onreadable, r);
+  apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
 
   submit_manager.op_submit_finish(op);
   apply_manager.op_apply_finish(op);
@@ -2054,10 +2083,10 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
   // getting blocked behind an ondisk completion.
   if (ondisk) {
     dout(10) << " queueing ondisk " << ondisk << dendl;
-    ondisk_finisher.queue(ondisk);
+    ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
   }
   if (!to_queue.empty()) {
-    ondisk_finisher.queue(to_queue);
+    ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
   }
 }
 
@@ -3786,7 +3815,9 @@ void FileStore::_flush_op_queue()
   dout(10) << "_flush_op_queue draining op tp" << dendl;
   op_wq.drain();
   dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
-  op_finisher.wait_for_empty();
+  for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+    (*it)->wait_for_empty();
+  }
 }
 
 /*
@@ -3810,7 +3841,9 @@ void FileStore::flush()
     if (journal)
       journal->flush();
     dout(10) << "flush draining ondisk finisher" << dendl;
-    ondisk_finisher.wait_for_empty();
+    for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+      (*it)->wait_for_empty();
+    }
   }
 
   _flush_op_queue();
index 2a04da2ac1aa2637c0fa563d518f06b67f343664..de20354f6ac1b7fa1386170c4405e8919e024ba4 100644 (file)
@@ -152,8 +152,6 @@ private:
   // ObjectMap
   boost::scoped_ptr<ObjectMap> object_map;
   
-  Finisher ondisk_finisher;
-
   // helper fns
   int get_cdir(coll_t cid, char *s, int len);
   
@@ -201,6 +199,7 @@ private:
   public:
     Sequencer *parent;
     Mutex apply_lock;  // for apply mutual exclusion
+    int id;
     
     /// get_max_uncompleted
     bool _get_max_uncompleted(
@@ -315,10 +314,11 @@ private:
       }
     }
 
-    OpSequencer()
+    OpSequencer(int i)
       : qlock("FileStore::OpSequencer::qlock", false, false),
        parent(0),
-       apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
+       apply_lock("FileStore::OpSequencer::apply_lock", false, false),
+        id(i) {}
     ~OpSequencer() {
       assert(q.empty());
     }
@@ -333,9 +333,13 @@ private:
   FDCache fdcache;
   WBThrottle wbthrottle;
 
+  atomic_t next_osr_id;
   deque<OpSequencer*> op_queue;
   Throttle throttle_ops, throttle_bytes;
-  Finisher op_finisher;
+  const int m_ondisk_finisher_num;
+  const int m_apply_finisher_num;
+  vector<Finisher*> ondisk_finishers;
+  vector<Finisher*> apply_finishers;
 
   ThreadPool op_tp;
   struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {