return 0;
}
+int IoCtx::aio_notify(const std::string& oid, AioCompletion *c, bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl) {
+ TestIoCtxImpl *ctx = reinterpret_cast<TestIoCtxImpl*>(io_ctx_impl);
+ ctx->aio_notify(oid, c->pc, bl, timeout_ms, pbl);
+ return 0;
+}
+
int IoCtx::aio_operate(const std::string& oid, AioCompletion *c,
ObjectReadOperation *op, bufferlist *pbl) {
return aio_operate(oid, c, op, 0, pbl);
m_client->flush_aio_operations(c);
}
+void TestIoCtxImpl::aio_notify(const std::string& oid, AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *pbl) {
+ m_pending_ops.inc();
+ c->get();
+ C_AioNotify *ctx = new C_AioNotify(this, c);
+ m_client->get_watch_notify().aio_notify(oid, bl, timeout_ms, pbl, ctx);
+}
+
int TestIoCtxImpl::aio_operate(const std::string& oid, TestObjectOperationImpl &ops,
AioCompletionImpl *c, SnapContext *snap_context,
int flags) {
return ret;
}
+void TestIoCtxImpl::handle_aio_notify_complete(AioCompletionImpl *c, int r) {
+ m_pending_ops.dec();
+
+ m_client->finish_aio_completion(c, r);
+}
+
} // namespace librados
#include "include/rados/librados.hpp"
#include "include/atomic.h"
+#include "include/Context.h"
#include "common/snap_types.h"
#include <boost/function.hpp>
#include <list>
virtual int aio_flush();
virtual void aio_flush_async(AioCompletionImpl *c);
+ virtual void aio_notify(const std::string& oid, AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl);
virtual int aio_operate(const std::string& oid, TestObjectOperationImpl &ops,
AioCompletionImpl *c, SnapContext *snap_context,
int flags);
bufferlist *pbl, const SnapContext &snapc);
private:
+ struct C_AioNotify : public Context {
+ TestIoCtxImpl *io_ctx;
+ AioCompletionImpl *aio_comp;
+ C_AioNotify(TestIoCtxImpl *_io_ctx, AioCompletionImpl *_aio_comp)
+ : io_ctx(_io_ctx), aio_comp(_aio_comp) {
+ }
+ virtual void finish(int r) {
+ io_ctx->handle_aio_notify_complete(aio_comp, r);
+ }
+ };
TestRadosClient *m_client;
int64_t m_pool_id;
SnapContext m_snapc;
atomic_t m_refcount;
+ void handle_aio_notify_complete(AioCompletionImpl *aio_comp, int r);
};
} // namespace librados
}
}
+void TestRadosClient::finish_aio_completion(AioCompletionImpl *c, int r) {
+ librados::finish_aio_completion(c, r);
+}
+
Finisher *TestRadosClient::get_finisher(const std::string &oid) {
std::size_t h = m_hash(oid);
return m_finishers[h % m_finishers.size()];
void flush_aio_operations();
void flush_aio_operations(AioCompletionImpl *c);
+ void finish_aio_completion(AioCompletionImpl *c, int r);
+
protected:
virtual ~TestRadosClient();
return 0;
}
+void TestWatchNotify::aio_notify(const std::string& oid, bufferlist& bl,
+ uint64_t timeout_ms, bufferlist *pbl,
+ Context *on_notify) {
+ SharedWatcher watcher = get_watcher(oid);
+ RWLock::WLocker watcher_locker(watcher->lock);
+ Mutex::Locker file_watcher_lock(m_file_watcher_lock);
+ ++m_pending_notifies;
+ uint64_t notify_id = ++m_notify_id;
+
+ SharedNotifyHandle notify_handle(new NotifyHandle());
+ notify_handle->pbl = pbl;
+
+ watcher->notify_handles[notify_id] = notify_handle;
+
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&TestWatchNotify::execute_notify, this,
+ oid, bl, notify_id, on_notify));
+ m_finisher->queue(ctx);
+}
+
int TestWatchNotify::notify(const std::string& oid, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl) {
- Mutex lock("TestRadosClient::watcher_notify::lock");
- Cond cond;
- bool done = false;
-
- {
- SharedWatcher watcher = get_watcher(oid);
- RWLock::WLocker l(watcher->lock);
- {
- Mutex::Locker l2(m_file_watcher_lock);
- ++m_pending_notifies;
- uint64_t notify_id = ++m_notify_id;
-
- SharedNotifyHandle notify_handle(new NotifyHandle());
- notify_handle->pbl = pbl;
-
- watcher->notify_handles[notify_id] = notify_handle;
-
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&TestWatchNotify::execute_notify, this,
- oid, bl, notify_id, &lock, &cond, &done));
- m_finisher->queue(ctx);
- }
- }
-
- lock.Lock();
- while (!done) {
- cond.Wait(lock);
- }
- lock.Unlock();
- return 0;
+ C_SaferCond cond;
+ aio_notify(oid, bl, timeout_ms, pbl, &cond);
+ return cond.wait();
}
void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
void TestWatchNotify::execute_notify(const std::string &oid,
bufferlist &bl, uint64_t notify_id,
- Mutex *lock, Cond *cond,
- bool *done) {
+ Context *on_notify) {
WatchHandles watch_handles;
SharedNotifyHandle notify_handle;
}
}
- Mutex::Locker l3(*lock);
- *done = true;
- cond->Signal();
+ on_notify->complete(0);
- {
- Mutex::Locker file_watcher_locker(m_file_watcher_lock);
- if (--m_pending_notifies == 0) {
- m_file_watcher_cond.Signal();
- }
+ Mutex::Locker file_watcher_locker(m_file_watcher_lock);
+ if (--m_pending_notifies == 0) {
+ m_file_watcher_cond.Signal();
}
}
void flush();
int list_watchers(const std::string& o,
std::list<obj_watch_t> *out_watchers);
+ void aio_notify(const std::string& oid, bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *pbl, Context *on_notify);
int notify(const std::string& o, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(const std::string& o, uint64_t notify_id,
SharedWatcher get_watcher(const std::string& oid);
SharedWatcher _get_watcher(const std::string& oid);
void execute_notify(const std::string &oid, bufferlist &bl,
- uint64_t notify_id, Mutex *lock, Cond *cond, bool *done);
+ uint64_t notify_id, Context *on_notify);
};