]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OpTracker: Sharding logic is implemented to improve performance
authorSomnath Roy <somnath.roy@sandisk.com>
Tue, 9 Sep 2014 01:44:48 +0000 (18:44 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Thu, 11 Sep 2014 18:18:04 +0000 (11:18 -0700)
Enabling OpTracking is degrading performance significantly. The
main reason for this is the contention in the ops_in_flight_lock
during registration/unregistration. Sharding this lock and hence
the xlist (storing the ops) helped to boost the performance.
Sharding is been done based on the op seq number % num_shards.
atomic64_t is used as data type to protect the global sequence
number of OpTracker. Introduced a lock in the OpHistory to
protect its data structure as ealier it was protected by global
ops_in_flight_lock. This is only accessed during unregistration
and thus not contended.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/common/TrackedOp.cc
src/common/TrackedOp.h
src/common/config_opts.h
src/mds/MDS.cc
src/osd/OSD.cc

index 11863a48019416e31f294d07eaf52dcf116a2f6f..8590c588f2de2bcfc363ac96a4185cf711adc6b8 100644 (file)
@@ -30,6 +30,7 @@ static ostream& _prefix(std::ostream* _dout)
 
 void OpHistory::on_shutdown()
 {
+  Mutex::Locker history_lock(ops_history_lock);
   arrived.clear();
   duration.clear();
   shutdown = true;
@@ -39,6 +40,8 @@ void OpHistory::insert(utime_t now, TrackedOpRef op)
 {
   if (shutdown)
     return;
+
+  Mutex::Locker history_lock(ops_history_lock);
   duration.insert(make_pair(op->get_duration(), op));
   arrived.insert(make_pair(op->get_initiated(), op));
   cleanup(now);
@@ -65,6 +68,7 @@ void OpHistory::cleanup(utime_t now)
 
 void OpHistory::dump_ops(utime_t now, Formatter *f)
 {
+  Mutex::Locker history_lock(ops_history_lock);
   cleanup(now);
   f->open_object_section("OpHistory");
   f->dump_int("num to keep", history_size);
@@ -86,24 +90,29 @@ void OpHistory::dump_ops(utime_t now, Formatter *f)
 
 void OpTracker::dump_historic_ops(Formatter *f)
 {
-  Mutex::Locker locker(ops_in_flight_lock);
   utime_t now = ceph_clock_now(cct);
   history.dump_ops(now, f);
 }
 
 void OpTracker::dump_ops_in_flight(Formatter *f)
 {
-  Mutex::Locker locker(ops_in_flight_lock);
   f->open_object_section("ops_in_flight"); // overall dump
-  f->dump_int("num_ops", ops_in_flight.size());
+  uint64_t total_ops_in_flight = 0;
   f->open_array_section("ops"); // list of TrackedOps
   utime_t now = ceph_clock_now(cct);
-  for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
-    f->open_object_section("op");
-    (*p)->dump(now, f);
-    f->close_section(); // this TrackedOp
+  for (uint32_t i = 0; i < num_optracker_shards; i++) {
+    ShardedTrackingData* sdata = sharded_in_flight_list[i];
+    assert(NULL != sdata); 
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);    
+    for (xlist<TrackedOp*>::iterator p = sdata->ops_in_flight_sharded.begin(); !p.end(); ++p) {
+      f->open_object_section("op");
+      (*p)->dump(now, f);
+      f->close_section(); // this TrackedOp
+      total_ops_in_flight++;
+    }
   }
   f->close_section(); // list of TrackedOps
+  f->dump_int("num_ops", total_ops_in_flight);
   f->close_section(); // overall dump
 }
 
@@ -111,9 +120,16 @@ void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i)
 {
   if (!tracking_enabled)
     return;
-  Mutex::Locker locker(ops_in_flight_lock);
-  ops_in_flight.push_back(i);
-  ops_in_flight.back()->seq = seq++;
+
+  uint64_t current_seq = seq.inc();
+  uint32_t shard_index = current_seq % num_optracker_shards;
+  ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
+  assert(NULL != sdata);
+  {
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+    sdata->ops_in_flight_sharded.push_back(i);
+    sdata->ops_in_flight_sharded.back()->seq = current_seq;
+  }
 }
 
 void OpTracker::unregister_inflight_op(TrackedOp *i)
@@ -121,63 +137,93 @@ void OpTracker::unregister_inflight_op(TrackedOp *i)
   // caller checks;
   assert(tracking_enabled);
 
-  Mutex::Locker locker(ops_in_flight_lock);
-  assert(i->xitem.get_list() == &ops_in_flight);
+  uint32_t shard_index = i->seq % num_optracker_shards;
+  ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
+  assert(NULL != sdata);
+  {
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+    assert(i->xitem.get_list() == &sdata->ops_in_flight_sharded);
+    i->xitem.remove_myself();
+  }
   utime_t now = ceph_clock_now(cct);
-  i->xitem.remove_myself();
   history.insert(now, TrackedOpRef(i));
 }
 
 bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
 {
-  Mutex::Locker locker(ops_in_flight_lock);
-  if (!ops_in_flight.size()) // this covers tracking_enabled, too
-    return false;
-
   utime_t now = ceph_clock_now(cct);
   utime_t too_old = now;
   too_old -= complaint_time;
+  utime_t oldest_op;
+  uint64_t total_ops_in_flight = 0;
+  bool got_first_op = false;
+
+  for (uint32_t i = 0; i < num_optracker_shards; i++) {
+    ShardedTrackingData* sdata = sharded_in_flight_list[i];
+    assert(NULL != sdata);
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+    if (!sdata->ops_in_flight_sharded.empty()) {
+      utime_t oldest_op_tmp = sdata->ops_in_flight_sharded.front()->get_initiated();
+      if (!got_first_op) {
+        oldest_op = oldest_op_tmp;
+        got_first_op = true;
+      } else if (oldest_op_tmp < oldest_op) {
+        oldest_op = oldest_op_tmp;
+      }
+    } 
+    total_ops_in_flight += sdata->ops_in_flight_sharded.size();
+  }
+      
+  if (0 == total_ops_in_flight)
+    return false;
 
-  utime_t oldest_secs = now - ops_in_flight.front()->get_initiated();
+  utime_t oldest_secs = now - oldest_op;
 
-  dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
+  dout(10) << "ops_in_flight.size: " << total_ops_in_flight
            << "; oldest is " << oldest_secs
            << " seconds old" << dendl;
 
   if (oldest_secs < complaint_time)
     return false;
 
-  xlist<TrackedOp*>::iterator i = ops_in_flight.begin();
   warning_vector.reserve(log_threshold + 1);
 
   int slow = 0;     // total slow
   int warned = 0;   // total logged
-  while (!i.end() && (*i)->get_initiated() < too_old) {
-    slow++;
+  for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
+    ShardedTrackingData* sdata = sharded_in_flight_list[iter];
+    assert(NULL != sdata);
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+    if (sdata->ops_in_flight_sharded.empty())
+      continue;
+    xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();    
+    while (!i.end() && (*i)->get_initiated() < too_old) {
+      slow++;
 
-    // exponential backoff of warning intervals
-    if (((*i)->get_initiated() +
+      // exponential backoff of warning intervals
+      if (((*i)->get_initiated() +
         (complaint_time * (*i)->warn_interval_multiplier)) < now) {
       // will warn
-      if (warning_vector.empty())
-       warning_vector.push_back("");
-      warned++;
-      if (warned > log_threshold)
-        break;
-
-      utime_t age = now - (*i)->get_initiated();
-      stringstream ss;
-      ss << "slow request " << age << " seconds old, received at "
-         << (*i)->get_initiated() << ": ";
-      (*i)->_dump_op_descriptor_unlocked(ss);
-      ss << " currently "
-        << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
-      warning_vector.push_back(ss.str());
-
-      // only those that have been shown will backoff
-      (*i)->warn_interval_multiplier *= 2;
+        if (warning_vector.empty())
+          warning_vector.push_back("");
+        warned++;
+        if (warned > log_threshold)
+          break;
+
+        utime_t age = now - (*i)->get_initiated();
+        stringstream ss;
+        ss << "slow request " << age << " seconds old, received at "
+           << (*i)->get_initiated() << ": ";
+        (*i)->_dump_op_descriptor_unlocked(ss);
+        ss << " currently "
+          << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
+        warning_vector.push_back(ss.str());
+
+        // only those that have been shown will backoff
+        (*i)->warn_interval_multiplier *= 2;
+      }
+      ++i;
     }
-    ++i;
   }
 
   // only summarize if we warn about any.  if everything has backed
@@ -194,28 +240,34 @@ bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
 
 void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
 {
-  Mutex::Locker locker(ops_in_flight_lock);
-
   h->clear();
 
   utime_t now = ceph_clock_now(NULL);
   unsigned bin = 30;
   uint32_t lb = 1 << (bin-1);  // lower bound for this bin
   int count = 0;
-  for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
-    utime_t age = now - (*i)->get_initiated();
-    uint32_t ms = (long)(age * 1000.0);
-    if (ms >= lb) {
-      count++;
-      continue;
-    }
-    if (count)
-      h->set_bin(bin, count);
-    while (lb > ms) {
-      bin--;
-      lb >>= 1;
+  
+  for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
+    ShardedTrackingData* sdata = sharded_in_flight_list[iter];
+    assert(NULL != sdata);
+    Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+
+    for (xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();
+                                                               !i.end(); ++i) {
+      utime_t age = now - (*i)->get_initiated();
+      uint32_t ms = (long)(age * 1000.0);
+      if (ms >= lb) {
+        count++;
+        continue;
+      }
+      if (count)
+        h->set_bin(bin, count);
+      while (lb > ms) {
+        bin--;
+        lb >>= 1;
+      }
+      count = 1;
     }
-    count = 1;
   }
   if (count)
     h->set_bin(bin, count);
index b08c2aa6f398d36b9dcb5865f15d110d8e11a6b1..5db7fd02b04a221fbd4fac11d75d298efe23b796 100644 (file)
@@ -29,13 +29,14 @@ class OpTracker;
 class OpHistory {
   set<pair<utime_t, TrackedOpRef> > arrived;
   set<pair<double, TrackedOpRef> > duration;
+  Mutex ops_history_lock;
   void cleanup(utime_t now);
   bool shutdown;
   uint32_t history_size;
   uint32_t history_duration;
 
 public:
-  OpHistory() : shutdown(false),
+  OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
   history_size(0), history_duration(0) {}
   ~OpHistory() {
     assert(arrived.empty());
@@ -59,9 +60,15 @@ class OpTracker {
   };
   friend class RemoveOnDelete;
   friend class OpHistory;
-  uint64_t seq;
-  Mutex ops_in_flight_lock;
-  xlist<TrackedOp *> ops_in_flight;
+  atomic64_t seq;
+  struct ShardedTrackingData {
+    Mutex ops_in_flight_lock_sharded;
+    xlist<TrackedOp *> ops_in_flight_sharded;
+    ShardedTrackingData(string lock_name):
+        ops_in_flight_lock_sharded(lock_name.c_str()) {}
+  };
+  vector<ShardedTrackingData*> sharded_in_flight_list;
+  uint32_t num_optracker_shards;
   OpHistory history;
   float complaint_time;
   int log_threshold;
@@ -70,9 +77,19 @@ class OpTracker {
 public:
   bool tracking_enabled;
   CephContext *cct;
-  OpTracker(CephContext *cct_, bool tracking) : seq(0), ops_in_flight_lock("OpTracker mutex"),
-                                               complaint_time(0), log_threshold(0),
-                                               tracking_enabled(tracking), cct(cct_) {}
+  OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards) : seq(0), 
+                                     num_optracker_shards(num_shards),
+                                    complaint_time(0), log_threshold(0),
+                                    tracking_enabled(tracking), cct(cct_) {
+
+    for (uint32_t i = 0; i < num_optracker_shards; i++) {
+      char lock_name[32] = {0};
+      snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i);
+      ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
+      sharded_in_flight_list.push_back(one_shard);
+    }
+  }
+      
   void set_complaint_and_threshold(float time, int threshold) {
     complaint_time = time;
     log_threshold = threshold;
@@ -100,11 +117,14 @@ public:
                           utime_t time = ceph_clock_now(g_ceph_context));
 
   void on_shutdown() {
-    Mutex::Locker l(ops_in_flight_lock);
     history.on_shutdown();
   }
   ~OpTracker() {
-    assert(ops_in_flight.empty());
+    while (!sharded_in_flight_list.empty()) {
+      assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
+      delete sharded_in_flight_list.back();
+      sharded_in_flight_list.pop_back();
+    }    
   }
 
   template <typename T, typename U>
index b4486699c932f2e137a91d7d845e2617907b281c..b7e1b3154fda8ee0f8e2366d50a40a91ff619444 100644 (file)
@@ -576,6 +576,7 @@ OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false)
 OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false)
 OPTION(osd_debug_reject_backfill_probability, OPT_DOUBLE, 0)
 OPTION(osd_enable_op_tracker, OPT_BOOL, true) // enable/disable OSD op tracking
+OPTION(osd_num_op_tracker_shard, OPT_U32, 32) // The number of shards for holding the ops 
 OPTION(osd_op_history_size, OPT_U32, 20)    // Max number of completed ops to track
 OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
 OPTION(osd_target_transaction_size, OPT_INT, 30)     // to adjust various transactions that batch smaller items
index 1247c5771a82b039a61d4a712e6b8936b6790103..0788f66a9dda326a848383542cec70cff9ca23eb 100644 (file)
@@ -102,7 +102,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
   messenger(m),
   monc(mc),
   log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
-  op_tracker(cct, m->cct->_conf->mds_enable_op_tracker),
+  op_tracker(cct, m->cct->_conf->mds_enable_op_tracker, 
+                     m->cct->_conf->osd_num_op_tracker_shard),
   finisher(cct),
   sessionmap(this),
   progress_thread(this),
index 91f8bba3ddd9a3ace19e77785bbf58de1bd73680..ff90b4f61fac43380b15f1c2524edc34f0d63a21 100644 (file)
@@ -1660,7 +1660,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   heartbeat_thread(this),
   heartbeat_dispatcher(this),
   finished_lock("OSD::finished_lock"),
-  op_tracker(cct, cct->_conf->osd_enable_op_tracker),
+  op_tracker(cct, cct->_conf->osd_enable_op_tracker, 
+                  cct->_conf->osd_num_op_tracker_shard),
   test_ops_hook(NULL),
   op_shardedwq(cct->_conf->osd_op_num_shards, this, 
     cct->_conf->osd_op_thread_timeout, &osd_op_tp),