}
};
+struct C_FlushCache : public Context {
+ ImageCtx *image_ctx;
+ Context *on_safe;
+
+ C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
+ : image_ctx(_image_ctx), on_safe(_on_safe) {
+ }
+ virtual void finish(int r) {
+ // successful cache flush indicates all IO is now safe
+ RWLock::RLocker owner_locker(image_ctx->owner_lock);
+ image_ctx->flush_cache(on_safe);
+ }
+};
+
+struct C_InvalidateCache : public Context {
+ ImageCtx *image_ctx;
+ bool purge_on_error;
+ bool reentrant_safe;
+ Context *on_finish;
+
+ C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
+ bool _reentrant_safe, Context *_on_finish)
+ : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
+ reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
+ }
+ virtual void finish(int r) {
+ assert(image_ctx->cache_lock.is_locked());
+ CephContext *cct = image_ctx->cct;
+
+ if (r == -EBLACKLISTED) {
+ lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
+ image_ctx->object_cacher->purge_set(image_ctx->object_set);
+ } else if (r != 0 && purge_on_error) {
+ lderr(cct) << "invalidate cache encountered error "
+ << cpp_strerror(r) << " !Purging cache..." << dendl;
+ image_ctx->object_cacher->purge_set(image_ctx->object_set);
+ } else if (r != 0) {
+ lderr(cct) << "flush_cache returned " << r << dendl;
+ }
+
+ loff_t unclean = image_ctx->object_cacher->release_set(
+ image_ctx->object_set);
+ if (unclean == 0) {
+ r = 0;
+ } else {
+ lderr(cct) << "could not release all objects from cache: "
+ << unclean << " bytes remain" << dendl;
+ r = -EBUSY;
+ }
+
+ if (reentrant_safe) {
+ on_finish->complete(r);
+ } else {
+ image_ctx->op_work_queue->queue(on_finish, r);
+ }
+ }
+
+};
+
+struct C_AsyncCallback : public Context {
+ ImageCtx *image_ctx;
+ Context *on_finish;
+ C_AsyncCallback(ImageCtx *image_ctx, Context *on_finish)
+ : image_ctx(image_ctx), on_finish(on_finish) {
+ }
+ virtual void finish(int r) {
+ image_ctx->op_work_queue->queue(on_finish, r);
+ }
+};
+
+void _flush_async_operations(ImageCtx *ictx, Context *on_finish) {
+ {
+ Mutex::Locker async_ops_locker(ictx->async_ops_lock);
+ if (!ictx->async_ops.empty()) {
+ ldout(ictx->cct, 20) << "flush async operations: " << on_finish << " "
+ << "count=" << ictx->async_ops.size() << dendl;
+ ictx->async_ops.front()->add_flush_context(on_finish);
+ return;
+ }
+ }
+ on_finish->complete(0);
+}
+
} // anonymous namespace
ImageCtx::ImageCtx(const string &image_name, const string &image_id,
}
}
- void ImageCtx::flush_cache_aio(Context *onfinish) {
+ int ImageCtx::flush_cache() {
+ C_SaferCond cond_ctx;
+ flush_cache(&cond_ctx);
+
+ ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
+ int r = cond_ctx.wait();
+ ldout(cct, 20) << "finished flushing cache" << dendl;
+
+ return r;
+ }
+
+ void ImageCtx::flush_cache(Context *onfinish) {
assert(owner_lock.is_locked());
cache_lock.Lock();
object_cacher->flush_set(object_set, onfinish);
cache_lock.Unlock();
}
- int ImageCtx::flush_cache() {
- int r = 0;
- Mutex mylock("librbd::ImageCtx::flush_cache");
- Cond cond;
- bool done;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- flush_cache_aio(onfinish);
- mylock.Lock();
- while (!done) {
- ldout(cct, 20) << "waiting for cache to be flushed" << dendl;
- cond.Wait(mylock);
- }
- mylock.Unlock();
- ldout(cct, 20) << "finished flushing cache" << dendl;
- return r;
- }
-
- void ImageCtx::shutdown_cache() {
+ int ImageCtx::shutdown_cache() {
flush_async_operations();
RWLock::RLocker owner_locker(owner_lock);
- invalidate_cache(true);
+ int r = invalidate_cache(true);
object_cacher->stop();
+ return r;
}
int ImageCtx::invalidate_cache(bool purge_on_error) {
- int result;
- C_SaferCond ctx;
- invalidate_cache(&ctx);
- result = ctx.wait();
-
- if (result && purge_on_error) {
- cache_lock.Lock();
- if (object_cacher != NULL) {
- lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
- object_cacher->purge_set(object_set);
- }
- cache_lock.Unlock();
+ flush_async_operations();
+ if (object_cacher == NULL) {
+ return 0;
}
+ cache_lock.Lock();
+ object_cacher->release_set(object_set);
+ cache_lock.Unlock();
+
+ C_SaferCond ctx;
+ flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
+
+ int result = ctx.wait();
return result;
}
object_cacher->release_set(object_set);
cache_lock.Unlock();
- flush_cache_aio(new FunctionContext(boost::bind(
- &ImageCtx::invalidate_cache_completion, this, _1, on_finish)));
- }
-
- void ImageCtx::invalidate_cache_completion(int r, Context *on_finish) {
- assert(cache_lock.is_locked());
- if (r == -EBLACKLISTED) {
- lderr(cct) << "Blacklisted during flush! Purging cache..." << dendl;
- object_cacher->purge_set(object_set);
- } else if (r != 0) {
- lderr(cct) << "flush_cache returned " << r << dendl;
- }
-
- loff_t unclean = object_cacher->release_set(object_set);
- if (unclean == 0) {
- r = 0;
- } else {
- lderr(cct) << "could not release all objects from cache: "
- << unclean << " bytes remain" << dendl;
- r = -EBUSY;
- }
-
- op_work_queue->queue(on_finish, r);
+ flush_cache(new C_InvalidateCache(this, false, false, on_finish));
}
void ImageCtx::clear_nonexistence_cache() {
void ImageCtx::flush_async_operations() {
C_SaferCond ctx;
- flush_async_operations(&ctx);
+ _flush_async_operations(this, &ctx);
ctx.wait();
}
void ImageCtx::flush_async_operations(Context *on_finish) {
- Mutex::Locker l(async_ops_lock);
- if (async_ops.empty()) {
- on_finish->complete(0);
- return;
+ // complete context in clean thread context
+ _flush_async_operations(this, new C_AsyncCallback(this, on_finish));
+ }
+
+ int ImageCtx::flush() {
+ assert(owner_lock.is_locked());
+
+ flush_async_operations();
+ if (object_cacher != NULL) {
+ int r = flush_cache();
+ if (r < 0) {
+ return r;
+ }
}
+ return 0;
+ }
- ldout(cct, 20) << "flush async operations: " << on_finish << " "
- << "count=" << async_ops.size() << dendl;
- async_ops.front()->add_flush_context(on_finish);
+ void ImageCtx::flush(Context *on_safe) {
+ assert(owner_lock.is_locked());
+ if (object_cacher != NULL) {
+ // flush cache after completing all in-flight AIO ops
+ on_safe = new C_FlushCache(this, on_safe);
+ }
+ flush_async_operations(on_safe);
}
void ImageCtx::cancel_async_requests() {
}
RWLock::WLocker md_locker(ictx->md_lock);
- r = _flush(ictx);
+ r = ictx->flush();
if (r < 0) {
return r;
}
} // release snap_lock and cache_lock
if (new_snap) {
- _flush(ictx);
+ ictx->flush();
}
ictx->refresh_lock.Lock();
// writes might create new snapshots. Rolling back will replace
// the current version, so we have to invalidate that too.
RWLock::WLocker md_locker(ictx->md_lock);
- ictx->flush_async_operations();
r = ictx->invalidate_cache();
if (r < 0) {
return r;
// get -EROFS for writes
RWLock::RLocker owner_locker(ictx->owner_lock);
RWLock::WLocker md_locker(ictx->md_lock);
- ictx->flush_cache();
+ ictx->flush();
}
int r = _snap_set(ictx, snap_name);
if (r < 0) {
// ensure previous writes are visible to listsnaps
{
RWLock::RLocker owner_locker(ictx->owner_lock);
- _flush(ictx);
+ ictx->flush();
}
int r = ictx_check(ictx);
C_AioWrite *flush_ctx = new C_AioWrite(cct, c);
c->add_request();
- ictx->flush_async_operations(flush_ctx);
+ ictx->flush(flush_ctx);
c->init_time(ictx, AIO_TYPE_FLUSH);
- C_AioWrite *req_comp = new C_AioWrite(cct, c);
- c->add_request();
- if (ictx->object_cacher) {
- ictx->flush_cache_aio(req_comp);
- } else {
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(req_comp, NULL, rados_ctx_cb);
- ictx->data_ctx.aio_flush_async(rados_completion);
- rados_completion->release();
- }
c->finish_adding_requests(cct);
c->put();
ictx->perfcounter->inc(l_librbd_aio_flush);
ictx->user_flushed();
{
RWLock::RLocker owner_locker(ictx->owner_lock);
- r = _flush(ictx);
+ r = ictx->flush();
}
ictx->perfcounter->inc(l_librbd_flush);
return r;
}
- int _flush(ImageCtx *ictx)
- {
- assert(ictx->owner_lock.is_locked());
- CephContext *cct = ictx->cct;
- int r;
- // flush any outstanding writes
- if (ictx->object_cacher) {
- r = ictx->flush_cache();
- } else {
- r = ictx->data_ctx.aio_flush();
- ictx->flush_async_operations();
- }
-
- if (r)
- lderr(cct) << "_flush " << ictx << " r = " << r << dendl;
-
- return r;
- }
-
int invalidate_cache(ImageCtx *ictx)
{
CephContext *cct = ictx->cct;
return r;
}
- ictx->flush_async_operations();
-
RWLock::RLocker owner_locker(ictx->owner_lock);
RWLock::WLocker md_locker(ictx->md_lock);
r = ictx->invalidate_cache();