From: Somnath Roy Date: Tue, 9 Sep 2014 01:44:48 +0000 (-0700) Subject: OpTracker: Sharding logic is implemented to improve performance X-Git-Tag: v0.86~43^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3fac790133e26f5bf141324aa660c13188183b89;p=ceph.git OpTracker: Sharding logic is implemented to improve performance Enabling OpTracking is degrading performance significantly. The main reason for this is the contention in the ops_in_flight_lock during registration/unregistration. Sharding this lock and hence the xlist (storing the ops) helped to boost the performance. Sharding is been done based on the op seq number % num_shards. atomic64_t is used as data type to protect the global sequence number of OpTracker. Introduced a lock in the OpHistory to protect its data structure as ealier it was protected by global ops_in_flight_lock. This is only accessed during unregistration and thus not contended. Signed-off-by: Somnath Roy --- diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc index 11863a480194..8590c588f2de 100644 --- a/src/common/TrackedOp.cc +++ b/src/common/TrackedOp.cc @@ -30,6 +30,7 @@ static ostream& _prefix(std::ostream* _dout) void OpHistory::on_shutdown() { + Mutex::Locker history_lock(ops_history_lock); arrived.clear(); duration.clear(); shutdown = true; @@ -39,6 +40,8 @@ void OpHistory::insert(utime_t now, TrackedOpRef op) { if (shutdown) return; + + Mutex::Locker history_lock(ops_history_lock); duration.insert(make_pair(op->get_duration(), op)); arrived.insert(make_pair(op->get_initiated(), op)); cleanup(now); @@ -65,6 +68,7 @@ void OpHistory::cleanup(utime_t now) void OpHistory::dump_ops(utime_t now, Formatter *f) { + Mutex::Locker history_lock(ops_history_lock); cleanup(now); f->open_object_section("OpHistory"); f->dump_int("num to keep", history_size); @@ -86,24 +90,29 @@ void OpHistory::dump_ops(utime_t now, Formatter *f) void OpTracker::dump_historic_ops(Formatter *f) { - Mutex::Locker locker(ops_in_flight_lock); utime_t now = ceph_clock_now(cct); history.dump_ops(now, f); } void OpTracker::dump_ops_in_flight(Formatter *f) { - Mutex::Locker locker(ops_in_flight_lock); f->open_object_section("ops_in_flight"); // overall dump - f->dump_int("num_ops", ops_in_flight.size()); + uint64_t total_ops_in_flight = 0; f->open_array_section("ops"); // list of TrackedOps utime_t now = ceph_clock_now(cct); - for (xlist::iterator p = ops_in_flight.begin(); !p.end(); ++p) { - f->open_object_section("op"); - (*p)->dump(now, f); - f->close_section(); // this TrackedOp + for (uint32_t i = 0; i < num_optracker_shards; i++) { + ShardedTrackingData* sdata = sharded_in_flight_list[i]; + assert(NULL != sdata); + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + for (xlist::iterator p = sdata->ops_in_flight_sharded.begin(); !p.end(); ++p) { + f->open_object_section("op"); + (*p)->dump(now, f); + f->close_section(); // this TrackedOp + total_ops_in_flight++; + } } f->close_section(); // list of TrackedOps + f->dump_int("num_ops", total_ops_in_flight); f->close_section(); // overall dump } @@ -111,9 +120,16 @@ void OpTracker::register_inflight_op(xlist::item *i) { if (!tracking_enabled) return; - Mutex::Locker locker(ops_in_flight_lock); - ops_in_flight.push_back(i); - ops_in_flight.back()->seq = seq++; + + uint64_t current_seq = seq.inc(); + uint32_t shard_index = current_seq % num_optracker_shards; + ShardedTrackingData* sdata = sharded_in_flight_list[shard_index]; + assert(NULL != sdata); + { + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + sdata->ops_in_flight_sharded.push_back(i); + sdata->ops_in_flight_sharded.back()->seq = current_seq; + } } void OpTracker::unregister_inflight_op(TrackedOp *i) @@ -121,63 +137,93 @@ void OpTracker::unregister_inflight_op(TrackedOp *i) // caller checks; assert(tracking_enabled); - Mutex::Locker locker(ops_in_flight_lock); - assert(i->xitem.get_list() == &ops_in_flight); + uint32_t shard_index = i->seq % num_optracker_shards; + ShardedTrackingData* sdata = sharded_in_flight_list[shard_index]; + assert(NULL != sdata); + { + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + assert(i->xitem.get_list() == &sdata->ops_in_flight_sharded); + i->xitem.remove_myself(); + } utime_t now = ceph_clock_now(cct); - i->xitem.remove_myself(); history.insert(now, TrackedOpRef(i)); } bool OpTracker::check_ops_in_flight(std::vector &warning_vector) { - Mutex::Locker locker(ops_in_flight_lock); - if (!ops_in_flight.size()) // this covers tracking_enabled, too - return false; - utime_t now = ceph_clock_now(cct); utime_t too_old = now; too_old -= complaint_time; + utime_t oldest_op; + uint64_t total_ops_in_flight = 0; + bool got_first_op = false; + + for (uint32_t i = 0; i < num_optracker_shards; i++) { + ShardedTrackingData* sdata = sharded_in_flight_list[i]; + assert(NULL != sdata); + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + if (!sdata->ops_in_flight_sharded.empty()) { + utime_t oldest_op_tmp = sdata->ops_in_flight_sharded.front()->get_initiated(); + if (!got_first_op) { + oldest_op = oldest_op_tmp; + got_first_op = true; + } else if (oldest_op_tmp < oldest_op) { + oldest_op = oldest_op_tmp; + } + } + total_ops_in_flight += sdata->ops_in_flight_sharded.size(); + } + + if (0 == total_ops_in_flight) + return false; - utime_t oldest_secs = now - ops_in_flight.front()->get_initiated(); + utime_t oldest_secs = now - oldest_op; - dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + dout(10) << "ops_in_flight.size: " << total_ops_in_flight << "; oldest is " << oldest_secs << " seconds old" << dendl; if (oldest_secs < complaint_time) return false; - xlist::iterator i = ops_in_flight.begin(); warning_vector.reserve(log_threshold + 1); int slow = 0; // total slow int warned = 0; // total logged - while (!i.end() && (*i)->get_initiated() < too_old) { - slow++; + for (uint32_t iter = 0; iter < num_optracker_shards; iter++) { + ShardedTrackingData* sdata = sharded_in_flight_list[iter]; + assert(NULL != sdata); + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + if (sdata->ops_in_flight_sharded.empty()) + continue; + xlist::iterator i = sdata->ops_in_flight_sharded.begin(); + while (!i.end() && (*i)->get_initiated() < too_old) { + slow++; - // exponential backoff of warning intervals - if (((*i)->get_initiated() + + // exponential backoff of warning intervals + if (((*i)->get_initiated() + (complaint_time * (*i)->warn_interval_multiplier)) < now) { // will warn - if (warning_vector.empty()) - warning_vector.push_back(""); - warned++; - if (warned > log_threshold) - break; - - utime_t age = now - (*i)->get_initiated(); - stringstream ss; - ss << "slow request " << age << " seconds old, received at " - << (*i)->get_initiated() << ": "; - (*i)->_dump_op_descriptor_unlocked(ss); - ss << " currently " - << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); - warning_vector.push_back(ss.str()); - - // only those that have been shown will backoff - (*i)->warn_interval_multiplier *= 2; + if (warning_vector.empty()) + warning_vector.push_back(""); + warned++; + if (warned > log_threshold) + break; + + utime_t age = now - (*i)->get_initiated(); + stringstream ss; + ss << "slow request " << age << " seconds old, received at " + << (*i)->get_initiated() << ": "; + (*i)->_dump_op_descriptor_unlocked(ss); + ss << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); + warning_vector.push_back(ss.str()); + + // only those that have been shown will backoff + (*i)->warn_interval_multiplier *= 2; + } + ++i; } - ++i; } // only summarize if we warn about any. if everything has backed @@ -194,28 +240,34 @@ bool OpTracker::check_ops_in_flight(std::vector &warning_vector) void OpTracker::get_age_ms_histogram(pow2_hist_t *h) { - Mutex::Locker locker(ops_in_flight_lock); - h->clear(); utime_t now = ceph_clock_now(NULL); unsigned bin = 30; uint32_t lb = 1 << (bin-1); // lower bound for this bin int count = 0; - for (xlist::iterator i = ops_in_flight.begin(); !i.end(); ++i) { - utime_t age = now - (*i)->get_initiated(); - uint32_t ms = (long)(age * 1000.0); - if (ms >= lb) { - count++; - continue; - } - if (count) - h->set_bin(bin, count); - while (lb > ms) { - bin--; - lb >>= 1; + + for (uint32_t iter = 0; iter < num_optracker_shards; iter++) { + ShardedTrackingData* sdata = sharded_in_flight_list[iter]; + assert(NULL != sdata); + Mutex::Locker locker(sdata->ops_in_flight_lock_sharded); + + for (xlist::iterator i = sdata->ops_in_flight_sharded.begin(); + !i.end(); ++i) { + utime_t age = now - (*i)->get_initiated(); + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set_bin(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; } - count = 1; } if (count) h->set_bin(bin, count); diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index b08c2aa6f398..5db7fd02b04a 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -29,13 +29,14 @@ class OpTracker; class OpHistory { set > arrived; set > duration; + Mutex ops_history_lock; void cleanup(utime_t now); bool shutdown; uint32_t history_size; uint32_t history_duration; public: - OpHistory() : shutdown(false), + OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false), history_size(0), history_duration(0) {} ~OpHistory() { assert(arrived.empty()); @@ -59,9 +60,15 @@ class OpTracker { }; friend class RemoveOnDelete; friend class OpHistory; - uint64_t seq; - Mutex ops_in_flight_lock; - xlist ops_in_flight; + atomic64_t seq; + struct ShardedTrackingData { + Mutex ops_in_flight_lock_sharded; + xlist ops_in_flight_sharded; + ShardedTrackingData(string lock_name): + ops_in_flight_lock_sharded(lock_name.c_str()) {} + }; + vector sharded_in_flight_list; + uint32_t num_optracker_shards; OpHistory history; float complaint_time; int log_threshold; @@ -70,9 +77,19 @@ class OpTracker { public: bool tracking_enabled; CephContext *cct; - OpTracker(CephContext *cct_, bool tracking) : seq(0), ops_in_flight_lock("OpTracker mutex"), - complaint_time(0), log_threshold(0), - tracking_enabled(tracking), cct(cct_) {} + OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards) : seq(0), + num_optracker_shards(num_shards), + complaint_time(0), log_threshold(0), + tracking_enabled(tracking), cct(cct_) { + + for (uint32_t i = 0; i < num_optracker_shards; i++) { + char lock_name[32] = {0}; + snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i); + ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name); + sharded_in_flight_list.push_back(one_shard); + } + } + void set_complaint_and_threshold(float time, int threshold) { complaint_time = time; log_threshold = threshold; @@ -100,11 +117,14 @@ public: utime_t time = ceph_clock_now(g_ceph_context)); void on_shutdown() { - Mutex::Locker l(ops_in_flight_lock); history.on_shutdown(); } ~OpTracker() { - assert(ops_in_flight.empty()); + while (!sharded_in_flight_list.empty()) { + assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty()); + delete sharded_in_flight_list.back(); + sharded_in_flight_list.pop_back(); + } } template diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b4486699c932..b7e1b3154fda 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -576,6 +576,7 @@ OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false) OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false) OPTION(osd_debug_reject_backfill_probability, OPT_DOUBLE, 0) OPTION(osd_enable_op_tracker, OPT_BOOL, true) // enable/disable OSD op tracking +OPTION(osd_num_op_tracker_shard, OPT_U32, 32) // The number of shards for holding the ops OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 1247c5771a82..0788f66a9dda 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -102,7 +102,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : messenger(m), monc(mc), log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS), - op_tracker(cct, m->cct->_conf->mds_enable_op_tracker), + op_tracker(cct, m->cct->_conf->mds_enable_op_tracker, + m->cct->_conf->osd_num_op_tracker_shard), finisher(cct), sessionmap(this), progress_thread(this), diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 91f8bba3ddd9..ff90b4f61fac 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1660,7 +1660,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, heartbeat_thread(this), heartbeat_dispatcher(this), finished_lock("OSD::finished_lock"), - op_tracker(cct, cct->_conf->osd_enable_op_tracker), + op_tracker(cct, cct->_conf->osd_enable_op_tracker, + cct->_conf->osd_num_op_tracker_shard), test_ops_hook(NULL), op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &osd_op_tp),