]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Watch/Notify: rework watch/notify
authorSamuel Just <sam.just@inktank.com>
Fri, 15 Feb 2013 18:45:47 +0000 (10:45 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 20 Feb 2013 21:29:20 +0000 (13:29 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/Watch.cc
src/osd/Watch.h
src/osd/osd_types.cc
src/osd/osd_types.h

index b20e6d690f244bb9f738d3c461289f658fefb498..66f976b1c154825917ec7aa5767cbc68fc74398a 100644 (file)
@@ -168,7 +168,6 @@ OSDService::OSDService(OSD *osd) :
   scrubs_active(0),
   watch_lock("OSD::watch_lock"),
   watch_timer(osd->client_messenger->cct, watch_lock),
-  watch(NULL),
   backfill_request_lock("OSD::backfill_request_lock"),
   backfill_request_timer(g_ceph_context, backfill_request_lock, false),
   last_tid(0),
@@ -252,15 +251,12 @@ void OSDService::shutdown()
   watch_lock.Lock();
   watch_timer.shutdown();
   watch_lock.Unlock();
-
-  delete watch;
 }
 
 void OSDService::init()
 {
   reserver_finisher.start();
   watch_timer.init();
-  watch = new Watch();
 }
 
 ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev)
@@ -2571,152 +2567,17 @@ void OSD::ms_handle_connect(Connection *con)
   }
 }
 
-void OSD::put_object_context(void *_obc, pg_t pgid)
-{
-  ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
-  ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(pgid);
-  // If pg is being deleted, (which is the only case in which
-  // it will be NULL) it will clean up its object contexts itself
-  if (pg) {
-    pg->put_object_context(obc);
-    pg->unlock();
-  }
-}
-
-void OSD::complete_notify(void *_notif, void *_obc)
-{
-  ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
-  Watch::Notification *notif = (Watch::Notification *)_notif;
-  dout(10) << "complete_notify " << notif << " got the last reply from pending watchers, can send response now" << dendl;
-  MWatchNotify *reply = notif->reply;
-  client_messenger->send_message(reply, notif->session->con);
-  notif->session->put();
-  notif->session->con->put();
-  service.watch->remove_notification(notif);
-  if (notif->timeout)
-    service.watch_timer.cancel_event(notif->timeout);
-  map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif);
-  if (iter != obc->notifs.end())
-    obc->notifs.erase(iter);
-  delete notif;
-}
-
-void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, ReplicatedPG *pg)
-{
-  assert(service.watch_lock.is_locked());
-  pg->assert_locked();
-  Watch::Notification *notif = (Watch::Notification *)_notif;
-  dout(10) << "ack_notification " << name << " notif " << notif << " id " << notif->id << dendl;
-  if (service.watch->ack_notification(name, notif)) {
-    complete_notify(notif, _obc);
-    pg->put_object_context(static_cast<ReplicatedPG::ObjectContext *>(_obc));
-  }
-}
-
-void OSD::handle_watch_timeout(void *obc,
-                              ReplicatedPG *pg,
-                              entity_name_t entity,
-                              utime_t expire)
-{
-  // watch_lock is inside pg->lock; handle_watch_timeout checks for the race.
-  service.watch_lock.Unlock();
-  pg->lock();
-  service.watch_lock.Lock();
-
-  pg->handle_watch_timeout(obc, entity, expire);
-  pg->unlock();
-  pg->put();
-}
-
-void OSD::disconnect_session_watches(Session *session)
-{
-  // get any watched obc's
-  map<ReplicatedPG::ObjectContext *, pg_t> obcs;
-  service.watch_lock.Lock();
-  for (map<void *, pg_t>::iterator iter = session->watches.begin(); iter != session->watches.end(); ++iter) {
-    ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)iter->first;
-    obcs[obc] = iter->second;
-  }
-  service.watch_lock.Unlock();
-
-  for (map<ReplicatedPG::ObjectContext *, pg_t>::iterator oiter = obcs.begin(); oiter != obcs.end(); ++oiter) {
-    ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first;
-    dout(10) << "obc=" << (void *)obc << dendl;
-
-    ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
-    if (!pg) {
-      /* pg removed between watch_unlock.Unlock() and now, all related
-       * watch structures would have been cleaned up in remove_watchers_and_notifies
-       */
-      continue; 
-    }
-    service.watch_lock.Lock();
-
-    if (!session->watches.count((void*)obc)) {
-      // Raced with watch removal, obc is invalid
-      service.watch_lock.Unlock();
-      pg->unlock();
-      continue;
-    }
-
-    /* NOTE! fix this one, should be able to just lookup entity name,
-       however, we currently only keep EntityName on the session and not
-       entity_name_t. */
-    map<entity_name_t, Session *>::iterator witer = obc->watchers.begin();
-    while (1) {
-      while (witer != obc->watchers.end() && witer->second == session) {
-        dout(10) << "removing watching session entity_name=" << session->entity_name
-               << " from " << obc->obs.oi << dendl;
-       entity_name_t entity = witer->first;
-       watch_info_t& w = obc->obs.oi.watchers[entity];
-       utime_t expire = ceph_clock_now(g_ceph_context);
-       expire += w.timeout_seconds;
-       pg->register_unconnected_watcher(obc, entity, expire);
-       dout(10) << " disconnected watch " << w << " by " << entity << " session " << session
-                << ", expires " << expire << dendl;
-        obc->watchers.erase(witer++);
-       pg->put_object_context(obc);
-       session->con->put();
-       session->put();
-      }
-      if (witer == obc->watchers.end())
-        break;
-      ++witer;
-    }
-    service.watch_lock.Unlock();
-    pg->unlock();
-  }
-}
-
 bool OSD::ms_handle_reset(Connection *con)
 {
   dout(1) << "OSD::ms_handle_reset()" << dendl;
   OSD::Session *session = (OSD::Session *)con->get_priv();
   if (!session)
     return false;
-  disconnect_session_watches(session);
+  session->wstate.reset();
   session->put();
   return true;
 }
 
-void OSD::handle_notify_timeout(void *_notif)
-{
-  assert(service.watch_lock.is_locked());
-  Watch::Notification *notif = (Watch::Notification *)_notif;
-  dout(10) << "OSD::handle_notify_timeout notif " << notif << " id " << notif->id << dendl;
-
-  ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)notif->obc;
-
-  pg_t pgid = notif->pgid;
-
-  complete_notify(_notif, obc);
-  service.watch_lock.Unlock(); /* drop lock to change locking order */
-
-  put_object_context(obc, pgid);
-  service.watch_lock.Lock();
-  /* exiting with watch_lock held */
-}
-
 struct C_OSD_GetVersion : public Context {
   OSD *osd;
   uint64_t oldest, newest;
index 1837195d33953bc4fe45995fa3346c21813955cf..25e1eee13ad0d7801b4a2d2bc28ac310234567dd 100644 (file)
@@ -48,6 +48,7 @@ using namespace std;
 #include <ext/hash_set>
 using namespace __gnu_cxx;
 
+#include "Watch.h"
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
 #include "common/sharedptr_registry.hpp"
@@ -295,7 +296,11 @@ public:
   // -- Watch --
   Mutex watch_lock;
   SafeTimer watch_timer;
-  Watch *watch;
+  uint64_t next_notif_id;
+  uint64_t get_next_id(epoch_t cur_epoch) {
+    Mutex::Locker l(watch_lock);
+    return (((uint64_t)cur_epoch) << 32) | ((uint64_t)(next_notif_id++));
+  }
 
   // -- Backfill Request Scheduling --
   Mutex backfill_request_lock;
@@ -526,7 +531,7 @@ public:
     int64_t auid;
     epoch_t last_sent_epoch;
     Connection *con;
-    std::map<void *, pg_t> watches;
+    WatchConState wstate;
 
     Session() : auid(-1), last_sent_epoch(0), con(0) {}
   };
@@ -1468,17 +1473,6 @@ public:
 
   int init_op_flags(OpRequestRef op);
 
-
-  void put_object_context(void *_obc, pg_t pgid);
-  void complete_notify(void *notif, void *obc);
-  void ack_notification(entity_name_t& peer_addr, void *notif, void *obc,
-                       ReplicatedPG *pg);
-  void handle_notify_timeout(void *notif);
-  void disconnect_session_watches(Session *session);
-  void handle_watch_timeout(void *obc,
-                           ReplicatedPG *pg,
-                           entity_name_t entity,
-                           utime_t expire);
   OSDService service;
   friend class OSDService;
 };
index 39bc6b5ab984ffe90d27687f2f8a5b5cc6d7582e..ed85c4e2946e40d41b9e8ac8d33581819f6afdef 100644 (file)
@@ -1918,16 +1918,6 @@ public:
   virtual void on_activate() = 0;
   virtual void on_flushed() = 0;
   virtual void on_shutdown() = 0;
-  virtual void remove_watchers_and_notifies() = 0;
-
-  virtual void register_unconnected_watcher(void *obc,
-                                           entity_name_t entity,
-                                           utime_t expire) = 0;
-  virtual void unregister_unconnected_watcher(void *obc,
-                                             entity_name_t entity) = 0;
-  virtual void handle_watch_timeout(void *obc,
-                                   entity_name_t entity,
-                                   utime_t expire) = 0;
 };
 
 WRITE_CLASS_ENCODER(PG::OndiskLog)
index ffa1a93845ba0f5011117138256d7c4fe4436e4d..2e4ea0286d087927a1cb106a8153b021ed1e6efe 100644 (file)
@@ -1567,94 +1567,6 @@ int ReplicatedPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr)
   }
 }
 
-void ReplicatedPG::dump_watchers(ObjectContext *obc)
-{
-  assert(osd->watch_lock.is_locked());
-  
-  dout(10) << "dump_watchers " << obc->obs.oi.soid << " " << obc->obs.oi << dendl;
-  for (map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.begin(); 
-       iter != obc->watchers.end();
-       ++iter)
-    dout(10) << " * obc->watcher: " << iter->first << " session=" << iter->second << dendl;
-  
-  for (map<entity_name_t, watch_info_t>::iterator oi_iter = obc->obs.oi.watchers.begin();
-       oi_iter != obc->obs.oi.watchers.end();
-       oi_iter++) {
-    watch_info_t& w = oi_iter->second;
-    dout(10) << " * oi->watcher: " << oi_iter->first << " cookie=" << w.cookie << dendl;
-  }
-}
-
-void ReplicatedPG::remove_watcher(ObjectContext *obc, entity_name_t entity)
-{
-  assert_locked();
-  assert(osd->watch_lock.is_locked());
-  dout(10) << "remove_watcher " << *obc << " " << entity << dendl;
-  map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
-  assert(iter != obc->watchers.end());
-  OSD::Session *session = iter->second;
-  dout(10) << "remove_watcher removing session " << session << dendl;
-
-  obc->watchers.erase(iter);
-  assert(session->watches.count(obc));
-  session->watches.erase(obc);
-
-  put_object_context(obc);
-  session->con->put();
-  session->put();
-}
-
-void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif)
-{
-  assert_locked();
-  assert(osd->watch_lock.is_locked());
-  map<Watch::Notification *, bool>::iterator niter = obc->notifs.find(notif);
-
-  // Cancel notification
-  if (notif->timeout)
-    osd->watch_timer.cancel_event(notif->timeout);
-  osd->watch->remove_notification(notif);
-
-  assert(niter != obc->notifs.end());
-
-  niter->first->session->con->put();
-  niter->first->session->put();
-  obc->notifs.erase(niter);
-
-  put_object_context(obc);
-  delete notif;
-}
-
-void ReplicatedPG::remove_watchers_and_notifies()
-{
-  assert_locked();
-
-  dout(10) << "remove_watchers" << dendl;
-
-  osd->watch_lock.Lock();
-  for (map<hobject_t, ObjectContext*>::iterator oiter = object_contexts.begin();
-       oiter != object_contexts.end();
-       ) {
-    map<hobject_t, ObjectContext *>::iterator iter = oiter++;
-    ObjectContext *obc = iter->second;
-    obc->ref++;
-    for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
-        witer != obc->watchers.end();
-        remove_watcher(obc, (witer++)->first)) ;
-    for (map<entity_name_t,Watch::C_WatchTimeout*>::iterator iter = obc->unconnected_watchers.begin();
-        iter != obc->unconnected_watchers.end();
-        ) {
-      map<entity_name_t,Watch::C_WatchTimeout*>::iterator i = iter++;
-      unregister_unconnected_watcher(obc, i->first);
-    }
-    for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
-        niter != obc->notifs.end();
-        remove_notify(obc, (niter++)->first)) ;
-    put_object_context(obc);
-  }
-  osd->watch_lock.Unlock();
-}
-
 // ========================================================================
 // low level osd ops
 
@@ -2329,20 +2241,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_NOTIFY_ACK:
       {
-       osd->watch_lock.Lock();
-       entity_name_t source = ctx->op->request->get_source();
-       map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source);
-       Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie);
-       if (oi_iter != oi.watchers.end() && notif) {
-         ctx->notify_acks.push_back(op.watch.cookie);
-       } else {
-         if (!notif)
-           dout(10) << " no pending notify for cookie " << op.watch.cookie << dendl;
-         else
-           dout(10) << " not registered as a watcher" << dendl;
-         result = -EINVAL;
-       }
-       osd->watch_lock.Unlock();
+       ctx->notify_acks.push_back(op.watch.cookie);
       }
       break;
 
@@ -2557,27 +2456,27 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
        watch_info_t w(cookie, 30);  // FIXME: where does the timeout come from?
        if (do_watch) {
-         if (oi.watchers.count(entity) && oi.watchers[entity] == w) {
+         if (oi.watchers.count(make_pair(cookie, entity))) {
            dout(10) << " found existing watch " << w << " by " << entity << dendl;
          } else {
            dout(10) << " registered new watch " << w << " by " << entity << dendl;
-           oi.watchers[entity] = w;
+           oi.watchers[make_pair(cookie, entity)] = w;
            t.nop();  // make sure update the object_info on disk!
          }
          ctx->watch_connect = true;
          ctx->watch_info = w;
          assert(obc->registered);
         } else {
-         map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
+         map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
+           oi.watchers.find(make_pair(cookie, entity));
          if (oi_iter != oi.watchers.end()) {
-           dout(10) << " removed watch " << oi_iter->second << " by " << entity << dendl;
-            oi.watchers.erase(entity);
+           dout(10) << " removed watch " << oi_iter->second << " by "
+                    << entity << dendl;
+            oi.watchers.erase(oi_iter);
            t.nop();  // update oi on disk
 
            ctx->watch_disconnect = true;
-
-           // FIXME: trigger notifies?
-
+           ctx->watch_info = w;
          } else {
            dout(10) << " can't remove: no watch by " << entity << dendl;
          }
@@ -3340,154 +3239,88 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum
 
 void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 {
-  if (ctx->watch_connect || ctx->watch_disconnect ||
-      !ctx->notifies.empty() || !ctx->notify_acks.empty()) {
-    OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv();
-    assert(session);
-    ObjectContext *obc = ctx->obc;
-    object_info_t& oi = ctx->new_obs.oi;
-    hobject_t& soid = oi.soid;
-    entity_name_t entity = ctx->reqid.name;
+  ConnectionRef conn(ctx->op->request->get_connection());
+  boost::intrusive_ptr<OSD::Session> session(
+    (OSD::Session *)conn->get_priv());
+  entity_name_t entity = ctx->reqid.name;
 
-    dout(10) << "do_osd_op_effects applying watch/notify effects on session " << session << dendl;
-
-    osd->watch_lock.Lock();
-    dump_watchers(obc);
-    
-    map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
-    if (ctx->watch_connect) {
-      watch_info_t w = ctx->watch_info;
-
-      if (iter == obc->watchers.end()) {
-       dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl;
-       obc->watchers[entity] = session;
-       session->con->get();
-       session->get();
-       session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
-       obc->ref++;
-      } else if (iter->second == session) {
-       // already there
-       dout(10) << " already connected to " << w << " by " << entity
-                << " session " << session << dendl;
-      } else {
-       // weird: same entity, different session.
-       dout(10) << " reconnected (with different session!) watch " << w << " by " << entity
-                << " session " << session << " (was " << iter->second << ")" << dendl;
-       session->con->get();
-       session->get();
-
-       iter->second->watches.erase(obc);
-       iter->second->con->put();
-       iter->second->put();
-
-       iter->second = session;
-       session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
-      }
-      map<entity_name_t,Watch::C_WatchTimeout*>::iterator un_iter =
-       obc->unconnected_watchers.find(entity);
-      if (un_iter != obc->unconnected_watchers.end()) {
-       unregister_unconnected_watcher(obc, un_iter->first);
-      }
+  dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
 
-      map<Watch::Notification *, bool>::iterator niter;
-      for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
-       Watch::Notification *notif = niter->first;
-       map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity);
-       if (iter != notif->watchers.end()) {
-         /* there is a pending notification for this watcher, we should resend it anyway
-            even if we already sent it as it might not have received it */
-         MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
-         osd->send_message_osd_client(notify_msg, session->con);
-       }
-      }
-    }
-    
-    if (ctx->watch_disconnect) {
-      if (iter != obc->watchers.end()) {
-       remove_watcher(obc, entity);
-      } else {
-       assert(obc->unconnected_watchers.count(entity));
-       unregister_unconnected_watcher(obc, entity);
-      }
-
-      // ack any pending notifies
-      map<Watch::Notification *, bool>::iterator p = obc->notifs.begin();
-      while (p != obc->notifs.end()) {
-       Watch::Notification *notif = p->first;
-       entity_name_t by = entity;
-       p++;
-       assert(notif->obc == obc);
-       dout(10) << " acking pending notif " << notif->id << " by " << by << dendl;
-       // TODOSAM: osd->osd-> not good
-       osd->osd->ack_notification(entity, notif, obc, this);
-      }
+  if (ctx->watch_connect) {
+    pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
+    dout(15) << "do_osd_op_effects applying watch connect on session "
+            << session.get() << " watcher " << watcher << dendl;
+    WatchRef watch;
+    if (ctx->obc->watchers.count(watcher)) {
+      dout(15) << "do_osd_op_effects found existing watch watcher " << watcher
+              << dendl;
+      watch = ctx->obc->watchers[watcher];
+    } else {
+      dout(15) << "do_osd_op_effects new watcher " << watcher
+              << dendl;
+      watch = Watch::makeWatchRef(
+       this, osd, ctx->obc, ctx->watch_info.timeout_seconds,
+       ctx->watch_info.cookie, entity);
+      ctx->obc->watchers.insert(
+       make_pair(
+         watcher,
+         watch));
+    }
+    watch->connect(conn);
+  }
+
+  if (ctx->watch_disconnect) {
+    pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
+    dout(15) << "do_osd_op_effects applying watch disconnect on session "
+            << session.get() << " and watcher " << watcher << dendl;
+    if (ctx->obc->watchers.count(watcher)) {
+      WatchRef watch = ctx->obc->watchers[watcher];
+      dout(10) << "do_osd_op_effects applying disconnect found watcher "
+              << watcher << dendl;
+      ctx->obc->watchers.erase(watcher);
+      watch->remove();
+    } else {
+      dout(10) << "do_osd_op_effects failed to find watcher "
+              << watcher << dendl;
     }
+  }
 
-    for (list<notify_info_t>::iterator p = ctx->notifies.begin();
-        p != ctx->notifies.end();
-        ++p) {
-
-      dout(10) << " " << *p << dendl;
-
-      Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
-      session->get();  // notif got a reference
-      session->con->get();
-      notif->pgid = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
-
-      osd->watch->add_notification(notif);
-      dout(20) << " notify id " << notif->id << dendl;
-
-      // connected
-      for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin();
-          i != obc->obs.oi.watchers.end();
-          ++i) {
-       map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first);
-       if (q != obc->watchers.end()) {
-         entity_name_t name = q->first;
-         OSD::Session *s = q->second;
-         watch_info_t& w = obc->obs.oi.watchers[q->first];
-
-         notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
-
-         MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
-         osd->send_message_osd_client(notify_msg, s->con);
-       } else {
-         // unconnected
-         entity_name_t name = i->first;
-         notif->add_watcher(name, Watch::WATCHER_PENDING);
-       }
-      }
-
-      notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
-      if (notif->watchers.empty()) {
-       // TODOSAM: osd->osd-> not good
-       osd->osd->complete_notify(notif, obc);
-      } else {
-       obc->notifs[notif] = true;
-       obc->ref++;
-       notif->obc = obc;
-       // TODOSAM: osd->osd not good
-       notif->timeout = new Watch::C_NotifyTimeout(osd->osd, notif);
-       osd->watch_timer.add_event_after(p->timeout, notif->timeout);
-      }
+  for (list<notify_info_t>::iterator p = ctx->notifies.begin();
+       p != ctx->notifies.end();
+       ++p) {
+    dout(10) << "do_osd_op_effects, notify " << *p << dendl;
+    NotifyRef notif(
+      Notify::makeNotifyRef(
+       conn,
+       ctx->obc->watchers.size(),
+       p->bl,
+       p->timeout,
+       p->cookie,
+       osd->get_next_id(get_osdmap()->get_epoch()),
+       ctx->obc->obs.oi.user_version.version,
+       osd));
+    for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
+          ctx->obc->watchers.begin();
+        i != ctx->obc->watchers.end();
+        ++i) {
+      dout(10) << "starting notify on watch " << i->first << dendl;
+      i->second->start_notify(notif);
     }
+    notif->init();
+  }
 
-    for (list<uint64_t>::iterator p = ctx->notify_acks.begin(); p != ctx->notify_acks.end(); ++p) {
-      uint64_t cookie = *p;
-      
-      dout(10) << " notify_ack " << cookie << dendl;
-      map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
-      assert(oi_iter != oi.watchers.end());
-
-      Watch::Notification *notif = osd->watch->get_notif(cookie);
-      assert(notif);
-
-      // TODOSAM: osd->osd-> not good
-      osd->osd->ack_notification(entity, notif, obc, this);
+  for (list<uint64_t>::iterator p = ctx->notify_acks.begin();
+       p != ctx->notify_acks.end();
+       ++p) {
+    dout(10) << "notify_ack " << *p << dendl;
+    for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
+          ctx->obc->watchers.begin();
+        i != ctx->obc->watchers.end();
+        ++i) {
+      if (i->first.second != entity) continue;
+      dout(10) << "acking notify on watch " << i->first << dendl;
+      i->second->notify_ack(*p);
     }
-
-    osd->watch_lock.Unlock();
-    session->put();
   }
 }
 
@@ -4115,113 +3948,39 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
          log.objects[obc->obs.oi.soid]->reverting_to == obc->obs.oi.version));
 
   dout(10) << "populate_obc_watchers " << obc->obs.oi.soid << dendl;
-  if (!obc->obs.oi.watchers.empty()) {
-    Mutex::Locker l(osd->watch_lock);
-    assert(obc->unconnected_watchers.size() == 0);
-    assert(obc->watchers.size() == 0);
-    // populate unconnected_watchers
-    utime_t now = ceph_clock_now(g_ceph_context);
-    for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin();
-        p != obc->obs.oi.watchers.end();
-        p++) {
-      utime_t expire = now;
-      expire += p->second.timeout_seconds;
-      dout(10) << "  unconnected watcher " << p->first << " will expire " << expire << dendl;
-      register_unconnected_watcher(obc, p->first, expire);
-    }
-  }
-}
-
-void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
-                                                 entity_name_t entity)
-{
-  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
-
-  /* If we failed to cancel the event, the event will fire and the obc
-   * ref and the pg ref will be taken care of */
-  if (osd->watch_timer.cancel_event(obc->unconnected_watchers[entity])) {
-    put_object_context(obc);
-    put();
+  assert(obc->watchers.size() == 0);
+  // populate unconnected_watchers
+  utime_t now = ceph_clock_now(g_ceph_context);
+  for (map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator p =
+       obc->obs.oi.watchers.begin();
+       p != obc->obs.oi.watchers.end();
+       p++) {
+    utime_t expire = now;
+    expire += p->second.timeout_seconds;
+    dout(10) << "  unconnected watcher " << p->first << " will expire " << expire << dendl;
+    WatchRef watch(
+      Watch::makeWatchRef(
+       this, osd, obc, p->second.timeout_seconds, p->first.first, p->first.second));
+    watch->disconnect();
+    obc->watchers.insert(
+      make_pair(
+       make_pair(p->first.first, p->first.second),
+       watch));
   }
-  obc->unconnected_watchers.erase(entity);
 }
 
-void ReplicatedPG::register_unconnected_watcher(void *_obc,
-                                               entity_name_t entity,
-                                               utime_t expire)
+void ReplicatedPG::handle_watch_timeout(WatchRef watch)
 {
-  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
-  pg_t pgid = info.pgid;
-  pgid.set_ps(obc->obs.oi.soid.hash);
-  get();
-  obc->ref++;
-  Watch::C_WatchTimeout *cb = new Watch::C_WatchTimeout(osd->osd,
-                                                       static_cast<void *>(obc),
-                                                       this,
-                                                       entity, expire);
-  osd->watch_timer.add_event_at(expire, cb);
-  obc->unconnected_watchers[entity] = cb;
-}
-
-void ReplicatedPG::handle_watch_timeout(void *_obc,
-                                       entity_name_t entity,
-                                       utime_t expire)
-{
-  dout(10) << "handle_watch_timeout obc " << _obc << dendl;
-  struct HandleWatchTimeout : public Context {
-    epoch_t cur_epoch;
-    boost::intrusive_ptr<ReplicatedPG> pg;
-    void *obc;
-    entity_name_t entity;
-    utime_t expire;
-    HandleWatchTimeout(
-      epoch_t cur_epoch,
-      ReplicatedPG *pg,
-      void *obc,
-      entity_name_t entity,
-      utime_t expire) : cur_epoch(cur_epoch),
-                       pg(pg), obc(obc), entity(entity), expire(expire) {
-      assert(pg->is_locked());
-      static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
-    }
-    void finish(int) {
-      assert(pg->is_locked());
-      if (cur_epoch < pg->last_peering_reset)
-       return;
-      // handle_watch_timeout gets its own ref
-      static_cast<ReplicatedPG::ObjectContext*>(obc)->get();
-      pg->handle_watch_timeout(obc, entity, expire);
-    }
-    ~HandleWatchTimeout() {
-      assert(pg->is_locked());
-      pg->put_object_context(static_cast<ReplicatedPG::ObjectContext*>(obc));
-    }
-  };
-
-  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
-
-  if (obc->unconnected_watchers.count(entity) == 0 ||
-      (obc->unconnected_watchers[entity] &&
-       obc->unconnected_watchers[entity]->expire != expire)) {
-     /* If obc->unconnected_watchers[entity] == NULL we know at least that
-      * the watcher for obc,entity should expire.  We might not have been
-      * the intended Context*, but that's ok since the intended one will
-      * take this branch and assume it raced. */
-    dout(10) << "handle_watch_timeout must have raced, no/wrong unconnected_watcher "
-            << entity << dendl;
-    put_object_context(obc);
-    return;
-  }
+  ObjectContext *obc = watch->get_obc(); // handle_watch_timeout owns this ref
+  dout(10) << "handle_watch_timeout obc " << obc << dendl;
 
   if (is_degraded_object(obc->obs.oi.soid)) {
     callbacks_for_degraded_object[obc->obs.oi.soid].push_back(
-      new HandleWatchTimeout(get_osdmap()->get_epoch(),
-                            this, _obc, entity, expire)
+      watch->get_delayed_cb()
       );
     dout(10) << "handle_watch_timeout waiting for degraded on obj "
             << obc->obs.oi.soid
             << dendl;
-    obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
     put_object_context(obc); // callback got its own ref
     return;
   }
@@ -4230,15 +3989,16 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
     dout(10) << "handle_watch_timeout waiting for scrub on obj "
             << obc->obs.oi.soid
             << dendl;
-    scrubber.add_callback(new HandleWatchTimeout(get_osdmap()->get_epoch(),
-                                                this, _obc, entity, expire));
-    obc->unconnected_watchers[entity] = 0; // Callback in progress, but not this one!
-    put_object_context(obc); // callback got its own ref
+    scrubber.add_callback(
+      watch->get_delayed_cb() // This callback!
+      );
+    put_object_context(obc);
     return;
   }
 
-  obc->unconnected_watchers.erase(entity);
-  obc->obs.oi.watchers.erase(entity);
+  obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
+  obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
+  watch->remove();
 
   vector<OSDOp> ops;
   tid_t rep_tid = osd->get_tid();
@@ -4283,7 +4043,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
   eval_repop(repop);
 }
 
-ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
+ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
 {
   map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid);
   if (p != object_contexts.end())
@@ -4291,7 +4051,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_
   return NULL;
 }
 
-ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi,
+ObjectContext *ReplicatedPG::create_object_context(const object_info_t& oi,
                                                                 SnapSetContext *ssc)
 {
   ObjectContext *obc = new ObjectContext(oi, false, ssc);
@@ -4302,7 +4062,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::create_object_context(const object_in
   return obc;
 }
 
-ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
+ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
                                                              const object_locator_t& oloc,
                                                              bool can_create)
 {
@@ -4354,7 +4114,24 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s
 
 void ReplicatedPG::context_registry_on_change()
 {
-  remove_watchers_and_notifies();
+  list<ObjectContext *> contexts;
+  for (map<hobject_t, ObjectContext*>::iterator i = object_contexts.begin();
+       i != object_contexts.end();
+       ++i) {
+    i->second->get();
+    contexts.push_back(i->second);
+    for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator j =
+          i->second->watchers.begin();
+        j != i->second->watchers.end();
+        i->second->watchers.erase(j++)) {
+      j->second->discard();
+    }
+  }
+  for (list<ObjectContext *>::iterator i = contexts.begin();
+       i != contexts.end();
+       contexts.erase(i++)) {
+    put_object_context(*i);
+  }
 }
 
 
@@ -4529,7 +4306,7 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *
     pgstat->stats.cat_sum[oi.category].add(stat);
 }
 
-ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
+SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
 {
   SnapSetContext *ssc = new SnapSetContext(oid);
   dout(10) << "create_snapset_context " << ssc << " " << ssc->oid << dendl;
@@ -4538,10 +4315,10 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::create_snapset_context(const object_
   return ssc;
 }
 
-ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
-                                                               const string& key,
-                                                               ps_t seed,
-                                                               bool can_create)
+SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
+                                                 const string& key,
+                                                 ps_t seed,
+                                                 bool can_create)
 {
   SnapSetContext *ssc;
   map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid);
@@ -6028,7 +5805,7 @@ eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
 
 /* Mark an object as lost
  */
-ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
+ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
                                                            const hobject_t &oid, eversion_t version,
                                                            utime_t mtime, int what)
 {
@@ -6065,7 +5842,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
 
 struct C_PG_MarkUnfoundLost : public Context {
   ReplicatedPG *pg;
-  list<ReplicatedPG::ObjectContext*> obcs;
+  list<ObjectContext*> obcs;
   C_PG_MarkUnfoundLost(ReplicatedPG *p) : pg(p) {
     pg->get();
   }
@@ -6246,14 +6023,14 @@ void ReplicatedPG::on_removal()
 {
   dout(10) << "on_removal" << dendl;
   apply_and_flush_repops(false);
-  remove_watchers_and_notifies();
+  context_registry_on_change();
 }
 
 void ReplicatedPG::on_shutdown()
 {
   dout(10) << "on_shutdown" << dendl;
   apply_and_flush_repops(false);
-  remove_watchers_and_notifies();
+  context_registry_on_change();
 }
 
 void ReplicatedPG::on_flushed()
index 397d8cfc63e93cfbf3ed80442200cc16131395c1..47ee76e5aa3353a28c0dfcb8965afa204246d0ec 100644 (file)
@@ -60,6 +60,7 @@ public:
 
 class ReplicatedPG : public PG {
   friend class OSD;
+  friend class Watch;
 public:  
 
   /*
@@ -424,14 +425,9 @@ protected:
   map<object_t, SnapSetContext*> snapset_contexts;
 
   void populate_obc_watchers(ObjectContext *obc);
-  void register_unconnected_watcher(void *obc,
-                                   entity_name_t entity,
-                                   utime_t expire);
-  void unregister_unconnected_watcher(void *obc,
-                                     entity_name_t entity);
-  void handle_watch_timeout(void *obc,
-                           entity_name_t entity,
-                           utime_t expire);
+public:
+  void handle_watch_timeout(WatchRef watch);
+protected:
 
   ObjectContext *lookup_object_context(const hobject_t& soid) {
     if (object_contexts.count(soid)) {
@@ -725,11 +721,6 @@ protected:
   void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
 
 
-  void dump_watchers(ObjectContext *obc);
-  void remove_watcher(ObjectContext *obc, entity_name_t entity);
-  void remove_notify(ObjectContext *obc, Watch::Notification *notif);
-  void remove_watchers_and_notifies();
-
   struct RepModify {
     ReplicatedPG *pg;
     OpRequestRef op;
@@ -1020,20 +1011,6 @@ public:
   void on_shutdown();
 };
 
-
-inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectState& obs)
-{
-  out << obs.oi.soid;
-  if (!obs.exists)
-    out << "(dne)";
-  return out;
-}
-
-inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc)
-{
-  return out << "obc(" << obc.obs << ")";
-}
-
 inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
 {
   out << "repgather(" << &repop
index da90cef4103dc1b126db558727af855c3bf41610..1ee5f35b5e341de29c3263bdbda944393d2b06b6 100644 (file)
@@ -1,7 +1,8 @@
-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 #include "PG.h"
 
 #include "include/types.h"
+#include "messages/MWatchNotify.h"
 
 #include <map>
 
 
 #include "common/config.h"
 
-bool Watch::ack_notification(entity_name_t& watcher, Notification *notif)
+struct CancelableContext : public Context {
+  virtual void cancel() = 0;
+};
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+static ostream& _prefix(
+  std::ostream* _dout,
+  Notify *notify) {
+  return *_dout << notify->gen_dbg_prefix();
+}
+
+Notify::Notify(
+  ConnectionRef client,
+  unsigned num_watchers,
+  bufferlist &payload,
+  uint32_t timeout,
+  uint64_t cookie,
+  uint64_t notify_id,
+  uint64_t version,
+  OSDService *osd)
+  : client(client),
+    in_progress_watchers(num_watchers),
+    complete(false),
+    discarded(false),
+    payload(payload),
+    timeout(timeout),
+    cookie(cookie),
+    notify_id(notify_id),
+    version(version),
+    osd(osd),
+    cb(NULL),
+    lock("Notify::lock") {}
+
+NotifyRef Notify::makeNotifyRef(
+  ConnectionRef client,
+  unsigned num_watchers,
+  bufferlist &payload,
+  uint32_t timeout,
+  uint64_t cookie,
+  uint64_t notify_id,
+  uint64_t version,
+  OSDService *osd) {
+  NotifyRef ret(
+    new Notify(
+      client, num_watchers,
+      payload, timeout,
+      cookie, notify_id,
+      version, osd));
+  ret->set_self(ret);
+  return ret;
+}
+
+class NotifyTimeoutCB : public CancelableContext {
+  NotifyRef notif;
+  bool canceled; // protected by notif lock
+public:
+  NotifyTimeoutCB(NotifyRef notif) : notif(notif) {}
+  void finish(int) {
+    notif->osd->watch_lock.Unlock();
+    notif->lock.Lock();
+    if (!canceled)
+      notif->do_timeout(); // drops lock
+    else
+      notif->lock.Unlock();
+    notif->osd->watch_lock.Lock();
+  }
+  void cancel() {
+    assert(notif->lock.is_locked_by_me());
+    canceled = true;
+  }
+};
+
+void Notify::do_timeout()
 {
-  map<entity_name_t, WatcherState>::iterator iter = notif->watchers.find(watcher);
+  assert(lock.is_locked_by_me());
+  dout(10) << "timeout" << dendl;
+  cb = NULL;
+  if (is_discarded()) {
+    lock.Unlock();
+    return;
+  }
 
-  if (iter == notif->watchers.end()) // client was not suppose to ack this notification
-    return false;
+  in_progress_watchers = 0; // we give up TODO: we should return an error code
+  maybe_complete_notify();
+  assert(complete);
+  set<WatchRef> _watchers;
+  _watchers.swap(watchers);
+  lock.Unlock();
 
-  notif->watchers.erase(iter);
+  for (set<WatchRef>::iterator i = _watchers.begin();
+       i != _watchers.end();
+       ++i) {
+    boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg());
+    pg->lock();
+    if (!(*i)->is_discarded()) {
+      (*i)->cancel_notify(self.lock());
+    }
+    pg->unlock();
+  }
+}
 
-  return notif->watchers.empty(); // true if there are no more watchers
+void Notify::register_cb()
+{
+  assert(lock.is_locked_by_me());
+  {
+    osd->watch_lock.Lock();
+    cb = new NotifyTimeoutCB(self.lock());
+    osd->watch_timer.add_event_after(
+      timeout,
+      cb);
+    osd->watch_lock.Unlock();
+  }
+}
+
+void Notify::unregister_cb()
+{
+  assert(lock.is_locked_by_me());
+  if (!cb)
+    return;
+  cb->cancel();
+  {
+    osd->watch_lock.Lock();
+    osd->watch_timer.cancel_event(cb);
+    cb = NULL;
+    osd->watch_lock.Unlock();
+  }
+}
+
+void Notify::start_watcher(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "start_watcher" << dendl;
+  watchers.insert(watch);
+}
+
+void Notify::complete_watcher(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  dout(10) << "complete_watcher" << dendl;
+  if (is_discarded())
+    return;
+  assert(in_progress_watchers > 0);
+  watchers.erase(watch);
+  --in_progress_watchers;
+  maybe_complete_notify();
+}
+
+void Notify::maybe_complete_notify()
+{
+  dout(10) << "maybe_complete_notify -- "
+          << in_progress_watchers
+          << " in progress watchers " << dendl;
+  if (!in_progress_watchers) {
+    MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
+                                        WATCH_NOTIFY, payload));
+    osd->send_message_osd_client(reply, client.get());
+    unregister_cb();
+    complete = true;
+  }
+}
+
+void Notify::discard()
+{
+  Mutex::Locker l(lock);
+  discarded = true;
+  unregister_cb();
+  watchers.clear();
+}
+
+void Notify::init()
+{
+  Mutex::Locker l(lock);
+  register_cb();
+  maybe_complete_notify();
+  assert(in_progress_watchers == watchers.size());
+}
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, watch.get())
+
+static ostream& _prefix(
+  std::ostream* _dout,
+  Watch *watch) {
+  return *_dout << watch->gen_dbg_prefix();
+}
+
+class HandleWatchTimeout : public CancelableContext {
+  WatchRef watch;
+public:
+  bool canceled; // protected by watch->pg->lock
+  HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+  void cancel() {
+    canceled = true;
+  }
+  void finish(int) { assert(0); /* not used */ }
+  void complete(int) {
+    dout(10) << "HandleWatchTimeout" << dendl;
+    boost::intrusive_ptr<ReplicatedPG> pg(watch->pg);
+    OSDService *osd(watch->osd);
+    osd->watch_lock.Unlock();
+    pg->lock();
+    watch->cb = NULL;
+    if (!watch->is_discarded() && !canceled)
+      watch->pg->handle_watch_timeout(watch);
+    delete this; // ~Watch requires pg lock!
+    pg->unlock();
+    osd->watch_lock.Lock();
+  }
+};
+
+class HandleDelayedWatchTimeout : public CancelableContext {
+  WatchRef watch;
+public:
+  bool canceled;
+  HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {}
+  void cancel() {
+    canceled = true;
+  }
+  void finish(int) {
+    dout(10) << "HandleWatchTimeoutDelayed" << dendl;
+    assert(watch->pg->is_locked());
+    watch->cb = NULL;
+    if (!watch->is_discarded() && !canceled)
+      watch->pg->handle_watch_timeout(watch);
+  }
+};
+
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+
+string Watch::gen_dbg_prefix() {
+  stringstream ss;
+  ss << pg->gen_prefix() << " -- Watch(" 
+     << make_pair(cookie, entity)
+     << ", obc->ref=" << (obc ? obc->ref : -1) << ") ";
+  return ss.str();
+}
+
+Watch::Watch(
+  ReplicatedPG *pg,
+  OSDService *osd,
+  ObjectContext *obc,
+  uint32_t timeout,
+  uint64_t cookie,
+  entity_name_t entity)
+  : cb(NULL),
+    osd(osd),
+    pg(pg),
+    obc(obc),
+    timeout(timeout),
+    cookie(cookie),
+    entity(entity),
+    discarded(false) {
+  obc->get();
+  dout(10) << "Watch()" << dendl;
+}
+
+Watch::~Watch() {
+  dout(10) << "~Watch" << dendl;
+  // users must have called remove() or discard() prior to this point
+  assert(!obc);
+  assert(!conn);
+}
+
+bool Watch::connected() { return conn; }
+
+Context *Watch::get_delayed_cb()
+{
+  assert(!cb);
+  cb = new HandleDelayedWatchTimeout(self.lock());
+  return cb;
 }
 
-void Watch::C_NotifyTimeout::finish(int r)
+ObjectContext *Watch::get_obc()
 {
-  osd->handle_notify_timeout(notif);
+  assert(obc);
+  obc->get();
+  return obc;
 }
 
-void Watch::C_WatchTimeout::finish(int r)
+void Watch::register_cb()
 {
-  osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity,
-                           expire);
+  Mutex::Locker l(osd->watch_lock);
+  dout(15) << "registering callback, timeout: " << timeout << dendl;
+  cb = new HandleWatchTimeout(self.lock());
+  osd->watch_timer.add_event_after(
+    timeout,
+    cb);
 }
 
+void Watch::unregister_cb()
+{
+  dout(15) << "unregister_cb" << dendl;
+  if (!cb)
+    return;
+  dout(15) << "actually registered, cancelling" << dendl;
+  cb->cancel();
+  {
+    Mutex::Locker l(osd->watch_lock);
+    osd->watch_timer.cancel_event(cb); // harmless if not registered with timer
+  }
+  cb = NULL;
+}
+
+void Watch::connect(ConnectionRef con)
+{
+  dout(10) << "connecting" << dendl;
+  conn = con;
+  OSD::Session* sessionref(static_cast<OSD::Session*>(con->get_priv()));
+  sessionref->wstate.addWatch(self.lock());
+  sessionref->put();
+  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+       i != in_progress_notifies.end();
+       ++i) {
+    send_notify(i->second);
+  }
+  unregister_cb();
+}
+
+void Watch::disconnect()
+{
+  dout(10) << "disconnect" << dendl;
+  conn = ConnectionRef();
+  register_cb();
+}
+
+void Watch::discard()
+{
+  dout(10) << "discard" << dendl;
+  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+       i != in_progress_notifies.end();
+       ++i) {
+    i->second->discard();
+  }
+  discard_state();
+}
+
+void Watch::discard_state()
+{
+  assert(pg->is_locked());
+  assert(!discarded);
+  assert(obc);
+  in_progress_notifies.clear();
+  unregister_cb();
+  discarded = true;
+  if (conn) {
+    OSD::Session* sessionref(static_cast<OSD::Session*>(conn->get_priv()));
+    sessionref->wstate.removeWatch(self.lock());
+    sessionref->put();
+    conn = ConnectionRef();
+  }
+  pg->put_object_context(obc);
+  obc = NULL;
+}
+
+bool Watch::is_discarded()
+{
+  return discarded;
+}
+
+void Watch::remove()
+{
+  dout(10) << "remove" << dendl;
+  for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
+       i != in_progress_notifies.end();
+       ++i) {
+    i->second->complete_watcher(self.lock());
+  }
+  discard_state();
+}
+
+void Watch::start_notify(NotifyRef notif)
+{
+  dout(10) << "start_notify " << notif->notify_id << dendl;
+  assert(in_progress_notifies.find(notif->notify_id) ==
+        in_progress_notifies.end());
+  in_progress_notifies[notif->notify_id] = notif;
+  notif->start_watcher(self.lock());
+  if (connected())
+    send_notify(notif);
+}
+
+void Watch::cancel_notify(NotifyRef notif)
+{
+  dout(10) << "cancel_notify " << notif->notify_id << dendl;
+  in_progress_notifies.erase(notif->notify_id);
+}
+
+void Watch::send_notify(NotifyRef notif)
+{
+  dout(10) << "send_notify" << dendl;
+  MWatchNotify *notify_msg = new MWatchNotify(
+    cookie, notif->version, notif->notify_id,
+    WATCH_NOTIFY, notif->payload);
+  osd->send_message_osd_client(notify_msg, conn.get());
+}
+
+void Watch::notify_ack(uint64_t notify_id)
+{
+  dout(10) << "notify_ack" << dendl;
+  map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
+  if (i != in_progress_notifies.end()) {
+    i->second->complete_watcher(self.lock());
+    in_progress_notifies.erase(i);
+  }
+}
+
+WatchRef Watch::makeWatchRef(
+  ReplicatedPG *pg, OSDService *osd,
+  ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity)
+{
+  WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity));
+  ret->set_self(ret);
+  return ret;
+}
+
+void WatchConState::addWatch(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  watches.insert(watch);
+}
+
+void WatchConState::removeWatch(WatchRef watch)
+{
+  Mutex::Locker l(lock);
+  watches.erase(watch);
+}
+
+void WatchConState::reset()
+{
+  set<WatchRef> _watches;
+  {
+    Mutex::Locker l(lock);
+    _watches.swap(watches);
+  }
+  for (set<WatchRef>::iterator i = _watches.begin();
+       i != _watches.end();
+       ++i) {
+    boost::intrusive_ptr<ReplicatedPG> pg((*i)->get_pg());
+    pg->lock();
+    if (!(*i)->is_discarded()) {
+      (*i)->disconnect();
+    }
+    pg->unlock();
+  }
+}
index cb48de4d426a7b4e644d3b55a860323186776e7b..089350f35bbab1b7832fb271ed62b34c76e837db 100644 (file)
  * Foundation.  See file COPYING.
  * 
  */
-
-
 #ifndef CEPH_WATCH_H
 #define CEPH_WATCH_H
 
-#include <map>
+#include <boost/intrusive_ptr.hpp>
+#include <tr1/memory>
+#include <set>
 
-#include "OSD.h"
-#include "common/config.h"
+#include "msg/Messenger.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+
+enum WatcherState {
+  WATCHER_PENDING,
+  WATCHER_NOTIFIED,
+};
 
+class OSDService;
+class ReplicatedPG;
+void intrusive_ptr_add_ref(ReplicatedPG *pg);
+void intrusive_ptr_release(ReplicatedPG *pg);
+class ObjectContext;
 class MWatchNotify;
 
-/* keeps track and accounts sessions, watchers and notifiers */
-class Watch {
-  uint64_t notif_id;
+class Watch;
+typedef std::tr1::shared_ptr<Watch> WatchRef;
+typedef std::tr1::weak_ptr<Watch> WWatchRef;
 
-public:
-  enum WatcherState {
-    WATCHER_PENDING,
-    WATCHER_NOTIFIED,
-  };
-
-  struct Notification {
-    std::map<entity_name_t, WatcherState> watchers;
-    entity_name_t name;
-    uint64_t id;
-    OSD::Session *session;
-    uint64_t cookie;
-    MWatchNotify *reply;
-    Context *timeout;
-    void *obc;
-    pg_t pgid;
-    bufferlist bl;
-
-    void add_watcher(const entity_name_t& name, WatcherState state) {
-      watchers[name] = state;
-    }
-
-    Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b)
-      : name(n), id(0), session(s), cookie(c), reply(0), timeout(0),
-       obc(0), bl(b) { }
-  };
-
-  class C_NotifyTimeout : public Context {
-    OSD *osd;
-    Notification *notif;
-  public:
-    C_NotifyTimeout(OSD *_osd, Notification *_notif) : osd(_osd), notif(_notif) {}
-    void finish(int r);
-  };
-
-  class C_WatchTimeout : public Context {
-    OSD *osd;
-    void *obc;
-    void *pg;
-    entity_name_t entity;
-  public:
-    utime_t expire;
-    C_WatchTimeout(OSD *_osd, void *_obc, void *_pg,
-                  entity_name_t _entity, utime_t _expire) :
-      osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {}
-    void finish(int r);
-  };
-
-private:
-  std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */
+class Notify;
+typedef std::tr1::shared_ptr<Notify> NotifyRef;
+typedef std::tr1::weak_ptr<Notify> WNotifyRef;
 
-public:
-  Watch() : notif_id(0) {}
+class CancelableContext;
 
-  void add_notification(Notification *notif) {
-    notif->id = ++notif_id;
-    notifs[notif->id] = notif;
+/**
+ * Notify tracks the progress of a particular notify
+ *
+ * References are held by Watch and the timeout callback.
+ */
+class NotifyTimeoutCB;
+class Notify {
+  friend class NotifyTimeoutCB;
+  friend class Watch;
+  WNotifyRef self;
+  ConnectionRef client;
+  unsigned in_progress_watchers;
+  bool complete;
+  bool discarded;
+  set<WatchRef> watchers;
+
+  bufferlist payload;
+  uint32_t timeout;
+  uint64_t cookie;
+  uint64_t notify_id;
+  uint64_t version;
+
+  OSDService *osd;
+  CancelableContext *cb;
+  Mutex lock;
+
+
+  /// true if this notify is being discarded
+  bool is_discarded() {
+    return discarded || complete;
   }
-  Notification *get_notif(uint64_t id) {
-    map<uint64_t, Notification *>::iterator iter = notifs.find(id);
-    if (iter != notifs.end())
-      return iter->second;
-    return NULL;
+
+  /// Sends notify completion if in_progress_watchers == 0
+  void maybe_complete_notify();
+
+  /// Called on Notify timeout
+  void do_timeout();
+
+  Notify(
+    ConnectionRef client,
+    unsigned num_watchers,
+    bufferlist &payload,
+    uint32_t timeout,
+    uint64_t cookie,
+    uint64_t notify_id,
+    uint64_t version,
+    OSDService *osd);
+
+  /// registers a timeout callback with the watch_timer
+  void register_cb();
+
+  /// removes the timeout callback, called on completion or cancellation
+  void unregister_cb();
+public:
+  string gen_dbg_prefix() {
+    stringstream ss;
+    ss << "Notify(" << make_pair(cookie, notify_id) << " "
+       << " in_progress_watchers=" << in_progress_watchers
+       << ") ";
+    return ss.str();
   }
-  void remove_notification(Notification *notif) {
-    map<uint64_t, Notification *>::iterator iter = notifs.find(notif->id);
-    if (iter != notifs.end())
-      notifs.erase(iter);
+  void set_self(NotifyRef _self) {
+    self = _self;
   }
+  static NotifyRef makeNotifyRef(
+    ConnectionRef client,
+    unsigned num_watchers,
+    bufferlist &payload,
+    uint32_t timeout,
+    uint64_t cookie,
+    uint64_t notify_id,
+    uint64_t version,
+    OSDService *osd);
+
+  /// Call after creation to initialize
+  void init();
+
+  /// Called once per watcher prior to init()
+  void start_watcher(
+    WatchRef watcher ///< [in] watcher to complete
+    );
 
-  bool ack_notification(entity_name_t& watcher, Notification *notif);
+  /// Called once per NotifyAck
+  void complete_watcher(
+    WatchRef watcher ///< [in] watcher to complete
+    );
+
+  /// Called when the notify is canceled due to a new peering interval
+  void discard();
 };
 
+/**
+ * Watch is a mapping between a Connection and an ObjectContext
+ *
+ * References are held by ObjectContext and the timeout callback
+ */
+class HandleWatchTimeout;
+class HandleDelayedWatchTimeout;
+class Watch {
+  WWatchRef self;
+  friend class HandleWatchTimeout;
+  friend class HandleDelayedWatchTimeout;
+  ConnectionRef conn;
+  CancelableContext *cb;
+
+  OSDService *osd;
+  boost::intrusive_ptr<ReplicatedPG> pg;
+  ObjectContext *obc;
+
+  std::map<uint64_t, NotifyRef> in_progress_notifies;
+
+  uint32_t timeout;
+  uint64_t cookie;
+  entity_name_t entity;
+  bool discarded;
 
+  Watch(
+    ReplicatedPG *pg, OSDService *osd,
+    ObjectContext *obc, uint32_t timeout,
+    uint64_t cookie, entity_name_t entity);
+
+  /// Registers the timeout callback with watch_timer
+  void register_cb();
+
+  /// Unregisters the timeout callback
+  void unregister_cb();
+
+  /// send a Notify message when connected for notif
+  void send_notify(NotifyRef notif);
+
+  /// Cleans up state on discard or remove (including Connection state, obc)
+  void discard_state();
+public:
+  /// NOTE: must be called with pg lock held
+  ~Watch();
+
+  string gen_dbg_prefix();
+  static WatchRef makeWatchRef(
+    ReplicatedPG *pg, OSDService *osd,
+    ObjectContext *obc, uint32_t timeout, uint64_t cookie, entity_name_t entity);
+  void set_self(WatchRef _self) {
+    self = _self;
+  }
+
+  /// Does not grant a ref count!
+  boost::intrusive_ptr<ReplicatedPG> get_pg() { return pg; }
+
+  /// Grants a ref count!
+  ObjectContext *get_obc();
+  uint64_t get_cookie() const { return cookie; }
+  entity_name_t get_entity() const { return entity; }
+
+  /// Generates context for use if watch timeout is delayed by scrub or recovery
+  Context *get_delayed_cb();
+
+  /// True if currently connected
+  bool connected();
+
+  /// Transitions Watch to connected, unregister_cb, resends pending Notifies
+  void connect(
+    ConnectionRef con ///< [in] Reference to new connection
+    );
+
+  /// Transitions watch to disconnected, register_cb
+  void disconnect();
+
+  /// Called if Watch state is discarded due to new peering interval
+  void discard();
+
+  /// True if removed or discarded
+  bool is_discarded();
+
+  /// Called on unwatch
+  void remove();
+
+  /// Adds notif as in-progress notify
+  void start_notify(
+    NotifyRef notif ///< [in] Reference to new in-progress notify
+    );
+
+  /// Removes timed out notify
+  void cancel_notify(
+    NotifyRef notif ///< [in] notify which timed out
+    );
+
+  /// Call when notify_ack received on notify_id
+  void notify_ack(
+    uint64_t notify_id ///< [in] id of acked notify
+    );
+};
+
+/**
+ * Holds weak refs to Watch structures corresponding to a connection
+ * Lives in the OSD::Session object of an OSD connection
+ */
+class WatchConState {
+  Mutex lock;
+  std::set<WatchRef> watches;
+public:
+  WatchConState() : lock("WatchConState") {}
+
+  /// Add a watch
+  void addWatch(
+    WatchRef watch ///< [in] Ref to new watch object
+    );
+
+  /// Remove a watch
+  void removeWatch(
+    WatchRef watch ///< [in] Ref to watch object to remove
+    );
+
+  /// Called on session reset, disconnects watchers
+  void reset();
+};
 
 #endif
index 2136191bc198b87a9b12e74b02b228cb0de1cce0..6953f182000b92f7e4128bfb623592e0d7a1bd37 100644 (file)
@@ -2491,7 +2491,14 @@ ps_t object_info_t::legacy_object_locator_to_ps(const object_t &oid,
 
 void object_info_t::encode(bufferlist& bl) const
 {
-  ENCODE_START(10, 8, bl);
+  map<entity_name_t, watch_info_t> old_watchers;
+  for (map<pair<uint64_t, entity_name_t>, watch_info_t>::const_iterator i =
+        watchers.begin();
+       i != watchers.end();
+       ++i) {
+    old_watchers.insert(make_pair(i->first.second, i->second));
+  }
+  ENCODE_START(11, 8, bl);
   ::encode(soid, bl);
   ::encode(oloc, bl);
   ::encode(category, bl);
@@ -2507,15 +2514,17 @@ void object_info_t::encode(bufferlist& bl) const
   ::encode(truncate_seq, bl);
   ::encode(truncate_size, bl);
   ::encode(lost, bl);
-  ::encode(watchers, bl);
+  ::encode(old_watchers, bl);
   ::encode(user_version, bl);
   ::encode(uses_tmap, bl);
+  ::encode(watchers, bl);
   ENCODE_FINISH(bl);
 }
 
 void object_info_t::decode(bufferlist::iterator& bl)
 {
-  DECODE_START_LEGACY_COMPAT_LEN(10, 8, 8, bl);
+  DECODE_START_LEGACY_COMPAT_LEN(11, 8, 8, bl);
+  map<entity_name_t, watch_info_t> old_watchers;
   if (struct_v >= 2 && struct_v <= 5) {
     sobject_t obj;
     ::decode(obj, bl);
@@ -2549,7 +2558,7 @@ void object_info_t::decode(bufferlist::iterator& bl)
   else
     lost = false;
   if (struct_v >= 4) {
-    ::decode(watchers, bl);
+    ::decode(old_watchers, bl);
     ::decode(user_version, bl);
   }
   if (struct_v >= 9)
@@ -2558,6 +2567,17 @@ void object_info_t::decode(bufferlist::iterator& bl)
     uses_tmap = true;
   if (struct_v < 10)
     soid.pool = oloc.pool;
+  if (struct_v >= 11) {
+    ::decode(watchers, bl);
+  } else {
+    for (map<entity_name_t, watch_info_t>::iterator i = old_watchers.begin();
+        i != old_watchers.end();
+        ++i) {
+      watchers.insert(
+       make_pair(
+         make_pair(i->second.cookie, i->first), i->second));
+    }
+  }
   DECODE_FINISH(bl);
 }
 
@@ -2584,9 +2604,10 @@ void object_info_t::dump(Formatter *f) const
   f->dump_unsigned("truncate_seq", truncate_seq);
   f->dump_unsigned("truncate_size", truncate_size);
   f->open_object_section("watchers");
-  for (map<entity_name_t,watch_info_t>::const_iterator p = watchers.begin(); p != watchers.end(); ++p) {
+  for (map<pair<uint64_t, entity_name_t>,watch_info_t>::const_iterator p =
+         watchers.begin(); p != watchers.end(); ++p) {
     stringstream ss;
-    ss << p->first;
+    ss << p->first.second;
     f->open_object_section(ss.str().c_str());
     p->second.dump(f);
     f->close_section();
index 13099bd1bfec7477ff132d74aa72686f03750ef2..008fe8e9ded40c96ca51692301f1ef4755381346 100644 (file)
@@ -27,7 +27,7 @@
 #include "common/snap_types.h"
 #include "common/Formatter.h"
 #include "os/hobject.h"
-
+#include "Watch.h"
 
 #define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v026"
 
@@ -1728,7 +1728,6 @@ static inline ostream& operator<<(ostream& out, const notify_info_t& n) {
 }
 
 
-
 struct object_info_t {
   hobject_t soid;
   object_locator_t oloc;
@@ -1748,7 +1747,7 @@ struct object_info_t {
   uint64_t truncate_seq, truncate_size;
 
 
-  map<entity_name_t, watch_info_t> watchers;
+  map<pair<uint64_t, entity_name_t>, watch_info_t> watchers;
   bool uses_tmap;
 
   void copy_user_bits(const object_info_t& other);
@@ -1823,9 +1822,7 @@ public:
   set<ObjectContext*> blocking;   // objects whose writes we block
 
   // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
-  map<entity_name_t, OSD::Session *> watchers;
-  map<entity_name_t, Watch::C_WatchTimeout *> unconnected_watchers;
-  map<Watch::Notification *, bool> notifs;
+  map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
 
   ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
     : ref(0), registered(false), obs(oi_, exists_), ssc(ssc_),