From: Sage Weil Date: Sat, 8 Nov 2014 03:45:21 +0000 (-0800) Subject: osd/Watch: set timeout event for new watchers X-Git-Tag: v0.91~124 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=26e2fc73210a14d6b61ac8bff83c68bceb2cfac1;p=ceph.git osd/Watch: set timeout event for new watchers For a new watcher, we set the timeout event on connect, and reset it when we get a ping. For an old watcher, we continue to set the timeout on disconnect() and rely on the msgr Connection reset. Signed-off-by: Sage Weil --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 228b526ad98b..2b8c8825ceae 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4312,14 +4312,15 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) oi.watchers[make_pair(cookie, entity)] = w; t->nop(); // make sure update the object_info on disk! } - ctx->watch_connects.push_back(w); + bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH); + ctx->watch_connects.push_back(make_pair(w, will_ping)); } else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) { if (!oi.watchers.count(make_pair(cookie, entity))) { result = -ENOTCONN; break; } dout(10) << " found existing watch " << w << " by " << entity << dendl; - ctx->watch_connects.push_back(w); + ctx->watch_connects.push_back(make_pair(w, true)); } else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) { if (!oi.watchers.count(make_pair(cookie, entity))) { result = -ENOTCONN; @@ -4332,6 +4333,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; } dout(10) << " found existing watch " << w << " by " << entity << dendl; + p->second->got_ping(ceph_clock_now(NULL)); result = 0; } else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) { map, watch_info_t>::iterator oi_iter = @@ -5375,10 +5377,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) dout(15) << "do_osd_op_effects on session " << session.get() << dendl; - for (list::iterator i = ctx->watch_connects.begin(); + for (list >::iterator i = ctx->watch_connects.begin(); i != ctx->watch_connects.end(); ++i) { - pair watcher(i->cookie, entity); + pair watcher(i->first.cookie, entity); dout(15) << "do_osd_op_effects applying watch connect on session " << session.get() << " watcher " << watcher << dendl; WatchRef watch; @@ -5390,14 +5392,14 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) dout(15) << "do_osd_op_effects new watcher " << watcher << dendl; watch = Watch::makeWatchRef( - this, osd, ctx->obc, i->timeout_seconds, - i->cookie, entity, conn->get_peer_addr()); + this, osd, ctx->obc, i->first.timeout_seconds, + i->first.cookie, entity, conn->get_peer_addr()); ctx->obc->watchers.insert( make_pair( watcher, watch)); } - watch->connect(conn); + watch->connect(conn, i->second); } for (list::iterator i = ctx->watch_disconnects.begin(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 452661a05883..5f2ce6491db1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -441,7 +441,7 @@ public: bool ignore_cache; ///< true if IGNORE_CACHE flag is set // side effects - list watch_connects; + list > watch_connects; ///< new watch + will_ping flag list watch_disconnects; list notifies; struct NotifyAck { diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index 51afac16cee0..36a8d09e443b 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -290,6 +290,7 @@ Watch::Watch( timeout(timeout), cookie(cookie), addr(addr), + will_ping(false), entity(entity), discarded(false) { dout(10) << "Watch()" << dendl; @@ -315,6 +316,7 @@ void Watch::register_cb() { Mutex::Locker l(osd->watch_lock); dout(15) << "registering callback, timeout: " << timeout << dendl; + assert(cb == NULL); cb = new HandleWatchTimeout(self.lock()); osd->watch_timer.add_event_after( timeout, @@ -335,10 +337,19 @@ void Watch::unregister_cb() cb = NULL; } -void Watch::connect(ConnectionRef con) +void Watch::got_ping(utime_t t) +{ + last_ping = t; + assert(conn); + unregister_cb(); + register_cb(); +} + +void Watch::connect(ConnectionRef con, bool _will_ping) { dout(10) << "connecting" << dendl; conn = con; + will_ping = _will_ping; OSD::Session* sessionref(static_cast(con->get_priv())); if (sessionref) { sessionref->wstate.addWatch(self.lock()); @@ -349,14 +360,20 @@ void Watch::connect(ConnectionRef con) send_notify(i->second); } } - unregister_cb(); + if (will_ping) { + last_ping = ceph_clock_now(NULL); + register_cb(); + } else { + unregister_cb(); + } } void Watch::disconnect() { dout(10) << "disconnect" << dendl; conn = ConnectionRef(); - register_cb(); + if (!will_ping) + register_cb(); } void Watch::discard() @@ -407,9 +424,20 @@ void Watch::remove() void Watch::start_notify(NotifyRef notif) { - dout(10) << "start_notify " << notif->notify_id << dendl; assert(in_progress_notifies.find(notif->notify_id) == in_progress_notifies.end()); + if (will_ping) { + utime_t cutoff = ceph_clock_now(NULL); + cutoff.sec_ref() -= timeout; + if (last_ping < cutoff) { + dout(10) << __func__ << " " << notif->notify_id + << " last_ping " << last_ping << " < cutoff " << cutoff + << ", disconnecting" << dendl; + disconnect(); + return; + } + } + dout(10) << "start_notify " << notif->notify_id << dendl; in_progress_notifies[notif->notify_id] = notif; notif->start_watcher(self.lock()); if (connected()) @@ -434,6 +462,8 @@ void Watch::send_notify(NotifyRef notif) void Watch::send_failed_notify(Notify *notif) { + if (!conn) + return; bufferlist empty; MWatchNotify *reply(new MWatchNotify(cookie, notif->version, notif->notify_id, CEPH_WATCH_EVENT_FAILED_NOTIFY, empty)); diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 2e199c5535ba..7176af628a2c 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -165,10 +165,13 @@ class Watch { std::map in_progress_notifies; // Could have watch_info_t here, but this file includes osd_types.h - uint32_t timeout; + uint32_t timeout; ///< timeout in seconds uint64_t cookie; entity_addr_t addr; + bool will_ping; ///< is client new enough to ping the watch + utime_t last_ping; ///< last cilent ping + entity_name_t entity; bool discarded; @@ -190,6 +193,12 @@ public: /// Unregisters the timeout callback void unregister_cb(); + /// note receipt of a ping + void got_ping(utime_t t); + utime_t get_last_ping() const { + return last_ping; + } + /// send a failed notify message void send_failed_notify(Notify *notif); @@ -226,7 +235,8 @@ public: /// Transitions Watch to connected, unregister_cb, resends pending Notifies void connect( - ConnectionRef con ///< [in] Reference to new connection + ConnectionRef con, ///< [in] Reference to new connection + bool will_ping ///< [in] client is new and will send pings ); /// Transitions watch to disconnected, register_cb