]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
workloadgen: Allow finer control over what the generator does.
authorJoao Eduardo Luis <jecluis@gmail.com>
Fri, 4 May 2012 23:42:26 +0000 (00:42 +0100)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Tue, 8 May 2012 02:54:32 +0000 (19:54 -0700)
  Allow the user to have more control on:
    - the sizes of the data being written by the operations;
    - which operations are suppressed from execution;
    - view the throughput;
    - specify the periodicity of throughput output.

For the CLI options, '--help' should suffice.

Signed-off-by: Joao Eduardo Luis <jecluis@gmail.com>
src/test/filestore/workload_generator.cc
src/test/filestore/workload_generator.h

index 4734305d5d8bf330c9a32d5bf69fc3a0efb04450..f78121de1b1cc08f61c5147aada4b11faca182fa 100644 (file)
@@ -17,6 +17,9 @@
 #include <time.h>
 #include <stdlib.h>
 #include <signal.h>
+#include <cctype>
+#include <errno.h>
+#include <sys/time.h>
 #include "os/FileStore.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
 
 #include "TestFileStoreState.h"
 
-void usage(const char *name);
+static const char *our_name = NULL;
+void usage();
 
 boost::scoped_ptr<WorkloadGenerator> wrkldgen;
 
 #define dout_subsys ceph_subsys_
 
+
 WorkloadGenerator::WorkloadGenerator(vector<const char*> args)
   : TestFileStoreState(NULL),
     m_num_ops(-1),
     m_destroy_coll_every_nr_runs(def_destroy_coll_every_nr_runs),
-    m_num_colls(def_num_colls)
+    m_num_colls(def_num_colls),
+    m_write_data_bytes(0), m_write_xattr_obj_bytes(0),
+    m_write_xattr_coll_bytes(0), m_write_pglog_bytes(0),
+    m_suppress_write_data(false), m_suppress_write_xattr_obj(false),
+    m_suppress_write_xattr_coll(false), m_suppress_write_log(false),
+    m_do_stats(false),
+    m_stats_written_data(0), m_stats_duration(0), m_stats_lock("Stats Lock"),
+    m_stats_show_secs(5)
 {
   int err = 0;
 
@@ -68,6 +80,59 @@ WorkloadGenerator::WorkloadGenerator(vector<const char*> args)
 
 }
 
+size_t WorkloadGenerator::_parse_size_or_die(std::string& val)
+{
+  size_t s = 0;
+  int multiplier = 0;
+  size_t i = 0;
+
+  if (val.empty()) // this should never happen, but catch it anyway.
+    goto die;
+
+
+  for (i = 0; i < val.length(); i++) {
+    if (!isdigit(val[i])) {
+      if (isalpha(val[i])) {
+        val[i] = tolower(val[i]);
+        switch (val[i]) {
+        case 'b': break;
+        case 'k': multiplier = 10; break;
+        case 'm': multiplier = 20; break;
+        case 'g': multiplier = 30; break;
+        default:
+          goto die;
+        }
+        val[i] = '\0';
+        break;
+      } else {
+        goto die;
+      }
+    }
+  }
+
+  s = strtoll(val.c_str(), NULL, 10) * (1 << multiplier);
+  return s;
+
+die:
+  usage();
+  exit(1);
+}
+
+void WorkloadGenerator::_suppress_ops_or_die(std::string& val)
+{
+  for (size_t i = 0; i < val.length(); i++) {
+    switch (val[i]) {
+    case 'c': m_suppress_write_xattr_coll = true; break;
+    case 'o': m_suppress_write_xattr_obj = true; break;
+    case 'l': m_suppress_write_log = true; break;
+    case 'd': m_suppress_write_data = true; break;
+    default:
+      usage();
+      exit(1);
+    }
+  }
+}
+
 void WorkloadGenerator::init_args(vector<const char*> args)
 {
   for (std::vector<const char*>::iterator i = args.begin(); i != args.end();) {
@@ -87,8 +152,28 @@ void WorkloadGenerator::init_args(vector<const char*> args)
     } else if (ceph_argparse_witharg(args, i, &val,
         "--test-num-ops", (char*) NULL)) {
       m_num_ops = strtoll(val.c_str(), NULL, 10);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-write-data-size", (char*) NULL)) {
+      m_write_data_bytes = _parse_size_or_die(val);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-write-xattr-obj-size", (char*) NULL)) {
+      m_write_xattr_obj_bytes = _parse_size_or_die(val);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-write-xattr-coll-size", (char*) NULL)) {
+      m_write_xattr_coll_bytes = _parse_size_or_die(val);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-write-pglog-size", (char*) NULL)) {
+      m_write_pglog_bytes = _parse_size_or_die(val);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-suppress-ops", (char*) NULL)) {
+      _suppress_ops_or_die(val);
+    } else if (ceph_argparse_witharg(args, i, &val,
+        "--test-show-stats-period", (char*) NULL)) {
+      m_stats_show_secs = strtoll(val.c_str(), NULL, 10);
+    } else if (ceph_argparse_flag(args, i, "--test-show-stats", (char*) NULL)) {
+      m_do_stats = true;
     } else if (ceph_argparse_flag(args, i, "--help", (char*) NULL)) {
-      usage(NULL);
+      usage();
       exit(0);
     }
   }
@@ -145,61 +230,104 @@ void WorkloadGenerator::get_filled_byte_array(bufferlist& bl, size_t size)
 }
 
 void WorkloadGenerator::do_write_object(ObjectStore::Transaction *t,
-                                       coll_t coll, hobject_t obj)
+                                       coll_t coll, hobject_t obj,
+                                       C_StatState *stat)
 {
-  size_t bytes = get_random_byte_amount(min_write_bytes, max_write_bytes);
+  if (m_suppress_write_data) {
+    dout(0) << __func__ << " suppressed" << dendl;
+    return;
+  }
+
+  size_t size = m_write_data_bytes;
+  if (!size)
+    size = get_random_byte_amount(min_write_bytes, max_write_bytes);
+
   bufferlist bl;
-  get_filled_byte_array(bl, bytes);
+  get_filled_byte_array(bl, size);
 
   dout(0) << __func__ << " " << coll << "/" << obj
       << " size " << bl.length() << dendl;
+
+  if (m_do_stats && (stat != NULL))
+    stat->written_data += bl.length();
+
   t->write(coll, obj, 0, bl.length(), bl);
 }
 
 void WorkloadGenerator::do_setattr_object(ObjectStore::Transaction *t,
-                                         coll_t coll, hobject_t obj)
+                                         coll_t coll, hobject_t obj,
+                                         C_StatState *stat)
 {
-  size_t size;
-  size = get_random_byte_amount(min_xattr_obj_bytes, max_xattr_obj_bytes);
+  if (m_suppress_write_xattr_obj) {
+    dout(0) << __func__ << " suppressed" << dendl;
+    return;
+  }
+
+  size_t size = m_write_xattr_obj_bytes;
+  if (!size)
+    size = get_random_byte_amount(min_xattr_obj_bytes, max_xattr_obj_bytes);
 
   bufferlist bl;
   get_filled_byte_array(bl, size);
 
   dout(0) << __func__ << " " << coll << "/" << obj << " size " << size << dendl;
+
+  if (m_do_stats && (stat != NULL))
+      stat->written_data += bl.length();
+
   t->setattr(coll, obj, "objxattr", bl);
 }
 
 void WorkloadGenerator::do_setattr_collection(ObjectStore::Transaction *t,
-                                             coll_t coll)
+                                             coll_t coll, C_StatState *stat)
 {
-  size_t size;
-  size = get_random_byte_amount(min_xattr_coll_bytes, max_xattr_coll_bytes);
+  if (m_suppress_write_xattr_coll) {
+    dout(0) << __func__ << " suppressed" << dendl;
+    return;
+  }
+
+  size_t size = m_write_xattr_coll_bytes;
+  if (!size)
+    size = get_random_byte_amount(min_xattr_coll_bytes, max_xattr_coll_bytes);
 
   bufferlist bl;
   get_filled_byte_array(bl, size);
   dout(0) << __func__ << " coll " << coll << " size " << size << dendl;
+
+  if (m_do_stats && (stat != NULL))
+      stat->written_data += bl.length();
+
   t->collection_setattr(coll, "collxattr", bl);
 }
 
-//void WorkloadGenerator::do_append_log(ObjectStore::Transaction *t,
-//                                   coll_t coll)
 void WorkloadGenerator::do_append_log(ObjectStore::Transaction *t,
-                                      coll_entry_t *entry)
+                                      coll_entry_t *entry, C_StatState *stat)
 {
+  if (m_suppress_write_log) {
+    dout(0) << __func__ << " suppressed" << dendl;
+    return;
+  }
+
+  size_t size = (m_write_pglog_bytes ? m_write_pglog_bytes : log_append_bytes);
+
   bufferlist bl;
-  get_filled_byte_array(bl, log_append_bytes);
+  get_filled_byte_array(bl, size);
   hobject_t log_obj = entry->m_meta_obj;
 
   dout(0) << __func__ << " coll " << entry->m_coll << " "
       << META_COLL << " /" << log_obj << " (" << bl.length() << ")" << dendl;
 
+  if (m_do_stats && (stat != NULL))
+      stat->written_data += bl.length();
+
   uint64_t s = pg_log_size[entry->m_coll];
   t->write(META_COLL, log_obj, s, bl.length(), bl);
   pg_log_size[entry->m_coll] += bl.length();
 }
 
 void WorkloadGenerator::do_destroy_collection(ObjectStore::Transaction *t,
-                                             coll_entry_t *entry)
+                                             coll_entry_t *entry,
+                                             C_StatState *stat)
 {  
   m_nr_runs.set(0);
   entry->m_osr.flush();
@@ -218,7 +346,8 @@ void WorkloadGenerator::do_destroy_collection(ObjectStore::Transaction *t,
 }
 
 TestFileStoreState::coll_entry_t
-*WorkloadGenerator::do_create_collection(ObjectStore::Transaction *t)
+*WorkloadGenerator::do_create_collection(ObjectStore::Transaction *t,
+                                         C_StatState *stat)
 {
   coll_entry_t *entry = coll_create(m_next_coll_nr++);
   if (!entry) {
@@ -239,7 +368,16 @@ void WorkloadGenerator::run()
 {
   bool create_coll = false;
   int ops_run = 0;
+
+  struct timeval time_elapsed;
+  if (gettimeofday(&time_elapsed, NULL) < 0) {
+    dout(0) << __func__ << " gettimeofday error: " << strerror(errno) << dendl;
+    exit(1);
+  }
+
   do {
+    C_StatState *stat_state = NULL;
+
     if (m_num_ops && (ops_run == m_num_ops))
       break;
 
@@ -260,10 +398,51 @@ void WorkloadGenerator::run()
     bool destroy_collection = false;
     TestFileStoreState::coll_entry_t *entry = NULL;
 
+    int ret;
+    struct timeval tv_begin;
+    ret = gettimeofday(&tv_begin, NULL);
+    if (ret < 0) {
+      dout(0) << __func__ << " gettimeofday error: " << strerror(errno) << dendl;
+      exit(1);
+    }
+
+    if (m_do_stats) {
+      if (_diff_tvs(tv_begin, time_elapsed) > (m_stats_show_secs * 1000000)) {
+        m_stats_lock.Lock();
+        double duration = m_stats_duration / 1000000; // to secs
+        double throughput = (m_stats_written_data / duration);
+
+        dout(0) << __func__ << " written data: " << m_stats_written_data << " duration: " << m_stats_duration << dendl;
+
+        string unit;
+
+        // this is a bit ugly. an alternative should be pursued.
+        if (throughput < (1 << 10)) {
+          unit = "B";
+        } else if (throughput < (1 << 20)) {
+          unit = "KB";
+          throughput /= (1 << 10);
+        } else {
+          unit = "MB";
+          throughput /= (1 << 20);
+        }
+
+        dout(0) << "Throughput  " << throughput << " " << unit << "/s" << dendl;
+
+        m_stats_written_data = m_stats_duration = 0;
+
+        m_stats_lock.Unlock();
+
+        gettimeofday(&time_elapsed, NULL);
+      }
+
+      stat_state = new C_StatState(this, tv_begin);
+    }
+
     if (create_coll) {
       create_coll = false;
 
-      entry = do_create_collection(t);
+      entry = do_create_collection(t, stat_state);
       if (!entry) {
         dout(0) << __func__ << " something went terribly wrong creating coll" << dendl;
         break;
@@ -277,22 +456,28 @@ void WorkloadGenerator::run()
     entry = get_rnd_coll_entry(destroy_collection);
 
     if (destroy_collection) {
-      do_destroy_collection(t, entry);
+      do_destroy_collection(t, entry, stat_state);
       c = new C_OnDestroyed(this, t, entry);
       if (!m_num_ops)
         create_coll = true;
     } else {
       hobject_t *obj = get_rnd_obj(entry);
 
-      do_write_object(t, entry->m_coll, *obj);
-      do_setattr_object(t, entry->m_coll, *obj);
-      do_setattr_collection(t, entry->m_coll);
-      do_append_log(t, entry);
+      do_write_object(t, entry->m_coll, *obj, stat_state);
+      do_setattr_object(t, entry->m_coll, *obj, stat_state);
+      do_setattr_collection(t, entry->m_coll, stat_state);
+      do_append_log(t, entry, stat_state);
 
       c = new C_OnReadable(this, t);
     }
 
 queue_tx:
+
+    if (m_do_stats) {
+      Context *tmp = c;
+      c = new C_StatWrapper(stat_state, tmp);
+    }
+
     m_store->queue_transaction(&(entry->m_osr), t, c);
 
     inc_in_flight();
@@ -314,10 +499,9 @@ void WorkloadGenerator::print_results()
 
 }
 
-void usage(const char *name)
+void usage()
 {
-  if (name)
-    cout << "usage: " << name << "[options]" << std::endl;
+  cout << "usage: " << our_name << "[options]" << std::endl;
 
   cout << "\
 \n\
@@ -334,14 +518,35 @@ Test-specific Options:\n\
   --test-destroy-coll-per-N-trans VAL Set how many transactions to run before\n\
                                       destroying a collection.\n\
   --test-num-ops VAL                  Run a certain number of operations\n\
-                                      (a VAL of 0 runs the test forever)\
-    " << std::endl;
+                                      (a VAL of 0 runs the test forever)\n\
+   --test-suppress-ops OPS            Suppress ops specified in OPS\n\
+   --test-write-data-size SIZE        Specify SIZE for all data writes\n\
+   --test-write-xattr-obj-size SIZE   Specify SIZE for all xattrs on objects\n\
+   --test-write-xattr-coll-size SIZE  Specify SIZE for all xattrs on colls\n\
+   --test-write-pglog-size SIZE       Specify SIZE for all pglog writes\n\
+   --test-show-stats                  Show stats as we go\n\
+   --test-show-stats-period SECS      Show stats every SECS (default: 5)\n\
+\n\
+   SIZE is a numeric value that can be assumed as being bytes, or may be any\n\
+   other unit if specified: B or b, K or k, M or m, G or g.\n\
+      e.g., 1G = 1024M = 1048576k = 1073741824\n\
+\n\
+   OPS can be one or more of the following options:\n\
+      c    writes on collection's xattrs\n\
+      o    writes on object's xattr\n\
+      l    writes on pglog\n\
+      d    data writes on objects\n\
+\n\
+" << std::endl;
 }
 
 int main(int argc, const char *argv[])
 {
   vector<const char*> def_args;
   vector<const char*> args;
+
+  our_name = argv[0];
+
   def_args.push_back("--osd-journal-size");
   def_args.push_back("400");
   def_args.push_back("--osd-data");
index 544bba3a35d52ca75039795fbc0af0b36af721bd..8814df7ae51e3e620e95118ce6f418380c50526e 100644 (file)
 #include <boost/random/mersenne_twister.hpp>
 #include <boost/random/uniform_int.hpp>
 #include <map>
+#include <sys/time.h>
 
 #include "TestFileStoreState.h"
 
 typedef boost::mt11213b rngen_t;
 
 class WorkloadGenerator : public TestFileStoreState {
-public:
+ public:
   static const int max_in_flight = 50;
 
   static const int def_destroy_coll_every_nr_runs = 100;
@@ -42,7 +43,21 @@ public:
 
   static const size_t log_append_bytes = 1024;
 
-private:
+  struct C_StatState {
+    struct timeval tv_start;
+    unsigned int written_data;
+    WorkloadGenerator *wrkldgen;
+
+    C_StatState(WorkloadGenerator *state, struct timeval start)
+      : tv_start(start), written_data(0), wrkldgen(state) { }
+  };
+
+  static unsigned long long _diff_tvs(struct timeval& a, struct timeval& b) {
+    return (((a.tv_sec*1000000)+a.tv_usec) - ((b.tv_sec*1000000)+b.tv_usec));
+  }
+
+
+ protected:
   int m_num_ops;
   int m_destroy_coll_every_nr_runs;
   atomic_t m_nr_runs;
@@ -53,6 +68,27 @@ private:
 
   map<coll_t, uint64_t> pg_log_size;
 
+  size_t m_write_data_bytes;
+  size_t m_write_xattr_obj_bytes;
+  size_t m_write_xattr_coll_bytes;
+  size_t m_write_pglog_bytes;
+
+  bool m_suppress_write_data;
+  bool m_suppress_write_xattr_obj;
+  bool m_suppress_write_xattr_coll;
+  bool m_suppress_write_log;
+
+  bool m_do_stats;
+
+  size_t m_stats_written_data;
+  unsigned long long m_stats_duration;
+  Mutex m_stats_lock;
+  int m_stats_show_secs;
+
+ private:
+
+  void _suppress_ops_or_die(std::string& val);
+  size_t _parse_size_or_die(std::string& val);
   void init_args(vector<const char*> args);
 
   int get_uniform_random_value(int min, int max);
@@ -65,18 +101,22 @@ private:
   void get_filled_byte_array(bufferlist& bl, size_t size);
 
   void do_write_object(ObjectStore::Transaction *t,
-      coll_t coll, hobject_t obj);
+      coll_t coll, hobject_t obj, C_StatState *stat);
   void do_setattr_object(ObjectStore::Transaction *t,
-      coll_t coll, hobject_t obj);
-  void do_setattr_collection(ObjectStore::Transaction *t, coll_t coll);
-//  void do_append_log(ObjectStore::Transaction *t, coll_t coll);
-  void do_append_log(ObjectStore::Transaction *t, coll_entry_t *entry);
+      coll_t coll, hobject_t obj, C_StatState *stat);
+  void do_setattr_collection(ObjectStore::Transaction *t, coll_t coll,
+      C_StatState *stat);
+  void do_append_log(ObjectStore::Transaction *t, coll_entry_t *entry,
+      C_StatState *stat);
 
   bool should_destroy_collection() {
-    return ((int)m_nr_runs.read() >= m_destroy_coll_every_nr_runs);
+    return ((m_destroy_coll_every_nr_runs > 0) &&
+        ((int)m_nr_runs.read() >= m_destroy_coll_every_nr_runs));
   }
-  void do_destroy_collection(ObjectStore::Transaction *t, coll_entry_t *entry);
-  coll_entry_t *do_create_collection(ObjectStore::Transaction *t);
+  void do_destroy_collection(ObjectStore::Transaction *t, coll_entry_t *entry,
+      C_StatState *stat);
+  coll_entry_t *do_create_collection(ObjectStore::Transaction *t,
+      C_StatState *stat);
 
 public:
   WorkloadGenerator(vector<const char*> args);
@@ -113,6 +153,33 @@ public:
     }
   };
 
+  class C_StatWrapper : public Context {
+    C_StatState *stat_state;
+    Context *ctx;
+
+   public:
+    C_StatWrapper(C_StatState *state, Context *context)
+      : stat_state(state), ctx(context) { }
+
+    void finish(int r) {
+      ctx->finish(r);
+
+      struct timeval tv_end;
+      int ret = gettimeofday(&tv_end, NULL);
+      if (ret < 0) {
+        cout << "error on gettimeofday" << std::endl;
+        _exit(1);
+      }
+
+      unsigned long long usec_taken = _diff_tvs(tv_end, stat_state->tv_start);
+
+      stat_state->wrkldgen->m_stats_lock.Lock();
+      stat_state->wrkldgen->m_stats_duration += usec_taken;
+      stat_state->wrkldgen->m_stats_written_data += stat_state->written_data;
+      stat_state->wrkldgen->m_stats_lock.Unlock();
+    }
+  };
+
   void run(void);
   void print_results(void);
 };