From: Sage Weil Date: Fri, 19 Dec 2014 19:48:27 +0000 (-0800) Subject: librados: add rados_watch_flush() call X-Git-Tag: v0.91~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7de1b4d4ff624feffa5da27c884bb4863828e2dd;p=ceph.git librados: add rados_watch_flush() call Add a call so that callers can make sure all queued callbacks have completed before shutting down the ioctx. This avoids a segv triggered by the LibRadosWatchNotifyPPTests/LibRadosWatchNotifyPP.WatchNotify2Timeout/1 test due to the ioctx being destroyed when the in-progress callback does a notify_ack. Signed-off-by: Sage Weil (cherry picked from commit 4ebd4b4280cb048547842351f41b38658fb21a6e) --- diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index 8a91ae35c789..acc3ced8f0eb 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -2103,6 +2103,19 @@ CEPH_RADOS_API int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, uint64_t cookie, const char *buf, int buf_len); +/** + * Flush watch/notify callbacks + * + * This call will block until all pending watch/notify callbacks have + * been executed and the queue is empty. It should usually be called + * after shutting down any watches before shutting down the ioctx or + * librados to ensure that any callbacks do not misuse the ioctx (for + * example by calling rados_notify_ack after the ioctx has been + * destroyed). + * + * @param cluster the cluster handle + */ +CEPH_RADOS_API int rados_watch_flush(rados_t cluster); /** @} Watch/Notify */ diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 1309dbbfedee..e3e206d9500a 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -1051,6 +1051,7 @@ namespace librados config_t cct(); int connect(); void shutdown(); + int watch_flush(); int conf_read_file(const char * const path) const; int conf_parse_argv(int argc, const char ** argv) const; int conf_parse_argv_remainder(int argc, const char ** argv, diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 69af44a6ce01..c2986fb4332c 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -308,6 +308,14 @@ void librados::RadosClient::shutdown() ldout(cct, 1) << "shutdown" << dendl; } +int librados::RadosClient::watch_flush() +{ + ldout(cct, 10) << __func__ << " enter" << dendl; + objecter->linger_callback_flush(); + ldout(cct, 10) << __func__ << " exit" << dendl; + return 0; +} + uint64_t librados::RadosClient::get_instance_id() { return instance_id; diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index b3aa1e168d80..34530ee431c8 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -81,6 +81,8 @@ public: int connect(); void shutdown(); + int watch_flush(); + uint64_t get_instance_id(); int wait_for_latest_osdmap(); diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 8a98bac61f75..f96c5bcfb822 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -1867,6 +1867,13 @@ librados::config_t librados::Rados::cct() return (config_t)client->cct; } +int librados::Rados::watch_flush() +{ + if (!client) + return -EINVAL; + return client->watch_flush(); +} + void librados::Rados::shutdown() { if (!client) @@ -3918,6 +3925,15 @@ extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o, return retval; } +extern "C" int rados_watch_flush(rados_t cluster) +{ + tracepoint(librados, rados_watch_flush_enter, cluster); + librados::RadosClient *client = (librados::RadosClient *)cluster; + int retval = client->watch_flush(); + tracepoint(librados, rados_watch_flush_exit, retval); + return retval; +} + extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o, uint64_t expected_object_size, uint64_t expected_write_size) diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index c11b0929d320..339ed8427100 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -490,9 +490,11 @@ void Objecter::_linger_commit(LingerOp *info, int r) } struct C_DoWatchError : public Context { + Objecter *objecter; Objecter::LingerOp *info; int err; - C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) { + C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r) + : objecter(o), info(i), err(r) { info->get(); info->_queued_async(); } @@ -500,6 +502,7 @@ struct C_DoWatchError : public Context { info->watch_context->handle_error(info->get_cookie(), err); info->finished_async(); info->put(); + objecter->_linger_callback_finish(); } }; @@ -522,8 +525,10 @@ void Objecter::_linger_reconnect(LingerOp *info, int r) if (!info->last_error) { r = _normalize_watch_error(r); info->last_error = r; - if (info->watch_context) - finisher->queue(new C_DoWatchError(info, r)); + if (info->watch_context) { + finisher->queue(new C_DoWatchError(this, info, r)); + _linger_callback_queue(); + } } info->watch_lock.put_write(); } @@ -584,8 +589,10 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent, } else if (r < 0 && !info->last_error) { r = _normalize_watch_error(r); info->last_error = r; - if (info->watch_context) - finisher->queue(new C_DoWatchError(info, r)); + if (info->watch_context) { + finisher->queue(new C_DoWatchError(this, info, r)); + _linger_callback_queue(); + } } } else { ldout(cct, 20) << " ignoring old gen" << dendl; @@ -767,8 +774,10 @@ void Objecter::handle_watch_notify(MWatchNotify *m) if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) { if (!info->last_error) { info->last_error = -ENOTCONN; - if (info->watch_context) - finisher->queue(new C_DoWatchError(info, -ENOTCONN)); + if (info->watch_context) { + finisher->queue(new C_DoWatchError(this, info, -ENOTCONN)); + _linger_callback_queue(); + } } } else if (!info->is_watch) { // notify completion; we can do this inline since we know the only user @@ -778,6 +787,7 @@ void Objecter::handle_watch_notify(MWatchNotify *m) info->on_notify_finish->complete(m->return_code); } else { finisher->queue(new C_DoWatchNotify(this, info, m)); + _linger_callback_queue(); } } @@ -786,10 +796,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m) ldout(cct, 10) << __func__ << " " << *m << dendl; rwlock.get_read(); - if (!initialized.read()) { - rwlock.put_read(); - goto out; - } + assert(initialized.read()); if (info->canceled) { rwlock.put_read(); @@ -814,6 +821,7 @@ void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m) info->finished_async(); info->put(); m->put(); + _linger_callback_finish(); } bool Objecter::ms_dispatch(Message *m) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index d17b1ed92b94..09691135b2ae 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1648,6 +1648,9 @@ public: map linger_ops; // we use this just to confirm a cookie is valid before dereferencing the ptr set linger_ops_set; + int num_linger_callbacks; + Mutex linger_callback_lock; + Cond linger_callback_cond; map poolstat_ops; map statfs_ops; @@ -1713,6 +1716,25 @@ public: void _linger_ping(LingerOp *info, int r, utime_t sent, uint32_t register_gen); int _normalize_watch_error(int r); + void _linger_callback_queue() { + Mutex::Locker l(linger_callback_lock); + ++num_linger_callbacks; + } + void _linger_callback_finish() { + Mutex::Locker l(linger_callback_lock); + if (--num_linger_callbacks == 0) + linger_callback_cond.SignalAll(); + assert(num_linger_callbacks >= 0); + } + friend class C_DoWatchError; +public: + void linger_callback_flush() { + Mutex::Locker l(linger_callback_lock); + while (num_linger_callbacks > 0) + linger_callback_cond.Wait(linger_callback_lock); + } + +private: void _check_op_pool_dne(Op *op, bool session_locked); void _send_op_map_check(Op *op); void _op_cancel_map_check(Op *op); @@ -1795,6 +1817,8 @@ public: timer(cct, timer_lock, false), logger(NULL), tick_event(NULL), m_request_state_hook(NULL), + num_linger_callbacks(0), + linger_callback_lock("Objecter::linger_callback_lock"), num_homeless_ops(0), homeless_session(new OSDSession(cct, -1)), mon_timeout(mon_timeout), diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc index 4916ad32a130..80a69df295fc 100644 --- a/src/test/librados/watch_notify.cc +++ b/src/test/librados/watch_notify.cc @@ -500,6 +500,7 @@ TEST_F(LibRadosWatchNotify, WatchNotify2Timeout) { ASSERT_GT(rados_watch_check(ioctx, handle), 0); rados_unwatch2(ioctx, handle); + rados_watch_flush(cluster); } TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) { @@ -521,10 +522,15 @@ TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) { ASSERT_EQ(watches.size(), 1u); ASSERT_EQ(0u, notify_cookies.size()); bufferlist bl2, bl_reply; + std::cout << " trying..." << std::endl; ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */, &bl_reply)); + std::cout << " timed out" << std::endl; ASSERT_GT(ioctx.watch_check(handle), 0); ioctx.unwatch2(handle); + std::cout << " flushing" << std::endl; + cluster.watch_flush(); + std::cout << " flushed" << std::endl; } // -- diff --git a/src/tracing/librados.tp b/src/tracing/librados.tp index fa82aba7ea85..83e5d89779ef 100644 --- a/src/tracing/librados.tp +++ b/src/tracing/librados.tp @@ -2344,6 +2344,22 @@ TRACEPOINT_EVENT(librados, rados_notify_ack_exit, ) ) +TRACEPOINT_EVENT(librados, rados_watch_flush_enter, + TP_ARGS( + rados_t, cluster), + TP_FIELDS( + ctf_integer_hex(rados_t, cluster, cluster) + ) +) + +TRACEPOINT_EVENT(librados, rados_watch_flush_exit, + TP_ARGS( + int, retval), + TP_FIELDS( + ctf_integer(int, retval, retval) + ) +) + TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter, TP_ARGS( rados_ioctx_t, ioctx,