rados_client->get_aio_finisher()->queue(on_finish);
}
+int TestWatchNotify::watch(TestRadosClient *rados_client,
+ const std::string& o, uint64_t gid,
+ uint64_t *handle, librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2) {
+ C_SaferCond cond;
+ aio_watch(rados_client, o, gid, handle, ctx, ctx2, &cond);
+ return cond.wait();
+}
+
void TestWatchNotify::aio_watch(TestRadosClient *rados_client,
const std::string& o, uint64_t gid,
uint64_t *handle,
- librados::WatchCtx2 *watch_ctx,
+ librados::WatchCtx *watch_ctx,
+ librados::WatchCtx2 *watch_ctx2,
Context *on_finish) {
- int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx);
- rados_client->get_aio_finisher()->queue(on_finish, r);
+ auto ctx = new FunctionContext([=](int) {
+ execute_watch(rados_client, o, gid, handle, watch_ctx, watch_ctx2,
+ on_finish);
+ });
+ rados_client->get_aio_finisher()->queue(ctx);
+}
+
+int TestWatchNotify::unwatch(TestRadosClient *rados_client,
+ uint64_t handle) {
+ C_SaferCond ctx;
+ aio_unwatch(rados_client, handle, &ctx);
+ return ctx.wait();
}
void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
uint64_t handle, Context *on_finish) {
- unwatch(rados_client, handle);
- rados_client->get_aio_finisher()->queue(on_finish);
+ auto ctx = new FunctionContext([this, rados_client, handle, on_finish](int) {
+ execute_unwatch(rados_client, handle, on_finish);
+ });
+ rados_client->get_aio_finisher()->queue(ctx);
}
void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
finish_notify(rados_client, o, notify_id);
}
-int TestWatchNotify::watch(TestRadosClient *rados_client,
- const std::string& o, uint64_t gid,
- uint64_t *handle, librados::WatchCtx *ctx,
- librados::WatchCtx2 *ctx2) {
+void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
+ const std::string& o, uint64_t gid,
+ uint64_t *handle, librados::WatchCtx *ctx,
+ librados::WatchCtx2 *ctx2,
+ Context* on_finish) {
CephContext *cct = rados_client->cct();
- Mutex::Locker lock(m_lock);
+ m_lock.Lock();
SharedWatcher watcher = get_watcher(o);
WatchHandle watch_handle;
ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
<< dendl;
- return 0;
+ m_lock.Unlock();
+
+ on_finish->complete(0);
}
-int TestWatchNotify::unwatch(TestRadosClient *rados_client,
- uint64_t handle) {
+void TestWatchNotify::execute_unwatch(TestRadosClient *rados_client,
+ uint64_t handle, Context* on_finish) {
CephContext *cct = rados_client->cct();
ldout(cct, 20) << "handle=" << handle << dendl;
- Mutex::Locker locker(m_lock);
- for (FileWatchers::iterator it = m_file_watchers.begin();
- it != m_file_watchers.end(); ++it) {
- SharedWatcher watcher = it->second;
-
- WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
- if (w_it != watcher->watch_handles.end()) {
- watcher->watch_handles.erase(w_it);
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase(it);
+ {
+ Mutex::Locker locker(m_lock);
+ for (FileWatchers::iterator it = m_file_watchers.begin();
+ it != m_file_watchers.end(); ++it) {
+ SharedWatcher watcher = it->second;
+
+ WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
+ if (w_it != watcher->watch_handles.end()) {
+ watcher->watch_handles.erase(w_it);
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ m_file_watchers.erase(it);
+ }
+ break;
}
- break;
}
}
- return 0;
+ on_finish->complete(0);
}
TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
void aio_flush(TestRadosClient *rados_client, Context *on_finish);
void aio_watch(TestRadosClient *rados_client, const std::string& o,
- uint64_t gid, uint64_t *handle, librados::WatchCtx2 *watch_ctx,
- Context *on_finish);
+ uint64_t gid, uint64_t *handle, librados::WatchCtx *watch_ctx,
+ librados::WatchCtx2 *watch_ctx2, Context *on_finish);
void aio_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
void aio_notify(TestRadosClient *rados_client, const std::string& oid,
SharedWatcher get_watcher(const std::string& oid);
+ void execute_watch(TestRadosClient *rados_client, const std::string& o,
+ uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *watch_ctx,
+ librados::WatchCtx2 *watch_ctx2,
+ Context *on_finish);
+ void execute_unwatch(TestRadosClient *rados_client, uint64_t handle,
+ Context *on_finish);
+
void execute_notify(TestRadosClient *rados_client, const std::string &oid,
bufferlist &bl, uint64_t notify_id);
void ack_notify(TestRadosClient *rados_client, const std::string &oid,