this, "librbd::io_work_queue",
cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
thread_pool);
- io_object_dispatcher = new io::ObjectDispatcher<>(this);
+ io_object_dispatcher = new io::ObjectDispatcher<ImageCtx>(this);
if (cct->_conf.get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
exclusive_lock_policy = new exclusive_lock::AutomaticPolicy(this);
class AsyncOperation;
template <typename> class CopyupRequest;
template <typename> class ImageRequestWQ;
- template <typename> class ObjectDispatcher;
+ struct ObjectDispatcherInterface;
}
namespace journal { struct Policy; }
xlist<operation::ResizeRequest<ImageCtx>*> resize_reqs;
io::ImageRequestWQ<ImageCtx> *io_work_queue;
- io::ObjectDispatcher<ImageCtx> *io_object_dispatcher = nullptr;
+ io::ObjectDispatcherInterface *io_object_dispatcher = nullptr;
ContextWQ *op_work_queue;
#include "librbd/ImageCtx.h"
#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/journal/CreateRequest.h"
#include "librbd/journal/DemoteRequest.h"
#include "librbd/journal/ObjectDispatch.h"
on_finish = create_async_context_callback(m_image_ctx, on_finish);
// inject our handler into the object dispatcher chain
- m_image_ctx.io_object_dispatcher->register_object_dispatch(
+ m_image_ctx.io_object_dispatcher->register_dispatch(
journal::ObjectDispatch<I>::create(&m_image_ctx, this));
std::lock_guard locker{m_lock};
auto ctx = new LambdaContext([on_finish, r](int _) {
on_finish->complete(r);
});
- m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
+ m_image_ctx.io_object_dispatcher->shut_down_dispatch(
io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
});
on_finish = create_async_context_callback(m_image_ctx, on_finish);
#include "librbd/Utils.h"
#include "librbd/cache/ObjectCacherWriteback.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/Types.h"
#include "librbd/io/Utils.h"
#include "osd/osd_types.h"
if (m_max_dirty > 0) {
m_image_ctx->disable_zero_copy = true;
}
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
}
template <typename I>
bool writethrough_until_flush);
~ObjectCacherObjectDispatch() override;
- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+ io::ObjectDispatchLayer get_dispatch_layer() const override {
return io::OBJECT_DISPATCH_LAYER_CACHE;
}
#include "librbd/Utils.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/ReadResult.h"
#include "include/ceph_assert.h"
#include "librbd/Journal.h"
#include "librbd/Utils.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/Utils.h"
#include "librbd/cache/ParentCacheObjectDispatch.h"
#include "osd/osd_types.h"
m_connecting.store(true);
create_cache_session(create_session_ctx, false);
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
m_initialized = true;
}
ParentCacheObjectDispatch(ImageCtxT* image_ctx);
~ParentCacheObjectDispatch() override;
- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+ io::ObjectDispatchLayer get_dispatch_layer() const override {
return io::OBJECT_DISPATCH_LAYER_PARENT_CACHE;
}
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
if (m_init_max_dirty > 0) {
m_image_ctx->disable_zero_copy = true;
}
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
}
template <typename I>
bool writethrough_until_flush);
~WriteAroundObjectDispatch() override;
- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+ io::ObjectDispatchLayer get_dispatch_layer() const override {
return io::OBJECT_DISPATCH_LAYER_CACHE;
}
#include "librbd/ObjectMap.h"
#include "librbd/Utils.h"
#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatchSpec.h"
#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#include "librbd/Utils.h"
#include "librbd/image/CloseRequest.h"
#include "librbd/image/OpenRequest.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageRequest.h"
#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/ObjectRequest.h"
#include "librbd/io/ReadResult.h"
#include "librbd/journal/Types.h"
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_DISPATCHER_H
+#define CEPH_LIBRBD_IO_DISPATCHER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "common/ceph_mutex.h"
+#include "common/dout.h"
+#include "common/AsyncOpTracker.h"
+#include "librbd/Utils.h"
+#include "librbd/io/Types.h"
+#include <map>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::Dispatcher: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+template <typename ImageCtxT, typename DispatchInterfaceT>
+class Dispatcher : public DispatchInterfaceT {
+public:
+ typedef typename DispatchInterfaceT::Dispatch Dispatch;
+ typedef typename DispatchInterfaceT::DispatchLayer DispatchLayer;
+ typedef typename DispatchInterfaceT::DispatchSpec DispatchSpec;
+
+ Dispatcher(ImageCtxT* image_ctx)
+ : m_image_ctx(image_ctx),
+ m_lock(ceph::make_shared_mutex(
+ librbd::util::unique_lock_name("librbd::io::Dispatcher::lock",
+ this))) {
+ }
+
+ virtual ~Dispatcher() {
+ ceph_assert(m_dispatches.empty());
+ }
+
+ void shut_down(Context* on_finish) override {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ std::map<DispatchLayer, DispatchMeta> dispatches;
+ {
+ std::unique_lock locker{m_lock};
+ std::swap(dispatches, m_dispatches);
+ }
+
+ for (auto it : dispatches) {
+ shut_down_dispatch(it.second, &on_finish);
+ }
+ on_finish->complete(0);
+ }
+
+ void register_dispatch(Dispatch* dispatch) override {
+ auto cct = m_image_ctx->cct;
+ auto type = dispatch->get_dispatch_layer();
+ ldout(cct, 5) << "dispatch_layer=" << type << dendl;
+
+ std::unique_lock locker{m_lock};
+
+ auto result = m_dispatches.insert(
+ {type, {dispatch, new AsyncOpTracker()}});
+ ceph_assert(result.second);
+ }
+
+ void shut_down_dispatch(DispatchLayer dispatch_layer,
+ Context* on_finish) override {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << "dispatch_layer=" << dispatch_layer << dendl;
+
+ DispatchMeta dispatch_meta;
+ {
+ std::unique_lock locker{m_lock};
+ auto it = m_dispatches.find(dispatch_layer);
+ ceph_assert(it != m_dispatches.end());
+
+ dispatch_meta = it->second;
+ m_dispatches.erase(it);
+ }
+
+ shut_down_dispatch(dispatch_meta, &on_finish);
+ on_finish->complete(0);
+ }
+
+ void send(DispatchSpec* dispatch_spec) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "dispatch_spec=" << dispatch_spec << dendl;
+
+ auto dispatch_layer = dispatch_spec->dispatch_layer;
+
+ // apply the IO request to all layers -- this method will be re-invoked
+ // by the dispatch layer if continuing / restarting the IO
+ while (true) {
+ m_lock.lock_shared();
+ dispatch_layer = dispatch_spec->dispatch_layer;
+ auto it = m_dispatches.upper_bound(dispatch_layer);
+ if (it == m_dispatches.end()) {
+ // the request is complete if handled by all layers
+ dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE;
+ m_lock.unlock_shared();
+ break;
+ }
+
+ auto& dispatch_meta = it->second;
+ auto dispatch = dispatch_meta.dispatch;
+ auto async_op_tracker = dispatch_meta.async_op_tracker;
+ dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID;
+
+ // prevent recursive locking back into the dispatcher while handling IO
+ async_op_tracker->start_op();
+ m_lock.unlock_shared();
+
+ // advance to next layer in case we skip or continue
+ dispatch_spec->dispatch_layer = dispatch->get_dispatch_layer();
+
+ bool handled = send_dispatch(dispatch, dispatch_spec);
+ async_op_tracker->finish_op();
+
+ // handled ops will resume when the dispatch ctx is invoked
+ if (handled) {
+ return;
+ }
+ }
+
+ // skipped through to the last layer
+ dispatch_spec->dispatcher_ctx.complete(0);
+ }
+
+protected:
+ struct DispatchMeta {
+ Dispatch* dispatch = nullptr;
+ AsyncOpTracker* async_op_tracker = nullptr;
+
+ DispatchMeta() {
+ }
+ DispatchMeta(Dispatch* dispatch, AsyncOpTracker* async_op_tracker)
+ : dispatch(dispatch), async_op_tracker(async_op_tracker) {
+ }
+ };
+
+ ImageCtxT* m_image_ctx;
+
+ ceph::shared_mutex m_lock;
+ std::map<DispatchLayer, DispatchMeta> m_dispatches;
+
+ virtual bool send_dispatch(Dispatch* dispatch,
+ DispatchSpec* dispatch_spec) = 0;
+
+private:
+ void shut_down_dispatch(DispatchMeta& dispatch_meta,
+ Context** on_finish) {
+ auto dispatch = dispatch_meta.dispatch;
+ auto async_op_tracker = dispatch_meta.async_op_tracker;
+
+ auto ctx = *on_finish;
+ ctx = new LambdaContext(
+ [dispatch, async_op_tracker, ctx](int r) {
+ delete dispatch;
+ delete async_op_tracker;
+
+ ctx->complete(r);
+ });
+ ctx = new LambdaContext([dispatch, ctx](int r) {
+ dispatch->shut_down(ctx);
+ });
+ *on_finish = new LambdaContext([async_op_tracker, ctx](int r) {
+ async_op_tracker->wait_for_ops(ctx);
+ });
+ }
+
+};
+
+} // namespace io
+} // namespace librbd
+
+#undef dout_subsys
+#undef dout_prefix
+#define dout_prefix *_dout
+
+#endif // CEPH_LIBRBD_IO_DISPATCHER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_DISPATCHER_INTERFACE_H
+#define CEPH_LIBRBD_IO_DISPATCHER_INTERFACE_H
+
+#include "include/int_types.h"
+
+struct Context;
+
+namespace librbd {
+namespace io {
+
+template <typename DispatchT>
+struct DispatcherInterface {
+public:
+ typedef DispatchT Dispatch;
+ typedef typename DispatchT::DispatchLayer DispatchLayer;
+ typedef typename DispatchT::DispatchSpec DispatchSpec;
+
+ virtual ~DispatcherInterface() {
+ }
+
+ virtual void shut_down(Context* on_finish) = 0;
+
+ virtual void register_dispatch(Dispatch* dispatch) = 0;
+ virtual void shut_down_dispatch(DispatchLayer dispatch_layer,
+ Context* on_finish) = 0;
+
+ virtual void send(DispatchSpec* dispatch_spec) = 0;
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_DISPATCHER_INTERFACE_H
#include "librbd/io/AsyncOperation.h"
#include "librbd/io/ObjectDispatchInterface.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/Utils.h"
#include "librbd/journal/Types.h"
#include "include/rados/librados.hpp"
public:
ObjectDispatch(ImageCtxT* image_ctx);
- ObjectDispatchLayer get_object_dispatch_layer() const override {
+ ObjectDispatchLayer get_dispatch_layer() const override {
return OBJECT_DISPATCH_LAYER_CORE;
}
namespace io {
struct AioCompletion;
+struct ObjectDispatchInterface;
+struct ObjectDispatchSpec;
struct ObjectDispatchInterface {
+ typedef ObjectDispatchInterface Dispatch;
+ typedef ObjectDispatchLayer DispatchLayer;
+ typedef ObjectDispatchSpec DispatchSpec;
+
virtual ~ObjectDispatchInterface() {
}
- virtual ObjectDispatchLayer get_object_dispatch_layer() const = 0;
+ virtual ObjectDispatchLayer get_dispatch_layer() const = 0;
virtual void shut_down(Context* on_finish) = 0;
#include "librbd/io/ObjectDispatchSpec.h"
#include "include/Context.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include <boost/variant.hpp>
namespace librbd {
C_Dispatcher dispatcher_ctx;
ObjectDispatcherInterface* object_dispatcher;
- ObjectDispatchLayer object_dispatch_layer;
+ ObjectDispatchLayer dispatch_layer;
int object_dispatch_flags = 0;
DispatchResult dispatch_result = DISPATCH_RESULT_INVALID;
Request&& request, int op_flags,
const ZTracer::Trace& parent_trace, Context* on_finish)
: dispatcher_ctx(this, on_finish), object_dispatcher(object_dispatcher),
- object_dispatch_layer(object_dispatch_layer), request(std::move(request)),
+ dispatch_layer(object_dispatch_layer), request(std::move(request)),
op_flags(op_flags), parent_trace(parent_trace) {
}
void complete(int r) override {
while (true) {
object_dispatcher->m_lock.lock_shared();
- auto it = object_dispatcher->m_object_dispatches.upper_bound(
+ auto it = object_dispatcher->m_dispatches.upper_bound(
object_dispatch_layer);
- if (it == object_dispatcher->m_object_dispatches.end()) {
+ if (it == object_dispatcher->m_dispatches.end()) {
object_dispatcher->m_lock.unlock_shared();
Context::complete(r);
return;
}
auto& object_dispatch_meta = it->second;
- auto object_dispatch = object_dispatch_meta.object_dispatch;
+ auto object_dispatch = object_dispatch_meta.dispatch;
// prevent recursive locking back into the dispatcher while handling IO
object_dispatch_meta.async_op_tracker->start_op();
object_dispatcher->m_lock.unlock_shared();
// next loop should start after current layer
- object_dispatch_layer = object_dispatch->get_object_dispatch_layer();
+ object_dispatch_layer = object_dispatch->get_dispatch_layer();
auto handled = execute(object_dispatch, this);
object_dispatch_meta.async_op_tracker->finish_op();
template <typename I>
ObjectDispatcher<I>::ObjectDispatcher(I* image_ctx)
- : m_image_ctx(image_ctx),
- m_lock(ceph::make_shared_mutex(
- librbd::util::unique_lock_name("librbd::io::ObjectDispatcher::lock",
- this))) {
+ : Dispatcher<I, ObjectDispatcherInterface>(image_ctx) {
// configure the core object dispatch handler on startup
auto object_dispatch = new ObjectDispatch(image_ctx);
- m_object_dispatches[object_dispatch->get_object_dispatch_layer()] =
- {object_dispatch, new AsyncOpTracker()};
-}
-
-template <typename I>
-ObjectDispatcher<I>::~ObjectDispatcher() {
- ceph_assert(m_object_dispatches.empty());
-}
-
-template <typename I>
-void ObjectDispatcher<I>::shut_down(Context* on_finish) {
- auto cct = m_image_ctx->cct;
- ldout(cct, 5) << dendl;
-
- std::map<ObjectDispatchLayer, ObjectDispatchMeta> object_dispatches;
- {
- std::unique_lock locker{m_lock};
- std::swap(object_dispatches, m_object_dispatches);
- }
-
- for (auto it : object_dispatches) {
- shut_down_object_dispatch(it.second, &on_finish);
- }
- on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectDispatcher<I>::register_object_dispatch(
- ObjectDispatchInterface* object_dispatch) {
- auto cct = m_image_ctx->cct;
- auto type = object_dispatch->get_object_dispatch_layer();
- ldout(cct, 5) << "object_dispatch_layer=" << type << dendl;
-
- std::unique_lock locker{m_lock};
- ceph_assert(type < OBJECT_DISPATCH_LAYER_LAST);
-
- auto result = m_object_dispatches.insert(
- {type, {object_dispatch, new AsyncOpTracker()}});
- ceph_assert(result.second);
-}
-
-template <typename I>
-void ObjectDispatcher<I>::shut_down_object_dispatch(
- ObjectDispatchLayer object_dispatch_layer, Context* on_finish) {
- auto cct = m_image_ctx->cct;
- ldout(cct, 5) << "object_dispatch_layer=" << object_dispatch_layer << dendl;
- ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST);
-
- ObjectDispatchMeta object_dispatch_meta;
- {
- std::unique_lock locker{m_lock};
- auto it = m_object_dispatches.find(object_dispatch_layer);
- ceph_assert(it != m_object_dispatches.end());
-
- object_dispatch_meta = it->second;
- m_object_dispatches.erase(it);
- }
-
- shut_down_object_dispatch(object_dispatch_meta, &on_finish);
- on_finish->complete(0);
-}
-
-template <typename I>
-void ObjectDispatcher<I>::shut_down_object_dispatch(
- ObjectDispatchMeta& object_dispatch_meta, Context** on_finish) {
- auto object_dispatch = object_dispatch_meta.object_dispatch;
- auto async_op_tracker = object_dispatch_meta.async_op_tracker;
-
- Context* ctx = *on_finish;
- ctx = new LambdaContext(
- [object_dispatch, async_op_tracker, ctx](int r) {
- delete object_dispatch;
- delete async_op_tracker;
-
- ctx->complete(r);
- });
- ctx = new LambdaContext([object_dispatch, ctx](int r) {
- object_dispatch->shut_down(ctx);
- });
- *on_finish = new LambdaContext([async_op_tracker, ctx](int r) {
- async_op_tracker->wait_for_ops(ctx);
- });
+ this->register_dispatch(object_dispatch);
}
template <typename I>
void ObjectDispatcher<I>::invalidate_cache(Context* on_finish) {
- auto cct = m_image_ctx->cct;
+ auto image_ctx = this->m_image_ctx;
+ auto cct = image_ctx->cct;
ldout(cct, 5) << dendl;
- on_finish = util::create_async_context_callback(*m_image_ctx, on_finish);
+ on_finish = util::create_async_context_callback(*image_ctx, on_finish);
auto ctx = new C_InvalidateCache(this, on_finish);
ctx->complete(0);
}
template <typename I>
void ObjectDispatcher<I>::reset_existence_cache(Context* on_finish) {
- auto cct = m_image_ctx->cct;
+ auto image_ctx = this->m_image_ctx;
+ auto cct = image_ctx->cct;
ldout(cct, 5) << dendl;
- on_finish = util::create_async_context_callback(*m_image_ctx, on_finish);
+ on_finish = util::create_async_context_callback(*image_ctx, on_finish);
auto ctx = new C_ResetExistenceCache(this, on_finish);
ctx->complete(0);
}
void ObjectDispatcher<I>::extent_overwritten(
uint64_t object_no, uint64_t object_off, uint64_t object_len,
uint64_t journal_tid, uint64_t new_journal_tid) {
- auto cct = m_image_ctx->cct;
+ auto cct = this->m_image_ctx->cct;
ldout(cct, 20) << object_no << " " << object_off << "~" << object_len
<< dendl;
- for (auto it : m_object_dispatches) {
+ std::shared_lock locker{this->m_lock};
+ for (auto it : this->m_dispatches) {
auto& object_dispatch_meta = it.second;
- auto object_dispatch = object_dispatch_meta.object_dispatch;
+ auto object_dispatch = object_dispatch_meta.dispatch;
object_dispatch->extent_overwritten(object_no, object_off, object_len,
journal_tid, new_journal_tid);
}
}
template <typename I>
-void ObjectDispatcher<I>::send(ObjectDispatchSpec* object_dispatch_spec) {
- auto cct = m_image_ctx->cct;
- ldout(cct, 20) << "object_dispatch_spec=" << object_dispatch_spec << dendl;
-
- auto object_dispatch_layer = object_dispatch_spec->object_dispatch_layer;
- ceph_assert(object_dispatch_layer + 1 < OBJECT_DISPATCH_LAYER_LAST);
-
- // apply the IO request to all layers -- this method will be re-invoked
- // by the dispatch layer if continuing / restarting the IO
- while (true) {
- m_lock.lock_shared();
- object_dispatch_layer = object_dispatch_spec->object_dispatch_layer;
- auto it = m_object_dispatches.upper_bound(object_dispatch_layer);
- if (it == m_object_dispatches.end()) {
- // the request is complete if handled by all layers
- object_dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE;
- m_lock.unlock_shared();
- break;
- }
-
- auto& object_dispatch_meta = it->second;
- auto object_dispatch = object_dispatch_meta.object_dispatch;
- object_dispatch_spec->dispatch_result = DISPATCH_RESULT_INVALID;
-
- // prevent recursive locking back into the dispatcher while handling IO
- object_dispatch_meta.async_op_tracker->start_op();
- m_lock.unlock_shared();
-
- // advance to next layer in case we skip or continue
- object_dispatch_spec->object_dispatch_layer =
- object_dispatch->get_object_dispatch_layer();
-
- bool handled = boost::apply_visitor(
- SendVisitor{object_dispatch, object_dispatch_spec},
- object_dispatch_spec->request);
- object_dispatch_meta.async_op_tracker->finish_op();
-
- // handled ops will resume when the dispatch ctx is invoked
- if (handled) {
- return;
- }
- }
-
- // skipped through to the last layer
- object_dispatch_spec->dispatcher_ctx.complete(0);
+bool ObjectDispatcher<I>::send_dispatch(
+ ObjectDispatchInterface* object_dispatch,
+ ObjectDispatchSpec* object_dispatch_spec) {
+ return boost::apply_visitor(
+ SendVisitor{object_dispatch, object_dispatch_spec},
+ object_dispatch_spec->request);
}
} // namespace io
#include "include/int_types.h"
#include "common/ceph_mutex.h"
+#include "librbd/io/Dispatcher.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/io/Types.h"
#include <map>
-struct AsyncOpTracker;
struct Context;
namespace librbd {
namespace io {
-struct ObjectDispatchInterface;
-struct ObjectDispatchSpec;
-
-struct ObjectDispatcherInterface {
-public:
- virtual ~ObjectDispatcherInterface() {
- }
-
-private:
- friend class ObjectDispatchSpec;
-
- virtual void send(ObjectDispatchSpec* object_dispatch_spec) = 0;
-};
-
template <typename ImageCtxT = ImageCtx>
-class ObjectDispatcher : public ObjectDispatcherInterface {
+class ObjectDispatcher
+ : public Dispatcher<ImageCtxT, ObjectDispatcherInterface> {
public:
ObjectDispatcher(ImageCtxT* image_ctx);
- ~ObjectDispatcher();
-
- void shut_down(Context* on_finish);
- void register_object_dispatch(ObjectDispatchInterface* object_dispatch);
- void shut_down_object_dispatch(ObjectDispatchLayer object_dispatch_layer,
- Context* on_finish);
-
- void invalidate_cache(Context* on_finish);
- void reset_existence_cache(Context* on_finish);
+ void invalidate_cache(Context* on_finish) override;
+ void reset_existence_cache(Context* on_finish) override;
void extent_overwritten(
uint64_t object_no, uint64_t object_off, uint64_t object_len,
- uint64_t journal_tid, uint64_t new_journal_tid);
-
-private:
- struct ObjectDispatchMeta {
- ObjectDispatchInterface* object_dispatch = nullptr;
- AsyncOpTracker* async_op_tracker = nullptr;
+ uint64_t journal_tid, uint64_t new_journal_tid) override;
- ObjectDispatchMeta() {
- }
- ObjectDispatchMeta(ObjectDispatchInterface* object_dispatch,
- AsyncOpTracker* async_op_tracker)
- : object_dispatch(object_dispatch), async_op_tracker(async_op_tracker) {
- }
- };
+protected:
+ bool send_dispatch(ObjectDispatchInterface* object_dispatch,
+ ObjectDispatchSpec* object_dispatch_spec) override;
+private:
struct C_LayerIterator;
struct C_InvalidateCache;
struct C_ResetExistenceCache;
struct SendVisitor;
- ImageCtxT* m_image_ctx;
-
- ceph::shared_mutex m_lock;
- std::map<ObjectDispatchLayer, ObjectDispatchMeta> m_object_dispatches;
-
- void send(ObjectDispatchSpec* object_dispatch_spec);
-
- void shut_down_object_dispatch(ObjectDispatchMeta& object_dispatch_meta,
- Context** on_finish);
-
};
} // namespace io
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_OBJECT_DISPATCHER_INTERFACE_H
+#define CEPH_LIBRBD_IO_OBJECT_DISPATCHER_INTERFACE_H
+
+#include "include/int_types.h"
+#include "librbd/io/DispatcherInterface.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+
+struct Context;
+
+namespace librbd {
+namespace io {
+
+struct ObjectDispatcherInterface
+ : public DispatcherInterface<ObjectDispatchInterface> {
+public:
+ virtual void invalidate_cache(Context* on_finish) = 0;
+ virtual void reset_existence_cache(Context* on_finish) = 0;
+
+ virtual void extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) = 0;
+};
+
+} // namespace io
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_IO_OBJECT_DISPATCHER_INTERFACE_H
ldout(cct, 5) << dendl;
// add ourself to the IO object dispatcher chain
- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+ m_image_ctx->io_object_dispatcher->register_dispatch(this);
}
template <typename I>
SimpleSchedulerObjectDispatch(ImageCtxT* image_ctx);
~SimpleSchedulerObjectDispatch() override;
- ObjectDispatchLayer get_object_dispatch_layer() const override {
+ ObjectDispatchLayer get_dispatch_layer() const override {
return OBJECT_DISPATCH_LAYER_SCHEDULER;
}
#include <map>
#include <vector>
+struct Context;
+
namespace librbd {
namespace io {
#include "librbd/Journal.h"
#include "librbd/Utils.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
ObjectDispatch(ImageCtxT* image_ctx, Journal<ImageCtxT>* journal);
- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+ io::ObjectDispatchLayer get_dispatch_layer() const override {
return io::OBJECT_DISPATCH_LAYER_JOURNAL;
}
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatchSpec.h"
#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/operation/TrimRequest.h"
#include "common/dout.h"
#include "common/errno.h"
#include "librbd/ObjectMap.h"
#include "librbd/Utils.h"
#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/operation/ResizeRequest.h"
#include "osdc/Striper.h"
#include <boost/lambda/bind.hpp>
#include "librbd/ObjectMap.h"
#include "librbd/Utils.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "common/ContextCompletion.h"
#include "common/dout.h"
#include "common/errno.h"
void expect_io_object_dispatcher_register_state(MockParentImageCache& mparent_image_cache,
int ret_val) {
auto& expect = EXPECT_CALL((*(mparent_image_cache.get_image_ctx()->io_object_dispatcher)),
- register_object_dispatch(_));
+ register_dispatch(_));
expect.WillOnce(WithArg<0>(Invoke([ret_val, &mparent_image_cache]
(io::ObjectDispatchInterface* object_dispatch) {
mock_parent_image_cache->init();
cond.wait();
- ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
+ ASSERT_EQ(mock_parent_image_cache->get_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
expect_cache_session_state(*mock_parent_image_cache, true);
mock_parent_image_cache->init();
// initialization fails.
- ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
+ ASSERT_EQ(mock_parent_image_cache->get_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
ASSERT_EQ(mock_parent_image_cache->get_cache_client()->is_session_work(), false);
mock_parent_image_cache->init();
cond.wait();
- ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
+ ASSERT_EQ(mock_parent_image_cache->get_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
expect_cache_session_state(*mock_parent_image_cache, true);
mock_parent_image_cache->init();
conn_cond.wait();
- ASSERT_EQ(mock_parent_image_cache->get_object_dispatch_layer(),
+ ASSERT_EQ(mock_parent_image_cache->get_dispatch_layer(),
io::OBJECT_DISPATCH_LAYER_PARENT_CACHE);
ASSERT_EQ(mock_parent_image_cache->get_state(), true);
expect_cache_session_state(*mock_parent_image_cache, true);
MockObjectDispatch() : lock("MockObjectDispatch::lock", true, false) {
}
- MOCK_CONST_METHOD0(get_object_dispatch_layer, ObjectDispatchLayer());
+ MOCK_CONST_METHOD0(get_dispatch_layer, ObjectDispatchLayer());
MOCK_METHOD1(shut_down, void(Context*));
public:
MOCK_METHOD1(shut_down, void(Context*));
- MOCK_METHOD1(register_object_dispatch, void(ObjectDispatchInterface*));
- MOCK_METHOD2(shut_down_object_dispatch, void(ObjectDispatchLayer, Context*));
+ MOCK_METHOD1(register_dispatch, void(ObjectDispatchInterface*));
+ MOCK_METHOD2(shut_down_dispatch, void(ObjectDispatchLayer, Context*));
MOCK_METHOD2(flush, void(FlushSource, Context*));
MOCK_METHOD1(invalidate_cache, void(Context*));
- MOCK_METHOD1(reset_existance_cache, void(Context*));
+ MOCK_METHOD1(reset_existence_cache, void(Context*));
MOCK_METHOD5(extent_overwritten, void(uint64_t, uint64_t, uint64_t, uint64_t,
uint64_t));
MOCK_METHOD1(send, void(ObjectDispatchSpec*));
-
};
} // namespace io
.WillOnce(CompleteContext(0, static_cast<ContextWQ*>(NULL)));
}
- void expect_register_object_dispatch(MockImageCtx& mock_image_ctx,
- MockObjectDispatch& mock_object_dispatch) {
+ void expect_register_dispatch(MockImageCtx& mock_image_ctx,
+ MockObjectDispatch& mock_object_dispatch) {
EXPECT_CALL(*mock_image_ctx.io_object_dispatcher,
- register_object_dispatch(&mock_object_dispatch));
+ register_dispatch(&mock_object_dispatch));
}
- void expect_shut_down_object_dispatch(MockImageCtx& mock_image_ctx) {
+ void expect_shut_down_dispatch(MockImageCtx& mock_image_ctx) {
EXPECT_CALL(*mock_image_ctx.io_object_dispatcher,
- shut_down_object_dispatch(io::OBJECT_DISPATCH_LAYER_JOURNAL, _))
+ shut_down_dispatch(io::OBJECT_DISPATCH_LAYER_JOURNAL, _))
.WillOnce(WithArg<1>(CompleteContext(0, mock_image_ctx.image_ctx->op_work_queue)));
}
expect_op_work_queue(mock_image_ctx);
InSequence seq;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
expect_construct_journaler(mock_journaler);
expect_open_journaler(mock_image_ctx, mock_journaler, mock_open_request,
primary, 0);
MockJournal *mock_journal,
::journal::MockJournaler &mock_journaler) {
expect_stop_append(mock_journaler, 0);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, 0);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, 0);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, 0);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, -EINVAL);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(-EINVAL, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, -EINVAL);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(-EINVAL, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, 0);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}
InSequence seq;
MockObjectDispatch mock_object_dispatch;
- expect_register_object_dispatch(mock_image_ctx, mock_object_dispatch);
+ expect_register_dispatch(mock_image_ctx, mock_object_dispatch);
::journal::MockJournaler mock_journaler;
MockJournalOpenRequest mock_open_request;
expect_stop_append(mock_journaler, 0);
expect_shut_down_journaler(mock_journaler);
- expect_shut_down_object_dispatch(mock_image_ctx);
+ expect_shut_down_dispatch(mock_image_ctx);
ASSERT_EQ(0, when_close(mock_journal));
}