]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,ReplicatedPG: expire and cleanup unconnected watchers
authorSamuel Just <samuel.just@dreamhost.com>
Mon, 3 Oct 2011 20:29:47 +0000 (13:29 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Fri, 7 Oct 2011 23:07:09 +0000 (16:07 -0700)
During handle_notify_timeout or ms_handle_reset, watchers are now marked
unconnected via pg->register_unconnected_watcher. A safe timer event has
been added to trigger OSD::handle_watch_timeout.
remove_watchers_and_notifies (called on role change) cleans up these
events before peering.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/Watch.cc
src/osd/Watch.h

index 7a4e86f32ad385f2ab545876bf5de999ae824d37..d86153b1cc500fa3504e18915beb759cab0ef4d4 100644 (file)
@@ -1890,6 +1890,7 @@ void OSD::complete_notify(void *_notif, void *_obc)
   MWatchNotify *reply = notif->reply;
   client_messenger->send_message(reply, notif->session->con);
   notif->session->put();
+  notif->session->con->put();
   watch->remove_notification(notif);
   if (notif->timeout)
     watch_timer.cancel_event(notif->timeout);
@@ -1910,15 +1911,19 @@ void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc, Replic
   }
 }
 
-bool OSD::ms_handle_reset(Connection *con)
+void OSD::handle_watch_timeout(void *obc,
+                              ReplicatedPG *pg,
+                              entity_name_t entity,
+                              utime_t expire)
 {
-  dout(0) << "OSD::ms_handle_reset()" << dendl;
-  OSD::Session *session = (OSD::Session *)con->get_priv();
-  if (!session)
-    return false;
-
-  dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
+  pg->lock();
+  pg->handle_watch_timeout(obc, entity, expire);
+  pg->unlock();
+  pg->put();
+}
 
+void OSD::disconnect_session_watches(Session *session)
+{
   // get any watched obc's
   map<ReplicatedPG::ObjectContext *, pg_t> obcs;
   watch_lock.Lock();
@@ -1932,6 +1937,8 @@ bool OSD::ms_handle_reset(Connection *con)
     ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)oiter->first;
     dout(0) << "obc=" << (void *)obc << dendl;
 
+    ReplicatedPG *pg = static_cast<ReplicatedPG *>(lookup_lock_raw_pg(oiter->second));
+    assert(pg);
     obc->lock.Lock();
     watch_lock.Lock();
     /* NOTE! fix this one, should be able to just lookup entity name,
@@ -1946,10 +1953,11 @@ bool OSD::ms_handle_reset(Connection *con)
        watch_info_t& w = obc->obs.oi.watchers[entity];
        utime_t expire = ceph_clock_now(g_ceph_context);
        expire += w.timeout_seconds;
-       obc->unconnected_watchers[entity] = expire;
+       pg->register_unconnected_watcher(obc, entity, expire);
        dout(10) << " disconnected watch " << w << " by " << entity << " session " << session
                 << ", expires " << expire << dendl;
         obc->watchers.erase(witer++);
+       session->put();
       }
       if (witer == obc->watchers.end())
         break;
@@ -1957,27 +1965,20 @@ bool OSD::ms_handle_reset(Connection *con)
     }
     watch_lock.Unlock();
     obc->lock.Unlock();
+    pg->put_object_context(obc);
     /* now drop a reference to that obc */
-    put_object_context(obc, oiter->second);
+    pg->unlock();
   }
+}
 
-#if 0
-  // FIXME: do we really want to _cancel_ notifications here?
-  // shouldn't they time out in the usual way?  because this person
-  // might/should immediately reconnect...
-  watch_lock.Lock();
-  for (map<void *, entity_name_t>::iterator notif_iter = session->notifs.begin();
-       notif_iter != session->notifs.end();
-       ++notif_iter) {
-    Watch::Notification *notif = (Watch::Notification *)notif_iter->first;
-    entity_name_t& dest = notif_iter->second;
-    dout(0) << "ms_handle_reset: ack notification for notif=" << (void *)notif << " entity=" << dest << dendl;
-    ack_notification(dest, notif);
-  }
-  session->notifs.clear();
-  watch_lock.Unlock();
-#endif
-
+bool OSD::ms_handle_reset(Connection *con)
+{
+  dout(0) << "OSD::ms_handle_reset()" << dendl;
+  OSD::Session *session = (OSD::Session *)con->get_priv();
+  if (!session)
+    return false;
+  dout(0) << "OSD::ms_handle_reset() s=" << (void *)session << dendl;
+  disconnect_session_watches(session);
   session->put();
   return true;
 }
index f24c609b35998893eb1716ad4c6a2dbf19acf5d8..046e8976b30605d5d523631f73e2706f936809c3 100644 (file)
@@ -112,7 +112,6 @@ class MOSDPGMissing;
 
 class Watch;
 class Notification;
-class ObjectContext;
 class ReplicatedPG;
 
 extern const coll_t meta_coll;
@@ -1034,6 +1033,11 @@ public:
   Mutex watch_lock;
   SafeTimer watch_timer;
   void handle_notify_timeout(void *notif);
+  void disconnect_session_watches(Session *session);
+  void handle_watch_timeout(void *obc,
+                           ReplicatedPG *pg,
+                           entity_name_t entity,
+                           utime_t expire);
 };
 
 //compatibility of the executable
index f664d76b2b4f265243eb1d48629cabd884007f5a..604a7faad7baefb23074662779f638cc8ae9bc66 100644 (file)
@@ -1685,6 +1685,8 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
   if (!is_replay()) {
     osd->take_waiters(waiting_for_active);
   }
+
+  on_activate();
 }
 
 
index 38e22d5d094fa2d42e54540422bcc85370852da5..191524e42537fd90cf2e52d18b8433ced84e88d4 100644 (file)
@@ -1664,8 +1664,18 @@ public:
   virtual void on_osd_failure(int osd) = 0;
   virtual void on_role_change() = 0;
   virtual void on_change() = 0;
+  virtual void on_activate() = 0;
   virtual void on_shutdown() = 0;
   virtual void remove_watchers_and_notifies() = 0;
+
+  virtual void register_unconnected_watcher(void *obc,
+                                           entity_name_t entity,
+                                           utime_t expire) = 0;
+  virtual void unregister_unconnected_watcher(void *obc,
+                                             entity_name_t entity) = 0;
+  virtual void handle_watch_timeout(void *obc,
+                                   entity_name_t entity,
+                                   utime_t expire) = 0;
 };
 
 //WRITE_CLASS_ENCODER(PG::Info::History)
index 5d613557c20aaed240c679339ab0f643debd44d8..159534d50ec88e387d888401d05c670e338b8da9 100644 (file)
@@ -1128,6 +1128,7 @@ void ReplicatedPG::remove_notify(ObjectContext *obc, Watch::Notification *notif)
   assert(niter != obc->notifs.end());
 
   niter->first->session->put();
+  niter->first->session->con->put();
   obc->notifs.erase(niter);
 
   put_object_context(obc);
@@ -1145,12 +1146,20 @@ void ReplicatedPG::remove_watchers_and_notifies()
        ) {
     map<hobject_t, ObjectContext *>::iterator iter = oiter++;
     ObjectContext *obc = iter->second;
+    obc->ref++;
     for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
         witer != obc->watchers.end();
         remove_watcher(obc, (witer++)->first));
+    for (map<entity_name_t, Context *>::iterator iter = obc->unconnected_watchers.begin();
+        iter != obc->unconnected_watchers.end();
+      ) {
+      map<entity_name_t, Context *>::iterator i = iter++;
+      unregister_unconnected_watcher(obc, i->first);
+    }
     for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
         niter != obc->notifs.end();
         remove_notify(obc, (niter++)->first));
+    put_object_context(obc);
   }
   osd->watch_lock.Unlock();
 }
@@ -2420,10 +2429,12 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
        session->get();
        session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
       }
-      map<entity_name_t, utime_t>::iterator un_iter = obc->unconnected_watchers.find(entity);
-      if (un_iter != obc->unconnected_watchers.end())
-       obc->unconnected_watchers.erase(un_iter);
-      
+      map<entity_name_t, Context *>::iterator un_iter =
+       obc->unconnected_watchers.find(entity);
+      if (un_iter != obc->unconnected_watchers.end()) {
+       unregister_unconnected_watcher(obc, un_iter->first);
+      }
+
       map<Watch::Notification *, bool>::iterator niter;
       for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
        Watch::Notification *notif = niter->first;
@@ -2442,7 +2453,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
        remove_watcher(obc, entity);
       } else {
        assert(obc->unconnected_watchers.count(entity));
-       obc->unconnected_watchers.erase(entity);
+       unregister_unconnected_watcher(obc, entity);
       }
     }
 
@@ -2454,34 +2465,32 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 
       Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
       session->get();  // notif got a reference
+      session->con->get();
       notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
 
       osd->watch->add_notification(notif);
 
       // connected
-      for (map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.begin();
-          q != obc->watchers.end();
-          q++) {
-       entity_name_t name = q->first;
-       OSD::Session *s = q->second;
-       watch_info_t& w = obc->obs.oi.watchers[q->first];
-
-       notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
-       s->add_notif(notif, name);
-
-       MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
-       osd->client_messenger->send_message(notify_msg, s->con);
-      }
+      for (map<entity_name_t, watch_info_t>::iterator i = obc->obs.oi.watchers.begin();
+          i != obc->obs.oi.watchers.end();
+          ++i) {
+       map<entity_name_t, OSD::Session*>::iterator q = obc->watchers.find(i->first);
+       if (q != obc->watchers.end()) {
+         entity_name_t name = q->first;
+         OSD::Session *s = q->second;
+         watch_info_t& w = obc->obs.oi.watchers[q->first];
+
+         notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
+         s->add_notif(notif, name);
 
-      // unconnected
-      utime_t now = ceph_clock_now(g_ceph_context);
-      for (map<entity_name_t, utime_t>::iterator q = obc->unconnected_watchers.begin();
-          q != obc->unconnected_watchers.end();
-          q++) {
-       entity_name_t name = q->first;
-       utime_t expire = q->second;
-       if (now < expire)
-         notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
+         MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
+         osd->client_messenger->send_message(notify_msg, s->con);
+       } else {
+         // unconnected
+         utime_t now = ceph_clock_now(g_ceph_context);
+         entity_name_t name = i->first;
+         notif->add_watcher(name, Watch::WATCHER_PENDING);
+       }
       }
 
       notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
@@ -3020,7 +3029,14 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
 
 void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
 {
+  if (!is_active() || is_degraded_object(obc->obs.oi.soid) ||
+      is_missing_object(obc->obs.oi.soid))
+    return;
+
   if (!obc->obs.oi.watchers.empty()) {
+    Mutex::Locker l(osd->watch_lock);
+    assert(obc->unconnected_watchers.size() == 0);
+    assert(obc->watchers.size() == 0);
     // populate unconnected_watchers
     utime_t now = ceph_clock_now(g_ceph_context);
     for (map<entity_name_t, watch_info_t>::iterator p = obc->obs.oi.watchers.begin();
@@ -3029,11 +3045,89 @@ void ReplicatedPG::populate_obc_watchers(ObjectContext *obc)
       utime_t expire = now;
       expire += p->second.timeout_seconds;
       dout(10) << "  unconnected watcher " << p->first << " will expire " << expire << dendl;
-      obc->unconnected_watchers[p->first] = expire;
+      register_unconnected_watcher(obc, p->first, expire);
     }
   }
 }
 
+void ReplicatedPG::unregister_unconnected_watcher(void *_obc,
+                                                 entity_name_t entity)
+{
+  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+  osd->watch_timer.cancel_event(obc->unconnected_watchers[entity]);
+  obc->unconnected_watchers.erase(entity);
+  put_object_context(obc);
+  put();
+}
+
+void ReplicatedPG::register_unconnected_watcher(void *_obc,
+                                               entity_name_t entity,
+                                               utime_t expire)
+{
+  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+  pg_t pgid = info.pgid;
+  pgid.set_ps(obc->obs.oi.soid.hash);
+  get();
+  obc->ref++;
+  Context *cb = new Watch::C_WatchTimeout(osd,
+                                         static_cast<void *>(obc),
+                                         this,
+                                         entity, expire);
+  osd->watch_timer.add_event_at(expire, cb);
+  obc->unconnected_watchers[entity] = cb;
+}
+
+void ReplicatedPG::handle_watch_timeout(void *_obc,
+                                       entity_name_t entity,
+                                       utime_t expire)
+{
+  ObjectContext *obc = static_cast<ObjectContext *>(_obc);
+  obc->unconnected_watchers.erase(entity);
+  obc->obs.oi.watchers.erase(entity);
+
+  vector<OSDOp> ops;
+  tid_t rep_tid = osd->get_tid();
+  osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+  OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
+  ctx->mtime = ceph_clock_now(g_ceph_context);
+
+  ctx->at_version.epoch = osd->osdmap->get_epoch();
+  ctx->at_version.version = log.head.version + 1;
+
+  entity_inst_t nobody;
+
+  /* Currently, mode.try_write always returns true.  If this changes, we will
+   * need to delay the repop accordingly */
+  assert(mode.try_write(nobody));
+  RepGather *repop = new_repop(ctx, obc, rep_tid);
+
+  ObjectStore::Transaction *t = &ctx->op_t;
+
+  ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, obc->obs.oi.soid,
+                               ctx->at_version,
+                               obc->obs.oi.version,
+                               osd_reqid_t(), ctx->mtime));
+
+  eversion_t old_last_update = log.head;
+  bool old_exists = repop->obc->obs.exists;
+  uint64_t old_size = repop->obc->obs.oi.size;
+  eversion_t old_version = repop->obc->obs.oi.version;
+
+  obc->obs.oi.prior_version = old_version;
+  obc->obs.oi.version = ctx->at_version;
+  bufferlist bl;
+  ::encode(obc->obs.oi, bl);
+  t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
+
+  ctx->at_version.version++;
+
+  append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
+
+  // obc ref swallowed by repop!
+  issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists,
+             old_size, old_version);
+  eval_repop(repop);
+}
 
 ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
                                                              const object_locator_t& oloc,
@@ -3954,6 +4048,11 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
          osd->take_waiters(waiting_for_degraded_object[soid]);
          waiting_for_degraded_object.erase(soid);
        }
+       map<hobject_t, ObjectContext *>::iterator i =
+         object_contexts.find(soid);
+       if (i != object_contexts.end()) {
+         populate_obc_watchers(i->second);
+       }
       } else {
        dout(10) << "pushed " << soid << ", still waiting for push ack from " 
                 << pushing[soid].size() << " others" << dendl;
@@ -4291,8 +4390,6 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
       
       obc->obs.exists = true;
       obc->obs.oi.decode(oibl);
-
-      populate_obc_watchers(obc);
       
       // suck in snapset context?
       SnapSetContext *ssc = obc->ssc;
@@ -4446,6 +4543,15 @@ void ReplicatedPG::on_shutdown()
   remove_watchers_and_notifies();
 }
 
+void ReplicatedPG::on_activate()
+{
+  for (map<hobject_t, ObjectContext *>::iterator i = object_contexts.begin();
+       i != object_contexts.end();
+       ++i) {
+    populate_obc_watchers(i->second);
+  }
+}
+
 void ReplicatedPG::on_change()
 {
   dout(10) << "on_change" << dendl;
@@ -4462,6 +4568,8 @@ void ReplicatedPG::on_change()
     state_clear(PG_STATE_REPAIR);
   }
 
+  context_registry_on_change();
+
   // take object waiters
   take_object_waiters(waiting_for_missing_object);
   take_object_waiters(waiting_for_degraded_object);
@@ -4485,8 +4593,6 @@ void ReplicatedPG::on_role_change()
        p++)
     osd->take_waiters(p->second);
   waiting_for_ondisk.clear();
-
-  context_registry_on_change();
 }
 
 
index d2970dd51207b141d6010beae0f4666512269ef9..23ccf548fd2d40cc0cd7ea76f9e14dc6e58329c5 100644 (file)
@@ -273,7 +273,7 @@ public:
 
     // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
     map<entity_name_t, OSD::Session *> watchers;
-    map<entity_name_t, utime_t> unconnected_watchers;
+    map<entity_name_t, Context *> unconnected_watchers;
     map<Watch::Notification *, bool> notifs;
 
     ObjectContext(const object_info_t &oi_, bool exists_, SnapSetContext *ssc_)
@@ -487,6 +487,14 @@ protected:
   map<object_t, SnapSetContext*> snapset_contexts;
 
   void populate_obc_watchers(ObjectContext *obc);
+  void register_unconnected_watcher(void *obc,
+                                   entity_name_t entity,
+                                   utime_t expire);
+  void unregister_unconnected_watcher(void *obc,
+                                     entity_name_t entity);
+  void handle_watch_timeout(void *obc,
+                           entity_name_t entity,
+                           utime_t expire);
 
   ObjectContext *lookup_object_context(const hobject_t& soid) {
     if (object_contexts.count(soid)) {
@@ -801,6 +809,7 @@ public:
   void on_acker_change();
   void on_role_change();
   void on_change();
+  void on_activate();
   void on_shutdown();
 };
 
index 6a01f30e6d207539279d064e1645fe4223764982..da90cef4103dc1b126db558727af855c3bf41610 100644 (file)
@@ -28,3 +28,9 @@ void Watch::C_NotifyTimeout::finish(int r)
   osd->handle_notify_timeout(notif);
 }
 
+void Watch::C_WatchTimeout::finish(int r)
+{
+  osd->handle_watch_timeout(obc, static_cast<ReplicatedPG *>(pg), entity,
+                           expire);
+}
+
index 7a0760cec8a783b452c446a8e406803a6844fe12..fa1bb99e6bffa566a0eee9f3073a4d0ce181c1c8 100644 (file)
@@ -60,6 +60,19 @@ public:
     void finish(int r);
   };
 
+  class C_WatchTimeout : public Context {
+    OSD *osd;
+    void *obc;
+    void *pg;
+    entity_name_t entity;
+    utime_t expire;
+  public:
+    C_WatchTimeout(OSD *_osd, void *_obc, void *_pg,
+                  entity_name_t _entity, utime_t _expire) :
+      osd(_osd), obc(_obc), pg(_pg), entity(_entity), expire(_expire) {}
+    void finish(int r);
+  };
+
 private:
   std::map<uint64_t, Notification *> notifs; /* notif_id to notifications */