From 3ed784c996415ad5cb24e108ccff5c8c20e363d8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 23 Mar 2012 12:48:19 -0700 Subject: [PATCH] osd/: add mark_event to OpRequest and move tracking into OpTracker Signed-off-by: Samuel Just --- src/Makefile.am | 3 +- src/common/config_opts.h | 1 + src/osd/OSD.cc | 77 +++-------------------------- src/osd/OSD.h | 11 +---- src/osd/OpRequest.cc | 103 +++++++++++++++++++++++++++++++++++++++ src/osd/OpRequest.h | 48 ++++++++++++++---- src/osd/ReplicatedPG.cc | 6 +++ 7 files changed, 160 insertions(+), 89 deletions(-) create mode 100644 src/osd/OpRequest.cc diff --git a/src/Makefile.am b/src/Makefile.am index 4440d046fde2..741a47be2be3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1069,7 +1069,8 @@ libosd_la_SOURCES = \ osd/OSD.cc \ osd/OSDCaps.cc \ osd/Watch.cc \ - osd/ClassHandler.cc + osd/ClassHandler.cc \ + osd/OpRequest.cc libosd_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS} libosd_la_LIBADD = libglobal.la noinst_LTLIBRARIES += libosd.la diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 3a8ae15ee317..236697ac01d3 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -57,6 +57,7 @@ OPTION(debug_journaler, OPT_INT, 0) OPTION(debug_objectcacher, OPT_INT, 0) OPTION(debug_client, OPT_INT, 0) OPTION(debug_osd, OPT_INT, 0) +OPTION(debug_optracker, OPT_INT, 10) OPTION(debug_objclass, OPT_INT, 0) OPTION(debug_filestore, OPT_INT, 1) OPTION(debug_journal, OPT_INT, 1) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index bf904eeffb56..a8dab77ddc46 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -553,7 +553,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, heartbeat_dispatcher(this), stat_lock("OSD::stat_lock"), finished_lock("OSD::finished_lock"), - ops_in_flight_lock("OSD::ops_in_flight_lock"), admin_ops_hook(NULL), op_queue_len(0), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), @@ -1872,77 +1871,15 @@ void OSD::tick() void OSD::check_ops_in_flight() { - ops_in_flight_lock.Lock(); - if (ops_in_flight.size()) { - utime_t now = ceph_clock_now(g_ceph_context); - utime_t too_old = now; - too_old -= g_conf->osd_op_complaint_time; - dout(1) << "ops_in_flight.size: " << ops_in_flight.size() - << "; oldest is " << now - ops_in_flight.front()->received_time - << " seconds old" << dendl; - xlist::iterator i = ops_in_flight.begin(); - while (!i.end() && (*i)->received_time < too_old) { - // exponential backoff of warning intervals - if ( ( (*i)->received_time + - (g_conf->osd_op_complaint_time * - (*i)->warn_interval_multiplier) )< now) { - stringstream ss; - ss << "old request " << *((*i)->request) << " received at " - << (*i)->received_time << " currently " << (*i)->state_string(); - clog.warn(ss); - (*i)->warn_interval_multiplier *= 2; - } - ++i; - } - } - ops_in_flight_lock.Unlock(); + stringstream ss; + if (op_tracker.check_ops_in_flight(ss)) + clog.warn(ss); + return; } void OSD::dump_ops_in_flight(ostream& ss) { - JSONFormatter jf(true); - Mutex::Locker locker(ops_in_flight_lock); - jf.open_object_section("ops_in_flight"); // overall dump - jf.dump_int("num_ops", ops_in_flight.size()); - jf.open_array_section("ops"); // list of OpRequests - utime_t now = ceph_clock_now(g_ceph_context); - for (xlist::iterator p = ops_in_flight.begin(); !p.end(); ++p) { - stringstream name; - Message *m = (*p)->request; - m->print(name); - jf.open_object_section("op"); - jf.dump_string("description", name.str().c_str()); // this OpRequest - jf.dump_stream("received_at") << (*p)->received_time; - jf.dump_float("age", now - (*p)->received_time); - jf.dump_string("flag_point", (*p)->state_string()); - if (m->get_orig_source().is_client()) { - jf.open_object_section("client_info"); - stringstream client_name; - client_name << m->get_orig_source(); - jf.dump_string("client", client_name.str()); - jf.dump_int("tid", m->get_tid()); - jf.close_section(); // client_info - } - jf.close_section(); // this OpRequest - } - jf.close_section(); // list of OpRequests - jf.close_section(); // overall dump - jf.flush(ss); -} - -void OSD::register_inflight_op(xlist::item *i) -{ - ops_in_flight_lock.Lock(); - ops_in_flight.push_back(i); - ops_in_flight_lock.Unlock(); -} - -void OSD::unregister_inflight_op(xlist::item *i) -{ - ops_in_flight_lock.Lock(); - assert(i->get_list() == &ops_in_flight); - i->remove_myself(); - ops_in_flight_lock.Unlock(); + op_tracker.dump_ops_in_flight(ss); } // ========================================= @@ -2960,8 +2897,8 @@ void OSD::_dispatch(Message *m) default: { - OpRequest *op = new OpRequest(m, this); - register_inflight_op(&op->xitem); + OpRequest *op = new OpRequest(m, &op_tracker); + op->mark_event("waiting_for_osdmap"); // no map? starting up? if (!osdmap) { dout(7) << "no OSDMap, not booted" << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index d48d7c0bd259..f6aaa708e33e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -45,6 +45,7 @@ using namespace std; #include using namespace __gnu_cxx; +#include "OpRequest.h" #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ @@ -120,7 +121,6 @@ class ReplicatedPG; class AuthAuthorizeHandlerRegistry; -class OpRequest; class OpsFlightSocketHook; extern const coll_t meta_coll; @@ -331,16 +331,9 @@ private: void do_waiters(); // -- op tracking -- - xlist ops_in_flight; - /** This is an inner lock that is taken by the following three - * functions without regard for what locks the callers hold. It - * protects the xlist, but not the OpRequests. */ - Mutex ops_in_flight_lock; - void register_inflight_op(xlist::item *i); + OpTracker op_tracker; void check_ops_in_flight(); - void unregister_inflight_op(xlist::item *i); void dump_ops_in_flight(ostream& ss); - friend struct OpRequest; friend class OpsFlightSocketHook; OpsFlightSocketHook *admin_ops_hook; diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc new file mode 100644 index 000000000000..fda2f3d46a8f --- /dev/null +++ b/src/osd/OpRequest.cc @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include "OpRequest.h" +#include "common/Formatter.h" +#include +#include "common/debug.h" +#include "common/config.h" + +#define DOUT_SUBSYS optracker +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "--OSD::tracker-- "; +} + +void OpTracker::dump_ops_in_flight(ostream &ss) +{ + JSONFormatter jf(true); + Mutex::Locker locker(ops_in_flight_lock); + jf.open_object_section("ops_in_flight"); // overall dump + jf.dump_int("num_ops", ops_in_flight.size()); + jf.open_array_section("ops"); // list of OpRequests + utime_t now = ceph_clock_now(g_ceph_context); + for (xlist::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + stringstream name; + Message *m = (*p)->request; + m->print(name); + jf.open_object_section("op"); + jf.dump_string("description", name.str().c_str()); // this OpRequest + jf.dump_stream("received_at") << (*p)->received_time; + jf.dump_float("age", now - (*p)->received_time); + jf.dump_string("flag_point", (*p)->state_string()); + if (m->get_orig_source().is_client()) { + jf.open_object_section("client_info"); + stringstream client_name; + client_name << m->get_orig_source(); + jf.dump_string("client", client_name.str()); + jf.dump_int("tid", m->get_tid()); + jf.close_section(); // client_info + } + jf.close_section(); // this OpRequest + } + jf.close_section(); // list of OpRequests + jf.close_section(); // overall dump + jf.flush(ss); +} + +void OpTracker::register_inflight_op(xlist::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + ops_in_flight.push_back(i); + ops_in_flight.back()->seq = seq++; +} + +void OpTracker::unregister_inflight_op(xlist::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + assert(i->get_list() == &ops_in_flight); + i->remove_myself(); +} + +bool OpTracker::check_ops_in_flight(ostream &out) +{ + Mutex::Locker locker(ops_in_flight_lock); + if (!ops_in_flight.size()) + return false; + + utime_t now = ceph_clock_now(g_ceph_context); + utime_t too_old = now; + too_old -= g_conf->osd_op_complaint_time; + + dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + << "; oldest is " << now - ops_in_flight.front()->received_time + << " seconds old" << dendl; + xlist::iterator i = ops_in_flight.begin(); + while (!i.end() && (*i)->received_time < too_old) { + // exponential backoff of warning intervals + if ( ( (*i)->received_time + + (g_conf->osd_op_complaint_time * + (*i)->warn_interval_multiplier) )< now) { + out << "old request " << *((*i)->request) << " received at " + << (*i)->received_time << " currently " << (*i)->state_string(); + (*i)->warn_interval_multiplier *= 2; + } + ++i; + } + return !i.end(); +} + +void OpTracker::mark_event(OpRequest *op, const string &dest) +{ + Mutex::Locker locker(ops_in_flight_lock); + utime_t now = ceph_clock_now(g_ceph_context); + dout(1) << "seq: " << op->seq << ", time: " << now << ", event: " << dest + << " " << *op << dendl; +} + +void OpRequest::mark_event(const string &event) +{ + tracker->mark_event(this, event); +} diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index dfeddd62c8f6..1b7f7073d1fc 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -13,6 +13,27 @@ #ifndef OPREQUEST_H_ #define OPREQUEST_H_ +#include +#include +#include +#include "common/Mutex.h" +#include "include/xlist.h" +#include "msg/Message.h" + +class OpRequest; +class OpTracker { + uint64_t seq; + Mutex ops_in_flight_lock; + xlist ops_in_flight; + +public: + OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {} + void dump_ops_in_flight(std::ostream& ss); + void register_inflight_op(xlist::item *i); + void unregister_inflight_op(xlist::item *i); + bool check_ops_in_flight(std::ostream &out); + void mark_event(OpRequest *op, const string &evt); +}; /** * The OpRequest takes in a Message* and takes over a single reference @@ -22,14 +43,16 @@ * the way you used to pass around the Message. */ struct OpRequest : public RefCountedObject { + friend class OpTracker; Message *request; xlist::item xitem; utime_t received_time; uint8_t warn_interval_multiplier; private: - OSD *osd; + OpTracker *tracker; uint8_t hit_flag_points; uint8_t latest_flag_point; + uint64_t seq; static const uint8_t flag_queued_for_pg=1 << 0; static const uint8_t flag_reached_pg = 1 << 1; static const uint8_t flag_delayed = 1 << 2; @@ -37,17 +60,18 @@ private: static const uint8_t flag_sub_op_sent = 1 << 4; public: - OpRequest() : request(NULL), xitem(this) {} - OpRequest(Message *req, OSD *o) : request(req), xitem(this), - warn_interval_multiplier(1), - osd(o) { + OpRequest(Message *req, OpTracker *tracker) : + request(req), xitem(this), + warn_interval_multiplier(1), + tracker(tracker), + seq(0) { received_time = request->get_recv_stamp(); + tracker->register_inflight_op(&xitem); } ~OpRequest() { - osd->unregister_inflight_op(&xitem); - if (request) { - request->put(); - } + tracker->unregister_inflight_op(&xitem); + assert(request); + request->put(); } bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; } @@ -74,10 +98,12 @@ public: } void mark_queued_for_pg() { + mark_event("queued_for_pg"); hit_flag_points |= flag_queued_for_pg; latest_flag_point = flag_queued_for_pg; } void mark_reached_pg() { + mark_event("reached_pg"); hit_flag_points |= flag_reached_pg; latest_flag_point = flag_reached_pg; } @@ -86,13 +112,17 @@ public: latest_flag_point = flag_delayed; } void mark_started() { + mark_event("started"); hit_flag_points |= flag_started; latest_flag_point = flag_started; } void mark_sub_op_sent() { + mark_event("sub_op_sent"); hit_flag_points |= flag_sub_op_sent; latest_flag_point = flag_sub_op_sent; } + + void mark_event(const string &new_state); }; #endif /* OPREQUEST_H_ */ diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4f641fd3e68a..45d60a8255e5 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3284,6 +3284,7 @@ void ReplicatedPG::op_applied(RepGather *repop) { lock(); dout(10) << "op_applied " << *repop << dendl; + repop->ctx->op->mark_event("op_applied"); // discard my reference to the buffer if (repop->ctx->op) @@ -3337,6 +3338,7 @@ void ReplicatedPG::op_applied(RepGather *repop) void ReplicatedPG::op_commit(RepGather *repop) { lock(); + repop->ctx->op->mark_event("op_commit"); if (repop->aborted) { dout(10) << "op_commit " << *repop << " -- aborted" << dendl; @@ -3599,6 +3601,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, << dendl; if (ack_type & CEPH_OSD_FLAG_ONDISK) { + repop->ctx->op->mark_event("sub_op_commit_rec"); // disk if (repop->waitfor_disk.count(fromosd)) { repop->waitfor_disk.erase(fromosd); @@ -3612,6 +3615,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, repop->waitfor_ack.erase(fromosd);*/ } else { // ack + repop->ctx->op->mark_event("sub_op_applied_rec"); repop->waitfor_ack.erase(fromosd); } @@ -4159,6 +4163,7 @@ void ReplicatedPG::sub_op_modify(OpRequest *op) void ReplicatedPG::sub_op_modify_applied(RepModify *rm) { lock(); + rm->op->mark_event("sub_op_applied"); dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; MOSDSubOp *m = (MOSDSubOp*)rm->op->request; assert(m->get_header().type == MSG_OSD_SUBOP); @@ -4197,6 +4202,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) void ReplicatedPG::sub_op_modify_commit(RepModify *rm) { lock(); + rm->op->mark_event("sub_op_commit"); // send commit. dout(10) << "sub_op_modify_commit on op " << *rm->op->request -- 2.47.3