JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
const std::string &oid,
- const std::string &client_id)
- : m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0),
- m_splay_width(0), m_initialized(false), m_timer(NULL),
+ const std::string &client_id,
+ double commit_interval)
+ : m_cct(NULL), m_oid(oid), m_client_id(client_id),
+ m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
+ m_initialized(false), m_timer(NULL),
m_timer_lock("JournalMetadata::m_timer_lock"),
m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
- m_update_notifications(0) {
+ m_update_notifications(0), m_commit_position_pending(false),
+ m_commit_position_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
int JournalMetadata::register_client(const std::string &description) {
assert(!m_client_id.empty());
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
if (r < 0) {
lderr(m_cct) << "failed to register journal client '" << m_client_id
int JournalMetadata::unregister_client() {
assert(!m_client_id.empty());
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_unregister(m_ioctx, m_oid, m_client_id);
if (r < 0) {
lderr(m_cct) << "failed to unregister journal client '" << m_client_id
void JournalMetadata::set_minimum_set(uint64_t object_set) {
Mutex::Locker locker(m_lock);
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
+ << ", new=" << object_set << dendl;
if (m_minimum_set >= object_set) {
return;
}
void JournalMetadata::set_active_set(uint64_t object_set) {
Mutex::Locker locker(m_lock);
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
+ << ", new=" << object_set << dendl;
if (m_active_set >= object_set) {
return;
}
void JournalMetadata::set_commit_position(
const ObjectSetPosition &commit_position, Context *on_safe) {
+ assert(on_safe != NULL);
+
Mutex::Locker locker(m_lock);
+ ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
+ << ", new=" << commit_position << dendl;
+ if (commit_position <= m_client.commit_position ||
+ commit_position <= m_commit_position) {
+ on_safe->complete(-ESTALE);
+ return;
+ }
- librados::ObjectWriteOperation op;
- client::client_commit(&op, m_client_id, commit_position);
+ if (m_commit_position_ctx != NULL) {
+ m_commit_position_ctx->complete(-ESTALE);
+ }
- C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_safe);
- librados::AioCompletion *comp =
- librados::Rados::aio_create_completion(ctx, NULL,
- utils::rados_ctx_callback);
- int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
- comp->release();
+ m_client.commit_position = commit_position;
+ m_commit_position = commit_position;
+ m_commit_position_ctx = on_safe;
+ schedule_commit_task();
}
void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
}
}
+void JournalMetadata::schedule_commit_task() {
+ assert(m_lock.is_locked());
+
+ Mutex::Locker timer_locker(m_timer_lock);
+ if (!m_commit_position_pending) {
+ m_commit_position_pending = true;
+ m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this));
+ }
+}
+
+void JournalMetadata::handle_commit_position_task() {
+ Mutex::Locker locker(m_lock);
+
+ librados::ObjectWriteOperation op;
+ client::client_commit(&op, m_client_id, m_commit_position);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
+ m_commit_position_ctx = NULL;
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+}
+
void JournalMetadata::schedule_watch_reset() {
Mutex::Locker locker(m_timer_lock);
m_timer->add_event_after(0.1, new C_WatchReset(this));
void JournalMetadata::notify_update() {
ldout(m_cct, 10) << "notifying journal header update" << dendl;
+ bufferlist bl;
+ m_ioctx.notify2(m_oid, bl, 5000, NULL);
+}
+
+void JournalMetadata::async_notify_update() {
+ ldout(m_cct, 10) << "async notifying journal header update" << dendl;
+
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(NULL, NULL, NULL);
};
JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
- const std::string &client_id);
+ const std::string &client_id, double commit_interval);
~JournalMetadata();
int init();
int register_client(const std::string &description);
int unregister_client();
+ inline const std::string &get_client_id() const {
+ return m_client_id;
+ }
inline uint8_t get_order() const {
return m_order;
}
*commit_position = m_client.commit_position;
}
+ void get_registered_clients(RegisteredClients *registered_clients) {
+ Mutex::Locker locker(m_lock);
+ *registered_clients = m_registered_clients;
+ }
+
inline uint64_t allocate_tid(const std::string &tag) {
Mutex::Locker locker(m_lock);
return m_allocated_tids[tag]++;
bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
void notify_update();
+ void async_notify_update();
private:
typedef std::map<std::string, uint64_t> AllocatedTids;
}
};
+ struct C_CommitPositionTask : public Context {
+ JournalMetadataPtr journal_metadata;
+
+ C_CommitPositionTask(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {}
+
+ virtual void finish(int r) {
+ journal_metadata->handle_commit_position_task();
+ };
+ };
+
struct C_NotifyUpdate : public Context {
JournalMetadataPtr journal_metadata;
Context *on_safe;
virtual void finish(int r) {
if (r == 0) {
- journal_metadata->notify_update();
+ journal_metadata->async_notify_update();
}
if (on_safe != NULL) {
on_safe->complete(r);
CephContext *m_cct;
std::string m_oid;
std::string m_client_id;
+ double m_commit_interval;
uint8_t m_order;
uint8_t m_splay_width;
size_t m_update_notifications;
Cond m_update_cond;
+ bool m_commit_position_pending;
+ ObjectSetPosition m_commit_position;
+ Context *m_commit_position_ctx;
+
void refresh(Context *on_finish);
void handle_refresh_complete(C_Refresh *refresh, int r);
+ void schedule_commit_task();
+ void handle_commit_position_task();
+
void schedule_watch_reset();
void handle_watch_reset();
void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalTrimmer.h"
+#include "journal/Utils.h"
+#include "common/Cond.h"
+#include "common/errno.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalTrimmer: "
+
+namespace journal {
+
+JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata)
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
+ m_pending_ops(0), m_remove_set_pending(false), m_remove_set(0),
+ m_remove_set_ctx(NULL) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+JournalTrimmer::~JournalTrimmer() {
+ wait_for_ops();
+}
+
+int JournalTrimmer::remove_objects() {
+ ldout(m_cct, 20) << __func__ << dendl;
+ wait_for_ops();
+
+ C_SaferCond ctx;
+ {
+ Mutex::Locker locker(m_lock);
+ JournalMetadata::RegisteredClients registered_clients;
+ m_journal_metadata->get_registered_clients(®istered_clients);
+
+ if (registered_clients.size() == 0) {
+ return -EINVAL;
+ } else if (registered_clients.size() > 1 || m_remove_set_pending) {
+ return -EBUSY;
+ }
+
+ m_remove_set = std::numeric_limits<uint64_t>::max();
+ m_remove_set_pending = true;
+ m_remove_set_ctx = &ctx;
+
+ remove_set(m_journal_metadata->get_minimum_set());
+ }
+ return ctx.wait();
+}
+
+void JournalTrimmer::update_commit_position(
+ const ObjectSetPosition &object_set_position) {
+ ldout(m_cct, 20) << __func__ << ": pos=" << object_set_position
+ << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ start_op();
+ }
+
+ Context *ctx = new C_CommitPositionSafe(this, object_set_position);
+ m_journal_metadata->set_commit_position(object_set_position, ctx);
+}
+
+void JournalTrimmer::start_op() {
+ assert(m_lock.is_locked());
+ ++m_pending_ops;
+}
+
+void JournalTrimmer::finish_op() {
+ assert(m_lock.is_locked());
+ assert(m_pending_ops > 0);
+ if (--m_pending_ops == 0) {
+ m_pending_ops_cond.Signal();
+ }
+}
+
+void JournalTrimmer::wait_for_ops() {
+ Mutex::Locker locker(m_lock);
+ while (m_pending_ops > 0) {
+ m_pending_ops_cond.Wait(m_lock);
+ }
+}
+
+void JournalTrimmer::trim_objects(uint64_t minimum_set) {
+ assert(m_lock.is_locked());
+
+ ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl;
+ if (minimum_set <= m_journal_metadata->get_minimum_set()) {
+ return;
+ }
+
+ if (m_remove_set_pending) {
+ m_remove_set = MAX(m_remove_set, minimum_set);
+ return;
+ }
+
+ m_remove_set = minimum_set;
+ m_remove_set_pending = true;
+ remove_set(m_journal_metadata->get_minimum_set());
+}
+
+void JournalTrimmer::remove_set(uint64_t object_set) {
+ assert(m_lock.is_locked());
+
+ start_op();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width);
+
+ ldout(m_cct, 20) << __func__ << ": removing object set " << object_set
+ << dendl;
+ for (uint64_t object_number = object_set * splay_width;
+ object_number < (object_set + 1) * splay_width;
+ ++object_number) {
+ std::string oid = utils::get_object_name(m_object_oid_prefix,
+ object_number);
+
+ ldout(m_cct, 20) << "removing journal object " << oid << dendl;
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_remove(oid, comp);
+ assert(r == 0);
+ comp->release();
+ }
+ ctx->complete(-ENOENT);
+}
+
+void JournalTrimmer::handle_commit_position_safe(
+ int r, const ObjectSetPosition &object_set_position) {
+ ldout(m_cct, 20) << __func__ << ": r=" << r << ", pos="
+ << object_set_position << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (r == 0) {
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t object_set = object_set_position.object_number / splay_width;
+
+ JournalMetadata::RegisteredClients registered_clients;
+ m_journal_metadata->get_registered_clients(®istered_clients);
+
+ bool trim_permitted = true;
+ for (JournalMetadata::RegisteredClients::iterator it =
+ registered_clients.begin();
+ it != registered_clients.end(); ++it) {
+ const JournalMetadata::Client &client = *it;
+ uint64_t client_object_set = client.commit_position.object_number /
+ splay_width;
+ if (client.id != m_journal_metadata->get_client_id() &&
+ client_object_set < object_set) {
+ ldout(m_cct, 20) << "object set " << client_object_set << " still "
+ << "in-use by client " << client.id << dendl;
+ trim_permitted = false;
+ break;
+ }
+ }
+
+ if (trim_permitted) {
+ trim_objects(object_set_position.object_number / splay_width);
+ }
+ }
+ finish_op();
+}
+
+void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
+ ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", "
+ << "trim=" << m_remove_set << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_remove_set_pending = false;
+
+ if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) {
+ // advance the minimum set to the next set
+ m_journal_metadata->set_minimum_set(object_set + 1);
+ uint64_t minimum_set = m_journal_metadata->get_minimum_set();
+
+ if (m_remove_set > minimum_set) {
+ m_remove_set_pending = true;
+ remove_set(minimum_set);
+ }
+ } else if (r == -ENOENT) {
+ // no objects within the set existed
+ r = 0;
+ }
+
+ if (m_remove_set_ctx != NULL && !m_remove_set_pending) {
+ ldout(m_cct, 20) << "completing remove set context" << dendl;
+ m_remove_set_ctx->complete(r);
+ }
+ finish_op();
+}
+
+JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
+ uint64_t _object_set,
+ uint8_t _splay_width)
+ : journal_trimmer(_journal_trimmer), object_set(_object_set),
+ lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
+ refs(_splay_width + 1), return_value(-ENOENT) {
+}
+
+void JournalTrimmer::C_RemoveSet::complete(int r) {
+ lock.Lock();
+ if (r < 0 && r != -ENOENT && return_value == -ENOENT) {
+ return_value = r;
+ } else if (r == 0 && return_value == -ENOENT) {
+ return_value = 0;
+ }
+
+ if (--refs == 0) {
+ finish(return_value);
+ lock.Unlock();
+ delete this;
+ } else {
+ lock.Unlock();
+ }
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_TRIMMER_H
+#define CEPH_JOURNAL_JOURNAL_TRIMMER_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "include/Context.h"
+#include "common/Mutex.h"
+#include "journal/JournalMetadata.h"
+#include "cls/journal/cls_journal_types.h"
+
+namespace journal {
+
+class JournalTrimmer {
+public:
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+ JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata);
+ ~JournalTrimmer();
+
+ int remove_objects();
+ void update_commit_position(const ObjectSetPosition &object_set_position);
+
+private:
+ struct C_CommitPositionSafe : public Context {
+ JournalTrimmer *journal_trimmer;
+ ObjectSetPosition object_set_position;
+
+ C_CommitPositionSafe(JournalTrimmer *_journal_trimmer,
+ const ObjectSetPosition &_object_set_position)
+ : journal_trimmer(_journal_trimmer),
+ object_set_position(_object_set_position) {}
+
+ virtual void complete(int r) {
+ finish(r);
+ }
+ virtual void finish(int r) {
+ journal_trimmer->handle_commit_position_safe(r, object_set_position);
+ }
+ };
+ struct C_RemoveSet : public Context {
+ JournalTrimmer *journal_trimmer;
+ uint64_t object_set;
+ Mutex lock;
+ uint32_t refs;
+ int return_value;
+
+ C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set,
+ uint8_t _splay_width);
+ virtual void complete(int r);
+ virtual void finish(int r) {
+ journal_trimmer->handle_set_removed(r, object_set);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+
+ JournalMetadataPtr m_journal_metadata;
+
+ Mutex m_lock;
+
+ size_t m_pending_ops;
+ Cond m_pending_ops_cond;
+
+ bool m_remove_set_pending;
+ uint64_t m_remove_set;
+ Context *m_remove_set_ctx;
+
+ void start_op();
+ void finish_op();
+ void wait_for_ops();
+
+ void trim_objects(uint64_t minimum_set);
+ void remove_set(uint64_t object_set);
+
+ void handle_commit_position_safe(int r, const ObjectSetPosition &position);
+
+ void handle_set_removed(int r, uint64_t object_set);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_TRIMMER_H
#include "journal/JournalMetadata.h"
#include "journal/JournalPlayer.h"
#include "journal/JournalRecorder.h"
+#include "journal/JournalTrimmer.h"
#include "journal/PayloadImpl.h"
#include "journal/ReplayHandler.h"
#include "cls/journal/cls_journal_client.h"
Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
uint64_t journal_id, const std::string &client_id)
- : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL)
+ : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
+ m_trimmer(NULL)
{
m_header_ioctx.dup(header_ioctx);
m_data_ioctx.dup(data_ioctx);
m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id);
m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + ".";
- m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id);
+ // TODO configurable commit interval
+ m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
+ 5);
m_metadata->get();
+
+ m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, m_metadata);
}
Journaler::~Journaler() {
m_metadata->put();
m_metadata = NULL;
}
+ delete m_trimmer;
assert(m_player == NULL);
assert(m_recorder == NULL);
}
m_player = NULL;
}
+void Journaler::update_commit_position(const Payload &payload) {
+ PayloadImplPtr payload_impl = payload.get_payload_impl();
+ m_trimmer->update_commit_position(payload_impl->get_object_set_position());
+}
+
void Journaler::start_append() {
assert(m_recorder == NULL);
class JournalMetadata;
class JournalPlayer;
class JournalRecorder;
+class JournalTrimmer;
class ReplayHandler;
class Journaler {
bool try_pop_front(Payload *payload);
void stop_replay();
+ void update_commit_position(const Payload &payload);
+
void start_append();
Future append(const std::string &tag, const bufferlist &bl);
void stop_append();
JournalMetadata *m_metadata;
JournalPlayer *m_player;
JournalRecorder *m_recorder;
+ JournalTrimmer *m_trimmer;
void create_player(ReplayHandler *replay_handler);
};
journal/JournalMetadata.cc \
journal/JournalPlayer.cc \
journal/JournalRecorder.cc \
+ journal/JournalTrimmer.cc \
journal/ObjectPlayer.cc \
journal/ObjectRecorder.cc \
journal/Payload.cc \
journal/JournalMetadata.h \
journal/JournalPlayer.h \
journal/JournalRecorder.h \
+ journal/JournalTrimmer.h \
journal/ObjectPlayer.h \
journal/ObjectRecorder.h \
journal/Payload.h \