return ctx->aio_remove(oid, c->pc);
}
+int IoCtx::aio_watch(const std::string& o, AioCompletion *c, uint64_t *handle,
+ librados::WatchCtx2 *watch_ctx) {
+ TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
+ return ctx->aio_watch(o, c->pc, handle, watch_ctx);
+}
+
+int IoCtx::aio_unwatch(uint64_t handle, AioCompletion *c) {
+ TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
+ return ctx->aio_unwatch(handle, c->pc);
+}
+
config_t IoCtx::cct() {
TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
return reinterpret_cast<config_t>(ctx->get_rados_client()->cct());
return new AioCompletion(c);
}
+int Rados::aio_watch_flush(AioCompletion* c) {
+ TestRadosClient *impl = reinterpret_cast<TestRadosClient*>(client);
+ return impl->aio_watch_flush(c->pc);
+}
+
int Rados::blacklist_add(const std::string& client_address,
uint32_t expire_seconds) {
TestRadosClient *impl = reinterpret_cast<TestRadosClient*>(client);
return 0;
}
+int TestIoCtxImpl::aio_watch(const std::string& o, AioCompletionImpl *c,
+ uint64_t *handle, librados::WatchCtx2 *watch_ctx) {
+ m_pending_ops.inc();
+ c->get();
+ C_AioNotify *ctx = new C_AioNotify(this, c);
+ m_client->get_watch_notify().aio_watch(o, get_instance_id(), handle,
+ watch_ctx, ctx);
+ return 0;
+}
+
+int TestIoCtxImpl::aio_unwatch(uint64_t handle, AioCompletionImpl *c) {
+ m_pending_ops.inc();
+ c->get();
+ C_AioNotify *ctx = new C_AioNotify(this, c);
+ m_client->get_watch_notify().aio_unwatch(handle, ctx);
+ return 0;
+}
+
int TestIoCtxImpl::exec(const std::string& oid, TestClassHandler *handler,
const char *cls, const char *method,
bufferlist& inbl, bufferlist* outbl,
AioCompletionImpl *c, int flags,
bufferlist *pbl);
virtual int aio_remove(const std::string& oid, AioCompletionImpl *c) = 0;
-
+ virtual int aio_watch(const std::string& o, AioCompletionImpl *c,
+ uint64_t *handle, librados::WatchCtx2 *ctx);
+ virtual int aio_unwatch(uint64_t handle, AioCompletionImpl *c);
virtual int append(const std::string& oid, const bufferlist &bl,
const SnapContext &snapc) = 0;
virtual int assert_exists(const std::string &oid) = 0;
}
}
+int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
+ c->get();
+ Context *ctx = new FunctionContext(boost::bind(
+ &TestRadosClient::finish_aio_completion, this, c, _1));
+ get_watch_notify().aio_flush(ctx);
+ return 0;
+}
+
void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
librados::finish_aio_completion(c, r);
}
virtual int64_t pool_lookup(const std::string &name) = 0;
virtual int pool_reverse_lookup(int64_t id, std::string *name) = 0;
+ virtual int aio_watch_flush(AioCompletionImpl *c);
virtual int watch_flush() = 0;
virtual int blacklist_add(const std::string& client_address,
return 0;
}
+void TestWatchNotify::aio_flush(Context *on_finish) {
+ m_finisher->queue(on_finish);
+}
+
+void TestWatchNotify::aio_watch(const std::string& o, uint64_t gid,
+ uint64_t *handle,
+ librados::WatchCtx2 *watch_ctx,
+ Context *on_finish) {
+ int r = watch(o, gid, handle, nullptr, watch_ctx);
+ m_finisher->queue(on_finish, r);
+}
+
+void TestWatchNotify::aio_unwatch(uint64_t handle, Context *on_finish) {
+ unwatch(handle);
+ m_finisher->queue(on_finish);
+}
+
void TestWatchNotify::aio_notify(const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
TestWatchNotify(CephContext *cct, Finisher *finisher);
~TestWatchNotify();
- void flush();
int list_watchers(const std::string& o,
std::list<obj_watch_t> *out_watchers);
+
+ void aio_flush(Context *on_finish);
+ void aio_watch(const std::string& o, uint64_t gid, uint64_t *handle,
+ librados::WatchCtx2 *watch_ctx, Context *on_finish);
+ void aio_unwatch(uint64_t handle, Context *on_finish);
void aio_notify(const std::string& oid, bufferlist& bl, uint64_t timeout_ms,
bufferlist *pbl, Context *on_notify);
+
+ void flush();
int notify(const std::string& o, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(const std::string& o, uint64_t notify_id,