pg->unlock();
}
+void OSD::ack_notification(entity_name_t& name, void *_notif)
+{
+ Watch::Notification *notif = (Watch::Notification *)_notif;
+ if (watch->ack_notification(name, notif)) {
+ dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
+ MWatchNotify *reply = notif->reply; // new MWatchNotify(notif->cookie, wi.ver, notif->id, WATCH_NOTIFY_COMPLETE);
+ client_messenger->send_message(reply, notif->session->con);
+ notif->session->put();
+ watch->remove_notification(notif);
+ if (notif->timeout)
+ watch_timer.cancel_event(notif->timeout);
+ delete notif;
+ }
+}
+
bool OSD::ms_handle_reset(Connection *con)
{
dout(0) << "OSD::ms_handle_reset()" << dendl;
put_object_context(obc, oiter->second);
}
+ watch_lock.Lock();
+
+ map<void *, entity_name_t>::iterator notif_iter;
+ for (notif_iter = session->notifs.begin(); notif_iter != session->notifs.end(); ++notif_iter) {
+ Watch::Notification *notif = (Watch::Notification *)notif_iter->first;
+ entity_name_t& dest = notif_iter->second;
+ dout(0) << "ms_handle_reset: ack notification for notif=" << (void *)notif << " entity=" << dest << dendl;
+ ack_notification(dest, notif);
+ }
+ session->notifs.clear();
+
+ watch_lock.Unlock();
+
return true;
}
epoch_t last_sent_epoch;
Connection *con;
std::map<void *, ceph_object_layout> watches;
+ std::map<void *, entity_name_t> notifs;
Session() : last_sent_epoch(0), con(0) {}
~Session() { if (con) con->put(); }
+ void add_notif(void *n, entity_name_t& name) {
+ notifs[n] = name;
+ }
+ void del_notif(void *n) {
+ std::map<void *, entity_name_t>::iterator iter = notifs.find(n);
+ if (iter != notifs.end())
+ notifs.erase(iter);
+ }
};
private:
PG *lookup_lock_pg(pg_t pgid);
ReplicatedPG *get_pg(void *_obc, ceph_object_layout& layout);
void put_object_context(void *_obc, ceph_object_layout& layout);
+ void ack_notification(entity_name_t& peer_addr, void *notif);
Mutex watch_lock;
SafeTimer watch_timer;
void handle_notify_timeout(void *notif);
session = iter->second;
dout(0) << " found session, sending notification" << dendl;
notif->add_watcher(oi_iter->first, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
+ entity_name_t name = oi_iter->first;
+ session->add_notif(notif, name);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
osd->client_messenger->send_message(notify_msg, session->con);
obc->ref++;
notif->obc = obc;
notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
- osd->watch_timer.add_event_after(5.0, notif->timeout);
+ osd->watch_timer.add_event_after(5.0, notif->timeout); /* FIXME: use a configurable timeout here */
}
osd->watch_lock.Unlock();
}
result = -EINVAL;
break;
}
+ OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv();
+ session->del_notif(notif);
entity_name_t source = ctx->op->get_source();
- if (osd->watch->ack_notification(source, notif)) {
- dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
- MWatchNotify *reply = notif->reply; // new MWatchNotify(notif->cookie, wi.ver, notif->id, WATCH_NOTIFY_COMPLETE);
- osd->client_messenger->send_message(reply, notif->session->con);
- notif->session->put();
- osd->watch->remove_notification(notif);
- if (notif->timeout)
- osd->watch_timer.cancel_event(notif->timeout);
- delete notif;
- }
+ osd->ack_notification(source, notif);
osd->watch_lock.Unlock();
}
break;