pg->unlock();
}
-void OSD::ack_notification(entity_name_t& name, void *_notif)
+void OSD::complete_notify(void *_notif, void *_obc)
+{
+ ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
+ Watch::Notification *notif = (Watch::Notification *)_notif;
+ dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
+ MWatchNotify *reply = notif->reply;
+ client_messenger->send_message(reply, notif->session->con);
+ notif->session->put();
+ watch->remove_notification(notif);
+ if (notif->timeout)
+ watch_timer.cancel_event(notif->timeout);
+ map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif);
+ if (iter != obc->notifs.end())
+ obc->notifs.erase(iter);
+ delete notif;
+}
+
+void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc)
{
assert(watch_lock.is_locked());
Watch::Notification *notif = (Watch::Notification *)_notif;
- if (watch->ack_notification(name, notif)) {
- dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
- MWatchNotify *reply = notif->reply;
- client_messenger->send_message(reply, notif->session->con);
- notif->session->put();
- watch->remove_notification(notif);
- if (notif->timeout)
- watch_timer.cancel_event(notif->timeout);
- delete notif;
- }
+ if (watch->ack_notification(name, notif))
+ complete_notify(notif, _obc);
}
bool OSD::ms_handle_reset(Connection *con)
put_object_context(obc, oiter->second);
}
+#if 0
// FIXME: do we really want to _cancel_ notifications here?
// shouldn't they time out in the usual way? because this person
// might/should immediately reconnect...
}
session->notifs.clear();
watch_lock.Unlock();
+#endif
return true;
}
watch_lock.Unlock(); /* put_object_context takes osd->lock */
ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(notif->pgid);
+ pg_t pgid = notif->pgid;
+ pg->do_complete_notify(notif, obc);
put_object_context(obc, notif->pgid);
- pg->do_complete_notify(notif);
watch_lock.Lock();
/* exiting with watch_lock held */
void put_object_context(void *_obc, pg_t pgid);
- void ack_notification(entity_name_t& peer_addr, void *notif);
+ void complete_notify(void *notif, void *obc);
+ void ack_notification(entity_name_t& peer_addr, void *notif, void *obc);
Mutex watch_lock;
SafeTimer watch_timer;
void handle_notify_timeout(void *notif);
}
}
-void ReplicatedPG::do_complete_notify(Watch::Notification *notif)
+void ReplicatedPG::do_complete_notify(Watch::Notification *notif, ObjectContext *obc)
{
- osd->client_messenger->send_message(notif->reply, notif->session->con);
- notif->session->put();
- osd->watch->remove_notification(notif);
- delete notif;
+ osd->complete_notify((void *)notif, obc);
}
p != obc->unconnected_watchers.end();
p++) {
entity_name_t name = p->first;
- // notif->add_watcher(name, Watch::WATCHER_PENDING);
+ notif->add_watcher(name, Watch::WATCHER_PENDING);
}
notif->reply = new MWatchNotify(op.watch.cookie, op.watch.ver, notif->id, WATCH_NOTIFY_COMPLETE);
if (notif->watchers.empty()) {
- do_complete_notify(notif);
+ do_complete_notify(notif, obc);
} else {
+ obc->notifs[notif] = true;
obc->ref++;
notif->obc = obc;
notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
session->del_notif(notif);
session->put();
- osd->ack_notification(source, notif);
+ osd->ack_notification(source, notif, obc);
osd->watch_lock.Unlock();
}
break;
map<entity_name_t, utime_t>::iterator un_iter = obc->unconnected_watchers.find(entity);
if (un_iter != obc->unconnected_watchers.end())
obc->unconnected_watchers.erase(un_iter);
+
+ map<Watch::Notification *, bool>::iterator niter;
+ for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
+ Watch::Notification *notif = niter->first;
+ map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity);
+ if (iter != notif->watchers.end()) {
+ /* there is a pending notification for this watcher, we should resend it anyway
+ even if we already sent it as it might not have received it */
+ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
+ osd->client_messenger->send_message(notify_msg, session->con);
+ }
+ }
register_object_context(obc);
} else {
map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<entity_name_t, OSD::Session *> watchers;
map<entity_name_t, utime_t> unconnected_watchers;
+ map<Watch::Notification *, bool> notifs;
/* ObjectContext(const sobject_t& s, const object_locator_t& ol) :
ref(0), registered(false), obs(s, ol),
int recover_replicas(int max);
void dump_watchers(ObjectContext *obc);
- void do_complete_notify(Watch::Notification *notif);
+ void do_complete_notify(Watch::Notification *notif, ObjectContext *obc);
struct RepModify {
ReplicatedPG *pg;