stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
admin_ops_hook(NULL),
+ historic_ops_hook(NULL),
op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
return 0;
}
+class HistoricOpsSocketHook : public AdminSocketHook {
+ OSD *osd;
+public:
+ HistoricOpsSocketHook(OSD *o) : osd(o) {}
+ bool call(std::string command, std::string args, bufferlist& out) {
+ stringstream ss;
+ osd->dump_historic_ops(ss);
+ out.append(ss);
+ return true;
+ }
+};
+
+
class OpsFlightSocketHook : public AdminSocketHook {
OSD *osd;
public:
AdminSocket *admin_socket = cct->get_admin_socket();
r = admin_socket->register_command("dump_ops_in_flight", admin_ops_hook,
"show the ops currently in flight");
+ historic_ops_hook = new HistoricOpsSocketHook(this);
+ r = admin_socket->register_command("dump_historic_ops", historic_ops_hook,
+ "show slowest recent ops");
assert(r == 0);
return 0;
cct->get_admin_socket()->unregister_command("dump_ops_in_flight");
delete admin_ops_hook;
+ delete historic_ops_hook;
admin_ops_hook = NULL;
+ historic_ops_hook = NULL;
recovery_tp.stop();
dout(10) << "recovery tp stopped" << dendl;
return *_dout << "--OSD::tracker-- ";
}
+void OpHistory::insert(utime_t now, OpRequest *op) {
+ duration.insert(make_pair(op->get_duration(), op));
+ arrived.insert(make_pair(op->get_arrived(), op));
+ cleanup(now);
+}
+
+void OpHistory::cleanup(utime_t now) {
+ while (arrived.size() &&
+ now - arrived.begin()->first >
+ (double)(g_conf->osd_op_history_duration)) {
+ delete arrived.begin()->second;
+ duration.erase(make_pair(
+ arrived.begin()->second->get_duration(),
+ arrived.begin()->second));
+ arrived.erase(arrived.begin());
+ }
+
+ while (duration.size() > g_conf->osd_op_history_size) {
+ delete duration.begin()->second;
+ arrived.erase(make_pair(
+ duration.begin()->second->get_arrived(),
+ duration.begin()->second));
+ duration.erase(duration.begin());
+ }
+}
+
+void OpHistory::dump_ops(utime_t now, Formatter *f)
+{
+ cleanup(now);
+ f->open_object_section("OpHistory");
+ f->dump_int("num to keep", g_conf->osd_op_history_size);
+ f->dump_int("duration to keep", g_conf->osd_op_history_duration);
+ {
+ f->open_array_section("Ops");
+ for (set<pair<utime_t, const OpRequest *> >::const_iterator i =
+ arrived.begin();
+ i != arrived.end();
+ ++i) {
+ f->open_object_section("Op");
+ i->second->dump(now, f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void OpTracker::dump_historic_ops(ostream &ss)
+{
+ JSONFormatter jf(true);
+ Mutex::Locker locker(ops_in_flight_lock);
+ utime_t now = ceph_clock_now(g_ceph_context);
+ history.dump_ops(now, &jf);
+ jf.flush(ss);
+}
+
void OpTracker::dump_ops_in_flight(ostream &ss)
{
JSONFormatter jf(true);
jf.open_array_section("ops"); // list of OpRequests
utime_t now = ceph_clock_now(g_ceph_context);
for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
- stringstream name;
- Message *m = (*p)->request;
- m->print(name);
jf.open_object_section("op");
- jf.dump_string("description", name.str().c_str()); // this OpRequest
- jf.dump_stream("received_at") << (*p)->received_time;
- jf.dump_float("age", now - (*p)->received_time);
- jf.dump_string("flag_point", (*p)->state_string());
- if (m->get_orig_source().is_client()) {
- jf.open_object_section("client_info");
- stringstream client_name;
- client_name << m->get_orig_source();
- jf.dump_string("client", client_name.str());
- jf.dump_int("tid", m->get_tid());
- jf.close_section(); // client_info
- }
+ (*p)->dump(now, &jf);
jf.close_section(); // this OpRequest
}
jf.close_section(); // list of OpRequests
ops_in_flight.back()->seq = seq++;
}
-void OpTracker::unregister_inflight_op(xlist<OpRequest*>::item *i)
+void OpTracker::unregister_inflight_op(OpRequest *i)
{
Mutex::Locker locker(ops_in_flight_lock);
- assert(i->get_list() == &ops_in_flight);
- i->remove_myself();
+ assert(i->xitem.get_list() == &ops_in_flight);
+ utime_t now = ceph_clock_now(g_ceph_context);
+ i->xitem.remove_myself();
+ i->request->clear_data();
+ history.insert(now, i);
}
bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
return warning_vector.size();
}
+void OpRequest::dump(utime_t now, Formatter *f) const
+{
+ Message *m = request;
+ stringstream name;
+ m->print(name);
+ f->dump_string("description", name.str().c_str()); // this OpRequest
+ f->dump_stream("received_at") << received_time;
+ f->dump_float("age", now - received_time);
+ f->dump_float("duration", get_duration());
+ f->dump_string("flag_point", state_string());
+ if (m->get_orig_source().is_client()) {
+ f->open_object_section("client_info");
+ stringstream client_name;
+ client_name << m->get_orig_source();
+ f->dump_string("client", client_name.str());
+ f->dump_int("tid", m->get_tid());
+ f->close_section(); // client_info
+ }
+ {
+ f->open_array_section("events");
+ for (list<pair<utime_t, string> >::const_iterator i = events.begin();
+ i != events.end();
+ ++i) {
+ f->open_object_section("event");
+ f->dump_stream("time") << i->first;
+ f->dump_string("event", i->second);
+ f->close_section();
+ }
+ f->close_section();
+ }
+}
+
void OpTracker::mark_event(OpRequest *op, const string &dest)
{
utime_t now = ceph_clock_now(g_ceph_context);
void OpTracker::RemoveOnDelete::operator()(OpRequest *op) {
op->mark_event("done");
- tracker->unregister_inflight_op(&(op->xitem));
- delete op;
+ tracker->unregister_inflight_op(op);
+ // Do not delete op, unregister_inflight_op took control
}
OpRequestRef OpTracker::create_request(Message *ref)
void OpRequest::mark_event(const string &event)
{
+ utime_t now = ceph_clock_now(g_ceph_context);
+ {
+ Mutex::Locker l(lock);
+ events.push_back(make_pair(now, event));
+ }
tracker->mark_event(this, event);
}
#include "common/TrackedOp.h"
#include "osd/osd_types.h"
+class OpRequest;
+class OpHistory {
+ set<pair<utime_t, const OpRequest *> > arrived;
+ set<pair<double, const OpRequest *> > duration;
+ void cleanup(utime_t now);
+
+public:
+ void insert(utime_t now, OpRequest *op);
+ void dump_ops(utime_t now, Formatter *f);
+};
+
class OpRequest;
typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpTracker {
uint64_t seq;
Mutex ops_in_flight_lock;
xlist<OpRequest *> ops_in_flight;
+ OpHistory history;
public:
OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
void dump_ops_in_flight(std::ostream& ss);
+ void dump_historic_ops(std::ostream& ss);
void register_inflight_op(xlist<OpRequest*>::item *i);
- void unregister_inflight_op(xlist<OpRequest*>::item *i);
+ void unregister_inflight_op(OpRequest *i);
+
/**
* Look for Ops which are too old, and insert warning
* strings for each Op that is too old.
*/
struct OpRequest : public TrackedOp {
friend class OpTracker;
+ friend class OpHistory;
Message *request;
xlist<OpRequest*>::item xitem;
utime_t received_time;
uint8_t warn_interval_multiplier;
+ utime_t get_arrived() const {
+ return received_time;
+ }
+ double get_duration() const {
+ return events.size() ?
+ (events.rbegin()->first - received_time) :
+ 0.0;
+ }
+ void dump(utime_t now, Formatter *f) const;
private:
+ list<pair<utime_t, string> > events;
+ Mutex lock;
OpTracker *tracker;
osd_reqid_t reqid;
uint8_t hit_flag_points;
OpRequest(Message *req, OpTracker *tracker) :
request(req), xitem(this),
warn_interval_multiplier(1),
+ lock("OpRequest::lock"),
tracker(tracker),
seq(0) {
received_time = request->get_recv_stamp();
bool currently_started() { return latest_flag_point & flag_started; }
bool currently_sub_op_sent() { return latest_flag_point & flag_sub_op_sent; }
- const char *state_string() {
+ const char *state_string() const {
switch(latest_flag_point) {
case flag_queued_for_pg: return "queued for pg";
case flag_reached_pg: return "reached pg";