oi.watchers[make_pair(cookie, entity)] = w;
t->nop(); // make sure update the object_info on disk!
}
- ctx->watch_connects.push_back(w);
+ bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
+ ctx->watch_connects.push_back(make_pair(w, will_ping));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_RECONNECT) {
if (!oi.watchers.count(make_pair(cookie, entity))) {
result = -ENOTCONN;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
- ctx->watch_connects.push_back(w);
+ ctx->watch_connects.push_back(make_pair(w, true));
} else if (op.watch.op == CEPH_OSD_WATCH_OP_PING) {
if (!oi.watchers.count(make_pair(cookie, entity))) {
result = -ENOTCONN;
break;
}
dout(10) << " found existing watch " << w << " by " << entity << dendl;
+ p->second->got_ping(ceph_clock_now(NULL));
result = 0;
} else if (op.watch.op == CEPH_OSD_WATCH_OP_UNWATCH) {
map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
- for (list<watch_info_t>::iterator i = ctx->watch_connects.begin();
+ for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
i != ctx->watch_connects.end();
++i) {
- pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
+ pair<uint64_t, entity_name_t> watcher(i->first.cookie, entity);
dout(15) << "do_osd_op_effects applying watch connect on session "
<< session.get() << " watcher " << watcher << dendl;
WatchRef watch;
dout(15) << "do_osd_op_effects new watcher " << watcher
<< dendl;
watch = Watch::makeWatchRef(
- this, osd, ctx->obc, i->timeout_seconds,
- i->cookie, entity, conn->get_peer_addr());
+ this, osd, ctx->obc, i->first.timeout_seconds,
+ i->first.cookie, entity, conn->get_peer_addr());
ctx->obc->watchers.insert(
make_pair(
watcher,
watch));
}
- watch->connect(conn);
+ watch->connect(conn, i->second);
}
for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
bool ignore_cache; ///< true if IGNORE_CACHE flag is set
// side effects
- list<watch_info_t> watch_connects;
+ list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
list<watch_info_t> watch_disconnects;
list<notify_info_t> notifies;
struct NotifyAck {
timeout(timeout),
cookie(cookie),
addr(addr),
+ will_ping(false),
entity(entity),
discarded(false) {
dout(10) << "Watch()" << dendl;
{
Mutex::Locker l(osd->watch_lock);
dout(15) << "registering callback, timeout: " << timeout << dendl;
+ assert(cb == NULL);
cb = new HandleWatchTimeout(self.lock());
osd->watch_timer.add_event_after(
timeout,
cb = NULL;
}
-void Watch::connect(ConnectionRef con)
+void Watch::got_ping(utime_t t)
+{
+ last_ping = t;
+ assert(conn);
+ unregister_cb();
+ register_cb();
+}
+
+void Watch::connect(ConnectionRef con, bool _will_ping)
{
dout(10) << "connecting" << dendl;
conn = con;
+ will_ping = _will_ping;
OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv()));
if (sessionref) {
sessionref->wstate.addWatch(self.lock());
send_notify(i->second);
}
}
- unregister_cb();
+ if (will_ping) {
+ last_ping = ceph_clock_now(NULL);
+ register_cb();
+ } else {
+ unregister_cb();
+ }
}
void Watch::disconnect()
{
dout(10) << "disconnect" << dendl;
conn = ConnectionRef();
- register_cb();
+ if (!will_ping)
+ register_cb();
}
void Watch::discard()
void Watch::start_notify(NotifyRef notif)
{
- dout(10) << "start_notify " << notif->notify_id << dendl;
assert(in_progress_notifies.find(notif->notify_id) ==
in_progress_notifies.end());
+ if (will_ping) {
+ utime_t cutoff = ceph_clock_now(NULL);
+ cutoff.sec_ref() -= timeout;
+ if (last_ping < cutoff) {
+ dout(10) << __func__ << " " << notif->notify_id
+ << " last_ping " << last_ping << " < cutoff " << cutoff
+ << ", disconnecting" << dendl;
+ disconnect();
+ return;
+ }
+ }
+ dout(10) << "start_notify " << notif->notify_id << dendl;
in_progress_notifies[notif->notify_id] = notif;
notif->start_watcher(self.lock());
if (connected())
void Watch::send_failed_notify(Notify *notif)
{
+ if (!conn)
+ return;
bufferlist empty;
MWatchNotify *reply(new MWatchNotify(cookie, notif->version, notif->notify_id,
CEPH_WATCH_EVENT_FAILED_NOTIFY, empty));
std::map<uint64_t, NotifyRef> in_progress_notifies;
// Could have watch_info_t here, but this file includes osd_types.h
- uint32_t timeout;
+ uint32_t timeout; ///< timeout in seconds
uint64_t cookie;
entity_addr_t addr;
+ bool will_ping; ///< is client new enough to ping the watch
+ utime_t last_ping; ///< last cilent ping
+
entity_name_t entity;
bool discarded;
/// Unregisters the timeout callback
void unregister_cb();
+ /// note receipt of a ping
+ void got_ping(utime_t t);
+ utime_t get_last_ping() const {
+ return last_ping;
+ }
+
/// send a failed notify message
void send_failed_notify(Notify *notif);
/// Transitions Watch to connected, unregister_cb, resends pending Notifies
void connect(
- ConnectionRef con ///< [in] Reference to new connection
+ ConnectionRef con, ///< [in] Reference to new connection
+ bool will_ping ///< [in] client is new and will send pings
);
/// Transitions watch to disconnected, register_cb