]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/Watch: set timeout event for new watchers
authorSage Weil <sage@redhat.com>
Sat, 8 Nov 2014 03:45:21 +0000 (19:45 -0800)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:38 +0000 (10:32 -0800)
For a new watcher, we set the timeout event on connect, and reset it when
we get a ping.

For an old watcher, we continue to set the timeout on disconnect() and
rely on the msgr Connection reset.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/Watch.cc
src/osd/Watch.h

index 228b526ad98b90f2a7e071f3dd8061947ca3ead3..2b8c8825ceaef7214d55659c27cfaae997bd9f1b 100644 (file)
@@ -4312,14 +4312,15 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            oi.watchers[make_pair(cookie, entity)] = w;
            t->nop();  // make sure update the object_info on disk!
          }
-         ctx->watch_connects.push_back(w);
+         bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
+         ctx->watch_connects.push_back(make_pair(w, will_ping));
         } else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) {
          if (!oi.watchers.count(make_pair(cookie, entity))) {
            result = -ENOTCONN;
            break;
          }
          dout(10) << " found existing watch " << w << " by " << entity << dendl;
-         ctx->watch_connects.push_back(w);
+         ctx->watch_connects.push_back(make_pair(w, true));
         } else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) {
          if (!oi.watchers.count(make_pair(cookie, entity))) {
            result = -ENOTCONN;
@@ -4332,6 +4333,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            break;
          }
          dout(10) << " found existing watch " << w << " by " << entity << dendl;
+         p->second->got_ping(ceph_clock_now(NULL));
          result = 0;
         } else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) {
          map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
@@ -5375,10 +5377,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 
   dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
 
-  for (list<watch_info_t>::iterator i = ctx->watch_connects.begin();
+  for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
        i != ctx->watch_connects.end();
        ++i) {
-    pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
+    pair<uint64_t, entity_name_t> watcher(i->first.cookie, entity);
     dout(15) << "do_osd_op_effects applying watch connect on session "
             << session.get() << " watcher " << watcher << dendl;
     WatchRef watch;
@@ -5390,14 +5392,14 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
       dout(15) << "do_osd_op_effects new watcher " << watcher
               << dendl;
       watch = Watch::makeWatchRef(
-       this, osd, ctx->obc, i->timeout_seconds,
-       i->cookie, entity, conn->get_peer_addr());
+       this, osd, ctx->obc, i->first.timeout_seconds,
+       i->first.cookie, entity, conn->get_peer_addr());
       ctx->obc->watchers.insert(
        make_pair(
          watcher,
          watch));
     }
-    watch->connect(conn);
+    watch->connect(conn, i->second);
   }
 
   for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
index 452661a058832d9610786dc415021694e855f3c5..5f2ce6491db1f972401942ec426be09f0e337476 100644 (file)
@@ -441,7 +441,7 @@ public:
     bool ignore_cache;    ///< true if IGNORE_CACHE flag is set
 
     // side effects
-    list<watch_info_t> watch_connects;
+    list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
     list<watch_info_t> watch_disconnects;
     list<notify_info_t> notifies;
     struct NotifyAck {
index 51afac16cee0e6ab6cc0cf7ad1fd892e3a57e6fb..36a8d09e443b64935e4b09395068e935e5ccd016 100644 (file)
@@ -290,6 +290,7 @@ Watch::Watch(
     timeout(timeout),
     cookie(cookie),
     addr(addr),
+    will_ping(false),
     entity(entity),
     discarded(false) {
   dout(10) << "Watch()" << dendl;
@@ -315,6 +316,7 @@ void Watch::register_cb()
 {
   Mutex::Locker l(osd->watch_lock);
   dout(15) << "registering callback, timeout: " << timeout << dendl;
+  assert(cb == NULL);
   cb = new HandleWatchTimeout(self.lock());
   osd->watch_timer.add_event_after(
     timeout,
@@ -335,10 +337,19 @@ void Watch::unregister_cb()
   cb = NULL;
 }
 
-void Watch::connect(ConnectionRef con)
+void Watch::got_ping(utime_t t)
+{
+  last_ping = t;
+  assert(conn);
+  unregister_cb();
+  register_cb();
+}
+
+void Watch::connect(ConnectionRef con, bool _will_ping)
 {
   dout(10) << "connecting" << dendl;
   conn = con;
+  will_ping = _will_ping;
   OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv()));
   if (sessionref) {
     sessionref->wstate.addWatch(self.lock());
@@ -349,14 +360,20 @@ void Watch::connect(ConnectionRef con)
       send_notify(i->second);
     }
   }
-  unregister_cb();
+  if (will_ping) {
+    last_ping = ceph_clock_now(NULL);
+    register_cb();
+  } else {
+    unregister_cb();
+  }
 }
 
 void Watch::disconnect()
 {
   dout(10) << "disconnect" << dendl;
   conn = ConnectionRef();
-  register_cb();
+  if (!will_ping)
+    register_cb();
 }
 
 void Watch::discard()
@@ -407,9 +424,20 @@ void Watch::remove()
 
 void Watch::start_notify(NotifyRef notif)
 {
-  dout(10) << "start_notify " << notif->notify_id << dendl;
   assert(in_progress_notifies.find(notif->notify_id) ==
         in_progress_notifies.end());
+  if (will_ping) {
+    utime_t cutoff = ceph_clock_now(NULL);
+    cutoff.sec_ref() -= timeout;
+    if (last_ping < cutoff) {
+      dout(10) << __func__ << " " << notif->notify_id
+              << " last_ping " << last_ping << " < cutoff " << cutoff
+              << ", disconnecting" << dendl;
+      disconnect();
+      return;
+    }
+  }
+  dout(10) << "start_notify " << notif->notify_id << dendl;
   in_progress_notifies[notif->notify_id] = notif;
   notif->start_watcher(self.lock());
   if (connected())
@@ -434,6 +462,8 @@ void Watch::send_notify(NotifyRef notif)
 
 void Watch::send_failed_notify(Notify *notif)
 {
+  if (!conn)
+    return;
   bufferlist empty;
   MWatchNotify *reply(new MWatchNotify(cookie, notif->version, notif->notify_id,
                                       CEPH_WATCH_EVENT_FAILED_NOTIFY, empty));
index 2e199c5535ba06477ae42468aa6c20ce9b0bf8de..7176af628a2c67445056189c21b107970aebbcb3 100644 (file)
@@ -165,10 +165,13 @@ class Watch {
   std::map<uint64_t, NotifyRef> in_progress_notifies;
 
   // Could have watch_info_t here, but this file includes osd_types.h
-  uint32_t timeout;
+  uint32_t timeout; ///< timeout in seconds
   uint64_t cookie;
   entity_addr_t addr;
 
+  bool will_ping;    ///< is client new enough to ping the watch
+  utime_t last_ping; ///< last cilent ping
+
   entity_name_t entity;
   bool discarded;
 
@@ -190,6 +193,12 @@ public:
   /// Unregisters the timeout callback
   void unregister_cb();
 
+  /// note receipt of a ping
+  void got_ping(utime_t t);
+  utime_t get_last_ping() const {
+    return last_ping;
+  }
+
   /// send a failed notify message
   void send_failed_notify(Notify *notif);
 
@@ -226,7 +235,8 @@ public:
 
   /// Transitions Watch to connected, unregister_cb, resends pending Notifies
   void connect(
-    ConnectionRef con ///< [in] Reference to new connection
+    ConnectionRef con, ///< [in] Reference to new connection
+    bool will_ping     ///< [in] client is new and will send pings
     );
 
   /// Transitions watch to disconnected, register_cb