osd/OSD.cc \
osd/OSDCaps.cc \
osd/Watch.cc \
- osd/ClassHandler.cc
+ osd/ClassHandler.cc \
+ osd/OpRequest.cc
libosd_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
libosd_la_LIBADD = libglobal.la
noinst_LTLIBRARIES += libosd.la
OPTION(debug_objectcacher, OPT_INT, 0)
OPTION(debug_client, OPT_INT, 0)
OPTION(debug_osd, OPT_INT, 0)
+OPTION(debug_optracker, OPT_INT, 10)
OPTION(debug_objclass, OPT_INT, 0)
OPTION(debug_filestore, OPT_INT, 1)
OPTION(debug_journal, OPT_INT, 1)
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
- ops_in_flight_lock("OSD::ops_in_flight_lock"),
admin_ops_hook(NULL),
op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
void OSD::check_ops_in_flight()
{
- ops_in_flight_lock.Lock();
- if (ops_in_flight.size()) {
- utime_t now = ceph_clock_now(g_ceph_context);
- utime_t too_old = now;
- too_old -= g_conf->osd_op_complaint_time;
- dout(1) << "ops_in_flight.size: " << ops_in_flight.size()
- << "; oldest is " << now - ops_in_flight.front()->received_time
- << " seconds old" << dendl;
- xlist<OpRequest*>::iterator i = ops_in_flight.begin();
- while (!i.end() && (*i)->received_time < too_old) {
- // exponential backoff of warning intervals
- if ( ( (*i)->received_time +
- (g_conf->osd_op_complaint_time *
- (*i)->warn_interval_multiplier) )< now) {
- stringstream ss;
- ss << "old request " << *((*i)->request) << " received at "
- << (*i)->received_time << " currently " << (*i)->state_string();
- clog.warn(ss);
- (*i)->warn_interval_multiplier *= 2;
- }
- ++i;
- }
- }
- ops_in_flight_lock.Unlock();
+ stringstream ss;
+ if (op_tracker.check_ops_in_flight(ss))
+ clog.warn(ss);
+ return;
}
void OSD::dump_ops_in_flight(ostream& ss)
{
- JSONFormatter jf(true);
- Mutex::Locker locker(ops_in_flight_lock);
- jf.open_object_section("ops_in_flight"); // overall dump
- jf.dump_int("num_ops", ops_in_flight.size());
- jf.open_array_section("ops"); // list of OpRequests
- utime_t now = ceph_clock_now(g_ceph_context);
- for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
- stringstream name;
- Message *m = (*p)->request;
- m->print(name);
- jf.open_object_section("op");
- jf.dump_string("description", name.str().c_str()); // this OpRequest
- jf.dump_stream("received_at") << (*p)->received_time;
- jf.dump_float("age", now - (*p)->received_time);
- jf.dump_string("flag_point", (*p)->state_string());
- if (m->get_orig_source().is_client()) {
- jf.open_object_section("client_info");
- stringstream client_name;
- client_name << m->get_orig_source();
- jf.dump_string("client", client_name.str());
- jf.dump_int("tid", m->get_tid());
- jf.close_section(); // client_info
- }
- jf.close_section(); // this OpRequest
- }
- jf.close_section(); // list of OpRequests
- jf.close_section(); // overall dump
- jf.flush(ss);
-}
-
-void OSD::register_inflight_op(xlist<OpRequest*>::item *i)
-{
- ops_in_flight_lock.Lock();
- ops_in_flight.push_back(i);
- ops_in_flight_lock.Unlock();
-}
-
-void OSD::unregister_inflight_op(xlist<OpRequest*>::item *i)
-{
- ops_in_flight_lock.Lock();
- assert(i->get_list() == &ops_in_flight);
- i->remove_myself();
- ops_in_flight_lock.Unlock();
+ op_tracker.dump_ops_in_flight(ss);
}
// =========================================
default:
{
- OpRequest *op = new OpRequest(m, this);
- register_inflight_op(&op->xitem);
+ OpRequest *op = new OpRequest(m, &op_tracker);
+ op->mark_event("waiting_for_osdmap");
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
#include <ext/hash_set>
using namespace __gnu_cxx;
+#include "OpRequest.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
class AuthAuthorizeHandlerRegistry;
-class OpRequest;
class OpsFlightSocketHook;
extern const coll_t meta_coll;
void do_waiters();
// -- op tracking --
- xlist<OpRequest*> ops_in_flight;
- /** This is an inner lock that is taken by the following three
- * functions without regard for what locks the callers hold. It
- * protects the xlist, but not the OpRequests. */
- Mutex ops_in_flight_lock;
- void register_inflight_op(xlist<OpRequest*>::item *i);
+ OpTracker op_tracker;
void check_ops_in_flight();
- void unregister_inflight_op(xlist<OpRequest*>::item *i);
void dump_ops_in_flight(ostream& ss);
- friend struct OpRequest;
friend class OpsFlightSocketHook;
OpsFlightSocketHook *admin_ops_hook;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "OpRequest.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include "common/debug.h"
+#include "common/config.h"
+
+#define DOUT_SUBSYS optracker
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static ostream& _prefix(std::ostream* _dout)
+{
+ return *_dout << "--OSD::tracker-- ";
+}
+
+void OpTracker::dump_ops_in_flight(ostream &ss)
+{
+ JSONFormatter jf(true);
+ Mutex::Locker locker(ops_in_flight_lock);
+ jf.open_object_section("ops_in_flight"); // overall dump
+ jf.dump_int("num_ops", ops_in_flight.size());
+ jf.open_array_section("ops"); // list of OpRequests
+ utime_t now = ceph_clock_now(g_ceph_context);
+ for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
+ stringstream name;
+ Message *m = (*p)->request;
+ m->print(name);
+ jf.open_object_section("op");
+ jf.dump_string("description", name.str().c_str()); // this OpRequest
+ jf.dump_stream("received_at") << (*p)->received_time;
+ jf.dump_float("age", now - (*p)->received_time);
+ jf.dump_string("flag_point", (*p)->state_string());
+ if (m->get_orig_source().is_client()) {
+ jf.open_object_section("client_info");
+ stringstream client_name;
+ client_name << m->get_orig_source();
+ jf.dump_string("client", client_name.str());
+ jf.dump_int("tid", m->get_tid());
+ jf.close_section(); // client_info
+ }
+ jf.close_section(); // this OpRequest
+ }
+ jf.close_section(); // list of OpRequests
+ jf.close_section(); // overall dump
+ jf.flush(ss);
+}
+
+void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ ops_in_flight.push_back(i);
+ ops_in_flight.back()->seq = seq++;
+}
+
+void OpTracker::unregister_inflight_op(xlist<OpRequest*>::item *i)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ assert(i->get_list() == &ops_in_flight);
+ i->remove_myself();
+}
+
+bool OpTracker::check_ops_in_flight(ostream &out)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ if (!ops_in_flight.size())
+ return false;
+
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t too_old = now;
+ too_old -= g_conf->osd_op_complaint_time;
+
+ dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
+ << "; oldest is " << now - ops_in_flight.front()->received_time
+ << " seconds old" << dendl;
+ xlist<OpRequest*>::iterator i = ops_in_flight.begin();
+ while (!i.end() && (*i)->received_time < too_old) {
+ // exponential backoff of warning intervals
+ if ( ( (*i)->received_time +
+ (g_conf->osd_op_complaint_time *
+ (*i)->warn_interval_multiplier) )< now) {
+ out << "old request " << *((*i)->request) << " received at "
+ << (*i)->received_time << " currently " << (*i)->state_string();
+ (*i)->warn_interval_multiplier *= 2;
+ }
+ ++i;
+ }
+ return !i.end();
+}
+
+void OpTracker::mark_event(OpRequest *op, const string &dest)
+{
+ Mutex::Locker locker(ops_in_flight_lock);
+ utime_t now = ceph_clock_now(g_ceph_context);
+ dout(1) << "seq: " << op->seq << ", time: " << now << ", event: " << dest
+ << " " << *op << dendl;
+}
+
+void OpRequest::mark_event(const string &event)
+{
+ tracker->mark_event(this, event);
+}
#ifndef OPREQUEST_H_
#define OPREQUEST_H_
+#include <sstream>
+#include <stdint.h>
+#include <include/utime.h>
+#include "common/Mutex.h"
+#include "include/xlist.h"
+#include "msg/Message.h"
+
+class OpRequest;
+class OpTracker {
+ uint64_t seq;
+ Mutex ops_in_flight_lock;
+ xlist<OpRequest *> ops_in_flight;
+
+public:
+ OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
+ void dump_ops_in_flight(std::ostream& ss);
+ void register_inflight_op(xlist<OpRequest*>::item *i);
+ void unregister_inflight_op(xlist<OpRequest*>::item *i);
+ bool check_ops_in_flight(std::ostream &out);
+ void mark_event(OpRequest *op, const string &evt);
+};
/**
* The OpRequest takes in a Message* and takes over a single reference
* the way you used to pass around the Message.
*/
struct OpRequest : public RefCountedObject {
+ friend class OpTracker;
Message *request;
xlist<OpRequest*>::item xitem;
utime_t received_time;
uint8_t warn_interval_multiplier;
private:
- OSD *osd;
+ OpTracker *tracker;
uint8_t hit_flag_points;
uint8_t latest_flag_point;
+ uint64_t seq;
static const uint8_t flag_queued_for_pg=1 << 0;
static const uint8_t flag_reached_pg = 1 << 1;
static const uint8_t flag_delayed = 1 << 2;
static const uint8_t flag_sub_op_sent = 1 << 4;
public:
- OpRequest() : request(NULL), xitem(this) {}
- OpRequest(Message *req, OSD *o) : request(req), xitem(this),
- warn_interval_multiplier(1),
- osd(o) {
+ OpRequest(Message *req, OpTracker *tracker) :
+ request(req), xitem(this),
+ warn_interval_multiplier(1),
+ tracker(tracker),
+ seq(0) {
received_time = request->get_recv_stamp();
+ tracker->register_inflight_op(&xitem);
}
~OpRequest() {
- osd->unregister_inflight_op(&xitem);
- if (request) {
- request->put();
- }
+ tracker->unregister_inflight_op(&xitem);
+ assert(request);
+ request->put();
}
bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
}
void mark_queued_for_pg() {
+ mark_event("queued_for_pg");
hit_flag_points |= flag_queued_for_pg;
latest_flag_point = flag_queued_for_pg;
}
void mark_reached_pg() {
+ mark_event("reached_pg");
hit_flag_points |= flag_reached_pg;
latest_flag_point = flag_reached_pg;
}
latest_flag_point = flag_delayed;
}
void mark_started() {
+ mark_event("started");
hit_flag_points |= flag_started;
latest_flag_point = flag_started;
}
void mark_sub_op_sent() {
+ mark_event("sub_op_sent");
hit_flag_points |= flag_sub_op_sent;
latest_flag_point = flag_sub_op_sent;
}
+
+ void mark_event(const string &new_state);
};
#endif /* OPREQUEST_H_ */
{
lock();
dout(10) << "op_applied " << *repop << dendl;
+ repop->ctx->op->mark_event("op_applied");
// discard my reference to the buffer
if (repop->ctx->op)
void ReplicatedPG::op_commit(RepGather *repop)
{
lock();
+ repop->ctx->op->mark_event("op_commit");
if (repop->aborted) {
dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
<< dendl;
if (ack_type & CEPH_OSD_FLAG_ONDISK) {
+ repop->ctx->op->mark_event("sub_op_commit_rec");
// disk
if (repop->waitfor_disk.count(fromosd)) {
repop->waitfor_disk.erase(fromosd);
repop->waitfor_ack.erase(fromosd);*/
} else {
// ack
+ repop->ctx->op->mark_event("sub_op_applied_rec");
repop->waitfor_ack.erase(fromosd);
}
void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
{
lock();
+ rm->op->mark_event("sub_op_applied");
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
{
lock();
+ rm->op->mark_event("sub_op_commit");
// send commit.
dout(10) << "sub_op_modify_commit on op " << *rm->op->request