]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: gracefully handle slow 'ceph -w' clients
authorSage Weil <sage@inktank.com>
Mon, 18 Jun 2012 21:00:06 +0000 (14:00 -0700)
committerSage Weil <sage@inktank.com>
Mon, 18 Jun 2012 21:00:06 +0000 (14:00 -0700)
If we are sending log updates to a client (ceph -w), and they are far
enough behind to drop behind first_committed, include a friendly message
in their stream but continue.

Drop useless return value from _create_sub_incremental().  Assert that we
can read the state file.

Signed-off-by: Sage Weil <sage@inktank.com>
src/mon/LogMonitor.cc
src/mon/LogMonitor.h

index f3dc1fcd13ad6af154115deb1c2d8181dae2f958..468c5fe97486271163cc602d33a3e4ea5b7ae129 100644 (file)
@@ -364,21 +364,19 @@ void LogMonitor::check_sub(Subscription *s)
   } 
  
   int sub_level = types[s->type];
-  bool ret = true;
   MLog *mlog = new MLog(mon->monmap->fsid);
 
   if (s->next == 0) { 
     /* First timer, heh? */
-    ret = _create_sub_summary(mlog, sub_level);
+    bool ret = _create_sub_summary(mlog, sub_level);
+    if (!ret) {
+      dout(1) << __func__ << " ret = " << ret << dendl;
+      mlog->put();
+      return;
+    }
   } else {
     /* let us send you an incremental log... */
-    ret = _create_sub_incremental(mlog, sub_level, s->next);
-  }
-
-  if (!ret) {
-    dout(1) << __func__ << " ret = " << ret << dendl;
-    mlog->put();
-    return;
+    _create_sub_incremental(mlog, sub_level, s->next);
   }
 
   dout(1) << __func__ << " sending message to " << s->session->inst 
@@ -430,23 +428,30 @@ bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
  *             since version @sv, inclusive.
  * @param level        The max log level of the messages the client is interested in.
  * @param sv   The version the client is looking for.
- * @return     'true' if we consider we successfully populated @mlog; 
- *             'false' otherwise.
  */
-bool LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
+void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
 {
   dout(10) << __func__ << " level " << level << " ver " << sv 
          << " cur summary ver " << summary.version << dendl; 
 
-  bool success = true;
+  if (sv < paxos->get_first_committed()) {
+    dout(10) << __func__ << " skipped from " << sv
+            << " to first_committed " << paxos->get_first_committed() << dendl;
+    LogEntry le;
+    le.stamp = ceph_clock_now(NULL);
+    le.type = CLOG_WARN;
+    ostringstream ss;
+    ss << "skipped log messages from " << sv << " to " << paxos->get_first_committed();
+    le.msg = ss.str();
+    mlog->entries.push_back(le);
+    sv = paxos->get_first_committed();
+  }
+
   version_t summary_ver = summary.version;
   while (sv <= summary_ver) {
     bufferlist bl;
-    success = paxos->read(sv, bl);
-    if (!success) {
-      dout(10) << __func__ << " paxos->read() unsuccessful" << dendl;
-      break;
-    }
+    bool success = paxos->read(sv, bl);
+    assert(success);
     bufferlist::iterator p = bl.begin();
     __u8 v;
     ::decode(v,p);
@@ -466,8 +471,6 @@ bool LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
   }
 
   dout(10) << __func__ << " incremental message ready (" 
-         << mlog->entries.size() << " entries)" << dendl;
-
-  return success;
+          << mlog->entries.size() << " entries)" << dendl;
 }
 
index df995792a7c24000cca9635d99123ea6d3a35eb9..a2ce987603fe8bdb6d5cefde9e957c49af4fe384 100644 (file)
@@ -60,7 +60,7 @@ private:
   bool prepare_command(MMonCommand *m);
 
   bool _create_sub_summary(MLog *mlog, int level);
-  bool _create_sub_incremental(MLog *mlog, int level, version_t sv);
+  void _create_sub_incremental(MLog *mlog, int level, version_t sv);
 
  public:
   LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }