From 65f86fe5f5267f8256f4dcaf5b6d48bebea8d31a Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 30 Jun 2015 23:47:31 -0400 Subject: [PATCH] journal: add support for trimming committed journal objects As clients periodically mark payloads as committed, the trimmer will determine the new minimum object set and delete all objects before the new minimum. Signed-off-by: Jason Dillaman --- src/journal/JournalMetadata.cc | 78 ++++++++++-- src/journal/JournalMetadata.h | 32 ++++- src/journal/JournalTrimmer.cc | 223 +++++++++++++++++++++++++++++++++ src/journal/JournalTrimmer.h | 88 +++++++++++++ src/journal/Journaler.cc | 16 ++- src/journal/Journaler.h | 4 + src/journal/Makefile.am | 2 + 7 files changed, 426 insertions(+), 17 deletions(-) create mode 100644 src/journal/JournalTrimmer.cc create mode 100644 src/journal/JournalTrimmer.h diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index ab0538f878858..e5087e5ebc161 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -17,12 +17,15 @@ using namespace cls::journal; JournalMetadata::JournalMetadata(librados::IoCtx &ioctx, const std::string &oid, - const std::string &client_id) - : m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0), - m_splay_width(0), m_initialized(false), m_timer(NULL), + const std::string &client_id, + double commit_interval) + : m_cct(NULL), m_oid(oid), m_client_id(client_id), + m_commit_interval(commit_interval), m_order(0), m_splay_width(0), + m_initialized(false), m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"), m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0), - m_update_notifications(0) { + m_update_notifications(0), m_commit_position_pending(false), + m_commit_position_ctx(NULL) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); } @@ -74,6 +77,7 @@ int JournalMetadata::init() { int JournalMetadata::register_client(const std::string &description) { assert(!m_client_id.empty()); + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; int r = client::client_register(m_ioctx, m_oid, m_client_id, description); if (r < 0) { lderr(m_cct) << "failed to register journal client '" << m_client_id @@ -88,6 +92,7 @@ int JournalMetadata::register_client(const std::string &description) { int JournalMetadata::unregister_client() { assert(!m_client_id.empty()); + ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; int r = client::client_unregister(m_ioctx, m_oid, m_client_id); if (r < 0) { lderr(m_cct) << "failed to unregister journal client '" << m_client_id @@ -117,6 +122,9 @@ void JournalMetadata::remove_listener(Listener *listener) { void JournalMetadata::set_minimum_set(uint64_t object_set) { Mutex::Locker locker(m_lock); + + ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set + << ", new=" << object_set << dendl; if (m_minimum_set >= object_set) { return; } @@ -137,6 +145,9 @@ void JournalMetadata::set_minimum_set(uint64_t object_set) { void JournalMetadata::set_active_set(uint64_t object_set) { Mutex::Locker locker(m_lock); + + ldout(m_cct, 20) << __func__ << ": current=" << m_active_set + << ", new=" << object_set << dendl; if (m_active_set >= object_set) { return; } @@ -157,18 +168,25 @@ void JournalMetadata::set_active_set(uint64_t object_set) { void JournalMetadata::set_commit_position( const ObjectSetPosition &commit_position, Context *on_safe) { + assert(on_safe != NULL); + Mutex::Locker locker(m_lock); + ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position + << ", new=" << commit_position << dendl; + if (commit_position <= m_client.commit_position || + commit_position <= m_commit_position) { + on_safe->complete(-ESTALE); + return; + } - librados::ObjectWriteOperation op; - client::client_commit(&op, m_client_id, commit_position); + if (m_commit_position_ctx != NULL) { + m_commit_position_ctx->complete(-ESTALE); + } - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_safe); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); + m_client.commit_position = commit_position; + m_commit_position = commit_position; + m_commit_position_ctx = on_safe; + schedule_commit_task(); } void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) { @@ -235,6 +253,33 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { } } +void JournalMetadata::schedule_commit_task() { + assert(m_lock.is_locked()); + + Mutex::Locker timer_locker(m_timer_lock); + if (!m_commit_position_pending) { + m_commit_position_pending = true; + m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this)); + } +} + +void JournalMetadata::handle_commit_position_task() { + Mutex::Locker locker(m_lock); + + librados::ObjectWriteOperation op; + client::client_commit(&op, m_client_id, m_commit_position); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); + m_commit_position_ctx = NULL; + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); +} + void JournalMetadata::schedule_watch_reset() { Mutex::Locker locker(m_timer_lock); m_timer->add_event_after(0.1, new C_WatchReset(this)); @@ -269,6 +314,13 @@ void JournalMetadata::handle_watch_error(int err) { void JournalMetadata::notify_update() { ldout(m_cct, 10) << "notifying journal header update" << dendl; + bufferlist bl; + m_ioctx.notify2(m_oid, bl, 5000, NULL); +} + +void JournalMetadata::async_notify_update() { + ldout(m_cct, 10) << "async notifying journal header update" << dendl; + librados::AioCompletion *comp = librados::Rados::aio_create_completion(NULL, NULL, NULL); diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 88f7f914724c8..81a539576f4a8 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -39,7 +39,7 @@ public: }; JournalMetadata(librados::IoCtx &ioctx, const std::string &oid, - const std::string &client_id); + const std::string &client_id, double commit_interval); ~JournalMetadata(); int init(); @@ -50,6 +50,9 @@ public: int register_client(const std::string &description); int unregister_client(); + inline const std::string &get_client_id() const { + return m_client_id; + } inline uint8_t get_order() const { return m_order; } @@ -83,6 +86,11 @@ public: *commit_position = m_client.commit_position; } + void get_registered_clients(RegisteredClients *registered_clients) { + Mutex::Locker locker(m_lock); + *registered_clients = m_registered_clients; + } + inline uint64_t allocate_tid(const std::string &tag) { Mutex::Locker locker(m_lock); return m_allocated_tids[tag]++; @@ -91,6 +99,7 @@ public: bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const; void notify_update(); + void async_notify_update(); private: typedef std::map AllocatedTids; @@ -122,6 +131,17 @@ private: } }; + struct C_CommitPositionTask : public Context { + JournalMetadataPtr journal_metadata; + + C_CommitPositionTask(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) {} + + virtual void finish(int r) { + journal_metadata->handle_commit_position_task(); + }; + }; + struct C_NotifyUpdate : public Context { JournalMetadataPtr journal_metadata; Context *on_safe; @@ -131,7 +151,7 @@ private: virtual void finish(int r) { if (r == 0) { - journal_metadata->notify_update(); + journal_metadata->async_notify_update(); } if (on_safe != NULL) { on_safe->complete(r); @@ -159,6 +179,7 @@ private: CephContext *m_cct; std::string m_oid; std::string m_client_id; + double m_commit_interval; uint8_t m_order; uint8_t m_splay_width; @@ -184,9 +205,16 @@ private: size_t m_update_notifications; Cond m_update_cond; + bool m_commit_position_pending; + ObjectSetPosition m_commit_position; + Context *m_commit_position_ctx; + void refresh(Context *on_finish); void handle_refresh_complete(C_Refresh *refresh, int r); + void schedule_commit_task(); + void handle_commit_position_task(); + void schedule_watch_reset(); void handle_watch_reset(); void handle_watch_notify(uint64_t notify_id, uint64_t cookie); diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc new file mode 100644 index 0000000000000..2b34873e5a272 --- /dev/null +++ b/src/journal/JournalTrimmer.cc @@ -0,0 +1,223 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalTrimmer.h" +#include "journal/Utils.h" +#include "common/Cond.h" +#include "common/errno.h" +#include + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalTrimmer: " + +namespace journal { + +JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + const JournalMetadataPtr &journal_metadata) + : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"), + m_pending_ops(0), m_remove_set_pending(false), m_remove_set(0), + m_remove_set_ctx(NULL) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); +} + +JournalTrimmer::~JournalTrimmer() { + wait_for_ops(); +} + +int JournalTrimmer::remove_objects() { + ldout(m_cct, 20) << __func__ << dendl; + wait_for_ops(); + + C_SaferCond ctx; + { + Mutex::Locker locker(m_lock); + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + if (registered_clients.size() == 0) { + return -EINVAL; + } else if (registered_clients.size() > 1 || m_remove_set_pending) { + return -EBUSY; + } + + m_remove_set = std::numeric_limits::max(); + m_remove_set_pending = true; + m_remove_set_ctx = &ctx; + + remove_set(m_journal_metadata->get_minimum_set()); + } + return ctx.wait(); +} + +void JournalTrimmer::update_commit_position( + const ObjectSetPosition &object_set_position) { + ldout(m_cct, 20) << __func__ << ": pos=" << object_set_position + << dendl; + + { + Mutex::Locker locker(m_lock); + start_op(); + } + + Context *ctx = new C_CommitPositionSafe(this, object_set_position); + m_journal_metadata->set_commit_position(object_set_position, ctx); +} + +void JournalTrimmer::start_op() { + assert(m_lock.is_locked()); + ++m_pending_ops; +} + +void JournalTrimmer::finish_op() { + assert(m_lock.is_locked()); + assert(m_pending_ops > 0); + if (--m_pending_ops == 0) { + m_pending_ops_cond.Signal(); + } +} + +void JournalTrimmer::wait_for_ops() { + Mutex::Locker locker(m_lock); + while (m_pending_ops > 0) { + m_pending_ops_cond.Wait(m_lock); + } +} + +void JournalTrimmer::trim_objects(uint64_t minimum_set) { + assert(m_lock.is_locked()); + + ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl; + if (minimum_set <= m_journal_metadata->get_minimum_set()) { + return; + } + + if (m_remove_set_pending) { + m_remove_set = MAX(m_remove_set, minimum_set); + return; + } + + m_remove_set = minimum_set; + m_remove_set_pending = true; + remove_set(m_journal_metadata->get_minimum_set()); +} + +void JournalTrimmer::remove_set(uint64_t object_set) { + assert(m_lock.is_locked()); + + start_op(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width); + + ldout(m_cct, 20) << __func__ << ": removing object set " << object_set + << dendl; + for (uint64_t object_number = object_set * splay_width; + object_number < (object_set + 1) * splay_width; + ++object_number) { + std::string oid = utils::get_object_name(m_object_oid_prefix, + object_number); + + ldout(m_cct, 20) << "removing journal object " << oid << dendl; + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, + utils::rados_ctx_callback); + int r = m_ioctx.aio_remove(oid, comp); + assert(r == 0); + comp->release(); + } + ctx->complete(-ENOENT); +} + +void JournalTrimmer::handle_commit_position_safe( + int r, const ObjectSetPosition &object_set_position) { + ldout(m_cct, 20) << __func__ << ": r=" << r << ", pos=" + << object_set_position << dendl; + + Mutex::Locker locker(m_lock); + if (r == 0) { + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t object_set = object_set_position.object_number / splay_width; + + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); + + bool trim_permitted = true; + for (JournalMetadata::RegisteredClients::iterator it = + registered_clients.begin(); + it != registered_clients.end(); ++it) { + const JournalMetadata::Client &client = *it; + uint64_t client_object_set = client.commit_position.object_number / + splay_width; + if (client.id != m_journal_metadata->get_client_id() && + client_object_set < object_set) { + ldout(m_cct, 20) << "object set " << client_object_set << " still " + << "in-use by client " << client.id << dendl; + trim_permitted = false; + break; + } + } + + if (trim_permitted) { + trim_objects(object_set_position.object_number / splay_width); + } + } + finish_op(); +} + +void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { + ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", " + << "trim=" << m_remove_set << dendl; + + Mutex::Locker locker(m_lock); + m_remove_set_pending = false; + + if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) { + // advance the minimum set to the next set + m_journal_metadata->set_minimum_set(object_set + 1); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + + if (m_remove_set > minimum_set) { + m_remove_set_pending = true; + remove_set(minimum_set); + } + } else if (r == -ENOENT) { + // no objects within the set existed + r = 0; + } + + if (m_remove_set_ctx != NULL && !m_remove_set_pending) { + ldout(m_cct, 20) << "completing remove set context" << dendl; + m_remove_set_ctx->complete(r); + } + finish_op(); +} + +JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer, + uint64_t _object_set, + uint8_t _splay_width) + : journal_trimmer(_journal_trimmer), object_set(_object_set), + lock(utils::unique_lock_name("C_RemoveSet::lock", this)), + refs(_splay_width + 1), return_value(-ENOENT) { +} + +void JournalTrimmer::C_RemoveSet::complete(int r) { + lock.Lock(); + if (r < 0 && r != -ENOENT && return_value == -ENOENT) { + return_value = r; + } else if (r == 0 && return_value == -ENOENT) { + return_value = 0; + } + + if (--refs == 0) { + finish(return_value); + lock.Unlock(); + delete this; + } else { + lock.Unlock(); + } +} + +} // namespace journal diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h new file mode 100644 index 0000000000000..1ed6486a8305c --- /dev/null +++ b/src/journal/JournalTrimmer.h @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_TRIMMER_H +#define CEPH_JOURNAL_JOURNAL_TRIMMER_H + +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "include/Context.h" +#include "common/Mutex.h" +#include "journal/JournalMetadata.h" +#include "cls/journal/cls_journal_types.h" + +namespace journal { + +class JournalTrimmer { +public: + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + + JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + const JournalMetadataPtr &journal_metadata); + ~JournalTrimmer(); + + int remove_objects(); + void update_commit_position(const ObjectSetPosition &object_set_position); + +private: + struct C_CommitPositionSafe : public Context { + JournalTrimmer *journal_trimmer; + ObjectSetPosition object_set_position; + + C_CommitPositionSafe(JournalTrimmer *_journal_trimmer, + const ObjectSetPosition &_object_set_position) + : journal_trimmer(_journal_trimmer), + object_set_position(_object_set_position) {} + + virtual void complete(int r) { + finish(r); + } + virtual void finish(int r) { + journal_trimmer->handle_commit_position_safe(r, object_set_position); + } + }; + struct C_RemoveSet : public Context { + JournalTrimmer *journal_trimmer; + uint64_t object_set; + Mutex lock; + uint32_t refs; + int return_value; + + C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set, + uint8_t _splay_width); + virtual void complete(int r); + virtual void finish(int r) { + journal_trimmer->handle_set_removed(r, object_set); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_object_oid_prefix; + + JournalMetadataPtr m_journal_metadata; + + Mutex m_lock; + + size_t m_pending_ops; + Cond m_pending_ops_cond; + + bool m_remove_set_pending; + uint64_t m_remove_set; + Context *m_remove_set_ctx; + + void start_op(); + void finish_op(); + void wait_for_ops(); + + void trim_objects(uint64_t minimum_set); + void remove_set(uint64_t object_set); + + void handle_commit_position_safe(int r, const ObjectSetPosition &position); + + void handle_set_removed(int r, uint64_t object_set); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_TRIMMER_H diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 4613517f9aa19..82b2ef926c7a5 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -9,6 +9,7 @@ #include "journal/JournalMetadata.h" #include "journal/JournalPlayer.h" #include "journal/JournalRecorder.h" +#include "journal/JournalTrimmer.h" #include "journal/PayloadImpl.h" #include "journal/ReplayHandler.h" #include "cls/journal/cls_journal_client.h" @@ -31,7 +32,8 @@ using namespace cls::journal; Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, uint64_t journal_id, const std::string &client_id) - : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL) + : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL), + m_trimmer(NULL) { m_header_ioctx.dup(header_ioctx); m_data_ioctx.dup(data_ioctx); @@ -40,8 +42,12 @@ Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id); m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + "."; - m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id); + // TODO configurable commit interval + m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id, + 5); m_metadata->get(); + + m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, m_metadata); } Journaler::~Journaler() { @@ -49,6 +55,7 @@ Journaler::~Journaler() { m_metadata->put(); m_metadata = NULL; } + delete m_trimmer; assert(m_player == NULL); assert(m_recorder == NULL); } @@ -114,6 +121,11 @@ void Journaler::stop_replay() { m_player = NULL; } +void Journaler::update_commit_position(const Payload &payload) { + PayloadImplPtr payload_impl = payload.get_payload_impl(); + m_trimmer->update_commit_position(payload_impl->get_object_set_position()); +} + void Journaler::start_append() { assert(m_recorder == NULL); diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index ff054dbaa2b5a..7de2bbcf74186 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -20,6 +20,7 @@ namespace journal { class JournalMetadata; class JournalPlayer; class JournalRecorder; +class JournalTrimmer; class ReplayHandler; class Journaler { @@ -40,6 +41,8 @@ public: bool try_pop_front(Payload *payload); void stop_replay(); + void update_commit_position(const Payload &payload); + void start_append(); Future append(const std::string &tag, const bufferlist &bl); void stop_append(); @@ -56,6 +59,7 @@ private: JournalMetadata *m_metadata; JournalPlayer *m_player; JournalRecorder *m_recorder; + JournalTrimmer *m_trimmer; void create_player(ReplayHandler *replay_handler); }; diff --git a/src/journal/Makefile.am b/src/journal/Makefile.am index 8c8bc97b69520..af85525080ddc 100644 --- a/src/journal/Makefile.am +++ b/src/journal/Makefile.am @@ -9,6 +9,7 @@ libjournal_la_SOURCES = \ journal/JournalMetadata.cc \ journal/JournalPlayer.cc \ journal/JournalRecorder.cc \ + journal/JournalTrimmer.cc \ journal/ObjectPlayer.cc \ journal/ObjectRecorder.cc \ journal/Payload.cc \ @@ -24,6 +25,7 @@ noinst_HEADERS += \ journal/JournalMetadata.h \ journal/JournalPlayer.h \ journal/JournalRecorder.h \ + journal/JournalTrimmer.h \ journal/ObjectPlayer.h \ journal/ObjectRecorder.h \ journal/Payload.h \ -- 2.39.5