]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: instrument filestore, journal throughput and throttling
authorSage Weil <sage.weil@dreamhost.com>
Sat, 19 Feb 2011 04:49:37 +0000 (20:49 -0800)
committerSage Weil <sage@newdream.net>
Tue, 15 Mar 2011 19:43:29 +0000 (12:43 -0700)
Signed-off-by: Sage Weil <sage@newdream.net>
Conflicts:

src/os/FileJournal.cc
src/os/FileStore.cc
src/os/FileStore.h
src/os/JournalingObjectStore.cc

src/os/FileJournal.cc
src/os/FileJournal.h
src/os/FileStore.cc
src/os/FileStore.h
src/os/Journal.h
src/os/ObjectStore.h
src/osd/OSD.cc

index 03be3caff44ba9f4df9687907cb45672ec6e06ed..6fc132cf4dcc8f75a09e0f10fa6930b3e5ae94fc 100644 (file)
@@ -17,6 +17,8 @@
 #include "common/safe_io.h"
 #include "FileJournal.h"
 #include "include/color.h"
+#include "common/ProfLogger.h"
+#include "os/ObjectStore.h"
 
 #include <fcntl.h>
 #include <sstream>
@@ -597,9 +599,7 @@ int FileJournal::prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_
        // throw out what we have so far
        full_state = FULL_FULL;
        while (!writeq.empty()) {
-         dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
-         throttle_ops.put(1);
-         throttle_bytes.put(writeq.front().bl.length());
+         put_throttle(1, writeq.front().bl.length());
          writeq.pop_front();
        }  
        print_header();
@@ -874,13 +874,13 @@ void FileJournal::flush()
 {
   write_lock.Lock();
   while ((!writeq.empty() || writing) && !write_stop) {
-    dout(10) << "flush waiting for writeq to empty and writes to complete" << dendl;
+    dout(5) << "flush waiting for writeq to empty and writes to complete" << dendl;
     write_empty_cond.Wait(write_lock);
   }
   write_lock.Unlock();
-  dout(10) << "flush waiting for finisher" << dendl;
+  dout(5) << "flush waiting for finisher" << dendl;
   finisher->wait_for_empty();
-  dout(10) << "flush done" << dendl;
+  dout(5) << "flush done" << dendl;
 }
 
 
@@ -914,13 +914,7 @@ void FileJournal::write_thread_entry()
     assert(r == 0);
     do_write(bl);
     
-    dout(30) << "XXX throttle put " << orig_bytes << dendl;
-    uint64_t new_ops = throttle_ops.put(orig_ops);
-    uint64_t new_bytes = throttle_bytes.put(orig_bytes);
-    dout(10) << "write_thread throttle finished " << orig_ops << " ops and " 
-            << orig_bytes << " bytes, now "
-            << new_ops << " ops and " << new_bytes << " bytes"
-            << dendl;
+    put_throttle(orig_ops, orig_bytes);
   }
   write_empty_cond.Signal();
   write_lock.Unlock();
@@ -933,7 +927,7 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte
   Mutex::Locker locker(write_lock);  // ** lock **
 
   // dump on queue
-  dout(10) << "submit_entry seq " << seq
+  dout(5) << "submit_entry seq " << seq
           << " len " << e.length()
           << " (" << oncommit << ")" << dendl;
 
@@ -946,6 +940,13 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte
     throttle_ops.take(1);
     throttle_bytes.take(e.length());
 
+    if (logger) {
+      logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+      logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+      logger->set(l_os_jq_ops, throttle_ops.get_current());
+      logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+    }
+
     writeq.push_back(write_item(seq, e, alignment));
     write_cond.Signal();
   } else {
@@ -981,16 +982,16 @@ void FileJournal::committed_thru(uint64_t seq)
   Mutex::Locker locker(write_lock);
 
   if (seq < last_committed_seq) {
-    dout(10) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
+    dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
     assert(seq >= last_committed_seq);
     return;
   }
   if (seq == last_committed_seq) {
-    dout(10) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
+    dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
     return;
   }
 
-  dout(10) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
+  dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
   last_committed_seq = seq;
 
   // adjust start pointer
@@ -1018,9 +1019,7 @@ void FileJournal::committed_thru(uint64_t seq)
     dout(15) << " dropping committed but unwritten seq " << writeq.front().seq 
             << " len " << writeq.front().bl.length()
             << dendl;
-    dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
-    throttle_ops.put(1);
-    throttle_bytes.put(writeq.front().bl.length());
+    put_throttle(1, writeq.front().bl.length());
     writeq.pop_front();  
   }
   
@@ -1030,6 +1029,25 @@ void FileJournal::committed_thru(uint64_t seq)
 }
 
 
+void FileJournal::put_throttle(uint64_t ops, uint64_t bytes)
+{
+  uint64_t new_ops = throttle_ops.put(ops);
+  uint64_t new_bytes = throttle_bytes.put(bytes);
+  dout(5) << "put_throttle finished " << ops << " ops and " 
+          << bytes << " bytes, now "
+          << new_ops << " ops and " << new_bytes << " bytes"
+          << dendl;
+
+  if (logger) {
+    logger->inc(l_os_j_ops, ops);
+    logger->inc(l_os_j_bytes, bytes);
+    logger->set(l_os_jq_ops, new_ops);
+    logger->set(l_os_jq_bytes, new_bytes);
+    logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+    logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+  }
+}
+
 void FileJournal::make_writeable()
 {
   _open(true);
index 86464754e8d2915c68140bfe06ec6b4b3f863f7e..6f516a3f0e44eacc6f447b0d8242a92a35e04345 100644 (file)
@@ -118,6 +118,8 @@ private:
   // throttle
   Throttle throttle_ops, throttle_bytes;
 
+  void put_throttle(uint64_t ops, uint64_t bytes);
+
   // write thread
   Mutex write_lock;
   Cond write_cond, write_empty_cond;
index 92c81870abf288c15abbcee1e739e94771f2a74e..f3f268a9b16d16e56f6d7bd7ecc3b9e3cf135cf3 100644 (file)
@@ -28,6 +28,7 @@
 #include "common/errno.h"
 #include "common/run_cmd.h"
 #include "common/safe_io.h"
+#include "common/ProfLogger.h"
 
 #define __STDC_FORMAT_MACROS
 #include <inttypes.h>
@@ -382,7 +383,8 @@ FileStore::FileStore(const char *base, const char *jdev) :
   stop(false), sync_thread(this),
   op_queue_len(0), op_queue_bytes(0), next_finish(0),
   op_tp("FileStore::op_tp", g_conf.filestore_op_threads), op_wq(this, &op_tp),
-  flusher_queue_len(0), flusher_thread(this)
+  flusher_queue_len(0), flusher_thread(this),
+  logger(NULL)
 {
   ostringstream oss;
   oss << basedir << "/current";
@@ -1397,6 +1399,7 @@ int FileStore::umount()
   flusher_thread.join();
 
   journal_stop();
+  stop_logger();
 
   op_finisher.stop();
   ondisk_finisher.stop();
@@ -1433,6 +1436,61 @@ int FileStore::umount()
 }
 
 
+void FileStore::start_logger(int whoami, utime_t tare)
+{
+  dout(10) << "start_logger" << dendl;
+  assert(!logger);
+
+  static ProfLogType fs_logtype(l_os_first, l_os_last);
+  static bool didit = false;
+  if (!didit) {
+    didit = true;
+
+    fs_logtype.add_inc(l_os_in_ops, "in_o");
+    //fs_logtype.add_inc(l_os_in_bytes, "in_b");
+    fs_logtype.add_inc(l_os_readable_ops, "or_o");
+    fs_logtype.add_inc(l_os_readable_bytes, "or_b");
+    //fs_logtype.add_inc(l_os_commit_bytes, "com_o");
+    //fs_logtype.add_inc(l_os_commit_bytes, "com_b");
+
+    fs_logtype.add_set(l_os_jq_max_ops, "jq_mo");
+    fs_logtype.add_set(l_os_jq_ops, "jq_o");
+    fs_logtype.add_inc(l_os_j_ops, "j_o");
+    fs_logtype.add_set(l_os_jq_max_bytes, "jq_mb");
+    fs_logtype.add_set(l_os_jq_bytes, "jq_b");
+    fs_logtype.add_inc(l_os_j_bytes, "j_b");
+    fs_logtype.add_set(l_os_oq_max_ops, "oq_mo");
+    fs_logtype.add_set(l_os_oq_ops, "oq_o");
+    fs_logtype.add_inc(l_os_ops, "o");
+    fs_logtype.add_set(l_os_oq_max_bytes, "oq_mb");
+    fs_logtype.add_set(l_os_oq_bytes, "oq_b");
+    fs_logtype.add_inc(l_os_bytes, "b");
+    fs_logtype.add_set(l_os_committing, "comitng");
+  }
+
+  char name[80];
+  snprintf(name, sizeof(name), "osd.%d.fs.log", whoami);
+  logger = new ProfLogger(name, (ProfLogType*)&fs_logtype);
+  journal->logger = logger;
+  logger_add(logger);  
+  logger_tare(tare);
+  logger_start();
+}
+
+void FileStore::stop_logger()
+{
+  dout(10) << "stop_logger" << dendl;
+  if (logger) {
+    journal->logger = NULL;
+    logger_remove(logger);
+    delete logger;
+    logger = NULL;
+  }
+}
+
+
+
+
 /// -----------------------------
 
 void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
@@ -1466,13 +1524,21 @@ void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>&
   op_tp.lock();
 
   osr->queue(o);
+  _op_queue_throttle("queue_op");
 
   op_queue_len++;
   op_queue_bytes += bytes;
 
+  if (logger) {
+    logger->inc(l_os_ops);
+    logger->inc(l_os_bytes, bytes);
+    logger->set(l_os_oq_ops, op_queue_len);
+    logger->set(l_os_oq_bytes, op_queue_bytes);
+  }
+
   op_tp.unlock();
 
-  dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
+  dout(5) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
           << "   (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)"
           << dendl;
   op_wq.queue(osr);
@@ -1481,22 +1547,32 @@ void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>&
 void FileStore::op_queue_throttle()
 {
   op_tp.lock();
+  _op_queue_throttle("op_queue_throttle");
+  op_tp.unlock();
+}
 
+void FileStore::_op_queue_throttle(const char *caller)
+{
   uint64_t max_ops = g_conf.filestore_queue_max_ops;
   uint64_t max_bytes = g_conf.filestore_queue_max_bytes;
+
   if (is_committing()) {
     max_ops += g_conf.filestore_queue_committing_max_ops;
     max_bytes += g_conf.filestore_queue_committing_max_bytes;
   }
 
+  if (logger) {
+    logger->set(l_os_oq_max_ops, max_ops);
+    logger->set(l_os_oq_max_bytes, max_bytes);
+  }
+
   while ((max_ops && op_queue_len >= max_ops) ||
         (max_bytes && op_queue_bytes >= max_bytes)) {
-    dout(2) << "op_queue_throttle waiting: "
+    dout(2) << caller << " waiting: "
             << op_queue_len << " > " << max_ops << " ops || "
             << op_queue_bytes << " > " << max_bytes << dendl;
     op_tp.wait(op_throttle_cond);
   }
-  op_tp.unlock();
 }
 
 void FileStore::_do_op(OpSequencer *osr)
@@ -1504,7 +1580,7 @@ void FileStore::_do_op(OpSequencer *osr)
   osr->apply_lock.Lock();
   Op *o = osr->peek_queue();
 
-  dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
+  dout(5) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
   int r = do_transactions(o->tls, o->op);
   op_apply_finish(o->op);
   dout(10) << "_do_op " << o << " " << o->op << " r = " << r
@@ -1527,6 +1603,11 @@ void FileStore::_finish_op(OpSequencer *osr)
   op_queue_bytes -= o->bytes;
   op_throttle_cond.Signal();
 
+  if (logger) {
+    logger->inc(l_os_readable_ops);
+    logger->inc(l_os_readable_bytes, o->bytes);
+  }
+
   if (next_finish == o->op) {
     dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
             << " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl;
@@ -1593,12 +1674,17 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
     posr = &default_osr;
   if (posr->p) {
     osr = (OpSequencer *)posr->p;
-    dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
+    dout(5) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
   } else {
     osr = new OpSequencer;
     osr->parent = posr;
     posr->p = osr;
-    dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+    dout(5) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+  }
+
+  if (logger) {
+    logger->inc(l_os_in_ops);
+    //logger->inc(l_os_in_bytes, 1); 
   }
 
   if (journal && journal->is_writeable()) {
@@ -1612,7 +1698,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
       op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
 
       uint64_t op = op_submit_start();
-      dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+      dout(5) << "queue_transactions (parallel) " << op << " " << tls << dendl;
       
       _op_journal_transactions(tls, op, ondisk);
       
@@ -1628,7 +1714,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
       op_queue_throttle();   // make sure the journal isn't getting ahead of our op queue.
 
       uint64_t op = op_submit_start();
-      dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+      dout(5) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
       osr->queue_journal(op);
       _op_journal_transactions(tls, op,
                               new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
@@ -1638,7 +1724,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   }
 
   uint64_t op = op_submit_start();
-  dout(10) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
+  dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
 
   _op_apply_start(op);
   int r = do_transactions(tls, op);
@@ -1660,6 +1746,11 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
   op_submit_finish(op);
   op_apply_finish(op);
 
+  if (logger) {
+    logger->inc(l_os_readable_ops);
+    //fixme logger->inc(l_os_readable_bytes, 1);
+  }
+
   return r;
 }
 
@@ -1668,7 +1759,7 @@ void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
                                 Context *onreadable, Context *ondisk,
                                 Context *onreadable_sync)
 {
-  dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
+  dout(5) << "_journaled_ahead " << op << " " << tls << dendl;
 
   op_queue_throttle();
 
@@ -2539,6 +2630,9 @@ void FileStore::sync_entry()
       timer.add_event_after(g_conf.filestore_commit_timeout, sync_entry_timeo);
       sync_entry_timeo_lock.Unlock();
 
+      if (logger)
+       logger->set(l_os_committing, 1);
+
       // make flusher stop flushing previously queued stuff
       sync_epoch++;
 
@@ -2613,6 +2707,9 @@ void FileStore::sync_entry()
       dout(10) << "sync_entry commit took " << done << dendl;
       commit_finish();
 
+      if (logger)
+       logger->set(l_os_committing, 0);
+
       // remove old snaps?
       if (do_snap) {
        while (snaps.size() > 2) {
index b7f261f63e9d8c1fd98e6657218a41e4f485f634..20af722639e0e45dbcdd8d9326b788aa8ea0a4cc 100644 (file)
@@ -34,6 +34,7 @@ using namespace std;
 #include <ext/hash_map>
 using namespace __gnu_cxx;
 
+
 // fake attributes in memory, if we need to.
 
 class FileStore : public JournalingObjectStore {
@@ -207,6 +208,7 @@ class FileStore : public JournalingObjectStore {
   void _finish_op(OpSequencer *o);
   void queue_op(OpSequencer *osr, uint64_t op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
   void op_queue_throttle();
+  void _op_queue_throttle(const char *caller = 0);
   void _journaled_ahead(OpSequencer *osr, uint64_t op, list<Transaction*> &tls,
                        Context *onreadable, Context *ondisk, Context *onreadable_sync);
   friend class C_JournaledAhead;
@@ -228,6 +230,13 @@ class FileStore : public JournalingObjectStore {
 
   int open_journal();
 
+
+  ProfLogger *logger;
+
+public:
+  void start_logger(int whoami, utime_t tare);
+  void stop_logger();
+
  public:
   FileStore(const char *base, const char *jdev);
 
index 3ffc56abbccf717291ffab8f5825e0479f107115..434f1b61b74a5008d3f6eaa3bb9944e18ebcebca 100644 (file)
 #include "include/Context.h"
 #include "common/Finisher.h"
 
+class ProfLogger;
+
 class Journal {
 protected:
   uint64_t fsid;
   Finisher *finisher;
+public:
+  ProfLogger *logger;
+protected:
   Cond *do_sync_cond;
   bool wait_on_full;
 
 public:
-  Journal(uint64_t f, Finisher *fin, Cond *c=0) : fsid(f), finisher(fin),
-                                              do_sync_cond(c),
-                                              wait_on_full(false) { }
+  Journal(uint64_t f, Finisher *fin, Cond *c=0) :
+    fsid(f), finisher(fin), logger(NULL),
+    do_sync_cond(c),
+    wait_on_full(false) { }
   virtual ~Journal() { }
 
   virtual int create() = 0;
index 8a9cd65533fb8c21d81dcd2feb6e81543401b217..2a94adcb41d916196cda4a7f268015c3c017a01c 100644 (file)
@@ -46,6 +46,32 @@ typedef uint64_t collection_list_handle_t;
  * low-level interface to the local OSD file system
  */
 
+class Logger;
+
+enum {
+  l_os_first = 84000,
+  l_os_in_ops,
+  l_os_in_bytes,
+  l_os_readable_ops,
+  l_os_readable_bytes,
+  l_os_commit_ops,
+  l_os_commit_bytes,
+  l_os_jq_max_ops,
+  l_os_jq_ops,
+  l_os_j_ops,
+  l_os_jq_max_bytes,
+  l_os_jq_bytes,
+  l_os_j_bytes,
+  l_os_oq_max_ops,
+  l_os_oq_ops,
+  l_os_ops,
+  l_os_oq_max_bytes,
+  l_os_oq_bytes,
+  l_os_bytes,
+  l_os_committing,
+  l_os_last,
+};
+
 
 static inline void encode(const map<string,bufferptr> *attrset, bufferlist &bl) {
   ::encode(*attrset, bl);
@@ -54,6 +80,7 @@ static inline void encode(const map<string,bufferptr> *attrset, bufferlist &bl)
 class ObjectStore {
 public:
 
+  Logger *logger;
 
   class FragmentationStat {
   public:
@@ -552,7 +579,7 @@ public:
 
 
  public:
-  ObjectStore() {}
+  ObjectStore() : logger(NULL) {}
   virtual ~ObjectStore() {}
 
   // mgmt
@@ -639,6 +666,8 @@ public:
   virtual void _fake_writes(bool b) {};
   virtual void _get_frag_stat(FragmentationStat& st) {};
   
+  virtual void start_logger(int whoami, utime_t tare) {};
+
 };
 
 
index bdd4bb1bd872cb94b2e9f7b3eaccd09fd0ec7d92..7fc43db2b571a89fcba18cfae39860826cb1d707 100644 (file)
@@ -700,6 +700,9 @@ void OSD::start_logger()
   logger_tare(osdmap->get_created());
   logger_start();
   logger_started = true;
+
+  // start the objectstore logger too
+  store->start_logger(whoami, osdmap->get_created());
 }
 
 int OSD::shutdown()