From adbb545903b6e283cc4fbeeaf5fdf503fc49d75e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 12 Nov 2010 09:37:13 -0800 Subject: [PATCH] osd: some notify simplifications and FIXMEs Signed-off-by: Sage Weil --- src/osd/OSD.cc | 39 ++++++++++++----------- src/osd/ReplicatedPG.cc | 68 +++++++++++++++++------------------------ src/osd/Watch.cc | 41 ------------------------- src/osd/Watch.h | 30 ++++++++++-------- 4 files changed, 66 insertions(+), 112 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ce31783afc852..4147de404b01a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1624,16 +1624,17 @@ void OSD::put_object_context(void *_obc, pg_t pgid) void OSD::ack_notification(entity_name_t& name, void *_notif) { + assert(watch_lock.is_locked()); Watch::Notification *notif = (Watch::Notification *)_notif; if (watch->ack_notification(name, notif)) { - dout(0) << "got the last reply from pending watchers, can send response now" << dendl; - MWatchNotify *reply = notif->reply; // new MWatchNotify(notif->cookie, wi.ver, notif->id, WATCH_NOTIFY_COMPLETE); - client_messenger->send_message(reply, notif->session->con); - notif->session->put(); - watch->remove_notification(notif); - if (notif->timeout) - watch_timer.cancel_event(notif->timeout); - delete notif; + dout(0) << "got the last reply from pending watchers, can send response now" << dendl; + MWatchNotify *reply = notif->reply; + client_messenger->send_message(reply, notif->session->con); + notif->session->put(); + watch->remove_notification(notif); + if (notif->timeout) + watch_timer.cancel_event(notif->timeout); + delete notif; } } @@ -1688,6 +1689,9 @@ bool OSD::ms_handle_reset(Connection *con) put_object_context(obc, oiter->second); } + // FIXME: do we really want to _cancel_ notifications here? + // shouldn't they time out in the usual way? because this person + // might/should immediately reconnect... watch_lock.Lock(); for (map::iterator notif_iter = session->notifs.begin(); notif_iter != session->notifs.end(); @@ -1705,35 +1709,34 @@ bool OSD::ms_handle_reset(Connection *con) void OSD::handle_notify_timeout(void *_notif) { + assert(watch_lock.is_locked()); Watch::Notification *notif = (Watch::Notification *)_notif; - - dout(0) << "OSD::handle_notify_timeout" << dendl; + dout(0) << "OSD::handle_notify_timeout notif " << notif->id << dendl; ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)notif->obc; notif->timeout = NULL; /* need to do it under the watch_lock, so we're not getting cancelled somewhere else */ watch_lock.Unlock(); /* drop lock to change locking order */ - obc->lock.Lock(); watch_lock.Lock(); - std::map::iterator notif_iter; - for (notif_iter = notif->watchers.begin(); notif_iter != notif->watchers.end(); ++notif_iter) { + for (std::map::iterator notif_iter = notif->watchers.begin(); + notif_iter != notif->watchers.end(); + ++notif_iter) { map::iterator witer = obc->watchers.find(notif_iter->first); if (witer != obc->watchers.end()) - obc->watchers.erase(witer); + obc->watchers.erase(witer); // FIXME: hmm? notify timeout may be different than watch timeout? } obc->lock.Unlock(); watch_lock.Unlock(); /* put_object_context takes osd->lock */ - + ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(notif->pgid); put_object_context(obc, notif->pgid); pg->do_complete_notify(notif); - + watch_lock.Lock(); - -/* exiting with watch_lock held */ + /* exiting with watch_lock held */ } void OSD::send_boot() diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4888c6212b92b..3562d50ddae96 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1130,48 +1130,38 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_NOTIFY: { dout(0) << "CEPH_OSD_OP_NOTIFY" << dendl; - ObjectContext *obc = ctx->obc; dout(0) << "ctx->obc=" << (void *)obc << dendl; - map::iterator iter; - map::iterator oi_iter; OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv(); - session->get(); + // give the session reference to notif. Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, op.watch.cookie); - if (!notif) { - result = -ENOMEM; - break; - } notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); osd->watch_lock.Lock(); osd->watch->add_notification(notif); - entity_name_t myname = ctx->op->get_source(); - - for (oi_iter = oi.watchers.begin(); - oi_iter != oi.watchers.end(); oi_iter++) { - watch_info_t& w = oi_iter->second; - dout(0) << "oi->watcher: " << oi_iter->first << " ver=" << w.ver << " cookie=" << w.cookie << dendl; - iter = obc->watchers.find(oi_iter->first); - - if (/* w.ver < ver && */ iter != obc->watchers.end() && - iter->first != myname) { // don't notify the originator of this message - /* found a session for registered watcher */ - session = iter->second; - dout(0) << " found session, sending notification" << dendl; - notif->add_watcher(oi_iter->first, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race - entity_name_t name = oi_iter->first; - session->add_notif(notif, name); - - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY); - osd->client_messenger->send_message(notify_msg, session->con); - } else { - /* FIXME.. need to add watcher and time out accordingly */ - // notif->add_watcher(oi_iter->first, Watch::WATCHER_PENDING); - dout(0) << " session was not found" << dendl; - } + // connected + for (map::iterator p = obc->watchers.begin(); + p != obc->watchers.end(); + p++) { + entity_name_t name = p->first; + OSD::Session *s = p->second; + watch_info_t& w = obc->obs.oi.watchers[p->first]; + + notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race + s->add_notif(notif, name); + + MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY); + osd->client_messenger->send_message(notify_msg, s->con); + } + + // unconnected + for (map::iterator p = obc->unconnected_watchers.begin(); + p != obc->unconnected_watchers.end(); + p++) { + entity_name_t name = p->first; + // notif->add_watcher(name, Watch::WATCHER_PENDING); } notif->reply = new MWatchNotify(op.watch.cookie, op.watch.ver, notif->id, WATCH_NOTIFY_COMPLETE); @@ -1189,33 +1179,31 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, case CEPH_OSD_OP_NOTIFY_ACK: { - dout(0) << "CEPH_OSD_OP_NOTIFY_ACK" << dendl; - ObjectContext *obc = ctx->obc; + entity_name_t source = ctx->op->get_source(); + dout(0) << "CEPH_OSD_OP_NOTIFY_ACK" << dendl; dout(0) << "ctx->obc=" << (void *)obc << dendl; - map::iterator iter; - map::iterator oi_iter; - osd->watch_lock.Lock(); - - oi_iter = oi.watchers.find(ctx->op->get_source()); + map::iterator oi_iter = oi.watchers.find(source); if (oi_iter == oi.watchers.end()) { dout(0) << "couldn't find watcher" << dendl; break; } watch_info_t& wi = oi_iter->second; wi.ver = op.watch.ver; + // FIXME: this gets lost without t.nop(). Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie); if (!notif) { result = -EINVAL; break; } + OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv(); session->del_notif(notif); + session->put(); - entity_name_t source = ctx->op->get_source(); osd->ack_notification(source, notif); osd->watch_lock.Unlock(); } diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index 71db309cc3474..126c46bfd0ba1 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -8,47 +8,6 @@ #include "config.h" -void Watch::add_notification(Notification *notif) -{ - notif->id = ++notif_id; - - notifs.insert(pair(notif->name, notif)); - itn[notif->id] = notif; - - map::iterator iter = notif->watchers.begin(); - for (; iter != notif->watchers.end(); ++iter) { - wtn.insert(pair(iter->first, notif)); - } -} - -void Watch::remove_notification(Notification *notif) -{ - map::iterator iter = itn.find(notif->id); - if (iter != itn.end()) - itn.erase(iter); - - map::iterator witer; - for (witer = notif->watchers.begin(); witer != notif->watchers.end(); ++iter) { - const entity_name_t& watcher = witer->first; - - multimap::iterator niter = wtn.find(watcher); - for (; niter != wtn.end(); ++niter) { - if (niter->second == notif) { - wtn.erase(niter); - break; - } - } - } - - multimap::iterator niter = notifs.find(notif->name); - for (; niter != wtn.end(); ++niter) { - if (niter->second == notif) { - notifs.erase(niter); - break; - } - } -} - bool Watch::ack_notification(entity_name_t& watcher, Notification *notif) { map::iterator iter = notif->watchers.find(watcher); diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 15fd751f811ec..ab3c0272009f1 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -44,7 +44,9 @@ public: void *obc; pg_t pgid; - void add_watcher(const entity_name_t& name, WatcherState state) { watchers[name] = state; } + void add_watcher(const entity_name_t& name, WatcherState state) { + watchers[name] = state; + } Notification(entity_name_t& n, OSD::Session *s, uint64_t c) : name(n), session(s), cookie(c) { } }; @@ -58,26 +60,28 @@ public: }; private: - std::multimap notifs; - std::multimap wtn; /* watchers to notifications */ - std::map itn; /* notif_id to notifications */ + std::map notifs; /* notif_id to notifications */ public: - Watch() : notif_id(0) {} - void register_session(OSD::Session *session, entity_name_t& name); - void remove_session(OSD::Session *session); - void add_notification(Notification *notif); - void remove_notification(Notification *notif); - bool ack_notification(entity_name_t& watcher, Notification *notif); - + void add_notification(Notification *notif) { + notif->id = ++notif_id; + notifs[notif->id] = notif; + } Notification *get_notif(uint64_t id) { - map::iterator iter = itn.find(id); - if (iter != itn.end()) + map::iterator iter = notifs.find(id); + if (iter != notifs.end()) return iter->second; return NULL; } + void remove_notification(Notification *notif) { + map::iterator iter = notifs.find(notif->id); + if (iter != notifs.end()) + notifs.erase(iter); + } + + bool ack_notification(entity_name_t& watcher, Notification *notif); }; -- 2.39.5