]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: add mark_event to OpRequest and move tracking into OpTracker
authorSamuel Just <samuel.just@dreamhost.com>
Fri, 23 Mar 2012 19:48:19 +0000 (12:48 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Mon, 26 Mar 2012 16:36:33 +0000 (09:36 -0700)
Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/Makefile.am
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpRequest.cc [new file with mode: 0644]
src/osd/OpRequest.h
src/osd/ReplicatedPG.cc

index 4440d046fde211aad4683b07d9666ce7c953cd60..741a47be2be31d051f88a5c27db53f41c34106a8 100644 (file)
@@ -1069,7 +1069,8 @@ libosd_la_SOURCES = \
        osd/OSD.cc \
        osd/OSDCaps.cc \
        osd/Watch.cc \
-        osd/ClassHandler.cc
+       osd/ClassHandler.cc \
+       osd/OpRequest.cc
 libosd_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
 libosd_la_LIBADD = libglobal.la
 noinst_LTLIBRARIES += libosd.la
index 3a8ae15ee31717219727875f9eed9389cc2a123a..236697ac01d330c1c72b8ed7b19354fae0cea397 100644 (file)
@@ -57,6 +57,7 @@ OPTION(debug_journaler, OPT_INT, 0)
 OPTION(debug_objectcacher, OPT_INT, 0)
 OPTION(debug_client, OPT_INT, 0)
 OPTION(debug_osd, OPT_INT, 0)
+OPTION(debug_optracker, OPT_INT, 10)
 OPTION(debug_objclass, OPT_INT, 0)
 OPTION(debug_filestore, OPT_INT, 1)
 OPTION(debug_journal, OPT_INT, 1)
index bf904eeffb56b62bec560444d4b0f80236e9f254..a8dab77ddc46ae1a3421d7b5be1cce0de3083d28 100644 (file)
@@ -553,7 +553,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   heartbeat_dispatcher(this),
   stat_lock("OSD::stat_lock"),
   finished_lock("OSD::finished_lock"),
-  ops_in_flight_lock("OSD::ops_in_flight_lock"),
   admin_ops_hook(NULL),
   op_queue_len(0),
   op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
@@ -1872,77 +1871,15 @@ void OSD::tick()
 
 void OSD::check_ops_in_flight()
 {
-  ops_in_flight_lock.Lock();
-  if (ops_in_flight.size()) {
-    utime_t now = ceph_clock_now(g_ceph_context);
-    utime_t too_old = now;
-    too_old -= g_conf->osd_op_complaint_time;
-    dout(1) << "ops_in_flight.size: " << ops_in_flight.size()
-              << "; oldest is " << now - ops_in_flight.front()->received_time
-              << " seconds old" << dendl;
-    xlist<OpRequest*>::iterator i = ops_in_flight.begin();
-    while (!i.end() && (*i)->received_time < too_old) {
-      // exponential backoff of warning intervals
-      if ( ( (*i)->received_time +
-            (g_conf->osd_op_complaint_time *
-              (*i)->warn_interval_multiplier) )< now) {
-        stringstream ss;
-        ss << "old request " << *((*i)->request) << " received at "
-           << (*i)->received_time << " currently " << (*i)->state_string();
-        clog.warn(ss);
-        (*i)->warn_interval_multiplier *= 2;
-      }
-      ++i;
-    }
-  }
-  ops_in_flight_lock.Unlock();
+  stringstream ss;
+  if (op_tracker.check_ops_in_flight(ss))
+    clog.warn(ss);
+  return;
 }
 
 void OSD::dump_ops_in_flight(ostream& ss)
 {
-  JSONFormatter jf(true);
-  Mutex::Locker locker(ops_in_flight_lock);
-  jf.open_object_section("ops_in_flight"); // overall dump
-  jf.dump_int("num_ops", ops_in_flight.size());
-  jf.open_array_section("ops"); // list of OpRequests
-  utime_t now = ceph_clock_now(g_ceph_context);
-  for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
-    stringstream name;
-    Message *m = (*p)->request;
-    m->print(name);
-    jf.open_object_section("op");
-    jf.dump_string("description", name.str().c_str()); // this OpRequest
-    jf.dump_stream("received_at") << (*p)->received_time;
-    jf.dump_float("age", now - (*p)->received_time);
-    jf.dump_string("flag_point", (*p)->state_string());
-    if (m->get_orig_source().is_client()) {
-      jf.open_object_section("client_info");
-      stringstream client_name;
-      client_name << m->get_orig_source();
-      jf.dump_string("client", client_name.str());
-      jf.dump_int("tid", m->get_tid());
-      jf.close_section(); // client_info
-    }
-    jf.close_section(); // this OpRequest
-  }
-  jf.close_section(); // list of OpRequests
-  jf.close_section(); // overall dump
-  jf.flush(ss);
-}
-
-void OSD::register_inflight_op(xlist<OpRequest*>::item *i)
-{
-  ops_in_flight_lock.Lock();
-  ops_in_flight.push_back(i);
-  ops_in_flight_lock.Unlock();
-}
-
-void OSD::unregister_inflight_op(xlist<OpRequest*>::item *i)
-{
-  ops_in_flight_lock.Lock();
-  assert(i->get_list() == &ops_in_flight);
-  i->remove_myself();
-  ops_in_flight_lock.Unlock();
+  op_tracker.dump_ops_in_flight(ss);
 }
 
 // =========================================
@@ -2960,8 +2897,8 @@ void OSD::_dispatch(Message *m)
 
   default:
     {
-      OpRequest *op = new OpRequest(m, this);
-      register_inflight_op(&op->xitem);
+      OpRequest *op = new OpRequest(m, &op_tracker);
+      op->mark_event("waiting_for_osdmap");
       // no map?  starting up?
       if (!osdmap) {
         dout(7) << "no OSDMap, not booted" << dendl;
index d48d7c0bd25980cec798430dde4950f23eb18e4b..f6aaa708e33e7ec4e01822b1926b101e9d117143 100644 (file)
@@ -45,6 +45,7 @@ using namespace std;
 #include <ext/hash_set>
 using namespace __gnu_cxx;
 
+#include "OpRequest.h"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
@@ -120,7 +121,6 @@ class ReplicatedPG;
 
 class AuthAuthorizeHandlerRegistry;
 
-class OpRequest;
 class OpsFlightSocketHook;
 
 extern const coll_t meta_coll;
@@ -331,16 +331,9 @@ private:
   void do_waiters();
   
   // -- op tracking --
-  xlist<OpRequest*> ops_in_flight;
-  /** This is an inner lock that is taken by the following three
-   * functions without regard for what locks the callers hold. It
-   * protects the xlist, but not the OpRequests. */
-  Mutex ops_in_flight_lock;
-  void register_inflight_op(xlist<OpRequest*>::item *i);
+  OpTracker op_tracker;
   void check_ops_in_flight();
-  void unregister_inflight_op(xlist<OpRequest*>::item *i);
   void dump_ops_in_flight(ostream& ss);
-  friend struct OpRequest;
   friend class OpsFlightSocketHook;
   OpsFlightSocketHook *admin_ops_hook;
 
diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc
new file mode 100644 (file)
index 0000000..fda2f3d
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+
+#include "OpRequest.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include "common/debug.h"
+#include "common/config.h"
+
+#define DOUT_SUBSYS optracker
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static ostream& _prefix(std::ostream* _dout)
+{
+  return *_dout << "--OSD::tracker-- ";
+}
+
+void OpTracker::dump_ops_in_flight(ostream &ss)
+{
+  JSONFormatter jf(true);
+  Mutex::Locker locker(ops_in_flight_lock);
+  jf.open_object_section("ops_in_flight"); // overall dump
+  jf.dump_int("num_ops", ops_in_flight.size());
+  jf.open_array_section("ops"); // list of OpRequests
+  utime_t now = ceph_clock_now(g_ceph_context);
+  for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
+    stringstream name;
+    Message *m = (*p)->request;
+    m->print(name);
+    jf.open_object_section("op");
+    jf.dump_string("description", name.str().c_str()); // this OpRequest
+    jf.dump_stream("received_at") << (*p)->received_time;
+    jf.dump_float("age", now - (*p)->received_time);
+    jf.dump_string("flag_point", (*p)->state_string());
+    if (m->get_orig_source().is_client()) {
+      jf.open_object_section("client_info");
+      stringstream client_name;
+      client_name << m->get_orig_source();
+      jf.dump_string("client", client_name.str());
+      jf.dump_int("tid", m->get_tid());
+      jf.close_section(); // client_info
+    }
+    jf.close_section(); // this OpRequest
+  }
+  jf.close_section(); // list of OpRequests
+  jf.close_section(); // overall dump
+  jf.flush(ss);
+}
+
+void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i)
+{
+  Mutex::Locker locker(ops_in_flight_lock);
+  ops_in_flight.push_back(i);
+  ops_in_flight.back()->seq = seq++;
+}
+
+void OpTracker::unregister_inflight_op(xlist<OpRequest*>::item *i)
+{
+  Mutex::Locker locker(ops_in_flight_lock);
+  assert(i->get_list() == &ops_in_flight);
+  i->remove_myself();
+}
+
+bool OpTracker::check_ops_in_flight(ostream &out)
+{
+  Mutex::Locker locker(ops_in_flight_lock);
+  if (!ops_in_flight.size())
+    return false;
+
+  utime_t now = ceph_clock_now(g_ceph_context);
+  utime_t too_old = now;
+  too_old -= g_conf->osd_op_complaint_time;
+  
+  dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
+          << "; oldest is " << now - ops_in_flight.front()->received_time
+          << " seconds old" << dendl;
+  xlist<OpRequest*>::iterator i = ops_in_flight.begin();
+  while (!i.end() && (*i)->received_time < too_old) {
+    // exponential backoff of warning intervals
+    if ( ( (*i)->received_time +
+          (g_conf->osd_op_complaint_time *
+           (*i)->warn_interval_multiplier) )< now) {
+      out << "old request " << *((*i)->request) << " received at "
+         << (*i)->received_time << " currently " << (*i)->state_string();
+      (*i)->warn_interval_multiplier *= 2;
+    }
+    ++i;
+  }
+  return !i.end();
+}
+
+void OpTracker::mark_event(OpRequest *op, const string &dest)
+{
+  Mutex::Locker locker(ops_in_flight_lock);
+  utime_t now = ceph_clock_now(g_ceph_context);
+  dout(1) << "seq: " << op->seq << ", time: " << now << ", event: " << dest
+         << " " << *op << dendl;
+}
+
+void OpRequest::mark_event(const string &event)
+{
+  tracker->mark_event(this, event);
+}
index dfeddd62c8f64db969beda0268a437d34f5fe989..1b7f7073d1fc6d74e3ef8d376bd603b7ba90b527 100644 (file)
 
 #ifndef OPREQUEST_H_
 #define OPREQUEST_H_
+#include <sstream>
+#include <stdint.h>
+#include <include/utime.h>
+#include "common/Mutex.h"
+#include "include/xlist.h"
+#include "msg/Message.h"
+
+class OpRequest;
+class OpTracker {
+  uint64_t seq;
+  Mutex ops_in_flight_lock;
+  xlist<OpRequest *> ops_in_flight;
+
+public:
+  OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
+  void dump_ops_in_flight(std::ostream& ss);
+  void register_inflight_op(xlist<OpRequest*>::item *i);
+  void unregister_inflight_op(xlist<OpRequest*>::item *i);
+  bool check_ops_in_flight(std::ostream &out);
+  void mark_event(OpRequest *op, const string &evt);
+};
 
 /**
  * The OpRequest takes in a Message* and takes over a single reference
  * the way you used to pass around the Message.
  */
 struct OpRequest : public RefCountedObject {
+  friend class OpTracker;
   Message *request;
   xlist<OpRequest*>::item xitem;
   utime_t received_time;
   uint8_t warn_interval_multiplier;
 private:
-  OSD *osd;
+  OpTracker *tracker;
   uint8_t hit_flag_points;
   uint8_t latest_flag_point;
+  uint64_t seq;
   static const uint8_t flag_queued_for_pg=1 << 0;
   static const uint8_t flag_reached_pg =  1 << 1;
   static const uint8_t flag_delayed =     1 << 2;
@@ -37,17 +60,18 @@ private:
   static const uint8_t flag_sub_op_sent = 1 << 4;
 
 public:
-  OpRequest() : request(NULL), xitem(this) {}
-  OpRequest(Message *req, OSD *o) : request(req), xitem(this),
-      warn_interval_multiplier(1),
-      osd(o) {
+  OpRequest(Message *req, OpTracker *tracker) :
+    request(req), xitem(this),
+    warn_interval_multiplier(1),
+    tracker(tracker),
+    seq(0) {
     received_time = request->get_recv_stamp();
+    tracker->register_inflight_op(&xitem);
   }
   ~OpRequest() {
-    osd->unregister_inflight_op(&xitem);
-    if (request) {
-      request->put();
-    }
+    tracker->unregister_inflight_op(&xitem);
+    assert(request);
+    request->put();
   }
 
   bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
@@ -74,10 +98,12 @@ public:
   }
 
   void mark_queued_for_pg() {
+    mark_event("queued_for_pg");
     hit_flag_points |= flag_queued_for_pg;
     latest_flag_point = flag_queued_for_pg;
   }
   void mark_reached_pg() {
+    mark_event("reached_pg");
     hit_flag_points |= flag_reached_pg;
     latest_flag_point = flag_reached_pg;
   }
@@ -86,13 +112,17 @@ public:
     latest_flag_point = flag_delayed;
   }
   void mark_started() {
+    mark_event("started");
     hit_flag_points |= flag_started;
     latest_flag_point = flag_started;
   }
   void mark_sub_op_sent() {
+    mark_event("sub_op_sent");
     hit_flag_points |= flag_sub_op_sent;
     latest_flag_point = flag_sub_op_sent;
   }
+
+  void mark_event(const string &new_state);
 };
 
 #endif /* OPREQUEST_H_ */
index 4f641fd3e68a672e39d62cebecbb9dd768b067db..45d60a8255e5c9eb8a49e410ac10acc31cdf6aa1 100644 (file)
@@ -3284,6 +3284,7 @@ void ReplicatedPG::op_applied(RepGather *repop)
 {
   lock();
   dout(10) << "op_applied " << *repop << dendl;
+  repop->ctx->op->mark_event("op_applied");
 
   // discard my reference to the buffer
   if (repop->ctx->op)
@@ -3337,6 +3338,7 @@ void ReplicatedPG::op_applied(RepGather *repop)
 void ReplicatedPG::op_commit(RepGather *repop)
 {
   lock();
+  repop->ctx->op->mark_event("op_commit");
 
   if (repop->aborted) {
     dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
@@ -3599,6 +3601,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
            << dendl;
   
   if (ack_type & CEPH_OSD_FLAG_ONDISK) {
+    repop->ctx->op->mark_event("sub_op_commit_rec");
     // disk
     if (repop->waitfor_disk.count(fromosd)) {
       repop->waitfor_disk.erase(fromosd);
@@ -3612,6 +3615,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
     repop->waitfor_ack.erase(fromosd);*/
   } else {
     // ack
+    repop->ctx->op->mark_event("sub_op_applied_rec");
     repop->waitfor_ack.erase(fromosd);
   }
 
@@ -4159,6 +4163,7 @@ void ReplicatedPG::sub_op_modify(OpRequest *op)
 void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
 {
   lock();
+  rm->op->mark_event("sub_op_applied");
   dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
   MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -4197,6 +4202,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
 void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
 {
   lock();
+  rm->op->mark_event("sub_op_commit");
 
   // send commit.
   dout(10) << "sub_op_modify_commit on op " << *rm->op->request