From 7b5e923cd0cee3329f56dcca72cfd745b836bbde Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 14 Dec 2010 17:00:49 -0800 Subject: [PATCH] osd: send pending notification for reconnected watcher --- src/osd/OSD.cc | 36 ++++++++++++++++++++++++------------ src/osd/OSD.h | 3 ++- src/osd/ReplicatedPG.cc | 26 ++++++++++++++++++-------- src/osd/ReplicatedPG.h | 3 ++- 4 files changed, 46 insertions(+), 22 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index aae8c18d533eb..5e9af33531068 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1634,20 +1634,29 @@ void OSD::put_object_context(void *_obc, pg_t pgid) pg->unlock(); } -void OSD::ack_notification(entity_name_t& name, void *_notif) +void OSD::complete_notify(void *_notif, void *_obc) +{ + ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc; + Watch::Notification *notif = (Watch::Notification *)_notif; + dout(0) << "got the last reply from pending watchers, can send response now" << dendl; + MWatchNotify *reply = notif->reply; + client_messenger->send_message(reply, notif->session->con); + notif->session->put(); + watch->remove_notification(notif); + if (notif->timeout) + watch_timer.cancel_event(notif->timeout); + map::iterator iter = obc->notifs.find(notif); + if (iter != obc->notifs.end()) + obc->notifs.erase(iter); + delete notif; +} + +void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc) { assert(watch_lock.is_locked()); Watch::Notification *notif = (Watch::Notification *)_notif; - if (watch->ack_notification(name, notif)) { - dout(0) << "got the last reply from pending watchers, can send response now" << dendl; - MWatchNotify *reply = notif->reply; - client_messenger->send_message(reply, notif->session->con); - notif->session->put(); - watch->remove_notification(notif); - if (notif->timeout) - watch_timer.cancel_event(notif->timeout); - delete notif; - } + if (watch->ack_notification(name, notif)) + complete_notify(notif, _obc); } bool OSD::ms_handle_reset(Connection *con) @@ -1701,6 +1710,7 @@ bool OSD::ms_handle_reset(Connection *con) put_object_context(obc, oiter->second); } +#if 0 // FIXME: do we really want to _cancel_ notifications here? // shouldn't they time out in the usual way? because this person // might/should immediately reconnect... @@ -1715,6 +1725,7 @@ bool OSD::ms_handle_reset(Connection *con) } session->notifs.clear(); watch_lock.Unlock(); +#endif return true; } @@ -1744,8 +1755,9 @@ void OSD::handle_notify_timeout(void *_notif) watch_lock.Unlock(); /* put_object_context takes osd->lock */ ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(notif->pgid); + pg_t pgid = notif->pgid; + pg->do_complete_notify(notif, obc); put_object_context(obc, notif->pgid); - pg->do_complete_notify(notif); watch_lock.Lock(); /* exiting with watch_lock held */ diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 2e81c03c22c26..776c1dd6305d4 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -995,7 +995,8 @@ public: void put_object_context(void *_obc, pg_t pgid); - void ack_notification(entity_name_t& peer_addr, void *notif); + void complete_notify(void *notif, void *obc); + void ack_notification(entity_name_t& peer_addr, void *notif, void *obc); Mutex watch_lock; SafeTimer watch_timer; void handle_notify_timeout(void *notif); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 55ea7671a102b..555e34144cf28 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -826,12 +826,9 @@ void ReplicatedPG::dump_watchers(ObjectContext *obc) } } -void ReplicatedPG::do_complete_notify(Watch::Notification *notif) +void ReplicatedPG::do_complete_notify(Watch::Notification *notif, ObjectContext *obc) { - osd->client_messenger->send_message(notif->reply, notif->session->con); - notif->session->put(); - osd->watch->remove_notification(notif); - delete notif; + osd->complete_notify((void *)notif, obc); } @@ -1189,13 +1186,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, p != obc->unconnected_watchers.end(); p++) { entity_name_t name = p->first; - // notif->add_watcher(name, Watch::WATCHER_PENDING); + notif->add_watcher(name, Watch::WATCHER_PENDING); } notif->reply = new MWatchNotify(op.watch.cookie, op.watch.ver, notif->id, WATCH_NOTIFY_COMPLETE); if (notif->watchers.empty()) { - do_complete_notify(notif); + do_complete_notify(notif, obc); } else { + obc->notifs[notif] = true; obc->ref++; notif->obc = obc; notif->timeout = new Watch::C_NotifyTimeout(osd, notif); @@ -1232,7 +1230,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, session->del_notif(notif); session->put(); - osd->ack_notification(source, notif); + osd->ack_notification(source, notif, obc); osd->watch_lock.Unlock(); } break; @@ -1460,6 +1458,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, map::iterator un_iter = obc->unconnected_watchers.find(entity); if (un_iter != obc->unconnected_watchers.end()) obc->unconnected_watchers.erase(un_iter); + + map::iterator niter; + for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) { + Watch::Notification *notif = niter->first; + map::iterator iter = notif->watchers.find(entity); + if (iter != notif->watchers.end()) { + /* there is a pending notification for this watcher, we should resend it anyway + even if we already sent it as it might not have received it */ + MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY); + osd->client_messenger->send_message(notify_msg, session->con); + } + } register_object_context(obc); } else { map::iterator oi_iter = oi.watchers.find(entity); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 1850f35ee1040..0915a38bca178 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -239,6 +239,7 @@ public: // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. map watchers; map unconnected_watchers; + map notifs; /* ObjectContext(const sobject_t& s, const object_locator_t& ol) : ref(0), registered(false), obs(s, ol), @@ -529,7 +530,7 @@ protected: int recover_replicas(int max); void dump_watchers(ObjectContext *obc); - void do_complete_notify(Watch::Notification *notif); + void do_complete_notify(Watch::Notification *notif, ObjectContext *obc); struct RepModify { ReplicatedPG *pg; -- 2.39.5