#include "tools/rbd_mirror/ImageDeleter.h"
#include "tools/rbd_mirror/ServiceDaemon.h"
#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
#include "tools/rbd_mirror/Types.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
void SetUp() override {
TestFixture::SetUp();
+ m_image_deletion_throttler.reset(
+ new rbd::mirror::Throttler<>(g_ceph_context,
+ "rbd_mirror_concurrent_image_deletions"));
+
m_service_daemon.reset(new rbd::mirror::ServiceDaemon<>(g_ceph_context,
_rados, m_threads));
librbd::api::Mirror<>::mode_set(m_local_io_ctx, RBD_MIRROR_MODE_IMAGE);
- m_deleter = new rbd::mirror::ImageDeleter<>(m_local_io_ctx, m_threads,
- m_service_daemon.get());
+ m_deleter = new rbd::mirror::ImageDeleter<>(
+ m_local_io_ctx, m_threads, m_image_deletion_throttler.get(),
+ m_service_daemon.get());
m_local_image_id = librbd::util::generate_image_id(m_local_io_ctx);
librbd::ImageOptions image_opts;
librbd::RBD rbd;
std::string m_local_image_id;
+ std::unique_ptr<rbd::mirror::Throttler<>> m_image_deletion_throttler;
std::unique_ptr<rbd::mirror::ServiceDaemon<>> m_service_daemon;
rbd::mirror::ImageDeleter<> *m_deleter;
};
static ImageDeleter* create(
librados::IoCtx &ioctx, Threads<librbd::MockTestImageCtx> *threads,
+ Throttler<librbd::MockTestImageCtx> *image_deletion_throttler,
ServiceDaemon<librbd::MockTestImageCtx> *service_daemon) {
ceph_assert(s_instance != nullptr);
return s_instance;
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr);
+ "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr);
+ "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr);
+ "remote mirror uuid", m_mock_threads, nullptr, nullptr, nullptr, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, &mock_service_daemon,
- nullptr);
+ "remote mirror uuid", m_mock_threads, nullptr, nullptr,
+ &mock_service_daemon, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
MockNamespaceReplayer namespace_replayer(
{}, m_local_io_ctx, m_remote_io_ctx, "local mirror uuid",
- "remote mirror uuid", m_mock_threads, nullptr, &mock_service_daemon,
- nullptr);
+ "remote mirror uuid", m_mock_threads, nullptr, nullptr,
+ &mock_service_daemon, nullptr);
C_SaferCond on_init;
namespace_replayer.init(&on_init);
const std::string &remote_mirror_uuid,
Threads<librbd::MockTestImageCtx> *threads,
Throttler<librbd::MockTestImageCtx> *image_sync_throttler,
+ Throttler<librbd::MockTestImageCtx> *image_deletion_throttler,
ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
ceph_assert(s_instances.count(name));
#include "librbd/Utils.h"
#include "ImageDeleter.h"
#include "tools/rbd_mirror/Threads.h"
+#include "tools/rbd_mirror/Throttler.h"
#include "tools/rbd_mirror/image_deleter/RemoveRequest.h"
#include "tools/rbd_mirror/image_deleter/TrashMoveRequest.h"
#include "tools/rbd_mirror/image_deleter/TrashWatcher.h"
namespace rbd {
namespace mirror {
+using librbd::util::create_async_context_callback;
+
namespace {
class ImageDeleterAdminSocketCommand {
};
template <typename I>
-ImageDeleter<I>::ImageDeleter(librados::IoCtx& local_io_ctx,
- Threads<librbd::ImageCtx>* threads,
- ServiceDaemon<librbd::ImageCtx>* service_daemon)
+ImageDeleter<I>::ImageDeleter(
+ librados::IoCtx& local_io_ctx, Threads<librbd::ImageCtx>* threads,
+ Throttler<librbd::ImageCtx>* image_deletion_throttler,
+ ServiceDaemon<librbd::ImageCtx>* service_daemon)
: m_local_io_ctx(local_io_ctx), m_threads(threads),
+ m_image_deletion_throttler(image_deletion_throttler),
m_service_daemon(service_daemon), m_trash_listener(this),
m_lock(ceph::make_mutex(
librbd::util::unique_lock_name("rbd::mirror::ImageDeleter::m_lock",
delete m_asok_hook;
m_asok_hook = nullptr;
+ m_image_deletion_throttler->drain(m_local_io_ctx.get_namespace(),
+ -ESTALE);
+
shut_down_trash_watcher(on_finish);
}
template <typename I>
void ImageDeleter<I>::cancel_all_deletions(Context* on_finish) {
+ m_image_deletion_throttler->drain(m_local_io_ctx.get_namespace(),
+ -ECANCELED);
{
std::lock_guard locker{m_lock};
// wake up any external state machines waiting on deletions
void ImageDeleter<I>::remove_images() {
dout(10) << dendl;
- auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
- uint64_t max_concurrent_deletions = cct->_conf.get_val<uint64_t>(
- "rbd_mirror_concurrent_image_deletions");
-
std::lock_guard locker{m_lock};
- while (true) {
- if (!m_running || m_delete_queue.empty() ||
- m_in_flight_delete_queue.size() >= max_concurrent_deletions) {
- return;
- }
+ while (m_running && !m_delete_queue.empty()) {
DeleteInfoRef delete_info = m_delete_queue.front();
m_delete_queue.pop_front();
ceph_assert(delete_info);
- remove_image(delete_info);
+
+ auto on_start = create_async_context_callback(
+ m_threads->work_queue, new FunctionContext(
+ [this, delete_info](int r) {
+ if (r < 0) {
+ notify_on_delete(delete_info->image_id, r);
+ return;
+ }
+ remove_image(delete_info);
+ }));
+
+ m_image_deletion_throttler->start_op(m_local_io_ctx.get_namespace(),
+ delete_info->image_id, on_start);
}
}
template <typename I>
void ImageDeleter<I>::remove_image(DeleteInfoRef delete_info) {
dout(10) << "info=" << *delete_info << dendl;
- ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ std::lock_guard locker{m_lock};
m_in_flight_delete_queue.push_back(delete_info);
m_async_op_tracker.start_op();
int r) {
dout(10) << "info=" << *delete_info << ", r=" << r << dendl;
+ m_image_deletion_throttler->finish_op(m_local_io_ctx.get_namespace(),
+ delete_info->image_id);
{
std::lock_guard locker{m_lock};
ceph_assert(ceph_mutex_is_locked(m_lock));
template <typename> class ServiceDaemon;
template <typename> class Threads;
+template <typename> class Throttler;
namespace image_deleter { template <typename> struct TrashWatcher; }
template <typename ImageCtxT = librbd::ImageCtx>
class ImageDeleter {
public:
- static ImageDeleter* create(librados::IoCtx& local_io_ctx,
- Threads<librbd::ImageCtx>* threads,
- ServiceDaemon<librbd::ImageCtx>* service_daemon) {
- return new ImageDeleter(local_io_ctx, threads, service_daemon);
+ static ImageDeleter* create(
+ librados::IoCtx& local_io_ctx, Threads<librbd::ImageCtx>* threads,
+ Throttler<librbd::ImageCtx>* image_deletion_throttler,
+ ServiceDaemon<librbd::ImageCtx>* service_daemon) {
+ return new ImageDeleter(local_io_ctx, threads, image_deletion_throttler,
+ service_daemon);
}
ImageDeleter(librados::IoCtx& local_io_ctx,
Threads<librbd::ImageCtx>* threads,
+ Throttler<librbd::ImageCtx>* image_deletion_throttler,
ServiceDaemon<librbd::ImageCtx>* service_daemon);
ImageDeleter(const ImageDeleter&) = delete;
librados::IoCtx& m_local_io_ctx;
Threads<librbd::ImageCtx>* m_threads;
+ Throttler<librbd::ImageCtx>* m_image_deletion_throttler;
ServiceDaemon<librbd::ImageCtx>* m_service_daemon;
image_deleter::TrashWatcher<ImageCtxT>* m_trash_watcher = nullptr;
librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
const std::string &local_mirror_uuid, const std::string &remote_mirror_uuid,
Threads<I> *threads, Throttler<I> *image_sync_throttler,
- ServiceDaemon<I> *service_daemon,
+ Throttler<I> *image_deletion_throttler, ServiceDaemon<I> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) :
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid),
m_threads(threads), m_image_sync_throttler(image_sync_throttler),
+ m_image_deletion_throttler(image_deletion_throttler),
m_service_daemon(service_daemon),
m_cache_manager_handler(cache_manager_handler),
m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
handle_init_image_deleter(r, on_finish);
});
m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
+ m_image_deletion_throttler,
m_service_daemon));
m_image_deleter->init(create_async_context_callback(
m_threads->work_queue, on_finish));
const std::string &remote_mirror_uuid,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
+ Throttler<ImageCtxT> *image_deletion_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler) {
return new NamespaceReplayer(name, local_ioctx, remote_ioctx,
local_mirror_uuid, remote_mirror_uuid, threads,
- image_sync_throttler, service_daemon,
- cache_manager_handler);
+ image_sync_throttler, image_deletion_throttler,
+ service_daemon, cache_manager_handler);
}
NamespaceReplayer(const std::string &name,
const std::string &remote_mirror_uuid,
Threads<ImageCtxT> *threads,
Throttler<ImageCtxT> *image_sync_throttler,
+ Throttler<ImageCtxT> *image_deletion_throttler,
ServiceDaemon<ImageCtxT> *service_daemon,
journal::CacheManagerHandler *cache_manager_handler);
NamespaceReplayer(const NamespaceReplayer&) = delete;
std::string m_remote_mirror_uuid;
Threads<ImageCtxT> *m_threads;
Throttler<ImageCtxT> *m_image_sync_throttler;
+ Throttler<ImageCtxT> *m_image_deletion_throttler;
ServiceDaemon<ImageCtxT> *m_service_daemon;
journal::CacheManagerHandler *m_cache_manager_handler;
m_image_sync_throttler.reset(
Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
+ m_image_deletion_throttler.reset(
+ Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
+
m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
"", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_threads, m_image_sync_throttler.get(), m_service_daemon,
- m_cache_manager_handler));
+ m_threads, m_image_sync_throttler.get(), m_image_deletion_throttler.get(),
+ m_service_daemon, m_cache_manager_handler));
C_SaferCond on_init;
m_default_namespace_replayer->init(&on_init);
m_default_namespace_replayer.reset();
m_image_sync_throttler.reset();
+ m_image_deletion_throttler.reset();
m_local_rados.reset();
m_remote_rados.reset();
for (auto &name : mirroring_namespaces) {
auto namespace_replayer = NamespaceReplayer<I>::create(
name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
- m_threads, m_image_sync_throttler.get(), m_service_daemon,
+ m_threads, m_image_sync_throttler.get(),
+ m_image_deletion_throttler.get(), m_service_daemon,
m_cache_manager_handler);
auto on_init = new FunctionContext(
[this, namespace_replayer, name, &mirroring_namespaces,
f->close_section(); // sync_throttler
}
+ if (m_image_deletion_throttler) {
+ f->open_object_section("deletion_throttler");
+ m_image_deletion_throttler->print_status(f, ss);
+ f->close_section(); // deletion_throttler
+ }
+
m_default_namespace_replayer->print_status(f, ss);
f->open_array_section("namespaces");
std::unique_ptr<LeaderWatcher<ImageCtxT>> m_leader_watcher;
std::unique_ptr<Throttler<ImageCtxT>> m_image_sync_throttler;
+ std::unique_ptr<Throttler<ImageCtxT>> m_image_deletion_throttler;
};
} // namespace mirror