]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: send notify response from reset handler if needed
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 27 Oct 2010 23:51:35 +0000 (16:51 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 24 Nov 2010 00:49:46 +0000 (16:49 -0800)
src/osd/OSD.cc
src/osd/OSD.h
src/osd/ReplicatedPG.cc

index 63693b11daa76f7e51dcb9ee8dee59897601297d..f0d0a8d36bfecc1dc0ddf12f9d78c84d4b1ec225 100644 (file)
@@ -1620,6 +1620,21 @@ void OSD::put_object_context(void *_obc, ceph_object_layout& layout)
   pg->unlock();
 }
 
+void OSD::ack_notification(entity_name_t& name, void *_notif)
+{
+  Watch::Notification *notif = (Watch::Notification *)_notif;
+  if (watch->ack_notification(name, notif)) {
+       dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
+       MWatchNotify *reply = notif->reply; // new MWatchNotify(notif->cookie, wi.ver, notif->id, WATCH_NOTIFY_COMPLETE);
+       client_messenger->send_message(reply, notif->session->con);
+       notif->session->put();
+       watch->remove_notification(notif);
+       if (notif->timeout)
+         watch_timer.cancel_event(notif->timeout);
+       delete notif;
+  }
+}
+
 bool OSD::ms_handle_reset(Connection *con)
 {
   dout(0) << "OSD::ms_handle_reset()" << dendl;
@@ -1668,6 +1683,19 @@ bool OSD::ms_handle_reset(Connection *con)
     put_object_context(obc, oiter->second);
   }
 
+  watch_lock.Lock();
+
+  map<void *, entity_name_t>::iterator notif_iter;
+  for (notif_iter = session->notifs.begin(); notif_iter != session->notifs.end(); ++notif_iter) {
+    Watch::Notification *notif = (Watch::Notification *)notif_iter->first;
+    entity_name_t& dest = notif_iter->second;
+    dout(0) << "ms_handle_reset: ack notification for notif=" << (void *)notif << " entity=" << dest << dendl;
+    ack_notification(dest, notif);
+  }
+  session->notifs.clear();
+
+  watch_lock.Unlock();
+
   return true;
 }
 
index efb8d0f62ceda1fceaded1e50709011ab132930a..6eacc6adb76aa2f8ae5b7560fd00396db8aae1cc 100644 (file)
@@ -216,9 +216,18 @@ public:
     epoch_t last_sent_epoch;
     Connection *con;
     std::map<void *, ceph_object_layout> watches;
+    std::map<void *, entity_name_t> notifs;
 
     Session() : last_sent_epoch(0), con(0) {}
     ~Session() { if (con) con->put(); }
+    void add_notif(void *n, entity_name_t& name) {
+      notifs[n] = name;
+    }
+    void del_notif(void *n) {
+      std::map<void *, entity_name_t>::iterator iter = notifs.find(n);
+      if (iter != notifs.end())
+        notifs.erase(iter);
+    }
   };
 
 private:
@@ -987,6 +996,7 @@ public:
   PG *lookup_lock_pg(pg_t pgid);
   ReplicatedPG *get_pg(void *_obc, ceph_object_layout& layout);
   void put_object_context(void *_obc, ceph_object_layout& layout);
+  void ack_notification(entity_name_t& peer_addr, void *notif);
   Mutex watch_lock;
   SafeTimer watch_timer;
   void handle_notify_timeout(void *notif);
index 7434077d83e5e8c8e444a8c276f5ee57c08f6d9f..cdcc22c2eb5d048ea2ff1550ec26a91a0449427a 100644 (file)
@@ -1129,6 +1129,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
            session = iter->second;
            dout(0) << " found session, sending notification" << dendl;
            notif->add_watcher(oi_iter->first, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
+            entity_name_t name = oi_iter->first;
+            session->add_notif(notif, name);
 
            MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
            osd->client_messenger->send_message(notify_msg, session->con);
@@ -1145,7 +1147,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
           obc->ref++;
           notif->obc = obc;
          notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
-         osd->watch_timer.add_event_after(5.0, notif->timeout);
+         osd->watch_timer.add_event_after(5.0, notif->timeout); /* FIXME: use a configurable timeout here */
        }
        osd->watch_lock.Unlock();
       }
@@ -1176,18 +1178,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
          result = -EINVAL;
          break;
        }
+       OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv();
+        session->del_notif(notif);
 
        entity_name_t source = ctx->op->get_source();
-       if (osd->watch->ack_notification(source, notif)) {
-         dout(0) << "got the last reply from pending watchers, can send response now" << dendl;
-         MWatchNotify *reply = notif->reply; // new MWatchNotify(notif->cookie, wi.ver, notif->id, WATCH_NOTIFY_COMPLETE);
-         osd->client_messenger->send_message(reply, notif->session->con);
-         notif->session->put();
-         osd->watch->remove_notification(notif);
-         if (notif->timeout)
-           osd->watch_timer.cancel_event(notif->timeout);
-         delete notif;
-       }
+        osd->ack_notification(source, notif);
        osd->watch_lock.Unlock();
       }
       break;