]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: make watch_check time reflect async delivery
authorSage Weil <sage@redhat.com>
Mon, 17 Nov 2014 19:28:21 +0000 (11:28 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:34:05 +0000 (10:34 -0800)
When a librados user calls watch_check(), the age they get back should
establish a timestamp for which:

 1) we know that the watch was registered at least through this time,
 2) we have received and processed notifies sent as of this time,
 3) we have processed any async error events as of this time.

We already accomplish 1 by updating watch_valid_thru based on the time
the ping is sent (not received).

This patch gets us 2 and 3 by using the MIN of watch_valid_thru and the
oldest queued async event for the watch_check result.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 52885791aab01d06ef42deae461c1b83d6301807..856486cf8e9e95ff2e49f4fd0707225e42c86650 100644 (file)
@@ -495,9 +495,11 @@ struct C_DoWatchError : public Context {
   int err;
   C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
     info->get();
+    info->queued_async();
   }
   void finish(int r) {
     info->watch_context->handle_error(info->linger_id, err);
+    info->finished_async();
     info->put();
   }
 };
@@ -583,7 +585,13 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent,
 int Objecter::linger_check(LingerOp *info)
 {
   RWLock::WLocker wl(rwlock);
-  utime_t age = ceph_clock_now(NULL) - info->watch_valid_thru;
+  Mutex::Locker l(info->watch_lock);
+
+  utime_t stamp = info->watch_valid_thru;
+  if (!info->watch_pending_async.empty())
+    stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
+  utime_t age = ceph_clock_now(NULL) - stamp;
+
   ldout(cct, 10) << __func__ << " " << info->linger_id
                 << " err " << info->last_error
                 << " age " << age << dendl;
@@ -726,6 +734,7 @@ struct C_DoWatchNotify : public Context {
   C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
     : objecter(o), info(i), msg(m) {
     info->get();
+    info->queued_async();
     msg->get();
   }
   void finish(int r) {
@@ -800,6 +809,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   }
 
  out:
+  info->finished_async();
   info->put();
   m->put();
 }
index eeb7fe8cff41d349681e799488e11e3abc345305..942eefdd9dbebe5a1adedc25e2ceab0085b04d04 100644 (file)
@@ -1480,6 +1480,10 @@ public:
     Mutex watch_lock;
     Cond watch_cond;
 
+    // queue of pending async operations, with the timestamp of
+    // when they were queued.
+    list<utime_t> watch_pending_async;
+
     uint32_t register_gen;
     bool registered;
     bool canceled;
@@ -1497,6 +1501,16 @@ public:
     ceph_tid_t ping_tid;
     epoch_t map_dne_bound;
 
+    void queued_async() {
+      Mutex::Locker l(watch_lock);
+      watch_pending_async.push_back(ceph_clock_now(NULL));
+    }
+    void finished_async() {
+      Mutex::Locker l(watch_lock);
+      assert(!watch_pending_async.empty());
+      watch_pending_async.pop_front();
+    }
+
     LingerOp() : linger_id(0),
                 target(object_t(), object_locator_t(), 0),
                 snap(CEPH_NOSNAP),