}
void ImageWatcher::flush_aio_operations() {
+ C_SaferCond *ctx = new C_SaferCond();
+ flush_aio_operations(ctx);
+ ctx->wait();
+}
+
+void ImageWatcher::flush_aio_operations(Context *ctx) {
Mutex::Locker l(m_aio_request_lock);
- while (m_retrying_aio_requests || !m_aio_requests.empty()) {
- ldout(m_image_ctx.cct, 20) << "flushing aio operations: "
- << "retrying=" << m_retrying_aio_requests << ","
- << "count=" << m_aio_requests.size() << dendl;
- m_aio_request_cond.Wait(m_aio_request_lock);
+ if (!m_retrying_aio_requests && m_aio_requests.empty()) {
+ ctx->complete(0);
+ return;
}
+
+ ldout(m_image_ctx.cct, 20) << "pending flush: " << ctx << " "
+ << "retrying=" << m_retrying_aio_requests << ", "
+ << "count=" << m_aio_requests.size() << dendl;
+ m_aio_flush_contexts.push_back(ctx);
}
int ImageWatcher::try_lock() {
Mutex::Locker l(m_aio_request_lock);
m_retrying_aio_requests = false;
- m_aio_request_cond.Signal();
-}
+ while (!m_aio_flush_contexts.empty()) {
+ Context *flush_ctx = m_aio_flush_contexts.front();
+ m_aio_flush_contexts.pop_front();
-void ImageWatcher::cancel_aio_requests(int result) {
- Mutex::Locker l(m_aio_request_lock);
- for (std::vector<AioRequest>::iterator iter = m_aio_requests.begin();
- iter != m_aio_requests.end(); ++iter) {
- AioCompletion *c = iter->second;
- c->get();
- c->lock.Lock();
- c->rval = result;
- c->lock.Unlock();
- c->finish_adding_requests(m_image_ctx.cct);
- c->put();
+ ldout(m_image_ctx.cct, 20) << "completed flush: " << flush_ctx << dendl;
+ flush_ctx->complete(0);
}
- m_aio_requests.clear();
- m_aio_request_cond.Signal();
}
void ImageWatcher::cancel_async_requests(int result) {
#ifndef CEPH_LIBRBD_IMAGE_WATCHER_H
#define CEPH_LIBRBD_IMAGE_WATCHER_H
-#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RWLock.h"
+#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "include/rbd/librbd.hpp"
#include <set>
#include "include/assert.h"
class entity_name_t;
-class Context;
class Finisher;
class SafeTimer;
bool has_pending_aio_operations();
void flush_aio_operations();
+ void flush_aio_operations(Context *ctx);
int try_lock();
int request_lock(const boost::function<int(AioCompletion*)>& restart_op,
std::set<RemoteAsyncRequest> m_async_progress;
Mutex m_aio_request_lock;
- Cond m_aio_request_cond;
+ std::list<Context *> m_aio_flush_contexts;
std::vector<AioRequest> m_aio_requests;
bool m_retrying_aio_requests;
Context *m_retry_aio_context;
void finalize_retry_aio_requests();
void retry_aio_requests();
- void cancel_aio_requests(int result);
void cancel_async_requests(int result);
uint64_t encode_async_request(bufferlist &bl);
ictx->user_flushed();
c->get();
- c->add_request();
c->init_time(ictx, AIO_TYPE_FLUSH);
+
+ if (ictx->image_watcher != NULL) {
+ C_AioWrite *flush_ctx = new C_AioWrite(cct, c);
+ c->add_request();
+ ictx->image_watcher->flush_aio_operations(flush_ctx);
+ }
+
C_AioWrite *req_comp = new C_AioWrite(cct, c);
+ c->add_request();
if (ictx->object_cacher) {
ictx->flush_cache_aio(req_comp);
} else {