void OpHistory::on_shutdown()
{
+ Mutex::Locker history_lock(ops_history_lock);
arrived.clear();
duration.clear();
shutdown = true;
{
if (shutdown)
return;
+
+ Mutex::Locker history_lock(ops_history_lock);
duration.insert(make_pair(op->get_duration(), op));
arrived.insert(make_pair(op->get_initiated(), op));
cleanup(now);
void OpHistory::dump_ops(utime_t now, Formatter *f)
{
+ Mutex::Locker history_lock(ops_history_lock);
cleanup(now);
f->open_object_section("OpHistory");
f->dump_int("num to keep", history_size);
void OpTracker::dump_historic_ops(Formatter *f)
{
- Mutex::Locker locker(ops_in_flight_lock);
utime_t now = ceph_clock_now(cct);
history.dump_ops(now, f);
}
void OpTracker::dump_ops_in_flight(Formatter *f)
{
- Mutex::Locker locker(ops_in_flight_lock);
f->open_object_section("ops_in_flight"); // overall dump
- f->dump_int("num_ops", ops_in_flight.size());
+ uint64_t total_ops_in_flight = 0;
f->open_array_section("ops"); // list of TrackedOps
utime_t now = ceph_clock_now(cct);
- for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
- f->open_object_section("op");
- (*p)->dump(now, f);
- f->close_section(); // this TrackedOp
+ for (uint32_t i = 0; i < num_optracker_shards; i++) {
+ ShardedTrackingData* sdata = sharded_in_flight_list[i];
+ assert(NULL != sdata);
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+ for (xlist<TrackedOp*>::iterator p = sdata->ops_in_flight_sharded.begin(); !p.end(); ++p) {
+ f->open_object_section("op");
+ (*p)->dump(now, f);
+ f->close_section(); // this TrackedOp
+ total_ops_in_flight++;
+ }
}
f->close_section(); // list of TrackedOps
+ f->dump_int("num_ops", total_ops_in_flight);
f->close_section(); // overall dump
}
{
if (!tracking_enabled)
return;
- Mutex::Locker locker(ops_in_flight_lock);
- ops_in_flight.push_back(i);
- ops_in_flight.back()->seq = seq++;
+
+ uint64_t current_seq = seq.inc();
+ uint32_t shard_index = current_seq % num_optracker_shards;
+ ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
+ assert(NULL != sdata);
+ {
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+ sdata->ops_in_flight_sharded.push_back(i);
+ sdata->ops_in_flight_sharded.back()->seq = current_seq;
+ }
}
void OpTracker::unregister_inflight_op(TrackedOp *i)
// caller checks;
assert(tracking_enabled);
- Mutex::Locker locker(ops_in_flight_lock);
- assert(i->xitem.get_list() == &ops_in_flight);
+ uint32_t shard_index = i->seq % num_optracker_shards;
+ ShardedTrackingData* sdata = sharded_in_flight_list[shard_index];
+ assert(NULL != sdata);
+ {
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+ assert(i->xitem.get_list() == &sdata->ops_in_flight_sharded);
+ i->xitem.remove_myself();
+ }
utime_t now = ceph_clock_now(cct);
- i->xitem.remove_myself();
history.insert(now, TrackedOpRef(i));
}
bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
{
- Mutex::Locker locker(ops_in_flight_lock);
- if (!ops_in_flight.size()) // this covers tracking_enabled, too
- return false;
-
utime_t now = ceph_clock_now(cct);
utime_t too_old = now;
too_old -= complaint_time;
+ utime_t oldest_op;
+ uint64_t total_ops_in_flight = 0;
+ bool got_first_op = false;
+
+ for (uint32_t i = 0; i < num_optracker_shards; i++) {
+ ShardedTrackingData* sdata = sharded_in_flight_list[i];
+ assert(NULL != sdata);
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+ if (!sdata->ops_in_flight_sharded.empty()) {
+ utime_t oldest_op_tmp = sdata->ops_in_flight_sharded.front()->get_initiated();
+ if (!got_first_op) {
+ oldest_op = oldest_op_tmp;
+ got_first_op = true;
+ } else if (oldest_op_tmp < oldest_op) {
+ oldest_op = oldest_op_tmp;
+ }
+ }
+ total_ops_in_flight += sdata->ops_in_flight_sharded.size();
+ }
+
+ if (0 == total_ops_in_flight)
+ return false;
- utime_t oldest_secs = now - ops_in_flight.front()->get_initiated();
+ utime_t oldest_secs = now - oldest_op;
- dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
+ dout(10) << "ops_in_flight.size: " << total_ops_in_flight
<< "; oldest is " << oldest_secs
<< " seconds old" << dendl;
if (oldest_secs < complaint_time)
return false;
- xlist<TrackedOp*>::iterator i = ops_in_flight.begin();
warning_vector.reserve(log_threshold + 1);
int slow = 0; // total slow
int warned = 0; // total logged
- while (!i.end() && (*i)->get_initiated() < too_old) {
- slow++;
+ for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
+ ShardedTrackingData* sdata = sharded_in_flight_list[iter];
+ assert(NULL != sdata);
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+ if (sdata->ops_in_flight_sharded.empty())
+ continue;
+ xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();
+ while (!i.end() && (*i)->get_initiated() < too_old) {
+ slow++;
- // exponential backoff of warning intervals
- if (((*i)->get_initiated() +
+ // exponential backoff of warning intervals
+ if (((*i)->get_initiated() +
(complaint_time * (*i)->warn_interval_multiplier)) < now) {
// will warn
- if (warning_vector.empty())
- warning_vector.push_back("");
- warned++;
- if (warned > log_threshold)
- break;
-
- utime_t age = now - (*i)->get_initiated();
- stringstream ss;
- ss << "slow request " << age << " seconds old, received at "
- << (*i)->get_initiated() << ": ";
- (*i)->_dump_op_descriptor_unlocked(ss);
- ss << " currently "
- << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
- warning_vector.push_back(ss.str());
-
- // only those that have been shown will backoff
- (*i)->warn_interval_multiplier *= 2;
+ if (warning_vector.empty())
+ warning_vector.push_back("");
+ warned++;
+ if (warned > log_threshold)
+ break;
+
+ utime_t age = now - (*i)->get_initiated();
+ stringstream ss;
+ ss << "slow request " << age << " seconds old, received at "
+ << (*i)->get_initiated() << ": ";
+ (*i)->_dump_op_descriptor_unlocked(ss);
+ ss << " currently "
+ << ((*i)->current.size() ? (*i)->current : (*i)->state_string());
+ warning_vector.push_back(ss.str());
+
+ // only those that have been shown will backoff
+ (*i)->warn_interval_multiplier *= 2;
+ }
+ ++i;
}
- ++i;
}
// only summarize if we warn about any. if everything has backed
void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
{
- Mutex::Locker locker(ops_in_flight_lock);
-
h->clear();
utime_t now = ceph_clock_now(NULL);
unsigned bin = 30;
uint32_t lb = 1 << (bin-1); // lower bound for this bin
int count = 0;
- for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
- utime_t age = now - (*i)->get_initiated();
- uint32_t ms = (long)(age * 1000.0);
- if (ms >= lb) {
- count++;
- continue;
- }
- if (count)
- h->set_bin(bin, count);
- while (lb > ms) {
- bin--;
- lb >>= 1;
+
+ for (uint32_t iter = 0; iter < num_optracker_shards; iter++) {
+ ShardedTrackingData* sdata = sharded_in_flight_list[iter];
+ assert(NULL != sdata);
+ Mutex::Locker locker(sdata->ops_in_flight_lock_sharded);
+
+ for (xlist<TrackedOp*>::iterator i = sdata->ops_in_flight_sharded.begin();
+ !i.end(); ++i) {
+ utime_t age = now - (*i)->get_initiated();
+ uint32_t ms = (long)(age * 1000.0);
+ if (ms >= lb) {
+ count++;
+ continue;
+ }
+ if (count)
+ h->set_bin(bin, count);
+ while (lb > ms) {
+ bin--;
+ lb >>= 1;
+ }
+ count = 1;
}
- count = 1;
}
if (count)
h->set_bin(bin, count);
class OpHistory {
set<pair<utime_t, TrackedOpRef> > arrived;
set<pair<double, TrackedOpRef> > duration;
+ Mutex ops_history_lock;
void cleanup(utime_t now);
bool shutdown;
uint32_t history_size;
uint32_t history_duration;
public:
- OpHistory() : shutdown(false),
+ OpHistory() : ops_history_lock("OpHistory::Lock"), shutdown(false),
history_size(0), history_duration(0) {}
~OpHistory() {
assert(arrived.empty());
};
friend class RemoveOnDelete;
friend class OpHistory;
- uint64_t seq;
- Mutex ops_in_flight_lock;
- xlist<TrackedOp *> ops_in_flight;
+ atomic64_t seq;
+ struct ShardedTrackingData {
+ Mutex ops_in_flight_lock_sharded;
+ xlist<TrackedOp *> ops_in_flight_sharded;
+ ShardedTrackingData(string lock_name):
+ ops_in_flight_lock_sharded(lock_name.c_str()) {}
+ };
+ vector<ShardedTrackingData*> sharded_in_flight_list;
+ uint32_t num_optracker_shards;
OpHistory history;
float complaint_time;
int log_threshold;
public:
bool tracking_enabled;
CephContext *cct;
- OpTracker(CephContext *cct_, bool tracking) : seq(0), ops_in_flight_lock("OpTracker mutex"),
- complaint_time(0), log_threshold(0),
- tracking_enabled(tracking), cct(cct_) {}
+ OpTracker(CephContext *cct_, bool tracking, uint32_t num_shards) : seq(0),
+ num_optracker_shards(num_shards),
+ complaint_time(0), log_threshold(0),
+ tracking_enabled(tracking), cct(cct_) {
+
+ for (uint32_t i = 0; i < num_optracker_shards; i++) {
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s:%d", "OpTracker::ShardedLock", i);
+ ShardedTrackingData* one_shard = new ShardedTrackingData(lock_name);
+ sharded_in_flight_list.push_back(one_shard);
+ }
+ }
+
void set_complaint_and_threshold(float time, int threshold) {
complaint_time = time;
log_threshold = threshold;
utime_t time = ceph_clock_now(g_ceph_context));
void on_shutdown() {
- Mutex::Locker l(ops_in_flight_lock);
history.on_shutdown();
}
~OpTracker() {
- assert(ops_in_flight.empty());
+ while (!sharded_in_flight_list.empty()) {
+ assert((sharded_in_flight_list.back())->ops_in_flight_sharded.empty());
+ delete sharded_in_flight_list.back();
+ sharded_in_flight_list.pop_back();
+ }
}
template <typename T, typename U>