}
}
- // TODO race condition between registering op and submitting to cache
- // (might not be flushed -- backport needed)
- C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp);
- m_image_ctx.flush_async_operations(flush_ctx);
-
- m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
- if (m_image_ctx.object_cacher != NULL) {
- m_image_ctx.flush_cache_aio(req_comp);
- } else {
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
- m_image_ctx.data_ctx.aio_flush_async(rados_completion);
- rados_completion->release();
- }
+ m_image_ctx.flush(req_comp);
+ m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_FLUSH);
m_aio_comp->finish_adding_requests(cct);
m_aio_comp->put();
#include "librbd/AsyncOperation.h"
#include "librbd/ImageCtx.h"
#include "common/dout.h"
+#include "common/WorkQueue.h"
#include "include/assert.h"
#define dout_subsys ceph_subsys_rbd
namespace librbd {
+namespace {
+
+struct C_CompleteFlushes : public Context {
+ ImageCtx *image_ctx;
+ std::list<Context *> flush_contexts;
+
+ C_CompleteFlushes(ImageCtx *image_ctx, std::list<Context *> &&flush_contexts)
+ : image_ctx(image_ctx), flush_contexts(std::move(flush_contexts)) {
+ }
+ virtual void finish(int r) {
+ RWLock::RLocker owner_locker(image_ctx->owner_lock);
+ while (!flush_contexts.empty()) {
+ Context *flush_ctx = flush_contexts.front();
+ flush_contexts.pop_front();
+
+ ldout(image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
+ flush_ctx->complete(0);
+ }
+ }
+};
+
+} // anonymous namespace
+
void AsyncOperation::start_op(ImageCtx &image_ctx) {
assert(m_image_ctx == NULL);
m_image_ctx = &image_ctx;
}
}
- while (!m_flush_contexts.empty()) {
- Context *flush_ctx = m_flush_contexts.front();
- m_flush_contexts.pop_front();
-
- ldout(m_image_ctx->cct, 20) << "completed flush: " << flush_ctx << dendl;
- flush_ctx->complete(0);
- }
+ C_CompleteFlushes *ctx = new C_CompleteFlushes(m_image_ctx,
+ std::move(m_flush_contexts));
+ m_image_ctx->op_work_queue->queue(ctx);
}
void AsyncOperation::add_flush_context(Context *on_finish) {
}
};
+struct C_FlushCache : public Context {
+ ImageCtx *image_ctx;
+ Context *on_safe;
+
+ C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
+ : image_ctx(_image_ctx), on_safe(_on_safe) {
+ }
+ virtual void finish(int r) {
+ // successful cache flush indicates all IO is now safe
+ assert(image_ctx->owner_lock.is_locked());
+ image_ctx->flush_cache(on_safe);
+ }
+};
+
+struct C_InvalidateCache : public Context {
+ ImageCtx *image_ctx;
+ bool purge_on_error;
+ bool reentrant_safe;
+ Context *on_finish;
+
+ C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
+ bool _reentrant_safe, Context *_on_finish)
+ : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
+ reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
+ }
+ virtual void finish(int r) {
+ assert(image_ctx->cache_lock.is_locked());
+ CephContext *cct = image_ctx->cct;
+
+ if (r == -EBLACKLISTED) {
+ lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
+ image_ctx->object_cacher->purge_set(image_ctx->object_set);
+ } else if (r != 0 && purge_on_error) {
+ lderr(cct) << "invalidate cache encountered error "
+ << cpp_strerror(r) << " !Purging cache..." << dendl;
+ image_ctx->object_cacher->purge_set(image_ctx->object_set);
+ } else if (r != 0) {
+ lderr(cct) << "flush_cache returned " << r << dendl;
+ }
+
+ loff_t unclean = image_ctx->object_cacher->release_set(
+ image_ctx->object_set);
+ if (unclean == 0) {
+ r = 0;
+ } else {
+ lderr(cct) << "could not release all objects from cache: "
+ << unclean << " bytes remain" << dendl;
+ r = -EBUSY;
+ }
+
+ if (reentrant_safe) {
+ on_finish->complete(r);
+ } else {
+ image_ctx->op_work_queue->queue(on_finish, r);
+ }
+ }
+
+};
+
} // anonymous namespace
const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
}
}
- void ImageCtx::flush_cache_aio(Context *onfinish) {
+ int ImageCtx::flush_cache() {
+ C_SaferCond cond_ctx;
+ flush_cache(&cond_ctx);
+
+ ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+ int r = cond_ctx.wait();
+ ldout(cct, 20) << "finished flushing cache" << dendl;
+
+ return r;
+ }
+
+ void ImageCtx::flush_cache(Context *onfinish) {
assert(owner_lock.is_locked());
cache_lock.Lock();
object_cacher->flush_set(object_set, onfinish);
cache_lock.Unlock();
}
- int ImageCtx::flush_cache() {
- int r = 0;
- Mutex mylock("librbd::ImageCtx::flush_cache");
- Cond cond;
- bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- flush_cache_aio(onfinish);
- mylock.Lock();
- while (!done) {
- ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
- cond.Wait(mylock);
- }
- mylock.Unlock();
- ldout(cct, 20) << "finished flushing cache" << dendl;
- return r;
- }
-
int ImageCtx::shutdown_cache() {
flush_async_operations();
}
int ImageCtx::invalidate_cache(bool purge_on_error) {
- int result;
- C_SaferCond ctx;
- invalidate_cache(&ctx);
- result = ctx.wait();
- ldout(cct, 20) << "finished invalidating cache" << dendl;
-
- if (result && purge_on_error) {
- cache_lock.Lock();
- if (object_cacher != NULL) {
- lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
- object_cacher->purge_set(object_set);
- }
- cache_lock.Unlock();
+ flush_async_operations();
+ if (object_cacher == NULL) {
+ return 0;
}
+ cache_lock.Lock();
+ object_cacher->release_set(object_set);
+ cache_lock.Unlock();
+
+ C_SaferCond ctx;
+ flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
+
+ int result = ctx.wait();
return result;
}
object_cacher->release_set(object_set);
cache_lock.Unlock();
- flush_cache_aio(new FunctionContext(boost::bind(
- &ImageCtx::invalidate_cache_completion, this, _1, on_finish)));
- }
-
- void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) {
- assert(cache_lock.is_locked());
- if (r == -EBLACKLISTED) {
- lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
- object_cacher->purge_set(object_set);
- } else if (r != 0) {
- lderr(cct) << "flush_cache returned " << r << dendl;
- }
-
- loff_t unclean = object_cacher->release_set(object_set);
- if (unclean == 0) {
- r = 0;
- } else {
- lderr(cct) << "could not release all objects from cache: "
- << unclean << " bytes remain" << dendl;
- r = -EBUSY;
- }
-
- op_work_queue->queue(on_finish, r);
+ flush_cache(new C_InvalidateCache(this, false, false, on_finish));
}
void ImageCtx::clear_nonexistence_cache() {
}
void ImageCtx::flush_async_operations(Context *on_finish) {
- Mutex::Locker l(async_ops_lock);
- if (async_ops.empty()) {
- on_finish->complete(0);
- return;
+ {
+ Mutex::Locker l(async_ops_lock);
+ if (!async_ops.empty()) {
+ ldout(cct, 20) << "flush async operations: " << on_finish << " "
+ << "count=" << async_ops.size() << dendl;
+ async_ops.front()->add_flush_context(on_finish);
+ return;
+ }
}
+ on_finish->complete(0);
+ }
- ldout(cct, 20) << "flush async operations: " << on_finish << " "
- << "count=" << async_ops.size() << dendl;
- async_ops.front()->add_flush_context(on_finish);
+ int ImageCtx::flush() {
+ C_SaferCond cond_ctx;
+ flush(&cond_ctx);
+ return cond_ctx.wait();
+ }
+
+ void ImageCtx::flush(Context *on_safe) {
+ assert(owner_lock.is_locked());
+ if (object_cacher != NULL) {
+ // flush cache after completing all in-flight AIO ops
+ on_safe = new C_FlushCache(this, on_safe);
+ }
+ flush_async_operations(on_safe);
}
void ImageCtx::cancel_async_requests() {
uint64_t off, Context *onfinish, int fadvise_flags,
uint64_t journal_tid);
void user_flushed();
- void flush_cache_aio(Context *onfinish);
int flush_cache();
+ void flush_cache(Context *onfinish);
int shutdown_cache();
int invalidate_cache(bool purge_on_error=false);
void invalidate_cache(Context *on_finish);
- void invalidate_cache_completion(int r, Context *on_finish);
void clear_nonexistence_cache();
int register_watch();
void unregister_watch();
void flush_async_operations();
void flush_async_operations(Context *on_finish);
+ int flush();
+ void flush(Context *on_safe);
+
void cancel_async_requests();
void apply_metadata_confs();
notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);
RWLock::WLocker md_locker(m_image_ctx.md_lock);
- r = librbd::_flush(&m_image_ctx);
+ r = m_image_ctx.flush();
if (r < 0) {
lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
goto err_cancel_unlock;
}
RWLock::WLocker md_locker(ictx->md_lock);
- r = _flush(ictx);
+ r = ictx->flush();
if (r < 0) {
return r;
}
RWLock::RLocker owner_locker(ictx->owner_lock);
RWLock::WLocker md_locker(ictx->md_lock);
- r = _flush(ictx);
+ r = ictx->flush();
if (r < 0) {
return r;
}
}
if (new_snap) {
- _flush(ictx);
+ ictx->flush();
}
return 0;
}
// writes might create new snapshots. Rolling back will replace
// the current version, so we have to invalidate that too.
RWLock::WLocker md_locker(ictx->md_lock);
- ictx->flush_async_operations();
r = ictx->invalidate_cache();
if (r < 0) {
return r;
}
ictx->cancel_async_requests();
- ictx->flush_async_operations();
-
- if (ictx->object_cacher) {
+ {
RWLock::RLocker owner_locker(ictx->owner_lock);
- r = _flush(ictx);
- if (r < 0) {
- return r;
- }
+ r = ictx->flush();
}
{
ictx->flush_async_operations();
ictx->readahead.wait_for_pending();
- int flush_r;
if (ictx->object_cacher) {
- flush_r = ictx->shutdown_cache(); // implicitly flushes
- } else {
- RWLock::RLocker owner_locker(ictx->owner_lock);
- flush_r = _flush(ictx);
- }
- if (flush_r< 0) {
- lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
- << dendl;
- if (r == 0) {
- r = flush_r;
+ int flush_r = ictx->shutdown_cache(); // implicitly flushes
+ if (flush_r < 0) {
+ lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
+ << dendl;
+ if (r == 0) {
+ r = flush_r;
+ }
}
}
// ensure previous writes are visible to listsnaps
{
RWLock::RLocker owner_locker(ictx->owner_lock);
- _flush(ictx);
+ ictx->flush();
}
int r = ictx_check(ictx);
ictx->user_flushed();
{
RWLock::RLocker owner_locker(ictx->owner_lock);
- r = _flush(ictx);
+ r = ictx->flush();
}
ictx->perfcounter->inc(l_librbd_flush);
return r;
}
- int _flush(ImageCtx *ictx)
- {
- assert(ictx->owner_lock.is_locked());
- CephContext *cct = ictx->cct;
- int r;
- // flush any outstanding writes
- if (ictx->object_cacher) {
- r = ictx->flush_cache();
- } else {
- r = ictx->data_ctx.aio_flush();
- ictx->flush_async_operations();
- }
-
- if (r)
- lderr(cct) << "_flush " << ictx << " r = " << r << dendl;
-
- return r;
- }
-
int invalidate_cache(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
return r;
}
- ictx->flush_async_operations();
-
RWLock::RLocker owner_locker(ictx->owner_lock);
RWLock::WLocker md_locker(ictx->md_lock);
r = ictx->invalidate_cache();
ProgressContext &prog_ctx);
int flush(ImageCtx *ictx);
- int _flush(ImageCtx *ictx);
int invalidate_cache(ImageCtx *ictx);
int metadata_list(ImageCtx *ictx, const string &last, uint64_t max, map<string, bufferlist> *pairs);
int metadata_get(ImageCtx *ictx, const std::string &key, std::string *value);