int flags) {
// TODO ignoring snap_context and flags for now
ops.get();
- m_client->add_aio_operation(oid, boost::bind(
+ m_client->add_aio_operation(oid, true, boost::bind(
&TestIoCtxImpl::execute_aio_operations, this, oid, &ops,
reinterpret_cast<bufferlist*>(NULL)), c);
return 0;
bufferlist *pbl) {
// TODO ignoring flags for now
ops.get();
- m_client->add_aio_operation(oid, boost::bind(
+ m_client->add_aio_operation(oid, true, boost::bind(
&TestIoCtxImpl::execute_aio_operations, this, oid, &ops, pbl), c);
return 0;
}
int TestIoCtxImpl::operate(const std::string& oid, TestObjectOperationImpl &ops) {
AioCompletionImpl *comp = new AioCompletionImpl();
- int ret = aio_operate(oid, ops, comp, NULL, 0);
- if (ret == 0) {
- comp->wait_for_safe();
- ret = comp->get_return_value();
- }
+
+ ops.get();
+ m_client->add_aio_operation(oid, false, boost::bind(
+ &TestIoCtxImpl::execute_aio_operations, this, oid, &ops,
+ reinterpret_cast<bufferlist*>(NULL)), comp);
+
+ comp->wait_for_safe();
+ int ret = comp->get_return_value();
comp->put();
return ret;
}
int TestIoCtxImpl::operate_read(const std::string& oid, TestObjectOperationImpl &ops,
bufferlist *pbl) {
AioCompletionImpl *comp = new AioCompletionImpl();
- int ret = aio_operate_read(oid, ops, comp, 0, pbl);
- if (ret == 0) {
- comp->wait_for_complete();
- ret = comp->get_return_value();
- }
+
+ ops.get();
+ m_client->add_aio_operation(oid, false, boost::bind(
+ &TestIoCtxImpl::execute_aio_operations, this, oid, &ops, pbl), comp);
+
+ comp->wait_for_complete();
+ int ret = comp->get_return_value();
comp->put();
return ret;
}
}
int TestMemIoCtxImpl::aio_remove(const std::string& oid, AioCompletionImpl *c) {
- m_client->add_aio_operation(oid, boost::bind(&TestMemIoCtxImpl::remove, this, oid),
+ m_client->add_aio_operation(oid, true,
+ boost::bind(&TestMemIoCtxImpl::remove, this, oid),
c);
return 0;
}
class AioFunctionContext : public Context {
public:
AioFunctionContext(const TestRadosClient::AioFunction &callback,
- AioCompletionImpl *c)
- : m_callback(callback), m_comp(c)
+ Finisher *finisher, AioCompletionImpl *c)
+ : m_callback(callback), m_finisher(finisher), m_comp(c)
{
if (m_comp != NULL) {
m_comp->get();
virtual void finish(int r) {
int ret = m_callback();
if (m_comp != NULL) {
- finish_aio_completion(m_comp, ret);
+ if (m_finisher != NULL) {
+ m_finisher->queue(new FunctionContext(boost::bind(
+ &finish_aio_completion, m_comp, ret)));
+ } else {
+ finish_aio_completion(m_comp, ret);
+ }
}
}
private:
TestRadosClient::AioFunction m_callback;
+ Finisher *m_finisher;
AioCompletionImpl *m_comp;
};
{
get();
+ // simulate multiple OSDs
int concurrency = get_concurrency();
for (int i = 0; i < concurrency; ++i) {
m_finishers.push_back(new Finisher(m_cct));
m_finishers.back()->start();
}
+
+ // replicate AIO callback processing
+ m_aio_finisher = new Finisher(m_cct);
+ m_aio_finisher->start();
}
TestRadosClient::~TestRadosClient() {
m_finishers[i]->stop();
delete m_finishers[i];
}
+ m_aio_finisher->stop();
+ delete m_aio_finisher;
m_cct->put();
m_cct = NULL;
}
void TestRadosClient::add_aio_operation(const std::string& oid,
+ bool queue_callback,
const AioFunction &aio_function,
AioCompletionImpl *c) {
- AioFunctionContext *ctx = new AioFunctionContext(aio_function, c);
+ AioFunctionContext *ctx = new AioFunctionContext(
+ aio_function, queue_callback ? m_aio_finisher : NULL, c);
get_finisher(oid)->queue(ctx);
}
for (size_t i = 0; i < m_finishers.size(); ++i) {
AioFunctionContext *ctx = new AioFunctionContext(
boost::bind(&WaitForFlush::flushed, wait_for_flush),
- NULL);
+ m_aio_finisher, NULL);
m_finishers[i]->queue(ctx);
}
}
return m_watch_notify;
}
- void add_aio_operation(const std::string& oid,
+ void add_aio_operation(const std::string& oid, bool queue_callback,
const AioFunction &aio_function, AioCompletionImpl *c);
void flush_aio_operations();
void flush_aio_operations(AioCompletionImpl *c);
Finisher *get_finisher(const std::string& oid);
+ Finisher *m_aio_finisher;
std::vector<Finisher *> m_finishers;
boost::hash<std::string> m_hash;