ldout(cct, 20) << "enter" << dendl;
// block until we know no additional async notify callbacks will occur
- Mutex::Locker locker(m_lock);
- while (m_pending_notifies > 0) {
- m_file_watcher_cond.Wait(m_lock);
- }
+ C_SaferCond ctx;
+ m_async_op_tracker.wait_for_ops(&ctx);
+ ctx.wait();
}
int TestWatchNotify::list_watchers(const std::string& o,
}
void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
- const std::string& oid, bufferlist& bl,
+ const std::string& oid, const bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
- CephContext *cct = rados_client->cct();
-
- Mutex::Locker lock(m_lock);
- ++m_pending_notifies;
- uint64_t notify_id = ++m_notify_id;
-
- ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
-
- SharedWatcher watcher = get_watcher(oid);
-
- SharedNotifyHandle notify_handle(new NotifyHandle());
- notify_handle->rados_client = rados_client;
- notify_handle->pbl = pbl;
- notify_handle->on_notify = on_notify;
- for (auto &watch_handle_pair : watcher->watch_handles) {
- WatchHandle &watch_handle = watch_handle_pair.second;
- notify_handle->pending_watcher_ids.insert(std::make_pair(
- watch_handle.gid, watch_handle.handle));
- }
- watcher->notify_handles[notify_id] = notify_handle;
-
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl,
- notify_id));
+ auto ctx = new FunctionContext([=](int) {
+ execute_notify(rados_client, oid, bl, pbl, on_notify);
+ });
rados_client->get_aio_finisher()->queue(ctx);
}
int TestWatchNotify::notify(TestRadosClient *rados_client,
- const std::string& oid, bufferlist& bl,
+ const std::string& oid, const bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl) {
C_SaferCond cond;
aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
const std::string &oid,
- bufferlist &bl, uint64_t notify_id) {
+ const bufferlist &bl, bufferlist *pbl,
+ Context *on_notify) {
CephContext *cct = rados_client->cct();
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
+ m_lock.Lock();
+ uint64_t notify_id = ++m_notify_id;
+
+ ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
- Mutex::Locker lock(m_lock);
SharedWatcher watcher = get_watcher(oid);
+
+ SharedNotifyHandle notify_handle(new NotifyHandle());
+ notify_handle->rados_client = rados_client;
+ notify_handle->pbl = pbl;
+ notify_handle->on_notify = on_notify;
+
WatchHandles &watch_handles = watcher->watch_handles;
+ for (auto &watch_handle_pair : watch_handles) {
+ WatchHandle &watch_handle = watch_handle_pair.second;
+ notify_handle->pending_watcher_ids.insert(std::make_pair(
+ watch_handle.gid, watch_handle.handle));
- NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
- if (n_it == watcher->notify_handles.end()) {
- ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
- << ": not found" << dendl;
- return;
- }
+ m_async_op_tracker.start_op();
+ uint64_t notifier_id = rados_client->get_instance_id();
+ watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
+ [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
+ bufferlist notify_bl;
+ notify_bl.append(bl);
+
+ if (watch_handle.watch_ctx2 != NULL) {
+ watch_handle.watch_ctx2->handle_notify(notify_id,
+ watch_handle.handle,
+ notifier_id, notify_bl);
+ } else if (watch_handle.watch_ctx != NULL) {
+ watch_handle.watch_ctx->notify(0, 0, notify_bl);
+
+ // auto ack old-style watch/notify clients
+ ack_notify(watch_handle.rados_client, oid, notify_id,
+ {watch_handle.gid, watch_handle.handle}, bufferlist());
+ }
- SharedNotifyHandle notify_handle = n_it->second;
- WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
- for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
- w_id_it != watcher_ids.end(); ++w_id_it) {
- WatcherID watcher_id = *w_id_it;
- WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
- if (w_it == watch_handles.end()) {
- // client disconnected before notification processed
- notify_handle->pending_watcher_ids.erase(watcher_id);
- } else {
- WatchHandle watch_handle = w_it->second;
- assert(watch_handle.gid == watcher_id.first);
- assert(watch_handle.handle == watcher_id.second);
-
- uint64_t notifier_id = rados_client->get_instance_id();
- watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
- [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
- bufferlist notify_bl;
- notify_bl.append(bl);
-
- if (watch_handle.watch_ctx2 != NULL) {
- watch_handle.watch_ctx2->handle_notify(notify_id,
- watch_handle.handle,
- notifier_id, notify_bl);
- } else if (watch_handle.watch_ctx != NULL) {
- watch_handle.watch_ctx->notify(0, 0, notify_bl);
-
- // auto ack old-style watch/notify clients
- ack_notify(watch_handle.rados_client, oid, notify_id,
- {watch_handle.gid, watch_handle.handle}, bufferlist());
- }
- }));
- }
+ m_async_op_tracker.finish_op();
+ }));
}
+ watcher->notify_handles[notify_id] = notify_handle;
finish_notify(rados_client, oid, notify_id);
-
- if (--m_pending_notifies == 0) {
- m_file_watcher_cond.Signal();
- }
+ m_lock.Unlock();
}
void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
#define CEPH_TEST_WATCH_NOTIFY_H
#include "include/rados/librados.hpp"
-#include "common/Cond.h"
+#include "common/AsyncOpTracker.h"
#include "common/Mutex.h"
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
void aio_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
void aio_notify(TestRadosClient *rados_client, const std::string& oid,
- bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
+ const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify);
void flush(TestRadosClient *rados_client);
int notify(TestRadosClient *rados_client, const std::string& o,
- bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
+ const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(TestRadosClient *rados_client, const std::string& o,
uint64_t notify_id, uint64_t handle, uint64_t gid,
bufferlist& bl);
uint64_t m_notify_id = 0;
Mutex m_lock;
- uint64_t m_pending_notifies = 0;
+ AsyncOpTracker m_async_op_tracker;
- Cond m_file_watcher_cond;
FileWatchers m_file_watchers;
SharedWatcher get_watcher(const std::string& oid);
Context *on_finish);
void execute_notify(TestRadosClient *rados_client, const std::string &oid,
- bufferlist &bl, uint64_t notify_id);
+ const bufferlist &bl, bufferlist *pbl,
+ Context *on_notify);
void ack_notify(TestRadosClient *rados_client, const std::string &oid,
uint64_t notify_id, const WatcherID &watcher_id,
const bufferlist &bl);