m_pending_ops++;
c->get();
C_AioNotify *ctx = new C_AioNotify(this, c);
- m_client->get_watch_notify()->aio_notify(m_client, oid, bl, timeout_ms, pbl,
- ctx);
+ m_client->get_watch_notify()->aio_notify(m_client, m_pool_id, oid, bl,
+ timeout_ms, pbl, ctx);
}
int TestIoCtxImpl::aio_operate(const std::string& oid, TestObjectOperationImpl &ops,
if (m_client->is_blacklisted()) {
m_client->get_aio_finisher()->queue(ctx, -EBLACKLISTED);
} else {
- m_client->get_watch_notify()->aio_watch(m_client, o, get_instance_id(),
- handle, nullptr, watch_ctx, ctx);
+ m_client->get_watch_notify()->aio_watch(m_client, m_pool_id, o,
+ get_instance_id(), handle, nullptr,
+ watch_ctx, ctx);
}
return 0;
}
return -EBLACKLISTED;
}
- return m_client->get_watch_notify()->list_watchers(o, out_watchers);
+ return m_client->get_watch_notify()->list_watchers(m_pool_id, o,
+ out_watchers);
}
int TestIoCtxImpl::notify(const std::string& o, bufferlist& bl,
return -EBLACKLISTED;
}
- return m_client->get_watch_notify()->notify(m_client, o, bl, timeout_ms, pbl);
+ return m_client->get_watch_notify()->notify(m_client, m_pool_id, o, bl,
+ timeout_ms, pbl);
}
void TestIoCtxImpl::notify_ack(const std::string& o, uint64_t notify_id,
uint64_t handle, bufferlist& bl) {
- m_client->get_watch_notify()->notify_ack(m_client, o, notify_id, handle,
- m_client->get_instance_id(), bl);
+ m_client->get_watch_notify()->notify_ack(m_client, m_pool_id, o, notify_id,
+ handle, m_client->get_instance_id(),
+ bl);
}
int TestIoCtxImpl::operate(const std::string& oid, TestObjectOperationImpl &ops) {
return -EBLACKLISTED;
}
- return m_client->get_watch_notify()->watch(m_client, o, get_instance_id(),
- handle, ctx, ctx2);
+ return m_client->get_watch_notify()->watch(m_client, m_pool_id, o,
+ get_instance_id(), handle, ctx,
+ ctx2);
}
int TestIoCtxImpl::execute_operation(const std::string& oid,
ctx.wait();
}
-int TestWatchNotify::list_watchers(const std::string& o,
+int TestWatchNotify::list_watchers(int64_t pool_id, const std::string& o,
std::list<obj_watch_t> *out_watchers) {
Mutex::Locker lock(m_lock);
- SharedWatcher watcher = get_watcher(o);
+ SharedWatcher watcher = get_watcher(pool_id, o);
out_watchers->clear();
for (TestWatchNotify::WatchHandles::iterator it =
rados_client->get_aio_finisher()->queue(on_finish);
}
-int TestWatchNotify::watch(TestRadosClient *rados_client,
- const std::string& o, uint64_t gid,
- uint64_t *handle, librados::WatchCtx *ctx,
- librados::WatchCtx2 *ctx2) {
+int TestWatchNotify::watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) {
C_SaferCond cond;
- aio_watch(rados_client, o, gid, handle, ctx, ctx2, &cond);
+ aio_watch(rados_client, pool_id, o, gid, handle, ctx, ctx2, &cond);
return cond.wait();
}
-void TestWatchNotify::aio_watch(TestRadosClient *rados_client,
+void TestWatchNotify::aio_watch(TestRadosClient *rados_client, int64_t pool_id,
const std::string& o, uint64_t gid,
uint64_t *handle,
librados::WatchCtx *watch_ctx,
librados::WatchCtx2 *watch_ctx2,
Context *on_finish) {
auto ctx = new FunctionContext([=](int) {
- execute_watch(rados_client, o, gid, handle, watch_ctx, watch_ctx2,
- on_finish);
+ execute_watch(rados_client, pool_id, o, gid, handle, watch_ctx,
+ watch_ctx2, on_finish);
});
rados_client->get_aio_finisher()->queue(ctx);
}
rados_client->get_aio_finisher()->queue(ctx);
}
-void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
+void TestWatchNotify::aio_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& oid, const bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
auto ctx = new FunctionContext([=](int) {
- execute_notify(rados_client, oid, bl, pbl, on_notify);
+ execute_notify(rados_client, pool_id, oid, bl, pbl, on_notify);
});
rados_client->get_aio_finisher()->queue(ctx);
}
-int TestWatchNotify::notify(TestRadosClient *rados_client,
- const std::string& oid, const bufferlist& bl,
+int TestWatchNotify::notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl) {
C_SaferCond cond;
- aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
+ aio_notify(rados_client, pool_id, oid, bl, timeout_ms, pbl, &cond);
return cond.wait();
}
-void TestWatchNotify::notify_ack(TestRadosClient *rados_client,
+void TestWatchNotify::notify_ack(TestRadosClient *rados_client, int64_t pool_id,
const std::string& o, uint64_t notify_id,
uint64_t handle, uint64_t gid,
bufferlist& bl) {
<< ", gid=" << gid << dendl;
Mutex::Locker lock(m_lock);
WatcherID watcher_id = std::make_pair(gid, handle);
- ack_notify(rados_client, o, notify_id, watcher_id, bl);
- finish_notify(rados_client, o, notify_id);
+ ack_notify(rados_client, pool_id, o, notify_id, watcher_id, bl);
+ finish_notify(rados_client, pool_id, o, notify_id);
}
void TestWatchNotify::execute_watch(TestRadosClient *rados_client,
- const std::string& o, uint64_t gid,
- uint64_t *handle, librados::WatchCtx *ctx,
+ int64_t pool_id, const std::string& o,
+ uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2,
Context* on_finish) {
CephContext *cct = rados_client->cct();
m_lock.Lock();
- SharedWatcher watcher = get_watcher(o);
+ SharedWatcher watcher = get_watcher(pool_id, o);
WatchHandle watch_handle;
watch_handle.rados_client = rados_client;
}
TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
- const std::string& oid) {
+ int64_t pool_id, const std::string& oid) {
assert(m_lock.is_locked());
- SharedWatcher &watcher = m_file_watchers[oid];
+ SharedWatcher &watcher = m_file_watchers[{pool_id, oid}];
if (!watcher) {
watcher.reset(new Watcher());
}
}
void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
- const std::string &oid,
+ int64_t pool_id, const std::string &oid,
const bufferlist &bl, bufferlist *pbl,
Context *on_notify) {
CephContext *cct = rados_client->cct();
ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
- SharedWatcher watcher = get_watcher(oid);
+ SharedWatcher watcher = get_watcher(pool_id, oid);
SharedNotifyHandle notify_handle(new NotifyHandle());
notify_handle->rados_client = rados_client;
m_async_op_tracker.start_op();
uint64_t notifier_id = rados_client->get_instance_id();
watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
- [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
+ [this, pool_id, oid, bl, notify_id, watch_handle, notifier_id](int r) {
bufferlist notify_bl;
notify_bl.append(bl);
watch_handle.watch_ctx->notify(0, 0, notify_bl);
// auto ack old-style watch/notify clients
- ack_notify(watch_handle.rados_client, oid, notify_id,
+ ack_notify(watch_handle.rados_client, pool_id, oid, notify_id,
{watch_handle.gid, watch_handle.handle}, bufferlist());
}
}
watcher->notify_handles[notify_id] = notify_handle;
- finish_notify(rados_client, oid, notify_id);
+ finish_notify(rados_client, pool_id, oid, notify_id);
m_lock.Unlock();
}
-void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
- const std::string &oid,
- uint64_t notify_id,
+void TestWatchNotify::ack_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string &oid, uint64_t notify_id,
const WatcherID &watcher_id,
const bufferlist &bl) {
CephContext *cct = rados_client->cct();
<< ", WatcherID=" << watcher_id << dendl;
assert(m_lock.is_locked());
- SharedWatcher watcher = get_watcher(oid);
+ SharedWatcher watcher = get_watcher(pool_id, oid);
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
}
void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
- const std::string &oid,
+ int64_t pool_id, const std::string &oid,
uint64_t notify_id) {
CephContext *cct = rados_client->cct();
ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
assert(m_lock.is_locked());
- SharedWatcher watcher = get_watcher(oid);
+ SharedWatcher watcher = get_watcher(pool_id, oid);
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
notify_handle->on_notify, 0);
watcher->notify_handles.erase(notify_id);
if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase(oid);
+ m_file_watchers.erase({pool_id, oid});
}
}
TestWatchNotify();
- int list_watchers(const std::string& o,
+ int list_watchers(int64_t pool_id, const std::string& o,
std::list<obj_watch_t> *out_watchers);
void aio_flush(TestRadosClient *rados_client, Context *on_finish);
- void aio_watch(TestRadosClient *rados_client, const std::string& o,
- uint64_t gid, uint64_t *handle, librados::WatchCtx *watch_ctx,
- librados::WatchCtx2 *watch_ctx2, Context *on_finish);
+ void aio_watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *watch_ctx, librados::WatchCtx2 *watch_ctx2,
+ Context *on_finish);
void aio_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
- void aio_notify(TestRadosClient *rados_client, const std::string& oid,
- const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
- Context *on_notify);
+ void aio_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& oid, const bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl, Context *on_notify);
void flush(TestRadosClient *rados_client);
- int notify(TestRadosClient *rados_client, const std::string& o,
- const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
- void notify_ack(TestRadosClient *rados_client, const std::string& o,
- uint64_t notify_id, uint64_t handle, uint64_t gid,
- bufferlist& bl);
- int watch(TestRadosClient *rados_client, const std::string& o, uint64_t gid,
- uint64_t *handle, librados::WatchCtx *ctx,
- librados::WatchCtx2 *ctx2);
+ int notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *pbl);
+ void notify_ack(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, uint64_t notify_id, uint64_t handle,
+ uint64_t gid, bufferlist& bl);
+
+ int watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, uint64_t gid, uint64_t *handle,
+ librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2);
int unwatch(TestRadosClient *rados_client, uint64_t handle);
void blacklist(uint32_t nonce);
private:
-
- typedef std::map<std::string, SharedWatcher> FileWatchers;
+ typedef std::pair<int64_t, std::string> PoolFile;
+ typedef std::map<PoolFile, SharedWatcher> FileWatchers;
uint64_t m_handle = 0;
uint64_t m_notify_id = 0;
FileWatchers m_file_watchers;
- SharedWatcher get_watcher(const std::string& oid);
+ SharedWatcher get_watcher(int64_t pool_id, const std::string& oid);
- void execute_watch(TestRadosClient *rados_client, const std::string& o,
- uint64_t gid, uint64_t *handle,
+ void execute_watch(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string& o, uint64_t gid, uint64_t *handle,
librados::WatchCtx *watch_ctx,
librados::WatchCtx2 *watch_ctx2,
Context *on_finish);
void execute_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
- void execute_notify(TestRadosClient *rados_client, const std::string &oid,
- const bufferlist &bl, bufferlist *pbl,
- Context *on_notify);
- void ack_notify(TestRadosClient *rados_client, const std::string &oid,
- uint64_t notify_id, const WatcherID &watcher_id,
- const bufferlist &bl);
- void finish_notify(TestRadosClient *rados_client, const std::string &oid,
- uint64_t notify_id);
+ void execute_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string &oid, const bufferlist &bl,
+ bufferlist *pbl, Context *on_notify);
+ void ack_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string &oid, uint64_t notify_id,
+ const WatcherID &watcher_id, const bufferlist &bl);
+ void finish_notify(TestRadosClient *rados_client, int64_t pool_id,
+ const std::string &oid, uint64_t notify_id);
};
} // namespace librados