From d66b52e10383dbd99853ab7a268ffeb696a2604f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 24 Sep 2010 10:41:55 -0700 Subject: [PATCH] osd: watch infrastructure third attempt --- src/Makefile.am | 1 + src/osd/OSD.cc | 4 +++ src/osd/OSD.h | 5 ++++ src/osd/ReplicatedPG.cc | 23 +++++++++++++++- src/osd/ReplicatedPG.h | 1 + src/osd/Watch.h | 61 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 src/osd/Watch.h diff --git a/src/Makefile.am b/src/Makefile.am index f448e65cd7916..87b3652c44075 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -509,6 +509,7 @@ libosd_a_SOURCES = \ osd/Ager.cc \ osd/OSD.cc \ osd/OSDCaps.cc \ + osd/Watch.cc \ common/ClassHandler.cc # osd/RAID4PG.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index cc476d7db68e9..dbd626af356c8 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -18,6 +18,7 @@ #include "OSD.h" #include "OSDMap.h" +#include "Watch.h" #include "os/FileStore.h" @@ -454,6 +455,7 @@ int OSD::init() Mutex::Locker lock(osd_lock); timer.init(); + watch = new Watch(); // mount. dout(2) << "mounting " << dev_path << " " << (journal_path ? journal_path : "(no journal)") << dendl; @@ -741,6 +743,8 @@ int OSD::shutdown() monc->shutdown(); + delete watch; + return r; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index f229e52f494a6..53f28ee15731e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -99,6 +99,8 @@ class MLog; class MClass; class MOSDPGMissing; +class Watch; + extern const coll_t meta_coll; class OSD : public Dispatcher { @@ -464,6 +466,9 @@ private: void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false); +protected: + Watch *watch; /* notify-watch handler */ + protected: // -- classes -- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 68de93bbd4ed6..410116a703ef8 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -31,6 +31,8 @@ #include "messages/MOSDPing.h" #include "messages/MWatchNotify.h" +#include "Watch.h" + #include "config.h" #define DOUT_SUBSYS osd @@ -1086,21 +1088,32 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, map::iterator iter; map::iterator oi_iter; + Watch::Notification *notif = new Watch::Notification(ctx->reqid.name); + if (!notif) { + result = -ENOMEM; + break; + } + osd->watch->add_notification(notif); for (oi_iter = oi.watchers.begin(); oi_iter != oi.watchers.end(); oi_iter++) { watch_info_t& w = oi_iter->second; dout(0) << "oi->watcher: " << oi_iter->first << " ver=" << w.ver << " cookie=" << w.cookie << dendl; iter = obc->watchers.find(oi_iter->first); + if (/* w.ver < ver && */ iter != obc->watchers.end()) { /* found a session for registered watcher */ OSD::Session *session = iter->second; dout(0) << " found session, sending notification" << dendl; + notif->add_watcher(oi_iter->first, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race + MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver); osd->client_messenger->send_message(notify_msg, session->con); } else { + notif->add_watcher(oi_iter->first, Watch::WATCHER_PENDING); dout(0) << " session was not found" << dendl; } + } } @@ -1119,11 +1132,19 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, oi_iter = oi.watchers.find(ctx->op->get_source()); if (oi_iter == oi.watchers.end()) { dout(0) << "couldn't find watcher" << dendl; - // result = -EINVAL; break; } watch_info_t& wi = oi_iter->second; wi.ver = op.watch.ver; + + map::iterator pending_iter = + obc->pending_watchers.find(ctx->op->get_source()); + if (pending_iter != obc->pending_watchers.end()) { + obc->pending_watchers.erase(pending_iter); + if (obc->pending_watchers.empty()) { + dout(0) << "got the last reply from pending watchers, can send response now" << dendl; + } + } } break; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 3019246c876a7..47c55b0200f56 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -235,6 +235,7 @@ public: int unstable_writes, readers, writers_waiting, readers_waiting; map watchers; + map pending_watchers; ObjectContext(const sobject_t& s, const object_locator_t& ol) : ref(0), registered(false), obs(s, ol), diff --git a/src/osd/Watch.h b/src/osd/Watch.h new file mode 100644 index 0000000000000..3d5e2a93fd549 --- /dev/null +++ b/src/osd/Watch.h @@ -0,0 +1,61 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef CEPH_WATCH_H +#define CEPH_WATCH_H + +#include + +#include "OSD.h" +#include "config.h" + +/* keeps track and accounts sessions, watchers and notifiers */ +class Watch { + +public: + enum WatcherState { + WATCHER_PENDING, + WATCHER_NOTIFIED, + }; + + struct Notification { + std::map watchers; + entity_name_t name; + + void add_watcher(const entity_name_t& name, WatcherState state) { watchers[name] = state; } + + Notification(entity_name_t& n) { name = n; } + }; + +private: + std::multimap notifs; + std::multimap wtn; /* watchers to notifications */ + + std::map ste; /* sessions to entities */ + std::map ets; /* entities to sessions */ + +public: + + Watch() {} + + void register_session(OSD::Session *session, entity_name_t& name); + void remove_session(OSD::Session *session); + void add_notification(Notification *notif); + bool ack_notification(entity_name_t& watcher, Notification *notif); +}; + + + +#endif -- 2.39.5