]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: send pending notification for reconnected watcher
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 15 Dec 2010 01:00:49 +0000 (17:00 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 15 Dec 2010 01:00:49 +0000 (17:00 -0800)
src/osd/OSD.cc
src/osd/OSD.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index aae8c18d533eb1a60a1708a80f6d73752d9f2301..5e9af335310686bdb5269eb31a91cb60326f6935 100644 (file)
@@ -1634,20 +1634,29 @@ void OSD::put_object_context(void *_obc, pg_t pgid)
   pg->unlock();
 }
 
-void OSD::ack_notification(entity_name_t& name, void *_notif)
+void OSD::complete_notify(void *_notif, void *_obc)
+{
+  ReplicatedPG::ObjectContext *obc = (ReplicatedPG::ObjectContext *)_obc;
+  Watch::Notification *notif = (Watch::Notification *)_notif;
+  dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
+  MWatchNotify *reply = notif->reply;
+  client_messenger->send_message(reply, notif->session->con);
+  notif->session->put();
+  watch->remove_notification(notif);
+  if (notif->timeout)
+    watch_timer.cancel_event(notif->timeout);
+  map<Watch::Notification *, bool>::iterator iter = obc->notifs.find(notif);
+  if (iter != obc->notifs.end())
+    obc->notifs.erase(iter);
+  delete notif;
+}
+
+void OSD::ack_notification(entity_name_t& name, void *_notif, void *_obc)
 {
   assert(watch_lock.is_locked());
   Watch::Notification *notif = (Watch::Notification *)_notif;
-  if (watch->ack_notification(name, notif)) {
-    dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
-    MWatchNotify *reply = notif->reply;
-    client_messenger->send_message(reply, notif->session->con);
-    notif->session->put();
-    watch->remove_notification(notif);
-    if (notif->timeout)
-      watch_timer.cancel_event(notif->timeout);
-    delete notif;
-  }
+  if (watch->ack_notification(name, notif))
+    complete_notify(notif, _obc);
 }
 
 bool OSD::ms_handle_reset(Connection *con)
@@ -1701,6 +1710,7 @@ bool OSD::ms_handle_reset(Connection *con)
     put_object_context(obc, oiter->second);
   }
 
+#if 0
   // FIXME: do we really want to _cancel_ notifications here?
   // shouldn't they time out in the usual way?  because this person
   // might/should immediately reconnect...
@@ -1715,6 +1725,7 @@ bool OSD::ms_handle_reset(Connection *con)
   }
   session->notifs.clear();
   watch_lock.Unlock();
+#endif
 
   return true;
 }
@@ -1744,8 +1755,9 @@ void OSD::handle_notify_timeout(void *_notif)
   watch_lock.Unlock(); /* put_object_context takes osd->lock */
   
   ReplicatedPG *pg = (ReplicatedPG *)lookup_lock_raw_pg(notif->pgid);
+  pg_t pgid = notif->pgid;
+  pg->do_complete_notify(notif, obc);
   put_object_context(obc, notif->pgid);
-  pg->do_complete_notify(notif);
   
   watch_lock.Lock();
   /* exiting with watch_lock held */
index 2e81c03c22c267e952d2f92ce2fdc50df7e3dc70..776c1dd6305d43f018bfea93ea318feeb37f8a9e 100644 (file)
@@ -995,7 +995,8 @@ public:
 
 
   void put_object_context(void *_obc, pg_t pgid);
-  void ack_notification(entity_name_t& peer_addr, void *notif);
+  void complete_notify(void *notif, void *obc);
+  void ack_notification(entity_name_t& peer_addr, void *notif, void *obc);
   Mutex watch_lock;
   SafeTimer watch_timer;
   void handle_notify_timeout(void *notif);
index 55ea7671a102b3485b5d1f831a07c0527deb2940..555e34144cf2887803696ba7a88354aea043372c 100644 (file)
@@ -826,12 +826,9 @@ void ReplicatedPG::dump_watchers(ObjectContext *obc)
   }
 }
 
-void ReplicatedPG::do_complete_notify(Watch::Notification *notif)
+void ReplicatedPG::do_complete_notify(Watch::Notification *notif, ObjectContext *obc)
 {
-  osd->client_messenger->send_message(notif->reply, notif->session->con);
-  notif->session->put();
-  osd->watch->remove_notification(notif);
-  delete notif;
+  osd->complete_notify((void *)notif, obc);
 }
 
 
@@ -1189,13 +1186,14 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
             p != obc->unconnected_watchers.end();
             p++) {
          entity_name_t name = p->first;
-         // notif->add_watcher(name, Watch::WATCHER_PENDING);
+         notif->add_watcher(name, Watch::WATCHER_PENDING);
        }
 
        notif->reply = new MWatchNotify(op.watch.cookie, op.watch.ver, notif->id, WATCH_NOTIFY_COMPLETE);
        if (notif->watchers.empty()) {
-          do_complete_notify(notif);
+          do_complete_notify(notif, obc);
        } else {
+         obc->notifs[notif] = true;
           obc->ref++;
           notif->obc = obc;
          notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
@@ -1232,7 +1230,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
         session->del_notif(notif);
        session->put();
 
-        osd->ack_notification(source, notif);
+        osd->ack_notification(source, notif, obc);
        osd->watch_lock.Unlock();
       }
       break;
@@ -1460,6 +1458,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
           map<entity_name_t, utime_t>::iterator un_iter = obc->unconnected_watchers.find(entity);
           if (un_iter != obc->unconnected_watchers.end())
             obc->unconnected_watchers.erase(un_iter);
+
+         map<Watch::Notification *, bool>::iterator niter;
+          for (niter = obc->notifs.begin(); niter != obc->notifs.end(); ++niter) {
+            Watch::Notification *notif = niter->first;
+            map<entity_name_t, Watch::WatcherState>::iterator iter = notif->watchers.find(entity);
+            if (iter != notif->watchers.end()) {
+            /* there is a pending notification for this watcher, we should resend it anyway
+               even if we already sent it as it might not have received it */
+              MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
+              osd->client_messenger->send_message(notify_msg, session->con);
+            }
+          }
          register_object_context(obc);
         } else {
          map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(entity);
index 1850f35ee1040843730d92e333e200376f20b1ab..0915a38bca178e5fb5b5a2473c38434cafa1bcd2 100644 (file)
@@ -239,6 +239,7 @@ public:
     // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
     map<entity_name_t, OSD::Session *> watchers;
     map<entity_name_t, utime_t> unconnected_watchers;
+    map<Watch::Notification *, bool> notifs;
 
     /*    ObjectContext(const sobject_t& s, const object_locator_t& ol) :
       ref(0), registered(false), obs(s, ol),
@@ -529,7 +530,7 @@ protected:
   int recover_replicas(int max);
 
   void dump_watchers(ObjectContext *obc);
-  void do_complete_notify(Watch::Notification *notif);
+  void do_complete_notify(Watch::Notification *notif, ObjectContext *obc);
 
   struct RepModify {
     ReplicatedPG *pg;