From: Jason Dillaman Date: Thu, 28 Jan 2016 19:35:54 +0000 (-0500) Subject: librados_test_stub: watch/notify now behaves similar to librados X-Git-Tag: v9.2.1~2^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2be722a3e944436deff7a8ae513e22b02fd31237;p=ceph.git librados_test_stub: watch/notify now behaves similar to librados Notifications are executed via the same librados AIO callback thread, so it's now possible to catch deadlock. Signed-off-by: Jason Dillaman (cherry picked from commit 0a3822f1559ba3fe3def6a65883b9c6c7c5a33fe) --- diff --git a/src/test/librados_test_stub/TestRadosClient.cc b/src/test/librados_test_stub/TestRadosClient.cc index 46437ac8657e3..c75f79e286dab 100644 --- a/src/test/librados_test_stub/TestRadosClient.cc +++ b/src/test/librados_test_stub/TestRadosClient.cc @@ -84,7 +84,8 @@ private: TestRadosClient::TestRadosClient(CephContext *cct) : m_cct(cct->get()), - m_watch_notify(m_cct), + m_aio_finisher(new Finisher(m_cct)), + m_watch_notify(m_cct, m_aio_finisher), m_transaction_lock("TestRadosClient::m_transaction_lock") { get(); @@ -97,7 +98,6 @@ TestRadosClient::TestRadosClient(CephContext *cct) } // replicate AIO callback processing - m_aio_finisher = new Finisher(m_cct); m_aio_finisher->start(); } diff --git a/src/test/librados_test_stub/TestWatchNotify.cc b/src/test/librados_test_stub/TestWatchNotify.cc index 14a43bc58cb1c..f40eea8357841 100644 --- a/src/test/librados_test_stub/TestWatchNotify.cc +++ b/src/test/librados_test_stub/TestWatchNotify.cc @@ -9,23 +9,19 @@ namespace librados { -TestWatchNotify::TestWatchNotify(CephContext *cct) - : m_cct(cct), m_finisher(new Finisher(cct)), m_handle(), m_notify_id(), +TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher) + : m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(), m_file_watcher_lock("librados::TestWatchNotify::m_file_watcher_lock"), m_pending_notifies(0) { m_cct->get(); - m_finisher->start(); } TestWatchNotify::~TestWatchNotify() { - m_finisher->stop(); - delete m_finisher; m_cct->put(); } TestWatchNotify::NotifyHandle::NotifyHandle() - : pbl(NULL), pending_responses(), - lock("TestWatchNotify::NotifyHandle::lock") { + : lock("TestWatchNotify::NotifyHandle::lock") { } TestWatchNotify::Watcher::Watcher() @@ -50,7 +46,7 @@ int TestWatchNotify::list_watchers(const std::string& o, it != watcher->watch_handles.end(); ++it) { obj_watch_t obj; strcpy(obj.addr, ":/0"); - obj.watcher_id = static_cast(it->second.instance_id); + obj.watcher_id = static_cast(it->second.gid); obj.cookie = it->second.handle; obj.timeout_seconds = 30; out_watchers->push_back(obj); @@ -60,35 +56,64 @@ int TestWatchNotify::list_watchers(const std::string& o, int TestWatchNotify::notify(const std::string& oid, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { - Mutex lock("TestRadosClient::watcher_notify::lock"); - Cond cond; - bool done = false; + uint64_t notify_id; + { + Mutex::Locker file_watcher_locker(m_file_watcher_lock); + ++m_pending_notifies; + notify_id = ++m_notify_id; + } + SharedNotifyHandle notify_handle(new NotifyHandle()); { SharedWatcher watcher = get_watcher(oid); - RWLock::WLocker l(watcher->lock); - { - Mutex::Locker l2(m_file_watcher_lock); - ++m_pending_notifies; - uint64_t notify_id = ++m_notify_id; + RWLock::WLocker watcher_locker(watcher->lock); + + WatchHandles watch_handles = watcher->watch_handles; + for (WatchHandles::iterator w_it = watch_handles.begin(); + w_it != watch_handles.end(); ++w_it) { + WatchHandle &watch_handle = w_it->second; + + Mutex::Locker notify_handle_locker(notify_handle->lock); + notify_handle->pending_watcher_ids.insert(std::make_pair( + watch_handle.gid, watch_handle.handle)); + } + + watcher->notify_handles[notify_id] = notify_handle; + + FunctionContext *ctx = new FunctionContext( + boost::bind(&TestWatchNotify::execute_notify, this, oid, bl, notify_id)); + m_finisher->queue(ctx); + } + + { + utime_t timeout; + timeout.set_from_double(ceph_clock_now(m_cct) + (timeout_ms / 1000.0)); - SharedNotifyHandle notify_handle(new NotifyHandle()); - notify_handle->pbl = pbl; + Mutex::Locker notify_locker(notify_handle->lock); + while (!notify_handle->pending_watcher_ids.empty()) { + notify_handle->cond.WaitUntil(notify_handle->lock, timeout); + } - watcher->notify_handles[notify_id] = notify_handle; + if (pbl != NULL) { + ::encode(notify_handle->notify_responses, *pbl); + ::encode(notify_handle->pending_watcher_ids, *pbl); + } + } - FunctionContext *ctx = new FunctionContext( - boost::bind(&TestWatchNotify::execute_notify, this, - oid, bl, notify_id, &lock, &cond, &done)); - m_finisher->queue(ctx); + SharedWatcher watcher = get_watcher(oid); + Mutex::Locker file_watcher_locker(m_file_watcher_lock); + { + RWLock::WLocker watcher_locker(watcher->lock); + + watcher->notify_handles.erase(notify_id); + if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { + m_file_watchers.erase(oid); } } - lock.Lock(); - while (!done) { - cond.Wait(lock); + if (--m_pending_notifies == 0) { + m_file_watcher_cond.Signal(); } - lock.Unlock(); return 0; } @@ -107,20 +132,24 @@ void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id, response.append(bl); SharedNotifyHandle notify_handle = it->second; - Mutex::Locker l2(notify_handle->lock); - --notify_handle->pending_responses; - notify_handle->notify_responses[std::make_pair(gid, handle)] = response; - notify_handle->cond.Signal(); + Mutex::Locker notify_handle_locker(notify_handle->lock); + + WatcherID watcher_id = std::make_pair(gid, handle); + notify_handle->notify_responses[watcher_id] = response; + notify_handle->pending_watcher_ids.erase(watcher_id); + if (notify_handle->pending_watcher_ids.empty()) { + notify_handle->cond.Signal(); + } } -int TestWatchNotify::watch(const std::string& o, uint64_t instance_id, +int TestWatchNotify::watch(const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) { SharedWatcher watcher = get_watcher(o); RWLock::WLocker l(watcher->lock); WatchHandle watch_handle; - watch_handle.instance_id = instance_id; + watch_handle.gid = gid; watch_handle.handle = ++m_handle; watch_handle.watch_ctx = ctx; watch_handle.watch_ctx2 = ctx2; @@ -131,24 +160,21 @@ int TestWatchNotify::watch(const std::string& o, uint64_t instance_id, } int TestWatchNotify::unwatch(uint64_t handle) { - - SharedWatcher watcher; - { - Mutex::Locker l(m_file_watcher_lock); - for (FileWatchers::iterator it = m_file_watchers.begin(); - it != m_file_watchers.end(); ++it) { - if (it->second->watch_handles.find(handle) != - it->second->watch_handles.end()) { - watcher = it->second; - break; + Mutex::Locker l(m_file_watcher_lock); + for (FileWatchers::iterator it = m_file_watchers.begin(); + it != m_file_watchers.end(); ++it) { + SharedWatcher watcher = it->second; + RWLock::WLocker watcher_locker(watcher->lock); + + WatchHandles::iterator w_it = watcher->watch_handles.find(handle); + if (w_it != watcher->watch_handles.end()) { + watcher->watch_handles.erase(w_it); + if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { + m_file_watchers.erase(it); } + break; } } - - if (watcher) { - RWLock::WLocker l(watcher->lock); - watcher->watch_handles.erase(handle); - } return 0; } @@ -168,65 +194,55 @@ TestWatchNotify::SharedWatcher TestWatchNotify::_get_watcher( } void TestWatchNotify::execute_notify(const std::string &oid, - bufferlist &bl, uint64_t notify_id, - Mutex *lock, Cond *cond, - bool *done) { - WatchHandles watch_handles; - SharedNotifyHandle notify_handle; - - { - SharedWatcher watcher = get_watcher(oid); - RWLock::RLocker l(watcher->lock); + bufferlist &bl, uint64_t notify_id) { + SharedWatcher watcher = get_watcher(oid); + RWLock::RLocker watcher_locker(watcher->lock); + WatchHandles &watch_handles = watcher->watch_handles; - NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id); - if (n_it == watcher->notify_handles.end()) { - return; - } - - watch_handles = watcher->watch_handles; - notify_handle = n_it->second; + NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id); + if (n_it == watcher->notify_handles.end()) { + return; } - utime_t timeout; - timeout.set_from_double(ceph_clock_now(m_cct) + 15); - - for (WatchHandles::iterator w_it = watch_handles.begin(); - w_it != watch_handles.end(); ++w_it) { - WatchHandle &watch_handle = w_it->second; - - bufferlist notify_bl; - notify_bl.append(bl); - if (watch_handle.watch_ctx2 != NULL) { - { - Mutex::Locker l2(notify_handle->lock); - ++notify_handle->pending_responses; + SharedNotifyHandle notify_handle = n_it->second; + Mutex::Locker notify_handle_locker(notify_handle->lock); + + WatcherIDs watcher_ids(notify_handle->pending_watcher_ids); + for (WatcherIDs::iterator w_id_it = watcher_ids.begin(); + w_id_it != watcher_ids.end(); ++w_id_it) { + WatcherID watcher_id = *w_id_it; + WatchHandles::iterator w_it = watch_handles.find(watcher_id.second); + if (w_it == watch_handles.end()) { + notify_handle->pending_watcher_ids.erase(watcher_id); + } else { + WatchHandle &watch_handle = w_it->second; + assert(watch_handle.gid == watcher_id.first); + assert(watch_handle.handle == watcher_id.second); + + bufferlist notify_bl; + notify_bl.append(bl); + + notify_handle->lock.Unlock(); + watcher->lock.put_read(); + if (watch_handle.watch_ctx2 != NULL) { + watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0, + notify_bl); + } else if (watch_handle.watch_ctx != NULL) { + watch_handle.watch_ctx->notify(0, 0, notify_bl); } - watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0, - notify_bl); - } else if (watch_handle.watch_ctx != NULL) { - watch_handle.watch_ctx->notify(0, 0, notify_bl); - } - } + watcher->lock.get_read(); + notify_handle->lock.Lock(); - { - Mutex::Locker l2(notify_handle->lock); - while (notify_handle->pending_responses > 0) { - notify_handle->cond.WaitUntil(notify_handle->lock, timeout); - } - if (notify_handle->pbl != NULL) { - ::encode(notify_handle->notify_responses, *notify_handle->pbl); + if (watch_handle.watch_ctx2 == NULL) { + // auto ack old-style watch/notify clients + notify_handle->notify_responses[watcher_id] = bufferlist(); + notify_handle->pending_watcher_ids.erase(watcher_id); + } } } - Mutex::Locker l3(*lock); - *done = true; - cond->Signal(); - - { - Mutex::Locker file_watcher_locker(m_file_watcher_lock); - if (--m_pending_notifies == 0) { - m_file_watcher_cond.Signal(); - } + if (notify_handle->pending_watcher_ids.empty()) { + notify_handle->cond.Signal(); } } diff --git a/src/test/librados_test_stub/TestWatchNotify.h b/src/test/librados_test_stub/TestWatchNotify.h index 1761302bbf37b..7ff4b1afaaadf 100644 --- a/src/test/librados_test_stub/TestWatchNotify.h +++ b/src/test/librados_test_stub/TestWatchNotify.h @@ -21,13 +21,14 @@ namespace librados { class TestWatchNotify : boost::noncopyable { public: + typedef std::pair WatcherID; + typedef std::set WatcherIDs; typedef std::map, bufferlist> NotifyResponses; struct NotifyHandle { NotifyHandle(); + WatcherIDs pending_watcher_ids; NotifyResponses notify_responses; - bufferlist *pbl; - size_t pending_responses; Mutex lock; Cond cond; }; @@ -35,7 +36,7 @@ public: typedef std::map NotifyHandles; struct WatchHandle { - uint64_t instance_id; + uint64_t gid; uint64_t handle; librados::WatchCtx* watch_ctx; librados::WatchCtx2* watch_ctx2; @@ -51,7 +52,7 @@ public: }; typedef boost::shared_ptr SharedWatcher; - TestWatchNotify(CephContext *cct); + TestWatchNotify(CephContext *cct, Finisher *finisher); ~TestWatchNotify(); void flush(); @@ -61,7 +62,7 @@ public: uint64_t timeout_ms, bufferlist *pbl); void notify_ack(const std::string& o, uint64_t notify_id, uint64_t handle, uint64_t gid, bufferlist& bl); - int watch(const std::string& o, uint64_t instance_id, uint64_t *handle, + int watch(const std::string& o, uint64_t gid, uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2); int unwatch(uint64_t handle); @@ -84,7 +85,7 @@ private: SharedWatcher get_watcher(const std::string& oid); SharedWatcher _get_watcher(const std::string& oid); void execute_notify(const std::string &oid, bufferlist &bl, - uint64_t notify_id, Mutex *lock, Cond *cond, bool *done); + uint64_t notify_id); };