]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados: add rados_watch_flush() call
authorSage Weil <sage@redhat.com>
Fri, 19 Dec 2014 19:48:27 +0000 (11:48 -0800)
committerSage Weil <sage@redhat.com>
Tue, 23 Dec 2014 01:41:28 +0000 (17:41 -0800)
Add a call so that callers can make sure all queued callbacks have
completed before shutting down the ioctx.  This avoids a segv triggered
by the LibRadosWatchNotifyPPTests/LibRadosWatchNotifyPP.WatchNotify2Timeout/1
test due to the ioctx being destroyed when the in-progress callback
does a notify_ack.

Signed-off-by: Sage Weil <sage@redhat.com>
(cherry picked from commit 4ebd4b4280cb048547842351f41b38658fb21a6e)

src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/librados/librados.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/librados/watch_notify.cc
src/tracing/librados.tp

index 8a91ae35c7898fd04f3e4fdf09494d11c966ef7a..acc3ced8f0eb6968379f894571b221eb1e79ad6c 100644 (file)
@@ -2103,6 +2103,19 @@ CEPH_RADOS_API int rados_notify_ack(rados_ioctx_t io, const char *o,
                                    uint64_t notify_id, uint64_t cookie,
                                    const char *buf, int buf_len);
 
+/**
+ * Flush watch/notify callbacks
+ *
+ * This call will block until all pending watch/notify callbacks have
+ * been executed and the queue is empty.  It should usually be called
+ * after shutting down any watches before shutting down the ioctx or
+ * librados to ensure that any callbacks do not misuse the ioctx (for
+ * example by calling rados_notify_ack after the ioctx has been
+ * destroyed).
+ *
+ * @param cluster the cluster handle
+ */
+CEPH_RADOS_API int rados_watch_flush(rados_t cluster);
 
 /** @} Watch/Notify */
 
index 1309dbbfedeee7e74ff481c6471e96b44534a327..e3e206d9500afaa422c6937609952da99c482a4a 100644 (file)
@@ -1051,6 +1051,7 @@ namespace librados
     config_t cct();
     int connect();
     void shutdown();
+    int watch_flush();
     int conf_read_file(const char * const path) const;
     int conf_parse_argv(int argc, const char ** argv) const;
     int conf_parse_argv_remainder(int argc, const char ** argv,
index 69af44a6ce0134025ab33f1ccaeeee09bc13ba68..c2986fb4332cc9f9166078ea7f7f2d385ca19205 100644 (file)
@@ -308,6 +308,14 @@ void librados::RadosClient::shutdown()
   ldout(cct, 1) << "shutdown" << dendl;
 }
 
+int librados::RadosClient::watch_flush()
+{
+  ldout(cct, 10) << __func__ << " enter" << dendl;
+  objecter->linger_callback_flush();
+  ldout(cct, 10) << __func__ << " exit" << dendl;
+  return 0;
+}
+
 uint64_t librados::RadosClient::get_instance_id()
 {
   return instance_id;
index b3aa1e168d8061511a4a717953eda4f0e2dc2bc1..34530ee431c8df7ea14e68854cf7b2701694b3ee 100644 (file)
@@ -81,6 +81,8 @@ public:
   int connect();
   void shutdown();
 
+  int watch_flush();
+
   uint64_t get_instance_id();
 
   int wait_for_latest_osdmap();
index 8a98bac61f75c0f41e5ca0d71caa17f63ecbbf29..f96c5bcfb822685ab36010b74cc11abfc9b857b9 100644 (file)
@@ -1867,6 +1867,13 @@ librados::config_t librados::Rados::cct()
   return (config_t)client->cct;
 }
 
+int librados::Rados::watch_flush()
+{
+  if (!client)
+    return -EINVAL;
+  return client->watch_flush();
+}
+
 void librados::Rados::shutdown()
 {
   if (!client)
@@ -3918,6 +3925,15 @@ extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
   return retval;
 }
 
+extern "C" int rados_watch_flush(rados_t cluster)
+{
+  tracepoint(librados, rados_watch_flush_enter, cluster);
+  librados::RadosClient *client = (librados::RadosClient *)cluster;
+  int retval = client->watch_flush();
+  tracepoint(librados, rados_watch_flush_exit, retval);
+  return retval;
+}
+
 extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o,
                                     uint64_t expected_object_size,
                                     uint64_t expected_write_size)
index c11b0929d320d57287be8950d1afd8c6b845be47..339ed8427100f05d5339dd40a59b90e6f9b82067 100644 (file)
@@ -490,9 +490,11 @@ void Objecter::_linger_commit(LingerOp *info, int r)
 }
 
 struct C_DoWatchError : public Context {
+  Objecter *objecter;
   Objecter::LingerOp *info;
   int err;
-  C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
+  C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
+    : objecter(o), info(i), err(r) {
     info->get();
     info->_queued_async();
   }
@@ -500,6 +502,7 @@ struct C_DoWatchError : public Context {
     info->watch_context->handle_error(info->get_cookie(), err);
     info->finished_async();
     info->put();
+    objecter->_linger_callback_finish();
   }
 };
 
@@ -522,8 +525,10 @@ void Objecter::_linger_reconnect(LingerOp *info, int r)
     if (!info->last_error) {
       r = _normalize_watch_error(r);
       info->last_error = r;
-      if (info->watch_context)
-       finisher->queue(new C_DoWatchError(info, r));
+      if (info->watch_context) {
+       finisher->queue(new C_DoWatchError(this, info, r));
+       _linger_callback_queue();
+      }
     }
     info->watch_lock.put_write();
   }
@@ -584,8 +589,10 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent,
     } else if (r < 0 && !info->last_error) {
       r = _normalize_watch_error(r);
       info->last_error = r;
-      if (info->watch_context)
-       finisher->queue(new C_DoWatchError(info, r));
+      if (info->watch_context) {
+       finisher->queue(new C_DoWatchError(this, info, r));
+       _linger_callback_queue();
+      }
     }
   } else {
     ldout(cct, 20) << " ignoring old gen" << dendl;
@@ -767,8 +774,10 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
   if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
     if (!info->last_error) {
       info->last_error = -ENOTCONN;
-      if (info->watch_context)
-       finisher->queue(new C_DoWatchError(info, -ENOTCONN));
+      if (info->watch_context) {
+       finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
+       _linger_callback_queue();
+      }
     }
   } else if (!info->is_watch) {
     // notify completion; we can do this inline since we know the only user
@@ -778,6 +787,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m)
     info->on_notify_finish->complete(m->return_code);
   } else {
     finisher->queue(new C_DoWatchNotify(this, info, m));
+    _linger_callback_queue();
   }
 }
 
@@ -786,10 +796,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   ldout(cct, 10) << __func__ << " " << *m << dendl;
 
   rwlock.get_read();
-  if (!initialized.read()) {
-    rwlock.put_read();
-    goto out;
-  }
+  assert(initialized.read());
 
   if (info->canceled) {
     rwlock.put_read();
@@ -814,6 +821,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
   info->finished_async();
   info->put();
   m->put();
+  _linger_callback_finish();
 }
 
 bool Objecter::ms_dispatch(Message *m)
index d17b1ed92b94801890829bece5bfe74f883855f6..09691135b2ae2728c1a01c215265ec4fb350f0ea 100644 (file)
@@ -1648,6 +1648,9 @@ public:
   map<uint64_t, LingerOp*>  linger_ops;
   // we use this just to confirm a cookie is valid before dereferencing the ptr
   set<LingerOp*>            linger_ops_set;
+  int num_linger_callbacks;
+  Mutex linger_callback_lock;
+  Cond linger_callback_cond;
 
   map<ceph_tid_t,PoolStatOp*>    poolstat_ops;
   map<ceph_tid_t,StatfsOp*>      statfs_ops;
@@ -1713,6 +1716,25 @@ public:
   void _linger_ping(LingerOp *info, int r, utime_t sent, uint32_t register_gen);
   int _normalize_watch_error(int r);
 
+  void _linger_callback_queue() {
+    Mutex::Locker l(linger_callback_lock);
+    ++num_linger_callbacks;
+  }
+  void _linger_callback_finish() {
+    Mutex::Locker l(linger_callback_lock);
+    if (--num_linger_callbacks == 0)
+      linger_callback_cond.SignalAll();
+    assert(num_linger_callbacks >= 0);
+  }
+  friend class C_DoWatchError;
+public:
+  void linger_callback_flush() {
+    Mutex::Locker l(linger_callback_lock);
+    while (num_linger_callbacks > 0)
+      linger_callback_cond.Wait(linger_callback_lock);
+  }
+
+private:
   void _check_op_pool_dne(Op *op, bool session_locked);
   void _send_op_map_check(Op *op);
   void _op_cancel_map_check(Op *op);
@@ -1795,6 +1817,8 @@ public:
     timer(cct, timer_lock, false),
     logger(NULL), tick_event(NULL),
     m_request_state_hook(NULL),
+    num_linger_callbacks(0),
+    linger_callback_lock("Objecter::linger_callback_lock"),
     num_homeless_ops(0),
     homeless_session(new OSDSession(cct, -1)),
     mon_timeout(mon_timeout),
index 4916ad32a1302885e35890cfdfcf048b8b9f468e..80a69df295fcef703e774b7aea607477806cdc05 100644 (file)
@@ -500,6 +500,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) {
   ASSERT_GT(rados_watch_check(ioctx, handle), 0);
 
   rados_unwatch2(ioctx, handle);
+  rados_watch_flush(cluster);
 }
 
 TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
@@ -521,10 +522,15 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
   ASSERT_EQ(watches.size(), 1u);
   ASSERT_EQ(0u, notify_cookies.size());
   bufferlist bl2, bl_reply;
+  std::cout << " trying..." << std::endl;
   ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
                                      &bl_reply));
+  std::cout << " timed out" << std::endl;
   ASSERT_GT(ioctx.watch_check(handle), 0);
   ioctx.unwatch2(handle);
+  std::cout << " flushing" << std::endl;
+  cluster.watch_flush();
+  std::cout << " flushed" << std::endl;
 }
 
 // --
index fa82aba7ea8596b71b34c91c6f4809513de11cf1..83e5d89779efebe572450a29463e9bd18d8ea783 100644 (file)
@@ -2344,6 +2344,22 @@ TRACEPOINT_EVENT(librados, rados_notify_ack_exit,
     )
 )
 
+TRACEPOINT_EVENT(librados, rados_watch_flush_enter,
+    TP_ARGS(
+        rados_t, cluster),
+    TP_FIELDS(
+        ctf_integer_hex(rados_t, cluster, cluster)
+    )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch_flush_exit,
+    TP_ARGS(
+        int, retval),
+    TP_FIELDS(
+        ctf_integer(int, retval, retval)
+    )
+)
+
 TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter,
     TP_ARGS(
         rados_ioctx_t, ioctx,