namespace librados {
-TestWatchNotify::TestWatchNotify(CephContext *cct)
- : m_cct(cct), m_finisher(new Finisher(cct)), m_handle(), m_notify_id(),
+TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher)
+ : m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(),
m_file_watcher_lock("librados::TestWatchNotify::m_file_watcher_lock"),
m_pending_notifies(0) {
m_cct->get();
- m_finisher->start();
}
TestWatchNotify::~TestWatchNotify() {
- m_finisher->stop();
- delete m_finisher;
m_cct->put();
}
TestWatchNotify::NotifyHandle::NotifyHandle()
- : pbl(NULL), pending_responses(),
- lock("TestWatchNotify::NotifyHandle::lock") {
+ : lock("TestWatchNotify::NotifyHandle::lock") {
}
TestWatchNotify::Watcher::Watcher()
it != watcher->watch_handles.end(); ++it) {
obj_watch_t obj;
strcpy(obj.addr, ":/0");
- obj.watcher_id = static_cast<int64_t>(it->second.instance_id);
+ obj.watcher_id = static_cast<int64_t>(it->second.gid);
obj.cookie = it->second.handle;
obj.timeout_seconds = 30;
out_watchers->push_back(obj);
int TestWatchNotify::notify(const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl) {
- Mutex lock("TestRadosClient::watcher_notify::lock");
- Cond cond;
- bool done = false;
+ uint64_t notify_id;
+ {
+ Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+ ++m_pending_notifies;
+ notify_id = ++m_notify_id;
+ }
+ SharedNotifyHandle notify_handle(new NotifyHandle());
{
SharedWatcher watcher = get_watcher(oid);
- RWLock::WLocker l(watcher->lock);
- {
- Mutex::Locker l2(m_file_watcher_lock);
- ++m_pending_notifies;
- uint64_t notify_id = ++m_notify_id;
+ RWLock::WLocker watcher_locker(watcher->lock);
+
+ WatchHandles watch_handles = watcher->watch_handles;
+ for (WatchHandles::iterator w_it = watch_handles.begin();
+ w_it != watch_handles.end(); ++w_it) {
+ WatchHandle &watch_handle = w_it->second;
+
+ Mutex::Locker notify_handle_locker(notify_handle->lock);
+ notify_handle->pending_watcher_ids.insert(std::make_pair(
+ watch_handle.gid, watch_handle.handle));
+ }
+
+ watcher->notify_handles[notify_id] = notify_handle;
+
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&TestWatchNotify::execute_notify, this, oid, bl, notify_id));
+ m_finisher->queue(ctx);
+ }
+
+ {
+ utime_t timeout;
+ timeout.set_from_double(ceph_clock_now(m_cct) + (timeout_ms / 1000.0));
- SharedNotifyHandle notify_handle(new NotifyHandle());
- notify_handle->pbl = pbl;
+ Mutex::Locker notify_locker(notify_handle->lock);
+ while (!notify_handle->pending_watcher_ids.empty()) {
+ notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
+ }
- watcher->notify_handles[notify_id] = notify_handle;
+ if (pbl != NULL) {
+ ::encode(notify_handle->notify_responses, *pbl);
+ ::encode(notify_handle->pending_watcher_ids, *pbl);
+ }
+ }
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&TestWatchNotify::execute_notify, this,
- oid, bl, notify_id, &lock, &cond, &done));
- m_finisher->queue(ctx);
+ SharedWatcher watcher = get_watcher(oid);
+ Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+ {
+ RWLock::WLocker watcher_locker(watcher->lock);
+
+ watcher->notify_handles.erase(notify_id);
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ m_file_watchers.erase(oid);
}
}
- lock.Lock();
- while (!done) {
- cond.Wait(lock);
+ if (--m_pending_notifies == 0) {
+ m_file_watcher_cond.Signal();
}
- lock.Unlock();
return 0;
}
response.append(bl);
SharedNotifyHandle notify_handle = it->second;
- Mutex::Locker l2(notify_handle->lock);
- --notify_handle->pending_responses;
- notify_handle->notify_responses[std::make_pair(gid, handle)] = response;
- notify_handle->cond.Signal();
+ Mutex::Locker notify_handle_locker(notify_handle->lock);
+
+ WatcherID watcher_id = std::make_pair(gid, handle);
+ notify_handle->notify_responses[watcher_id] = response;
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+ if (notify_handle->pending_watcher_ids.empty()) {
+ notify_handle->cond.Signal();
+ }
}
-int TestWatchNotify::watch(const std::string& o, uint64_t instance_id,
+int TestWatchNotify::watch(const std::string& o, uint64_t gid,
uint64_t *handle, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2) {
SharedWatcher watcher = get_watcher(o);
RWLock::WLocker l(watcher->lock);
WatchHandle watch_handle;
- watch_handle.instance_id = instance_id;
+ watch_handle.gid = gid;
watch_handle.handle = ++m_handle;
watch_handle.watch_ctx = ctx;
watch_handle.watch_ctx2 = ctx2;
}
int TestWatchNotify::unwatch(uint64_t handle) {
-
- SharedWatcher watcher;
- {
- Mutex::Locker l(m_file_watcher_lock);
- for (FileWatchers::iterator it = m_file_watchers.begin();
- it != m_file_watchers.end(); ++it) {
- if (it->second->watch_handles.find(handle) !=
- it->second->watch_handles.end()) {
- watcher = it->second;
- break;
+ Mutex::Locker l(m_file_watcher_lock);
+ for (FileWatchers::iterator it = m_file_watchers.begin();
+ it != m_file_watchers.end(); ++it) {
+ SharedWatcher watcher = it->second;
+ RWLock::WLocker watcher_locker(watcher->lock);
+
+ WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+ if (w_it != watcher->watch_handles.end()) {
+ watcher->watch_handles.erase(w_it);
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ m_file_watchers.erase(it);
}
+ break;
}
}
-
- if (watcher) {
- RWLock::WLocker l(watcher->lock);
- watcher->watch_handles.erase(handle);
- }
return 0;
}
}
void TestWatchNotify::execute_notify(const std::string &oid,
- bufferlist &bl, uint64_t notify_id,
- Mutex *lock, Cond *cond,
- bool *done) {
- WatchHandles watch_handles;
- SharedNotifyHandle notify_handle;
-
- {
- SharedWatcher watcher = get_watcher(oid);
- RWLock::RLocker l(watcher->lock);
+ bufferlist &bl, uint64_t notify_id) {
+ SharedWatcher watcher = get_watcher(oid);
+ RWLock::RLocker watcher_locker(watcher->lock);
+ WatchHandles &watch_handles = watcher->watch_handles;
- NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
- if (n_it == watcher->notify_handles.end()) {
- return;
- }
-
- watch_handles = watcher->watch_handles;
- notify_handle = n_it->second;
+ NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
+ if (n_it == watcher->notify_handles.end()) {
+ return;
}
- utime_t timeout;
- timeout.set_from_double(ceph_clock_now(m_cct) + 15);
-
- for (WatchHandles::iterator w_it = watch_handles.begin();
- w_it != watch_handles.end(); ++w_it) {
- WatchHandle &watch_handle = w_it->second;
-
- bufferlist notify_bl;
- notify_bl.append(bl);
- if (watch_handle.watch_ctx2 != NULL) {
- {
- Mutex::Locker l2(notify_handle->lock);
- ++notify_handle->pending_responses;
+ SharedNotifyHandle notify_handle = n_it->second;
+ Mutex::Locker notify_handle_locker(notify_handle->lock);
+
+ WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
+ for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
+ w_id_it != watcher_ids.end(); ++w_id_it) {
+ WatcherID watcher_id = *w_id_it;
+ WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
+ if (w_it == watch_handles.end()) {
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+ } else {
+ WatchHandle &watch_handle = w_it->second;
+ assert(watch_handle.gid == watcher_id.first);
+ assert(watch_handle.handle == watcher_id.second);
+
+ bufferlist notify_bl;
+ notify_bl.append(bl);
+
+ notify_handle->lock.Unlock();
+ watcher->lock.put_read();
+ if (watch_handle.watch_ctx2 != NULL) {
+ watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
+ notify_bl);
+ } else if (watch_handle.watch_ctx != NULL) {
+ watch_handle.watch_ctx->notify(0, 0, notify_bl);
}
- watch_handle.watch_ctx2->handle_notify(notify_id, w_it->first, 0,
- notify_bl);
- } else if (watch_handle.watch_ctx != NULL) {
- watch_handle.watch_ctx->notify(0, 0, notify_bl);
- }
- }
+ watcher->lock.get_read();
+ notify_handle->lock.Lock();
- {
- Mutex::Locker l2(notify_handle->lock);
- while (notify_handle->pending_responses > 0) {
- notify_handle->cond.WaitUntil(notify_handle->lock, timeout);
- }
- if (notify_handle->pbl != NULL) {
- ::encode(notify_handle->notify_responses, *notify_handle->pbl);
+ if (watch_handle.watch_ctx2 == NULL) {
+ // auto ack old-style watch/notify clients
+ notify_handle->notify_responses[watcher_id] = bufferlist();
+ notify_handle->pending_watcher_ids.erase(watcher_id);
+ }
}
}
- Mutex::Locker l3(*lock);
- *done = true;
- cond->Signal();
-
- {
- Mutex::Locker file_watcher_locker(m_file_watcher_lock);
- if (--m_pending_notifies == 0) {
- m_file_watcher_cond.Signal();
- }
+ if (notify_handle->pending_watcher_ids.empty()) {
+ notify_handle->cond.Signal();
}
}