From b305153166f49eebdedf6c20617105be0815652c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 12 Nov 2010 08:29:47 -0800 Subject: [PATCH] osd: fix up WATCH Separate various paths: registering new watch, reconnecting to existing watch, removing watch, etc. Signed-off-by: Sage Weil --- src/osd/ReplicatedPG.cc | 107 +++++++++++++++++++++++++--------------- src/osd/ReplicatedPG.h | 1 + src/osd/osd_types.h | 8 +++ 3 files changed, 75 insertions(+), 41 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ff97bcd906f6a..546c6ffc21429 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -780,6 +780,24 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr) } } +void ReplicatedPG::dump_watchers(ObjectContext *obc) +{ + assert(osd->watch_lock.is_locked()); + + dout(0) << "dump_watchers " << obc->obs.oi.soid << " " << obc->obs.oi << dendl; + for (map::iterator iter = obc->watchers.begin(); + iter != obc->watchers.end(); + ++iter) + dout(0) << " * obc->watcher: " << iter->first << " session=" << iter->second << dendl; + + for (map::iterator oi_iter = obc->obs.oi.watchers.begin(); + oi_iter != obc->obs.oi.watchers.end(); + oi_iter++) { + watch_info_t& w = oi_iter->second; + dout(0) << " * oi->watcher: " << oi_iter->first << " ver=" << w.ver << " cookie=" << w.cookie << dendl; + } +} + void ReplicatedPG::do_complete_notify(Watch::Notification *notif) { osd->client_messenger->send_message(notif->reply, notif->session->con); @@ -1371,13 +1389,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, uint64_t cookie = op.watch.cookie; uint64_t ver = op.watch.ver; bool do_watch = op.watch.flag & 1; - entity_name_t entity = ctx->reqid.name; + ObjectContext *obc = ctx->obc; - OSD::Session *session; - ObjectContext *obc = ctx->obc; - dout(0) << "ctx->obc=" << (void *)obc << " cookie=" << cookie << " ver=" << ver << " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl; - dout(0) << "oi.user_version=" << oi.user_version.version << dendl; + dout(0) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie << " ver=" << ver + << " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl; + dout(0) << "watch: oi.user_version=" << oi.user_version.version << dendl; if (do_watch) { if (ver < oi.user_version.version) { @@ -1390,54 +1407,62 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, } } - session = (OSD::Session *)ctx->op->get_connection()->get_priv(); + OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv(); osd->watch_lock.Lock(); - map::iterator iter = obc->watchers.find(entity); - - /* is there an old watch from the same client? if so, discard it*/ - if (iter != obc->watchers.end()) { - session = iter->second; - session->put(); - put_object_context(obc); - } - + watch_info_t w = {cookie, ver}; if (do_watch) { - obc->ref++; - register_object_context(obc); - watch_info_t w = {cookie, ver}; - oi.watchers[entity] = w; - - obc->watchers[entity] = session; + if (oi.watchers.count(entity) && oi.watchers[entity] == w) { + dout(10) << " found existing watch " << w << " by " << entity << " session " << session << dendl; + } else { + dout(10) << " registered new watch " << w << " by " << entity << " session " << session << dendl; + oi.watchers[entity] = w; + t.nop(); // make sure update the object_info on disk! + } - session->get(); - session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); + if (iter == obc->watchers.end()) { + dout(10) << " connected to watch " << w << " by " << entity << " session " << session << dendl; + obc->watchers[entity] = session; + session->get(); + session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); + obc->ref++; + } else if (iter->second == session) { + // already there + dout(10) << " already connected to watch " << w << " by " << entity << " session " << session << dendl; + } else { + // weird: same entity, different session. + dout(10) << " reconnected (with different session!) watch " << w << " by " << entity << " session " << session << dendl; + iter->second->watches.erase(obc); + iter->second->put(); + iter->second = session; + session->get(); + session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); + } + register_object_context(obc); } else { - obc->watchers.erase(entity); - map::iterator oi_iter = oi.watchers.find(entity); - if (oi_iter != oi.watchers.end()) + if (oi_iter != oi.watchers.end()) { + dout(10) << " removed watch " << oi_iter->second << " by " << entity << " session " << session << dendl; oi.watchers.erase(entity); + t.nop(); - /* we don't session->put() here because we already did it above */ + if (iter != obc->watchers.end()) { + obc->watchers.erase(iter); + session->watches.erase(obc); + put_object_context(obc); + iter->second->put(); + } + } else { + dout(10) << " can't remove: no watch by " << entity << " session " << session << dendl; + assert(iter == obc->watchers.end()); + } } - - - for (iter = obc->watchers.begin(); iter != obc->watchers.end(); ++iter) { - dout(0) << "* obc->watcher: " << iter->first << " session=" << iter->second << " obc->obs.oi=" << obc->obs.oi << dendl; - } - - map::iterator oi_iter; - for (oi_iter = oi.watchers.begin(); - oi_iter != oi.watchers.end(); oi_iter++) { - watch_info_t& w = oi_iter->second; - dout(0) << "* oi->watcher: " << oi_iter->first << " ver=" << w.ver << " cookie=" << w.cookie << dendl; - } + dump_watchers(obc); osd->watch_lock.Unlock(); - t.nop(); - } + session->put(); + } break; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4cf34eb07abe5..e95670baf1fd2 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -522,6 +522,7 @@ protected: int recover_primary(int max); int recover_replicas(int max); + void dump_watchers(ObjectContext *obc); void do_complete_notify(Watch::Notification *notif); struct RepModify { diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index eac8619a5a1bc..6db21315add0a 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1304,6 +1304,14 @@ struct watch_info_t { }; WRITE_CLASS_ENCODER(watch_info_t) +static inline bool operator==(const watch_info_t& l, const watch_info_t& r) { + return l.cookie == r.cookie && l.ver == r.ver; +} + +static inline ostream& operator<<(ostream& out, const watch_info_t& w) { + return out << w.cookie << '@' << w.ver; +} + struct object_info_t { sobject_t soid; object_locator_t oloc; -- 2.39.5