namespace librados {
-TestWatchNotify::TestWatchNotify(CephContext *cct)
- : m_cct(cct), m_finisher(new Finisher(cct)), m_handle(), m_notify_id(),
- m_file_watcher_lock("librados::TestWatchNotify::m_file_watcher_lock"),
+TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher)
+ : m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(),
+ m_lock("librados::TestWatchNotify::m_lock"),
m_pending_notifies(0) {
m_cct->get();
- m_finisher->start();
}
TestWatchNotify::~TestWatchNotify() {
- m_finisher->stop();
- delete m_finisher;
m_cct->put();
}
-TestWatchNotify::NotifyHandle::NotifyHandle()
- : pbl(NULL), pending_responses(),
- lock("TestWatchNotify::NotifyHandle::lock") {
-}
-
-TestWatchNotify::Watcher::Watcher()
- : lock("TestWatchNotify::Watcher::lock") {
-}
-
void TestWatchNotify::flush() {
- Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+ Mutex::Locker locker(m_lock);
while (m_pending_notifies > 0) {
- m_file_watcher_cond.Wait(m_file_watcher_lock);
+ m_file_watcher_cond.Wait(m_lock);
}
}
int TestWatchNotify::list_watchers(const std::string& o,
std::list<obj_watch_t> *out_watchers) {
+ Mutex::Locker lock(m_lock);
SharedWatcher watcher = get_watcher(o);
- RWLock::RLocker l(watcher->lock);
out_watchers->clear();
for (TestWatchNotify::WatchHandles::iterator it =
it != watcher->watch_handles.end(); ++it) {
obj_watch_t obj;
strcpy(obj.addr, ":/0");
- obj.watcher_id = static_cast<int64_t>(it->second.instance_id);
+ obj.watcher_id = static_cast<int64_t>(it->second.gid);
obj.cookie = it->second.handle;
obj.timeout_seconds = 30;
out_watchers->push_back(obj);
void TestWatchNotify::aio_notify(const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
- SharedWatcher watcher = get_watcher(oid);
- RWLock::WLocker watcher_locker(watcher->lock);
- Mutex::Locker file_watcher_lock(m_file_watcher_lock);
+ Mutex::Locker lock(m_lock);
++m_pending_notifies;
uint64_t notify_id = ++m_notify_id;
+ SharedWatcher watcher = get_watcher(oid);
+
SharedNotifyHandle notify_handle(new NotifyHandle());
notify_handle->pbl = pbl;
-
+ notify_handle->on_notify = on_notify;
+ for (auto &watch_handle_pair : watcher->watch_handles) {
+ WatchHandle &watch_handle = watch_handle_pair.second;
+ notify_handle->pending_watcher_ids.insert(std::make_pair(
+ watch_handle.gid, watch_handle.handle));
+ }
watcher->notify_handles[notify_id] = notify_handle;
FunctionContext *ctx = new FunctionContext(
- boost::bind(&TestWatchNotify::execute_notify, this,
- oid, bl, notify_id, on_notify));
+ boost::bind(&TestWatchNotify::execute_notify, this, oid, bl, notify_id));
m_finisher->queue(ctx);
}
void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
uint64_t handle, uint64_t gid,
bufferlist& bl) {
- SharedWatcher watcher = get_watcher(o);
-
- RWLock::RLocker l(watcher->lock);
- NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
- if (it == watcher->notify_handles.end()) {
- return;
- }
-
- bufferlist response;
- response.append(bl);
-
- SharedNotifyHandle notify_handle = it->second;
- Mutex::Locker l2(notify_handle->lock);
- --notify_handle->pending_responses;
- notify_handle->notify_responses[std::make_pair(gid, handle)] = response;
- notify_handle->cond.Signal();
+ Mutex::Locker lock(m_lock);
+ WatcherID watcher_id = std::make_pair(gid, handle);
+ ack_notify(o, notify_id, watcher_id, bl);
+ finish_notify(o, notify_id);
}
-int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
+int TestWatchNotify::watch(const std::string& o, uint64_t gid,
uint64_t *handle, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2) {
+ Mutex::Locker lock(m_lock);
SharedWatcher watcher = get_watcher(o);
- RWLock::WLocker l(watcher->lock);
WatchHandle watch_handle;
- watch_handle.instance_id = instance_id;
+ watch_handle.gid = gid;
watch_handle.handle = ++m_handle;
watch_handle.watch_ctx = ctx;
watch_handle.watch_ctx2 = ctx2;
}
int TestWatchNotify::unwatch(uint64_t handle) {
-
- SharedWatcher watcher;
- {
- Mutex::Locker l(m_file_watcher_lock);
- for (FileWatchers::iterator it = m_file_watchers.begin();
- it != m_file_watchers.end(); ++it) {
- if (it->second->watch_handles.find(handle) !=
- it->second->watch_handles.end()) {
- watcher = it->second;
- break;
+ Mutex::Locker locker(m_lock);
+ for (FileWatchers::iterator it = m_file_watchers.begin();
+ it != m_file_watchers.end(); ++it) {
+ SharedWatcher watcher = it->second;
+
+ WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+ if (w_it != watcher->watch_handles.end()) {
+ watcher->watch_handles.erase(w_it);
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ m_file_watchers.erase(it);
}
+ break;
}
}
-
- if (watcher) {
- RWLock::WLocker l(watcher->lock);
- watcher->watch_handles.erase(handle);
- }
return 0;
}
TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
const std::string& oid) {
- Mutex::Locker l(m_file_watcher_lock);
- return _get_watcher(oid);
-}
-
-TestWatchNotify::SharedWatcher TestWatchNotify::_get_watcher(
- const std::string& oid) {
+ assert(m_lock.is_locked());
SharedWatcher &watcher = m_file_watchers[oid];
if (!watcher) {
watcher.reset(new Watcher());
}
void TestWatchNotify::execute_notify(const std::string &oid,
- bufferlist &bl, uint64_t notify_id,
- Context *on_notify) {
- WatchHandles watch_handles;
- SharedNotifyHandle notify_handle;
-
- {
- SharedWatcher watcher = get_watcher(oid);
- RWLock::RLocker l(watcher->lock);
-
- NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
- if (n_it == watcher->notify_handles.end()) {
- return;
- }
+ bufferlist &bl, uint64_t notify_id) {
+ Mutex::Locker lock(m_lock);
+ SharedWatcher watcher = get_watcher(oid);
+ WatchHandles &watch_handles = watcher->watch_handles;
- watch_handles = watcher->watch_handles;
- notify_handle = n_it->second;
+ NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
+ if (n_it == watcher->notify_handles.end()) {
+ return;
}
- utime_t timeout;
- timeout.set_from_double(ceph_clock_now(m_cct) + 15);
-
- for (WatchHandles::iterator w_it = watch_handles.begin();
- w_it != watch_handles.end(); ++w_it) {
- WatchHandle &watch_handle = w_it->second;
+ SharedNotifyHandle notify_handle = n_it->second;
+ WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
+ for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
+ w_id_it != watcher_ids.end(); ++w_id_it) {
+ WatcherID watcher_id = *w_id_it;
+ WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
+ if (w_it == watch_handles.end()) {
+ // client disconnected before notification processed
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+ } else {
+ WatchHandle &watch_handle = w_it->second;
+ assert(watch_handle.gid == watcher_id.first);
+ assert(watch_handle.handle == watcher_id.second);
+
+ bufferlist notify_bl;
+ notify_bl.append(bl);
+
+ m_lock.Unlock();
+ if (watch_handle.watch_ctx2 != NULL) {
+ watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
+ notify_bl);
+ } else if (watch_handle.watch_ctx != NULL) {
+ watch_handle.watch_ctx->notify(0, 0, notify_bl);
+ }
+ m_lock.Lock();
- bufferlist notify_bl;
- notify_bl.append(bl);
- if (watch_handle.watch_ctx2 != NULL) {
- {
- Mutex::Locker l2(notify_handle->lock);
- ++notify_handle->pending_responses;
+ if (watch_handle.watch_ctx2 == NULL) {
+ // auto ack old-style watch/notify clients
+ ack_notify(oid, notify_id, watcher_id, bufferlist());
}
- watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
- notify_bl);
- } else if (watch_handle.watch_ctx != NULL) {
- watch_handle.watch_ctx->notify(0, 0, notify_bl);
}
}
- {
- Mutex::Locker l2(notify_handle->lock);
- while (notify_handle->pending_responses > 0) {
- notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
- }
- if (notify_handle->pbl != NULL) {
- ::encode(notify_handle->notify_responses, *notify_handle->pbl);
- }
+ finish_notify(oid, notify_id);
+}
+
+void TestWatchNotify::ack_notify(const std::string &oid,
+ uint64_t notify_id,
+ const WatcherID &watcher_id,
+ const bufferlist &bl) {
+ assert(m_lock.is_locked());
+ SharedWatcher watcher = get_watcher(oid);
+
+ NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+ if (it == watcher->notify_handles.end()) {
+ return;
+ }
+
+ bufferlist response;
+ response.append(bl);
+
+ SharedNotifyHandle notify_handle = it->second;
+ notify_handle->notify_responses[watcher_id] = response;
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+}
+
+void TestWatchNotify::finish_notify(const std::string &oid,
+ uint64_t notify_id) {
+ assert(m_lock.is_locked());
+ SharedWatcher watcher = get_watcher(oid);
+
+ NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
+ if (it == watcher->notify_handles.end()) {
+ return;
}
- on_notify->complete(0);
+ SharedNotifyHandle notify_handle = it->second;
+ if (!notify_handle->pending_watcher_ids.empty()) {
+ return;
+ }
+
+ if (notify_handle->pbl != NULL) {
+ ::encode(notify_handle->notify_responses, *notify_handle->pbl);
+ ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
+ }
+
+ m_lock.Unlock();
+ notify_handle->on_notify->complete(0);
+ m_lock.Lock();
+
+ watcher->notify_handles.erase(notify_id);
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ m_file_watchers.erase(oid);
+ }
- Mutex::Locker file_watcher_locker(m_file_watcher_lock);
if (--m_pending_notifies == 0) {
m_file_watcher_cond.Signal();
}
-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_TEST_WATCH_NOTIFY_H
#include "include/rados/librados.hpp"
#include "common/Cond.h"
#include "common/Mutex.h"
-#include "common/RWLock.h"
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <list>
class TestWatchNotify : boost::noncopyable {
public:
+ typedef std::pair<uint64_t, uint64_t> WatcherID;
+ typedef std::set<WatcherID> WatcherIDs;
typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> NotifyResponses;
struct NotifyHandle {
- NotifyHandle();
+ WatcherIDs pending_watcher_ids;
NotifyResponses notify_responses;
- bufferlist *pbl;
- size_t pending_responses;
- Mutex lock;
- Cond cond;
+ bufferlist *pbl = nullptr;
+ Context *on_notify = nullptr;
};
typedef boost::shared_ptr<NotifyHandle> SharedNotifyHandle;
typedef std::map<uint64_t, SharedNotifyHandle> NotifyHandles;
struct WatchHandle {
- uint64_t instance_id;
+ uint64_t gid;
uint64_t handle;
librados::WatchCtx* watch_ctx;
librados::WatchCtx2* watch_ctx2;
typedef std::map<uint64_t, WatchHandle> WatchHandles;
struct Watcher {
- Watcher();
WatchHandles watch_handles;
NotifyHandles notify_handles;
- RWLock lock;
};
typedef boost::shared_ptr<Watcher> SharedWatcher;
- TestWatchNotify(CephContext *cct);
+ TestWatchNotify(CephContext *cct, Finisher *finisher);
~TestWatchNotify();
void flush();
uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(const std::string& o, uint64_t notify_id,
uint64_t handle, uint64_t gid, bufferlist& bl);
- int watch(const std::string& o, uint64_t instance_id, uint64_t *handle,
+ int watch(const std::string& o, uint64_t gid, uint64_t *handle,
librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
int unwatch(uint64_t handle);
uint64_t m_handle;
uint64_t m_notify_id;
- Mutex m_file_watcher_lock;
- Cond m_file_watcher_cond;
+ Mutex m_lock;
uint64_t m_pending_notifies;
+ Cond m_file_watcher_cond;
FileWatchers m_file_watchers;
SharedWatcher get_watcher(const std::string& oid);
- SharedWatcher _get_watcher(const std::string& oid);
- void execute_notify(const std::string &oid, bufferlist &bl,
- uint64_t notify_id, Context *on_notify);
+ void execute_notify(const std::string &oid, bufferlist &bl,
+ uint64_t notify_id);
+ void ack_notify(const std::string &oid, uint64_t notify_id,
+ const WatcherID &watcher_id, const bufferlist &bl);
+ void finish_notify(const std::string &oid, uint64_t notify_id);
};
} // namespace librados