From: Jason Dillaman Date: Tue, 9 Jun 2015 20:38:43 +0000 (-0400) Subject: journal: new generic journal implementation X-Git-Tag: v10.0.1~102^2~35 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=debe17224f70b26abac2b542aad059c83c721739;p=ceph.git journal: new generic journal implementation The initial use-case for this journal is for librbd and its mirroring feature. This journal is different from the current MDS journal in that it organizes journal entries across journal objects w/o striping individual entries. It also allows multiple clients to read and write from a single journal concurrently. Signed-off-by: Jason Dillaman --- diff --git a/src/Makefile.am b/src/Makefile.am index 3bb6c19daf7..9e31b041bca 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -31,6 +31,7 @@ include messages/Makefile.am include include/Makefile.am include librados/Makefile.am include libradosstriper/Makefile.am +include journal/Makefile.am include librbd/Makefile.am include rgw/Makefile.am include cls/Makefile.am diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc new file mode 100644 index 00000000000..bd26ebe1ee7 --- /dev/null +++ b/src/journal/Entry.cc @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Entry.h" +#include "include/encoding.h" +#include "include/stringify.h" +#include "common/Formatter.h" +#include + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "Entry: " + +namespace journal { + +namespace { + +const uint32_t HEADER_FIXED_SIZE = 17; /// preamble, version, tid + +} // anonymous namespace + +void Entry::encode(bufferlist &bl) const { + bufferlist data_bl; + ::encode(preamble, data_bl); + ::encode(static_cast(1), data_bl); + ::encode(m_tid, data_bl); + assert(HEADER_FIXED_SIZE == data_bl.length()); + + ::encode(m_tag, data_bl); + ::encode(m_data, data_bl); + + uint32_t crc = data_bl.crc32c(0); + bl.claim_append(data_bl); + ::encode(crc, bl); +} + +void Entry::decode(bufferlist::iterator &iter) { + uint32_t start_offset = iter.get_off(); + uint64_t bl_preamble; + ::decode(bl_preamble, iter); + if (bl_preamble != preamble) { + throw buffer::malformed_input("incorrect preamble: " + + stringify(bl_preamble)); + } + + uint8_t version; + ::decode(version, iter); + if (version != 1) { + throw buffer::malformed_input("unknown version: " + stringify(version)); + } + + ::decode(m_tid, iter); + ::decode(m_tag, iter); + ::decode(m_data, iter); + uint32_t end_offset = iter.get_off(); + + uint32_t crc; + ::decode(crc, iter); + + bufferlist data_bl; + data_bl.substr_of(iter.get_bl(), start_offset, end_offset - start_offset); + uint32_t actual_crc = data_bl.crc32c(0); + if (crc != actual_crc) { + throw buffer::malformed_input("crc mismatch: " + stringify(crc) + + " != " + stringify(actual_crc)); + } +} + +void Entry::dump(Formatter *f) const { + f->dump_string("tag", m_tag); + f->dump_unsigned("tid", m_tid); + + std::stringstream data; + m_data.hexdump(data); + f->dump_string("data", data.str()); +} + +bool Entry::is_readable(bufferlist::iterator iter, uint32_t *bytes_needed) { + uint32_t start_off = iter.get_off(); + if (iter.get_remaining() < HEADER_FIXED_SIZE) { + *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining(); + return false; + } + uint64_t bl_preamble; + ::decode(bl_preamble, iter); + if (bl_preamble != preamble) { + *bytes_needed = 0; + return false; + } + iter.advance(HEADER_FIXED_SIZE - sizeof(bl_preamble)); + + if (iter.get_remaining() < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - iter.get_remaining(); + return false; + } + uint32_t tag_size; + ::decode(tag_size, iter); + + if (iter.get_remaining() < tag_size) { + *bytes_needed = tag_size - iter.get_remaining(); + return false; + } + iter.advance(tag_size); + + if (iter.get_remaining() < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - iter.get_remaining(); + return false; + } + uint32_t data_size; + ::decode(data_size, iter); + + if (iter.get_remaining() < data_size) { + *bytes_needed = data_size - iter.get_remaining(); + return false; + } + iter.advance(data_size); + uint32_t end_off = iter.get_off(); + + if (iter.get_remaining() < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - iter.get_remaining(); + return false; + } + + bufferlist crc_bl; + crc_bl.substr_of(iter.get_bl(), start_off, end_off - start_off); + + *bytes_needed = 0; + uint32_t crc; + ::decode(crc, iter); + if (crc != crc_bl.crc32c(0)) { + return false; + } + return true; +} + +void Entry::generate_test_instances(std::list &o) { + o.push_back(new Entry("tag1", 123, bufferlist())); + + bufferlist bl; + bl.append("data"); + o.push_back(new Entry("tag2", 123, bl)); +} + +bool Entry::operator==(const Entry& rhs) const { + return (m_tag == rhs.m_tag && m_tid == rhs.m_tid && + const_cast(m_data).contents_equal( + const_cast(rhs.m_data))); +} + +std::ostream &operator<<(std::ostream &os, const Entry &entry) { + os << "Entry[tag=" << entry.get_tag() << ", tid=" << entry.get_tid() << ", " + << "data size=" << entry.get_data().length() << "]"; + return os; +} + +} // namespace journal diff --git a/src/journal/Entry.h b/src/journal/Entry.h new file mode 100644 index 00000000000..9e85df4c7dd --- /dev/null +++ b/src/journal/Entry.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_ENTRY_H +#define CEPH_JOURNAL_ENTRY_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include +#include + +namespace ceph { +class Formatter; +} + +namespace journal { + +class Entry { +public: + Entry() : m_tid() {} + Entry(const std::string &tag, uint64_t tid, const bufferlist &data) + : m_tag(tag), m_tid(tid), m_data(data) + { + } + + inline const std::string &get_tag() const { + return m_tag; + } + inline uint64_t get_tid() const { + return m_tid; + } + inline const bufferlist &get_data() const { + return m_data; + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &iter); + void dump(ceph::Formatter *f) const; + + bool operator==(const Entry& rhs) const; + + static bool is_readable(bufferlist::iterator iter, uint32_t *bytes_needed); + static void generate_test_instances(std::list &o); + +private: + static const uint64_t preamble = 0x3141592653589793; + + std::string m_tag; + uint64_t m_tid; + bufferlist m_data; +}; + +std::ostream &operator<<(std::ostream &os, const Entry &entry); + +} // namespace journal + +using journal::operator<<; + +WRITE_CLASS_ENCODER(journal::Entry) + +#endif // CEPH_JOURNAL_ENTRY_H diff --git a/src/journal/Future.cc b/src/journal/Future.cc new file mode 100644 index 00000000000..5b051065127 --- /dev/null +++ b/src/journal/Future.cc @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Future.h" +#include "journal/FutureImpl.h" + +namespace journal { + +void Future::flush(Context *on_safe) { + m_future_impl->flush(on_safe); +} + +void Future::wait(Context *on_safe) { + m_future_impl->wait(on_safe); +} + +bool Future::is_complete() const { + return m_future_impl->is_complete(); +} + +int Future::get_return_value() const { + return m_future_impl->get_return_value(); +} + +void intrusive_ptr_add_ref(FutureImpl *p) { + p->get(); +} + +void intrusive_ptr_release(FutureImpl *p) { + p->put(); +} + +std::ostream &operator<<(std::ostream &os, const Future &future) { + return os << *future.m_future_impl.get(); +} + +} // namespace journal + diff --git a/src/journal/Future.h b/src/journal/Future.h new file mode 100644 index 00000000000..3ded9f6d901 --- /dev/null +++ b/src/journal/Future.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_FUTURE_H +#define CEPH_JOURNAL_FUTURE_H + +#include "include/int_types.h" +#include +#include +#include +#include "include/assert.h" + +class Context; + +namespace journal { + +class FutureImpl; + +class Future { +public: + typedef boost::intrusive_ptr FutureImplPtr; + + Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {} + + void flush(Context *on_safe); + void wait(Context *on_safe); + + bool is_complete() const; + int get_return_value() const; + +private: + friend std::ostream& operator<<(std::ostream&, const Future&); + + FutureImplPtr m_future_impl; +}; + +void intrusive_ptr_add_ref(FutureImpl *p); +void intrusive_ptr_release(FutureImpl *p); + +std::ostream &operator<<(std::ostream &os, const Future &future); + +} // namespace journal + +using journal::intrusive_ptr_add_ref; +using journal::intrusive_ptr_release; +using journal::operator<<; + +#endif // CEPH_JOURNAL_FUTURE_H diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc new file mode 100644 index 00000000000..3ca8688dfab --- /dev/null +++ b/src/journal/FutureImpl.cc @@ -0,0 +1,143 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/FutureImpl.h" +#include "journal/Utils.h" + +namespace journal { + +FutureImpl::FutureImpl(const std::string &tag, uint64_t tid) + : RefCountedObject(NULL, 0), m_tag(tag), m_tid(tid), + m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false), + m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE), + m_consistent_ack(this) { +} + +void FutureImpl::init(const FutureImplPtr &prev_future) { + // chain ourself to the prior future (if any) to that we known when the + // journal is consistent + if (prev_future) { + m_prev_future = prev_future; + m_prev_future->wait(&m_consistent_ack); + } else { + m_consistent_ack.complete(0); + } +} + +void FutureImpl::flush(Context *on_safe) { + bool complete; + FlushHandlerPtr flush_handler; + { + Mutex::Locker locker(m_lock); + complete = (m_safe && m_consistent); + if (!complete) { + if (on_safe != NULL) { + m_contexts.push_back(on_safe); + } + + if (m_flush_state == FLUSH_STATE_NONE) { + m_flush_state = FLUSH_STATE_REQUESTED; + flush_handler = m_flush_handler; + + // walk the chain backwards up to futures + if (m_prev_future) { + m_prev_future->flush(); + } + } + } + } + + if (complete && on_safe != NULL) { + on_safe->complete(m_return_value); + } else if (flush_handler) { + // attached to journal object -- instruct it to flush all entries through + // this one. possible to become detached while lock is released, so flush + // will be re-requested by the object if it doesn't own the future + flush_handler->flush(this); + } +} + +void FutureImpl::wait(Context *on_safe) { + assert(on_safe != NULL); + { + Mutex::Locker locker(m_lock); + if (!m_safe || !m_consistent) { + m_contexts.push_back(on_safe); + return; + } + } + on_safe->complete(m_return_value); +} + +bool FutureImpl::is_complete() const { + Mutex::Locker locker(m_lock); + return m_safe && m_consistent; +} + +int FutureImpl::get_return_value() const { + Mutex::Locker locker(m_lock); + assert(m_safe && m_consistent); + return m_return_value; +} + +bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) { + Mutex::Locker locker(m_lock); + assert(!m_flush_handler); + m_flush_handler = flush_handler; + return m_flush_state != FLUSH_STATE_NONE; +} + +void FutureImpl::safe(int r) { + Mutex::Locker locker(m_lock); + assert(!m_safe); + m_safe = true; + if (m_return_value == 0) { + m_return_value = r; + } + + m_flush_handler.reset(); + if (m_consistent) { + finish(); + } +} + +void FutureImpl::consistent(int r) { + Mutex::Locker locker(m_lock); + assert(!m_consistent); + m_consistent = true; + m_prev_future.reset(); + if (m_return_value == 0) { + m_return_value = r; + } + + if (m_safe) { + finish(); + } +} + +void FutureImpl::finish() { + assert(m_lock.is_locked()); + assert(m_safe && m_consistent); + + Contexts contexts; + contexts.swap(m_contexts); + for (Contexts::iterator it = contexts.begin(); + it != contexts.end(); ++it) { + (*it)->complete(m_return_value); + } +} + +std::ostream &operator<<(std::ostream &os, const FutureImpl &future) { + os << "Future[tag=" << future.m_tag << ", tid=" << future.m_tid << "]"; + return os; +} + +void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) { + p->get(); +} + +void intrusive_ptr_release(FutureImpl::FlushHandler *p) { + p->put(); +} + +} // namespace journal diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h new file mode 100644 index 00000000000..eaf5febfcc0 --- /dev/null +++ b/src/journal/FutureImpl.h @@ -0,0 +1,121 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_FUTURE_IMPL_H +#define CEPH_JOURNAL_FUTURE_IMPL_H + +#include "include/int_types.h" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "journal/Future.h" +#include +#include +#include +#include "include/assert.h" + +class Context; + +namespace journal { + +class FutureImpl; +typedef boost::intrusive_ptr FutureImplPtr; + +class FutureImpl : public RefCountedObject, boost::noncopyable { +public: + struct FlushHandler { + virtual ~FlushHandler() {} + virtual void flush(const FutureImplPtr &future) = 0; + virtual void get() = 0; + virtual void put() = 0; + }; + typedef boost::intrusive_ptr FlushHandlerPtr; + + FutureImpl(const std::string &tag, uint64_t tid); + + void init(const FutureImplPtr &prev_future); + + inline const std::string &get_tag() const { + return m_tag; + } + inline uint64_t get_tid() const { + return m_tid; + } + + void flush(Context *on_safe = NULL); + void wait(Context *on_safe); + + bool is_complete() const; + int get_return_value() const; + + inline bool is_flush_in_progress() const { + Mutex::Locker locker(m_lock); + return (m_flush_state == FLUSH_STATE_IN_PROGRESS); + } + inline void set_flush_in_progress() { + Mutex::Locker locker(m_lock); + m_flush_state = FLUSH_STATE_IN_PROGRESS; + } + + bool attach(const FlushHandlerPtr &flush_handler); + inline void detach() { + Mutex::Locker locker(m_lock); + assert(m_flush_handler); + m_flush_handler.reset(); + } + inline FlushHandlerPtr get_flush_handler() const { + Mutex::Locker locker(m_lock); + return m_flush_handler; + } + + void safe(int r); + +private: + friend std::ostream &operator<<(std::ostream &, const FutureImpl &); + + typedef std::list Contexts; + + enum FlushState { + FLUSH_STATE_NONE, + FLUSH_STATE_REQUESTED, + FLUSH_STATE_IN_PROGRESS + }; + + struct C_ConsistentAck : public Context { + FutureImplPtr future; + C_ConsistentAck(FutureImpl *_future) : future(_future) {} + virtual void complete(int r) { + future->consistent(r); + future.reset(); + } + virtual void finish(int r) {} + }; + + std::string m_tag; + uint64_t m_tid; + + mutable Mutex m_lock; + FutureImplPtr m_prev_future; + bool m_safe; + bool m_consistent; + int m_return_value; + + FlushHandlerPtr m_flush_handler; + FlushState m_flush_state; + + C_ConsistentAck m_consistent_ack; + Contexts m_contexts; + + void consistent(int r); + void finish(); +}; + +void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p); +void intrusive_ptr_release(FutureImpl::FlushHandler *p); + +std::ostream &operator<<(std::ostream &os, const FutureImpl &future); + +} // namespace journal + +using journal::operator<<; + +#endif // CEPH_JOURNAL_FUTURE_IMPL_H diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc new file mode 100644 index 00000000000..35cf21788f3 --- /dev/null +++ b/src/journal/JournalMetadata.cc @@ -0,0 +1,300 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalMetadata.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "cls/journal/cls_journal_client.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalMetadata: " + +namespace journal { + +using namespace cls::journal; + +namespace { + +struct C_NotifyUpdate : public Context { + JournalMetadataPtr journal_metadata; + + C_NotifyUpdate(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) {} + + virtual void finish(int r) { + if (r == 0) { + journal_metadata->notify_update(); + } + } +}; + +void rados_ctx_callback(rados_completion_t c, void *arg) { + Context *comp = reinterpret_cast(arg); + comp->complete(rados_aio_get_return_value(c)); +} + +} // anonymous namespace + +JournalMetadata::JournalMetadata(librados::IoCtx &ioctx, + const std::string &oid, + const std::string &client_id) + : m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0), + m_splay_width(0), m_initialized(false), m_timer(NULL), + m_timer_lock("JournalMetadata::m_timer_lock"), + m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0), + m_update_notifications(0) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); +} + +JournalMetadata::~JournalMetadata() { + if (m_timer != NULL) { + Mutex::Locker locker(m_timer_lock); + m_timer->shutdown(); + delete m_timer; + m_timer = NULL; + } + + m_ioctx.unwatch2(m_watch_handle); + librados::Rados rados(m_ioctx); + rados.watch_flush(); +} + +int JournalMetadata::init() { + assert(!m_initialized); + m_initialized = true; + + int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order, + &m_splay_width); + if (r < 0) { + lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: " + << cpp_strerror(r) << dendl; + return r; + } + + m_timer = new SafeTimer(m_cct, m_timer_lock, false); + m_timer->init(); + + r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); + if (r < 0) { + lderr(m_cct) << __func__ << ": failed to watch journal" + << cpp_strerror(r) << dendl; + return r; + } + + C_SaferCond cond; + refresh(&cond); + r = cond.wait(); + if (r < 0) { + return r; + } + return 0; +} + +int JournalMetadata::register_client(const std::string &description) { + assert(!m_client_id.empty()); + + int r = client::client_register(m_ioctx, m_oid, m_client_id, description); + if (r < 0) { + lderr(m_cct) << "failed to register journal client '" << m_client_id + << "': " << cpp_strerror(r) << dendl; + return r; + } + + notify_update(); + return 0; +} + +int JournalMetadata::unregister_client() { + assert(!m_client_id.empty()); + + int r = client::client_unregister(m_ioctx, m_oid, m_client_id); + if (r < 0) { + lderr(m_cct) << "failed to unregister journal client '" << m_client_id + << "': " << cpp_strerror(r) << dendl; + return r; + } + + notify_update(); + return 0; +} + +void JournalMetadata::add_listener(Listener *listener) { + Mutex::Locker locker(m_lock); + while (m_update_notifications > 0) { + m_update_cond.Wait(m_lock); + } + m_listeners.push_back(listener); +} + +void JournalMetadata::remove_listener(Listener *listener) { + Mutex::Locker locker(m_lock); + while (m_update_notifications > 0) { + m_update_cond.Wait(m_lock); + } + m_listeners.remove(listener); +} + +void JournalMetadata::set_minimum_set(uint64_t object_set) { + Mutex::Locker locker(m_lock); + if (m_minimum_set >= object_set) { + return; + } + + librados::ObjectWriteOperation op; + client::set_minimum_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); + + m_minimum_set = object_set; +} + +void JournalMetadata::set_active_set(uint64_t object_set) { + Mutex::Locker locker(m_lock); + if (m_active_set >= object_set) { + return; + } + + librados::ObjectWriteOperation op; + client::set_active_set(&op, object_set); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); + + m_active_set = object_set; +} + +void JournalMetadata::set_commit_position( + const ObjectSetPosition &commit_position) { + Mutex::Locker locker(m_lock); + + librados::ObjectWriteOperation op; + client::client_commit(&op, m_client_id, commit_position); + + C_NotifyUpdate *ctx = new C_NotifyUpdate(this); + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); +} + +void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) { + Mutex::Locker locker(m_lock); + uint64_t &allocated_tid = m_allocated_tids[tag]; + if (allocated_tid <= tid) { + allocated_tid = tid + 1; + } +} + +bool JournalMetadata::get_last_allocated_tid(const std::string &tag, + uint64_t *tid) const { + Mutex::Locker locker(m_lock); + + AllocatedTids::const_iterator it = m_allocated_tids.find(tag); + if (it == m_allocated_tids.end()) { + return false; + } + + assert(it->second > 0); + *tid = it->second - 1; + return true; +} + +void JournalMetadata::refresh(Context *on_complete) { + ldout(m_cct, 10) << "refreshing journal metadata" << dendl; + C_Refresh *refresh = new C_Refresh(this, on_complete); + client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set, + &refresh->active_set, + &refresh->registered_clients, refresh); +} + +void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { + ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl; + if (r == 0) { + Mutex::Locker locker(m_lock); + + Client client(m_client_id, ""); + RegisteredClients::iterator it = refresh->registered_clients.find(client); + if (it != refresh->registered_clients.end()) { + m_minimum_set = refresh->minimum_set; + m_active_set = refresh->active_set; + m_registered_clients = refresh->registered_clients; + m_client = *it; + + ++m_update_notifications; + m_lock.Unlock(); + for (Listeners::iterator it = m_listeners.begin(); + it != m_listeners.end(); ++it) { + (*it)->handle_update(this); + } + m_lock.Lock(); + if (--m_update_notifications == 0) { + m_update_cond.Signal(); + } + } else { + lderr(m_cct) << "failed to locate client: " << m_client_id << dendl; + r = -ENOENT; + } + } + + if (refresh->on_finish != NULL) { + refresh->on_finish->complete(r); + } +} + +void JournalMetadata::schedule_watch_reset() { + Mutex::Locker locker(m_timer_lock); + m_timer->add_event_after(0.1, new C_WatchReset(this)); +} + +void JournalMetadata::handle_watch_reset() { + int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); + if (r < 0) { + lderr(m_cct) << __func__ << ": failed to watch journal" + << cpp_strerror(r) << dendl; + schedule_watch_reset(); + } else { + ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl; + refresh(NULL); + } +} + +void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { + ldout(m_cct, 10) << "journal header updated" << dendl; + + bufferlist bl; + m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); + + refresh(NULL); +} + +void JournalMetadata::handle_watch_error(int err) { + lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl; + schedule_watch_reset(); +} + +void JournalMetadata::notify_update() { + ldout(m_cct, 10) << "notifying journal header update" << dendl; + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(NULL, NULL, NULL); + + bufferlist bl; + int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL); + assert(r == 0); + + comp->release(); +} + +} // namespace journal diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h new file mode 100644 index 00000000000..2bab6870bbc --- /dev/null +++ b/src/journal/JournalMetadata.h @@ -0,0 +1,179 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H +#define CEPH_JOURNAL_JOURNAL_METADATA_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "cls/journal/cls_journal_types.h" +#include +#include +#include +#include +#include +#include "include/assert.h" + +class SafeTimer; + +namespace journal { + +class JournalMetadata; +typedef boost::intrusive_ptr JournalMetadataPtr; + +class JournalMetadata : public RefCountedObject, boost::noncopyable { +public: + typedef cls::journal::EntryPosition EntryPosition; + typedef cls::journal::EntryPositions EntryPositions; + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + typedef cls::journal::Client Client; + + typedef std::set RegisteredClients; + + struct Listener { + virtual ~Listener() {}; + virtual void handle_update(JournalMetadata *) = 0; + }; + + JournalMetadata(librados::IoCtx &ioctx, const std::string &oid, + const std::string &client_id); + ~JournalMetadata(); + + int init(); + + void add_listener(Listener *listener); + void remove_listener(Listener *listener); + + int register_client(const std::string &description); + int unregister_client(); + + inline uint8_t get_order() const { + return m_order; + } + inline uint8_t get_splay_width() const { + return m_splay_width; + } + + inline SafeTimer &get_timer() { + return *m_timer; + } + inline Mutex &get_timer_lock() { + return m_timer_lock; + } + + void set_minimum_set(uint64_t object_set); + inline uint64_t get_minimum_set() const { + Mutex::Locker locker(m_lock); + return m_minimum_set; + } + + void set_active_set(uint64_t object_set); + inline uint64_t get_active_set() const { + Mutex::Locker locker(m_lock); + return m_active_set; + } + + void set_commit_position(const ObjectSetPosition &commit_position); + void get_commit_position(ObjectSetPosition *commit_position) const { + Mutex::Locker locker(m_lock); + *commit_position = m_client.commit_position; + } + + inline uint64_t allocate_tid(const std::string &tag) { + Mutex::Locker locker(m_lock); + return m_allocated_tids[tag]++; + } + void reserve_tid(const std::string &tag, uint64_t tid); + bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const; + + void notify_update(); + +private: + typedef std::map AllocatedTids; + typedef std::list Listeners; + + struct C_WatchCtx : public librados::WatchCtx2 { + JournalMetadata *journal_metadata; + + C_WatchCtx(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) {} + + virtual void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) { + journal_metadata->handle_watch_notify(notify_id, cookie); + } + virtual void handle_error(uint64_t cookie, int err) { + journal_metadata->handle_watch_error(err); + } + }; + struct C_WatchReset : public Context { + JournalMetadataPtr journal_metadata; + + C_WatchReset(JournalMetadata *_journal_metadata) + : journal_metadata(_journal_metadata) {} + + virtual void finish(int r) { + journal_metadata->handle_watch_reset(); + } + }; + + struct C_Refresh : public Context { + JournalMetadataPtr journal_metadata; + uint64_t minimum_set; + uint64_t active_set; + RegisteredClients registered_clients; + Context *on_finish; + + C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish) + : journal_metadata(_journal_metadata), minimum_set(0), active_set(0), + on_finish(_on_finish) {} + + virtual void finish(int r) { + journal_metadata->handle_refresh_complete(this, r); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_oid; + std::string m_client_id; + + uint8_t m_order; + uint8_t m_splay_width; + bool m_initialized; + + SafeTimer *m_timer; + Mutex m_timer_lock; + + mutable Mutex m_lock; + + Listeners m_listeners; + + C_WatchCtx m_watch_ctx; + uint64_t m_watch_handle; + + uint64_t m_minimum_set; + uint64_t m_active_set; + RegisteredClients m_registered_clients; + Client m_client; + + AllocatedTids m_allocated_tids; + + size_t m_update_notifications; + Cond m_update_cond; + + void refresh(Context *on_finish); + void handle_refresh_complete(C_Refresh *refresh, int r); + + void schedule_watch_reset(); + void handle_watch_reset(); + void handle_watch_notify(uint64_t notify_id, uint64_t cookie); + void handle_watch_error(int err); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_METADATA_H diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc new file mode 100644 index 00000000000..ed70519ab63 --- /dev/null +++ b/src/journal/JournalPlayer.cc @@ -0,0 +1,358 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalPlayer.h" +#include "journal/ReplayHandler.h" +#include "journal/Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalPlayer: " + +namespace journal { + +JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + const JournalMetadataPtr& journal_metadata, + ReplayHandler *replay_handler) + : RefCountedObject(NULL, 0), m_cct(NULL), + m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(journal_metadata), m_replay_handler(replay_handler), + m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), + m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false), + m_watch_interval(0), m_commit_object(0) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); + + ObjectSetPosition commit_position; + m_journal_metadata->get_commit_position(&commit_position); + if (!commit_position.entry_positions.empty()) { + uint8_t splay_width = m_journal_metadata->get_splay_width(); + m_splay_offset = commit_position.object_number % splay_width; + m_commit_object = commit_position.object_number; + m_commit_tag = commit_position.entry_positions.front().tag; + + for (size_t i=0; iget_splay_width(); + ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " " + << "objects" << dendl; + + // prefetch starting from the last known commit set + C_PrefetchBatch *ctx = new C_PrefetchBatch(this); + uint64_t start_object = (m_commit_object / splay_width) * splay_width; + for (uint64_t object_number = start_object; + object_number < start_object + (2 * splay_width); ++object_number) { + ctx->add_fetch(); + fetch(object_number, ctx); + } + m_lock.Unlock(); + + ctx->complete(0); +} + +void JournalPlayer::prefetch_and_watch(double interval) { + { + Mutex::Locker locker(m_lock); + m_watch_enabled = true; + m_watch_interval = interval; + } + prefetch(); +} + +void JournalPlayer::unwatch() { + Mutex::Locker locker(m_lock); + if (m_watch_scheduled) { + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player); + + object_player->unwatch(); + m_watch_scheduled = false; + } +} + +bool JournalPlayer::try_pop_front(Entry *entry, + ObjectSetPosition *object_set_position) { + Mutex::Locker locker(m_lock); + if (m_state != STATE_PLAYBACK) { + return false; + } + + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player); + + if (object_player->empty()) { + if (m_watch_enabled && !m_watch_scheduled) { + object_player->watch(&m_process_state, m_watch_interval); + m_watch_scheduled = true; + } + return false; + } + + object_player->front(entry); + object_player->pop_front(); + + uint64_t last_tid; + if (m_journal_metadata->get_last_allocated_tid(entry->get_tag(), &last_tid) && + entry->get_tid() != last_tid + 1) { + lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; + + m_state = STATE_ERROR; + m_lock.Unlock(); + m_replay_handler->handle_error(-EINVAL); + m_lock.Lock(); + return false; + } + + // skip to next splay offset if we cannot apply the next entry in-sequence + if (!object_player->empty()) { + Entry peek_entry; + object_player->front(&peek_entry); + if (peek_entry.get_tag() == entry->get_tag() || + (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(), + &last_tid) && + last_tid +1 != peek_entry.get_tid())) { + advance_splay_object(); + } + } else { + advance_splay_object(); + + ObjectPlayerPtr next_set_object_player = get_next_set_object_player(); + if (!next_set_object_player->empty()) { + remove_object_player(object_player, &m_process_state); + } + } + + // TODO populate the object_set_position w/ current object number and + // unique entry tag mappings + m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid()); + return true; +} + +void JournalPlayer::process_state(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + if (r == 0) { + Mutex::Locker locker(m_lock); + switch (m_state) { + case STATE_PREFETCH: + ldout(m_cct, 10) << "PREFETCH" << dendl; + r = process_prefetch(); + break; + case STATE_PLAYBACK: + ldout(m_cct, 10) << "PLAYBACK" << dendl; + r = process_playback(); + break; + case STATE_ERROR: + ldout(m_cct, 10) << "ERROR" << dendl; + break; + default: + lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl; + assert(false); + break; + } + } + + if (r < 0) { + { + Mutex::Locker locker(m_lock); + m_state = STATE_ERROR; + } + m_replay_handler->handle_error(r); + } +} + +int JournalPlayer::process_prefetch() { + ldout(m_cct, 10) << __func__ << dendl; + assert(m_lock.is_locked()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + assert(m_object_players.count(splay_offset) == 1); + + ObjectPlayers &object_players = m_object_players[splay_offset]; + assert(object_players.size() == 2); + + ObjectPlayerPtr object_player = object_players.begin()->second; + assert(!object_player->is_fetch_in_progress()); + + ldout(m_cct, 15) << "seeking known commit position in " + << object_player->get_oid() << dendl; + Entry entry; + while (!object_player->empty()) { + object_player->front(&entry); + if (entry.get_tid() > m_commit_tids[entry.get_tag()]) { + ldout(m_cct, 10) << "located next uncommitted entry: " << entry + << dendl; + break; + } + + ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl; + object_player->pop_front(); + } + + // if this object contains the commit position, our read should start with + // the next consistent journal entry in the sequence + if (object_player->get_object_number() == m_commit_object) { + if (object_player->empty()) { + advance_splay_object(); + } else { + Entry entry; + object_player->front(&entry); + if (entry.get_tag() == m_commit_tag) { + advance_splay_object(); + } + } + } + + ObjectPlayerPtr next_set_object_player = get_next_set_object_player(); + if (object_player->empty() && !next_set_object_player->empty()) { + ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl; + remove_object_player(object_player, &m_process_state); + } + } + + m_state = STATE_PLAYBACK; + ObjectPlayerPtr object_player = get_object_player(); + if (!object_player->empty()) { + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_lock.Unlock(); + m_replay_handler->handle_entries_available(); + m_lock.Lock(); + } else if (m_watch_enabled) { + object_player->watch(&m_process_state, m_watch_interval); + m_watch_scheduled = true; + } + return 0; +} + +int JournalPlayer::process_playback() { + ldout(m_cct, 10) << __func__ << dendl; + assert(m_lock.is_locked()); + + m_watch_scheduled = false; + + ObjectPlayerPtr object_player = get_object_player(); + if (!object_player->empty()) { + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_lock.Unlock(); + m_replay_handler->handle_entries_available(); + m_lock.Lock(); + } + return 0; +} + +const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const { + assert(m_lock.is_locked()); + + assert(m_object_players.count(m_splay_offset) == 1); + SplayedObjectPlayers::const_iterator it = m_object_players.find( + m_splay_offset); + assert(it != m_object_players.end()); + + const ObjectPlayers &object_players = it->second; + assert(object_players.size() == 2); + return object_players; +} + +ObjectPlayerPtr JournalPlayer::get_object_player() const { + assert(m_lock.is_locked()); + + const ObjectPlayers &object_players = get_object_players(); + return object_players.begin()->second; +} + +ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const { + assert(m_lock.is_locked()); + + const ObjectPlayers &object_players = get_object_players(); + return object_players.rbegin()->second; +} + +void JournalPlayer::advance_splay_object() { + assert(m_lock.is_locked()); + ++m_splay_offset; + m_splay_offset %= m_journal_metadata->get_splay_width(); + ldout(m_cct, 20) << __func__ << ": new offset " + << static_cast(m_splay_offset) << dendl; +} + +void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player, + Context *on_fetch) { + assert(m_lock.is_locked()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + ObjectPlayers &object_players = m_object_players[ + object_player->get_object_number() % splay_width]; + assert(!object_players.empty()); + assert(object_players.begin()->second == object_player); + object_players.erase(object_players.begin()); + + fetch(object_player->get_object_number() + (2 * splay_width), on_fetch); +} + +void JournalPlayer::fetch(uint64_t object_num, Context *ctx) { + assert(m_lock.is_locked()); + + std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); + + ldout(m_cct, 10) << __func__ << ": " << oid << dendl; + C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx); + ObjectPlayerPtr object_player(new ObjectPlayer( + m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(), + m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order())); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + m_object_players[object_num % splay_width][object_num] = object_player; + object_player->fetch(fetch_ctx); +} + +int JournalPlayer::handle_fetched(int r, uint64_t object_num) { + std::string oid = utils::get_object_name(m_object_oid_prefix, object_num); + + ldout(m_cct, 10) << __func__ << ": fetched " + << utils::get_object_name(m_object_oid_prefix, object_num) + << ": r=" << r << dendl; + if (r < 0 && r != -ENOENT) { + return r; + } + return 0; +} + +JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p) + : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1), + return_value(0) { + player->get(); +} + +void JournalPlayer::C_PrefetchBatch::add_fetch() { + Mutex::Locker locker(lock); + ++refs; +} + +void JournalPlayer::C_PrefetchBatch::complete(int r) { + { + Mutex::Locker locker(lock); + if (r < 0 && return_value == 0) { + return_value = r; + } + --refs; + } + + if (refs == 0) { + player->process_state(return_value); + player->put(); + delete this; + } +} + +} // namespace journal diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h new file mode 100644 index 00000000000..43d81f4c9e1 --- /dev/null +++ b/src/journal/JournalPlayer.h @@ -0,0 +1,127 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H +#define CEPH_JOURNAL_JOURNAL_PLAYER_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "journal/JournalMetadata.h" +#include "journal/ObjectPlayer.h" +#include "cls/journal/cls_journal_types.h" +#include + +class SafeTimer; + +namespace journal { + +class ReplayHandler; +class JournalPlayer; +typedef boost::intrusive_ptr JournalPlayerPtr; + +class JournalPlayer : public RefCountedObject { +public: + typedef cls::journal::EntryPosition EntryPosition; + typedef cls::journal::EntryPositions EntryPositions; + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + + JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + const JournalMetadataPtr& journal_metadata, + ReplayHandler *replay_handler); + + void prefetch(); + void prefetch_and_watch(double interval); + void unwatch(); + + bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position); + +private: + typedef std::map AllocatedTids; + typedef std::map ObjectPlayers; + typedef std::map SplayedObjectPlayers; + + enum State { + STATE_INIT, + STATE_PREFETCH, + STATE_PLAYBACK, + STATE_ERROR + }; + + struct C_ProcessState : public Context { + JournalPlayer *player; + C_ProcessState(JournalPlayer *p) : player(p) {} + virtual void complete(int r) { + player->process_state(r); + } + virtual void finish(int r) {} + }; + struct C_PrefetchBatch : public Context { + JournalPlayer *player; + Mutex lock; + uint32_t refs; + int return_value; + + C_PrefetchBatch(JournalPlayer *p); + void add_fetch(); + virtual void complete(int r); + virtual void finish(int r) {} + }; + struct C_Fetch : public Context { + JournalPlayer *player; + uint64_t object_num; + Context *on_fetch; + C_Fetch(JournalPlayer *p, uint64_t o, Context *c) + : player(p), object_num(o), on_fetch(c) { + player->get(); + } + virtual void finish(int r) { + r = player->handle_fetched(r, object_num); + on_fetch->complete(r); + player->put(); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_object_oid_prefix; + JournalMetadataPtr m_journal_metadata; + + ReplayHandler *m_replay_handler; + + C_ProcessState m_process_state; + + mutable Mutex m_lock; + State m_state; + uint8_t m_splay_offset; + + bool m_watch_enabled; + bool m_watch_scheduled; + double m_watch_interval; + + SplayedObjectPlayers m_object_players; + uint64_t m_commit_object; + std::string m_commit_tag; + AllocatedTids m_commit_tids; + + void advance_splay_object(); + + const ObjectPlayers &get_object_players() const; + ObjectPlayerPtr get_object_player() const; + ObjectPlayerPtr get_next_set_object_player() const; + void remove_object_player(const ObjectPlayerPtr &object_player, + Context *on_fetch); + + void process_state(int r); + int process_prefetch(); + int process_playback(); + + void fetch(uint64_t object_num, Context *ctx); + int handle_fetched(int r, uint64_t object_num); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc new file mode 100644 index 00000000000..9050775c63d --- /dev/null +++ b/src/journal/JournalRecorder.cc @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/JournalRecorder.h" +#include "journal/Entry.h" +#include "journal/Utils.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "JournalRecorder: " + +namespace journal { + +JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + const JournalMetadataPtr& journal_metadata, + uint32_t flush_interval, uint64_t flush_bytes, + double flush_age) + : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), + m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this), + m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"), + m_current_set(m_journal_metadata->get_active_set()) { + + Mutex::Locker locker(m_lock); + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + uint64_t object_number = splay_offset + (m_current_set * splay_width); + m_object_ptrs[splay_offset] = create_object_recorder(object_number); + } + + m_journal_metadata->add_listener(&m_listener); +} + +JournalRecorder::~JournalRecorder() { + m_journal_metadata->remove_listener(&m_listener); +} + +Future JournalRecorder::append(const std::string &tag, + const bufferlist &payload_bl) { + Mutex::Locker locker(m_lock); + + uint64_t tid = m_journal_metadata->allocate_tid(tag); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = tid % splay_width; + + ObjectRecorderPtr object_ptr = get_object(splay_offset); + FutureImplPtr future(new FutureImpl(tag, tid)); + future->init(m_prev_future); + m_prev_future = future; + + bufferlist entry_bl; + ::encode(Entry(future->get_tag(), future->get_tid(), payload_bl), entry_bl); + + AppendBuffers append_buffers; + append_buffers.push_back(std::make_pair(future, entry_bl)); + bool object_full = object_ptr->append(append_buffers); + + // TODO populate the object_set_position + + if (object_full) { + ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" + << dendl; + close_object_set(object_ptr->get_object_number() / splay_width); + } + + return Future(future); +} + +void JournalRecorder::flush() { + Mutex::Locker locker(m_lock); + for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); + it != m_object_ptrs.end(); ++it) { + it->second->flush(); + } +} + +ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { + assert(m_lock.is_locked()); + + ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; + assert(object_recoder != NULL); + return object_recoder; +} + +void JournalRecorder::close_object_set(uint64_t object_set) { + assert(m_lock.is_locked()); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (object_set != m_current_set) { + return; + } + + uint64_t active_set = m_journal_metadata->get_active_set(); + if (active_set < m_current_set + 1) { + m_journal_metadata->set_active_set(m_current_set + 1); + } + m_current_set = m_journal_metadata->get_active_set(); + + ldout(m_cct, 10) << __func__ << ": advancing to object set " + << m_current_set << dendl; + + // object recorders will invoke overflow handler as they complete + // closing the object to ensure correct order of future appends + for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); + it != m_object_ptrs.end(); ++it) { + ObjectRecorderPtr object_recorder = it->second; + if (object_recorder != NULL && + object_recorder->get_object_number() / splay_width == m_current_set) { + if (object_recorder->close_object()) { + // no in-flight ops, immediately create new recorder + create_next_object_recorder(object_recorder); + } + } + } +} + +ObjectRecorderPtr JournalRecorder::create_object_recorder( + uint64_t object_number) { + ObjectRecorderPtr object_recorder(new ObjectRecorder( + m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), + object_number, m_journal_metadata->get_timer(), + m_journal_metadata->get_timer_lock(), &m_overflow_handler, + m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes, + m_flush_age)); + return object_recorder; +} + +void JournalRecorder::create_next_object_recorder( + ObjectRecorderPtr object_recorder) { + assert(m_lock.is_locked()); + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + + ObjectRecorderPtr new_object_recorder = create_object_recorder( + (m_current_set * splay_width) + splay_offset); + + AppendBuffers append_buffers; + object_recorder->claim_append_buffers(&append_buffers); + new_object_recorder->append(append_buffers); + + m_object_ptrs[splay_offset] = new_object_recorder; +} + +void JournalRecorder::handle_update() { + Mutex::Locker locker(m_lock); + + uint64_t active_set = m_journal_metadata->get_active_set(); + if (active_set > m_current_set) { + close_object_set(m_current_set); + } +} + +void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { + ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; + + Mutex::Locker locker(m_lock); + + uint64_t object_number = object_recorder->get_object_number(); + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint8_t splay_offset = object_number % splay_width; + ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; + assert(active_object_recorder->get_object_number() == object_number); + + close_object_set(object_number / splay_width); + create_next_object_recorder(active_object_recorder); +} + +} // namespace journal diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h new file mode 100644 index 00000000000..0604d555dfb --- /dev/null +++ b/src/journal/JournalRecorder.h @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H +#define CEPH_JOURNAL_JOURNAL_RECORDER_H + +#include "include/int_types.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "journal/Future.h" +#include "journal/FutureImpl.h" +#include "journal/JournalMetadata.h" +#include "journal/ObjectRecorder.h" +#include +#include + +class SafeTimer; + +namespace journal { + +class JournalRecorder { +public: + JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + const JournalMetadataPtr &journal_metadata, + uint32_t flush_interval, uint64_t flush_bytes, + double flush_age); + ~JournalRecorder(); + + Future append(const std::string &tag, const bufferlist &bl); + void flush(); + + ObjectRecorderPtr get_object(uint8_t splay_offset); + +private: + typedef std::map ObjectRecorderPtrs; + + struct Listener : public JournalMetadata::Listener { + JournalRecorder *journal_recorder; + + Listener(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) {} + + virtual void handle_update(JournalMetadata *) { + journal_recorder->handle_update(); + } + }; + + struct OverflowHandler : public ObjectRecorder::OverflowHandler { + JournalRecorder *journal_recorder; + + OverflowHandler(JournalRecorder *_journal_recorder) + : journal_recorder(_journal_recorder) {} + + virtual void overflow(ObjectRecorder *object_recorder) { + journal_recorder->handle_overflow(object_recorder); + } + }; + + librados::IoCtx m_ioctx; + CephContext *m_cct; + std::string m_object_oid_prefix; + + JournalMetadataPtr m_journal_metadata; + + uint32_t m_flush_interval; + uint64_t m_flush_bytes; + double m_flush_age; + + Listener m_listener; + OverflowHandler m_overflow_handler; + + Mutex m_lock; + + uint64_t m_current_set; + ObjectRecorderPtrs m_object_ptrs; + + FutureImplPtr m_prev_future; + + void close_object_set(uint64_t object_set); + ObjectRecorderPtr create_object_recorder(uint64_t object_number); + void create_next_object_recorder(ObjectRecorderPtr object_recorder); + + void handle_update(); + void handle_overflow(ObjectRecorder *object_recorder); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc new file mode 100644 index 00000000000..4613517f9aa --- /dev/null +++ b/src/journal/Journaler.cc @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Journaler.h" +#include "include/stringify.h" +#include "common/errno.h" +#include "journal/Entry.h" +#include "journal/FutureImpl.h" +#include "journal/JournalMetadata.h" +#include "journal/JournalPlayer.h" +#include "journal/JournalRecorder.h" +#include "journal/PayloadImpl.h" +#include "journal/ReplayHandler.h" +#include "cls/journal/cls_journal_client.h" +#include "cls/journal/cls_journal_types.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "Journaler: " + +namespace journal { + +namespace { + +static const std::string JOURNAL_HEADER_PREFIX = "journal."; +static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; + +} // anonymous namespace + +using namespace cls::journal; + +Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, + uint64_t journal_id, const std::string &client_id) + : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL) +{ + m_header_ioctx.dup(header_ioctx); + m_data_ioctx.dup(data_ioctx); + m_cct = reinterpret_cast(m_header_ioctx.cct()); + + m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id); + m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + "."; + + m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id); + m_metadata->get(); +} + +Journaler::~Journaler() { + if (m_metadata != NULL) { + m_metadata->put(); + m_metadata = NULL; + } + assert(m_player == NULL); + assert(m_recorder == NULL); +} + +int Journaler::init() { + return m_metadata->init(); +} + +int Journaler::create(uint8_t order, uint8_t splay_width) { + if (order > 64 || order < 12) { + lderr(m_cct) << "order must be in the range [12, 64]" << dendl; + return -EDOM; + } + if (splay_width == 0) { + return -EINVAL; + } + + ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl; + int r = client::create(m_header_ioctx, m_header_oid, order, splay_width); + if (r < 0) { + lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl; + return r; + } + return 0; +} + +int Journaler::register_client(const std::string &description) { + return m_metadata->register_client(description); +} + +int Journaler::unregister_client() { + return m_metadata->unregister_client(); +} + +void Journaler::start_replay(ReplayHandler *replay_handler) { + create_player(replay_handler); + m_player->prefetch(); +} + +void Journaler::start_live_replay(ReplayHandler *replay_handler, + double interval) { + create_player(replay_handler); + m_player->prefetch_and_watch(interval); +} + +bool Journaler::try_pop_front(Payload *payload) { + assert(m_player != NULL); + + Entry entry; + ObjectSetPosition object_set_position; + if (!m_player->try_pop_front(&entry, &object_set_position)) { + return false; + } + + *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position)); + return true; +} + +void Journaler::stop_replay() { + assert(m_player != NULL); + m_player->unwatch(); + m_player->put(); + m_player = NULL; +} + +void Journaler::start_append() { + assert(m_recorder == NULL); + + // TODO verify active object set >= current replay object set + + // TODO configurable flush intervals + m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix, + m_metadata, 0, 0, 0); +} + +void Journaler::stop_append() { + assert(m_recorder != NULL); + m_recorder->flush(); + delete m_recorder; + m_recorder = NULL; +} + +Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) { + return m_recorder->append(tag, payload_bl); +} + +void Journaler::create_player(ReplayHandler *replay_handler) { + assert(m_player == NULL); + m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata, + replay_handler); +} + +} // namespace journal diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h new file mode 100644 index 00000000000..ff054dbaa2b --- /dev/null +++ b/src/journal/Journaler.h @@ -0,0 +1,65 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_JOURNALER_H +#define CEPH_JOURNAL_JOURNALER_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "journal/Future.h" +#include "journal/Payload.h" +#include +#include +#include "include/assert.h" + +class SafeTimer; + +namespace journal { + +class JournalMetadata; +class JournalPlayer; +class JournalRecorder; +class ReplayHandler; + +class Journaler { +public: + Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, + uint64_t journal_id, const std::string &client_id); + ~Journaler(); + + int init(); + + int create(uint8_t order, uint8_t splay_width); + + int register_client(const std::string &description); + int unregister_client(); + + void start_replay(ReplayHandler *replay_handler); + void start_live_replay(ReplayHandler *replay_handler, double interval); + bool try_pop_front(Payload *payload); + void stop_replay(); + + void start_append(); + Future append(const std::string &tag, const bufferlist &bl); + void stop_append(); + +private: + librados::IoCtx m_header_ioctx; + librados::IoCtx m_data_ioctx; + CephContext *m_cct; + std::string m_client_id; + + std::string m_header_oid; + std::string m_object_oid_prefix; + + JournalMetadata *m_metadata; + JournalPlayer *m_player; + JournalRecorder *m_recorder; + + void create_player(ReplayHandler *replay_handler); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_JOURNALER_H diff --git a/src/journal/Makefile.am b/src/journal/Makefile.am new file mode 100644 index 00000000000..8c8bc97b695 --- /dev/null +++ b/src/journal/Makefile.am @@ -0,0 +1,36 @@ +if ENABLE_CLIENT +if WITH_RADOS + +libjournal_la_SOURCES = \ + journal/Entry.cc \ + journal/Future.cc \ + journal/FutureImpl.cc \ + journal/Journaler.cc \ + journal/JournalMetadata.cc \ + journal/JournalPlayer.cc \ + journal/JournalRecorder.cc \ + journal/ObjectPlayer.cc \ + journal/ObjectRecorder.cc \ + journal/Payload.cc \ + journal/PayloadImpl.cc \ + journal/Utils.cc + +noinst_LTLIBRARIES += libjournal.la +noinst_HEADERS += \ + journal/Entry.h \ + journal/Future.h \ + journal/FutureImpl.h \ + journal/Journaler.h \ + journal/JournalMetadata.h \ + journal/JournalPlayer.h \ + journal/JournalRecorder.h \ + journal/ObjectPlayer.h \ + journal/ObjectRecorder.h \ + journal/Payload.h \ + journal/PayloadImpl.h \ + journal/ReplayHandler.h \ + journal/Utils.h +DENCODER_DEPS += libjournal.la + +endif # WITH_RADOS +endif # ENABLE_CLIENT diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc new file mode 100644 index 00000000000..704937de774 --- /dev/null +++ b/src/journal/ObjectPlayer.cc @@ -0,0 +1,244 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectPlayer.h" +#include "journal/Utils.h" +#include "common/Timer.h" +#include + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "ObjectPlayer: " + +namespace journal { + +namespace { + +void rados_ctx_callback(rados_completion_t c, void *arg) { + Context *ctx = reinterpret_cast(arg); + ctx->complete(rados_aio_get_return_value(c)); +} + +} // anonymous namespace + +ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, + const std::string &object_oid_prefix, + uint64_t object_num, SafeTimer &timer, + Mutex &timer_lock, uint8_t order) + : RefCountedObject(NULL, 0), m_object_num(object_num), + m_oid(utils::get_object_name(object_oid_prefix, m_object_num)), + m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order), + m_watch_interval(0), m_watch_task(NULL), m_watch_fetch(this), + m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)), + m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL), + m_watch_ctx_in_progress(false) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); +} + +ObjectPlayer::~ObjectPlayer() { + { + Mutex::Locker locker(m_lock); + assert(m_watch_ctx == NULL); + } +} + +void ObjectPlayer::fetch(Context *on_finish) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl; + + Mutex::Locker locker(m_lock); + m_fetch_in_progress = true; + + C_Fetch *context = new C_Fetch(this, on_finish); + librados::ObjectReadOperation op; + op.read(m_read_off, 2 << m_order, &context->read_bl, NULL); + + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(context, rados_ctx_callback, NULL); + int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL); + assert(r == 0); + rados_completion->release(); +} + +void ObjectPlayer::watch(Context *on_fetch, double interval) { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl; + { + Mutex::Locker locker(m_lock); + assert(m_watch_ctx == NULL); + m_watch_ctx = on_fetch; + } + { + Mutex::Locker locker(m_timer_lock); + m_watch_interval = interval; + } + schedule_watch(); +} + +void ObjectPlayer::unwatch() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl; + { + Mutex::Locker locker(m_lock); + while (m_watch_ctx_in_progress) { + m_watch_ctx_cond.Wait(m_lock); + } + delete m_watch_ctx; + m_watch_ctx = NULL; + } + cancel_watch(); +} + +void ObjectPlayer::front(Entry *entry) const { + Mutex::Locker locker(m_lock); + assert(!m_entries.empty()); + *entry = m_entries.front(); +} + +void ObjectPlayer::pop_front() { + Mutex::Locker locker(m_lock); + assert(!m_entries.empty()); + m_entries.pop_front(); +} + +int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len=" + << bl.length() << dendl; + + m_fetch_in_progress = false; + if (r < 0) { + return r; + } + if (bl.length() == 0) { + return -ENOENT; + } + + Mutex::Locker locker(m_lock); + m_read_bl.append(bl); + + bool invalid = false; + uint32_t invalid_start_off = 0; + + bufferlist::iterator iter(&m_read_bl, m_read_off); + while (!iter.end()) { + uint32_t bytes_needed; + if (!Entry::is_readable(iter, &bytes_needed)) { + if (bytes_needed != 0) { + invalid_start_off = iter.get_off(); + invalid = true; + lderr(m_cct) << ": partial record at offset " << iter.get_off() + << dendl; + break; + } + + if (!invalid) { + invalid_start_off = iter.get_off(); + invalid = true; + lderr(m_cct) << ": detected corrupt journal entry at offset " + << invalid_start_off << dendl; + } + ++iter; + continue; + } + + if (invalid) { + uint32_t invalid_end_off = iter.get_off(); + lderr(m_cct) << ": corruption range [" << invalid_start_off + << ", " << invalid_end_off << ")" << dendl; + m_invalid_ranges.insert(invalid_start_off, invalid_end_off); + invalid = false; + } + + Entry entry; + ::decode(entry, iter); + ldout(m_cct, 20) << ": " << entry << " decoded" << dendl; + + EntryKey entry_key(std::make_pair(entry.get_tag(), entry.get_tid())); + if (m_entry_keys.find(entry_key) == m_entry_keys.end()) { + m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry); + } else { + ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl; + *m_entry_keys[entry_key] = entry; + } + } + + m_read_off = m_read_bl.length(); + if (invalid) { + uint32_t invalid_end_off = m_read_bl.length(); + lderr(m_cct) << ": corruption range [" << invalid_start_off + << ", " << invalid_end_off << ")" << dendl; + m_invalid_ranges.insert(invalid_start_off, invalid_end_off); + } + + if (!m_invalid_ranges.empty()) { + r = -EINVAL; + } + return r; +} + +void ObjectPlayer::schedule_watch() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl; + Mutex::Locker locker(m_timer_lock); + assert(m_watch_task == NULL); + m_watch_task = new C_WatchTask(this); + m_timer.add_event_after(m_watch_interval, m_watch_task); +} + +void ObjectPlayer::cancel_watch() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl; + Mutex::Locker locker(m_timer_lock); + if (m_watch_task != NULL) { + m_timer.cancel_event(m_watch_task); + m_watch_task = NULL; + } +} + +void ObjectPlayer::handle_watch_task() { + ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl; + { + Mutex::Locker locker(m_timer_lock); + m_watch_task = NULL; + } + fetch(&m_watch_fetch); +} + +void ObjectPlayer::handle_watch_fetched(int r) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r + << dendl; + if (r == -ENOENT) { + schedule_watch(); + return; + } + + Context *on_finish; + { + Mutex::Locker locker(m_lock); + m_watch_ctx_in_progress = true; + on_finish = m_watch_ctx; + m_watch_ctx = NULL; + } + + if (on_finish != NULL) { + on_finish->complete(r); + } + + { + Mutex::Locker locker(m_lock); + m_watch_ctx_in_progress = false; + m_watch_ctx_cond.Signal(); + } +} + +void ObjectPlayer::C_Fetch::finish(int r) { + r = object_player->handle_fetch_complete(r, read_bl); + on_finish->complete(r); + object_player->put(); +} + +void ObjectPlayer::C_WatchTask::finish(int r) { + object_player->handle_watch_task(); +} + +void ObjectPlayer::C_WatchFetch::finish(int r) { + object_player->handle_watch_fetched(r); +} + +} // namespace journal diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h new file mode 100644 index 00000000000..ff85575f304 --- /dev/null +++ b/src/journal/ObjectPlayer.h @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H +#define CEPH_JOURNAL_OBJECT_PLAYER_H + +#include "include/Context.h" +#include "include/hash_namespace.h" +#include "include/interval_set.h" +#include "include/rados/librados.hpp" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "journal/Entry.h" +#include +#include +#include +#include +#include +#include "include/assert.h" + +class SafeTimer; + +namespace journal { + +class ObjectPlayer; +typedef boost::intrusive_ptr ObjectPlayerPtr; + +class ObjectPlayer : public RefCountedObject { +public: + typedef std::list Entries; + typedef interval_set InvalidRanges; + + ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, + uint64_t object_num, SafeTimer &timer, Mutex &timer_lock, + uint8_t order); + ~ObjectPlayer(); + + inline const std::string &get_oid() const { + return m_oid; + } + inline uint64_t get_object_number() const { + return m_object_num; + } + + void fetch(Context *on_finish); + void watch(Context *on_fetch, double interval); + void unwatch(); + + inline bool is_fetch_in_progress() const { + Mutex::Locker locker(m_lock); + return m_fetch_in_progress; + } + + void front(Entry *entry) const; + void pop_front(); + inline bool empty() const { + Mutex::Locker locker(m_lock); + return m_entries.empty(); + } + + inline void get_entries(Entries *entries) { + Mutex::Locker locker(m_lock); + *entries = m_entries; + } + inline void get_invalid_ranges(InvalidRanges *invalid_ranges) { + Mutex::Locker locker(m_lock); + *invalid_ranges = m_invalid_ranges; + } + +private: + typedef std::pair EntryKey; + typedef boost::unordered_map EntryKeys; + + struct C_Fetch : public Context { + ObjectPlayer *object_player; + Context *on_finish; + bufferlist read_bl; + C_Fetch(ObjectPlayer *o, Context *ctx) + : object_player(o), on_finish(ctx) { + object_player->get(); + } + virtual void finish(int r); + }; + struct C_WatchTask : public Context { + ObjectPlayer *object_player; + C_WatchTask(ObjectPlayer *o) : object_player(o) { + object_player->get(); + } + virtual void finish(int r); + }; + struct C_WatchFetch : public Context { + ObjectPlayer *object_player; + C_WatchFetch(ObjectPlayer *o) : object_player(o) { + } + virtual void complete(int r) { + finish(r); + object_player->put(); + } + virtual void finish(int r); + }; + + librados::IoCtx m_ioctx; + uint64_t m_object_num; + std::string m_oid; + CephContext *m_cct; + + SafeTimer &m_timer; + Mutex &m_timer_lock; + + double m_fetch_interval; + uint8_t m_order; + + double m_watch_interval; + Context *m_watch_task; + C_WatchFetch m_watch_fetch; + + mutable Mutex m_lock; + bool m_fetch_in_progress; + bufferlist m_read_bl; + uint32_t m_read_off; + + Entries m_entries; + EntryKeys m_entry_keys; + InvalidRanges m_invalid_ranges; + + Context *m_watch_ctx; + Cond m_watch_ctx_cond; + bool m_watch_ctx_in_progress; + + int handle_fetch_complete(int r, const bufferlist &bl); + + void schedule_watch(); + void cancel_watch(); + void handle_watch_task(); + void handle_watch_fetched(int r); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_OBJECT_PLAYER_H diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc new file mode 100644 index 00000000000..2677b281feb --- /dev/null +++ b/src/journal/ObjectRecorder.cc @@ -0,0 +1,307 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectRecorder.h" +#include "journal/Utils.h" +#include "include/assert.h" +#include "common/Timer.h" +#include "cls/journal/cls_journal_client.h" + +#define dout_subsys ceph_subsys_journaler +#undef dout_prefix +#define dout_prefix *_dout << "ObjectRecorder: " + +using namespace cls::journal; + +namespace journal { + +namespace { + +void rados_ctx_callback(rados_completion_t c, void *arg) { + Context *ctx = reinterpret_cast(arg); + ctx->complete(rados_aio_get_return_value(c)); +} + +} // anonymous + +ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, + uint64_t object_number, + SafeTimer &timer, Mutex &timer_lock, + OverflowHandler *overflow_handler, uint8_t order, + uint32_t flush_interval, uint64_t flush_bytes, + double flush_age) + : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), + m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), + m_overflow_handler(overflow_handler), m_order(order), + m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), + m_append_task(NULL), + m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)), + m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false), + m_object_closed(false) { + m_ioctx.dup(ioctx); + m_cct = reinterpret_cast(m_ioctx.cct()); + assert(m_overflow_handler != NULL); +} + +ObjectRecorder::~ObjectRecorder() { + cancel_append_task(); + assert(m_append_buffers.empty()); + assert(m_in_flight_appends.empty()); +} + +bool ObjectRecorder::append(const AppendBuffers &append_buffers) { + FutureImplPtr last_flushed_future; + { + Mutex::Locker locker(m_lock); + for (AppendBuffers::const_iterator iter = append_buffers.begin(); + iter != append_buffers.end(); ++iter) { + if (append(*iter)) { + last_flushed_future = iter->first; + } + } + } + + if (last_flushed_future) { + flush(last_flushed_future); + } + return (m_size + m_pending_bytes >= m_soft_max_size); +} + +void ObjectRecorder::flush() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + + Mutex::Locker locker(m_lock); + if (flush_appends(true)) { + cancel_append_task(); + } +} + +void ObjectRecorder::flush(const FutureImplPtr &future) { + ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future + << dendl; + + Mutex::Locker locker(m_lock); + if (future->get_flush_handler().get() != &m_flush_handler) { + // if we don't own this future, re-issue the flush so that it hits the + // correct journal object owner + future->flush(); + return; + } else if (future->is_flush_in_progress()) { + return; + } + + assert(!m_object_closed); + AppendBuffers::iterator it; + for (it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) { + if (it->first == future) { + break; + } + } + assert(it != m_append_buffers.end()); + ++it; + + AppendBuffers flush_buffers; + flush_buffers.splice(flush_buffers.end(), m_append_buffers, + m_append_buffers.begin(), it); + send_appends(&flush_buffers); +} + +void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { + ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + + Mutex::Locker locker(m_lock); + assert(m_in_flight_appends.empty()); + assert(m_object_closed || m_overflowed); + append_buffers->splice(append_buffers->end(), m_append_buffers, + m_append_buffers.begin(), m_append_buffers.end()); +} + +bool ObjectRecorder::close_object() { + ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; + + Mutex::Locker locker(m_lock); + m_object_closed = true; + if (flush_appends(true)) { + cancel_append_task(); + } + return m_in_flight_appends.empty(); +} + +void ObjectRecorder::handle_append_task() { + { + Mutex::Locker locker(m_lock); + flush_appends(true); + } + + { + Mutex::Locker locker(m_timer_lock); + m_append_task = NULL; + put(); + } +} + +void ObjectRecorder::cancel_append_task() { + Mutex::Locker locker(m_timer_lock); + if (m_append_task != NULL) { + m_timer.cancel_event(m_append_task); + m_append_task = NULL; + put(); + } +} + +void ObjectRecorder::schedule_append_task() { + Mutex::Locker locker(m_timer_lock); + if (m_append_task == NULL && m_flush_age > 0) { + get(); + m_append_task = new C_AppendTask(this); + m_timer.add_event_after(m_flush_age, m_append_task); + } +} + +bool ObjectRecorder::append(const AppendBuffer &append_buffer) { + assert(m_lock.is_locked()); + + bool flush_requested = append_buffer.first->attach(&m_flush_handler); + m_append_buffers.push_back(append_buffer); + m_pending_bytes += append_buffer.second.length(); + + if (flush_appends(false)) { + cancel_append_task(); + } else { + schedule_append_task(); + } + return flush_requested; +} + +bool ObjectRecorder::flush_appends(bool force) { + assert(m_lock.is_locked()); + if (m_object_closed || m_overflowed) { + return true; + } + + if (m_append_buffers.empty() || + (!force && + m_size + m_pending_bytes < m_soft_max_size && + (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) && + (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) { + return false; + } + + m_pending_bytes = 0; + AppendBuffers append_buffers; + append_buffers.swap(m_append_buffers); + send_appends(&append_buffers); + return true; +} + +void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid + << ", r=" << r << dendl; + + Mutex::Locker locker(m_lock); + InFlightAppends::iterator iter = m_in_flight_appends.find(tid); + if (iter == m_in_flight_appends.end()) { + // must have seen an overflow on a previous append op + assert(m_overflowed); + return; + } else if (r == -EOVERFLOW) { + m_overflowed = true; + append_overflowed(tid); + return; + } + + assert(!m_overflowed || r != 0); + AppendBuffers &append_buffers = iter->second; + assert(!append_buffers.empty()); + + // Flag the associated futures as complete. + for (AppendBuffers::iterator buf_it = append_buffers.begin(); + buf_it != append_buffers.end(); ++buf_it) { + ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe" + << dendl; + buf_it->first->safe(r); + } + m_in_flight_appends.erase(iter); + + if (m_in_flight_appends.empty() && m_object_closed) { + // all remaining unsent appends should be redirected to new object + notify_overflow(); + } +} + +void ObjectRecorder::append_overflowed(uint64_t tid) { + ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed" + << dendl; + + assert(m_lock.is_locked()); + assert(!m_in_flight_appends.empty()); + assert(m_in_flight_appends.begin()->first == tid); + + cancel_append_task(); + + InFlightAppends in_flight_appends; + in_flight_appends.swap(m_in_flight_appends); + + AppendBuffers restart_append_buffers; + for (InFlightAppends::iterator it = in_flight_appends.begin(); + it != in_flight_appends.end(); ++it) { + restart_append_buffers.insert(restart_append_buffers.end(), + it->second.begin(), it->second.end()); + } + + restart_append_buffers.splice(restart_append_buffers.end(), + m_append_buffers, + m_append_buffers.begin(), + m_append_buffers.end()); + restart_append_buffers.swap(m_append_buffers); + notify_overflow(); +} + +void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { + assert(m_lock.is_locked()); + assert(!append_buffers->empty()); + + uint64_t append_tid = m_append_tid++; + ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" + << append_tid << dendl; + C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid); + + librados::ObjectWriteOperation op; + client::guard_append(&op, m_soft_max_size); + + for (AppendBuffers::iterator it = append_buffers->begin(); + it != append_buffers->end(); ++it) { + ldout(m_cct, 20) << __func__ << ": flushing " << *it->first + << dendl; + it->first->set_flush_in_progress(); + op.append(it->second); + m_size += it->second.length(); + } + m_in_flight_appends[append_tid].swap(*append_buffers); + + librados::AioCompletion *rados_completion = + librados::Rados::aio_create_completion(append_flush, NULL, + rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); + assert(r == 0); + rados_completion->release(); +} + +void ObjectRecorder::notify_overflow() { + assert(m_lock.is_locked()); + + for (AppendBuffers::const_iterator it = m_append_buffers.begin(); + it != m_append_buffers.end(); ++it) { + ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first + << dendl; + it->first->detach(); + } + + // TODO need to delay completion until after aio_notify completes + m_lock.Unlock(); + m_overflow_handler->overflow(this); + m_lock.Lock(); +} + +} // namespace journal diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h new file mode 100644 index 00000000000..d7d2ed10e36 --- /dev/null +++ b/src/journal/ObjectRecorder.h @@ -0,0 +1,149 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H +#define CEPH_JOURNAL_OBJECT_RECORDER_H + +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/RefCountedObj.h" +#include "journal/FutureImpl.h" +#include +#include +#include +#include "include/assert.h" + +class SafeTimer; + +namespace journal { + +class ObjectRecorder; +typedef boost::intrusive_ptr ObjectRecorderPtr; + +typedef std::pair AppendBuffer; +typedef std::list AppendBuffers; + +class ObjectRecorder : public RefCountedObject, boost::noncopyable { +public: + struct OverflowHandler { + virtual ~OverflowHandler() {} + virtual void overflow(ObjectRecorder *object_recorder) = 0; + }; + + ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, + uint64_t object_number, SafeTimer &timer, Mutex &timer_lock, + OverflowHandler *overflow_handler, uint8_t order, + uint32_t flush_interval, uint64_t flush_bytes, + double flush_age); + ~ObjectRecorder(); + + inline uint64_t get_object_number() const { + return m_object_number; + } + inline const std::string &get_oid() const { + return m_oid; + } + + bool append(const AppendBuffers &append_buffers); + void flush(); + void flush(const FutureImplPtr &future); + + void claim_append_buffers(AppendBuffers *append_buffers); + bool close_object(); + + inline CephContext *cct() const { + return m_cct; + } + + inline size_t get_pending_appends() const { + Mutex::Locker locker(m_lock); + return m_append_buffers.size(); + } + +private: + typedef std::map InFlightAppends; + + struct FlushHandler : public FutureImpl::FlushHandler { + ObjectRecorder *object_recorder; + FlushHandler(ObjectRecorder *o) : object_recorder(o) {} + virtual void get() { + object_recorder->get(); + } + virtual void put() { + object_recorder->put(); + } + virtual void flush(const FutureImplPtr &future) { + object_recorder->flush(future); + } + }; + struct C_AppendTask : public Context { + ObjectRecorder *object_recorder; + C_AppendTask(ObjectRecorder *o) : object_recorder(o) {} + virtual void complete(int r) { + object_recorder->handle_append_task(); + } + virtual void finish(int r) {} + }; + struct C_AppendFlush : public Context { + ObjectRecorder *object_recorder; + uint64_t tid; + C_AppendFlush(ObjectRecorder *o, uint64_t _tid) + : object_recorder(o), tid(_tid) { + object_recorder->get(); + } + virtual void finish(int r) { + object_recorder->handle_append_flushed(tid, r); + object_recorder->put(); + } + }; + + librados::IoCtx m_ioctx; + std::string m_oid; + uint64_t m_object_number; + CephContext *m_cct; + + SafeTimer &m_timer; + Mutex &m_timer_lock; + + OverflowHandler *m_overflow_handler; + + uint8_t m_order; + uint64_t m_soft_max_size; + + uint32_t m_flush_interval; + uint64_t m_flush_bytes; + double m_flush_age; + + FlushHandler m_flush_handler; + + C_AppendTask *m_append_task; + + mutable Mutex m_lock; + AppendBuffers m_append_buffers; + uint64_t m_append_tid; + uint32_t m_pending_bytes; + + InFlightAppends m_in_flight_appends; + uint64_t m_size; + bool m_overflowed; + bool m_object_closed; + + bufferlist m_prefetch_bl; + + void handle_append_task(); + void cancel_append_task(); + void schedule_append_task(); + + bool append(const AppendBuffer &append_buffer); + bool flush_appends(bool force); + void handle_append_flushed(uint64_t tid, int r); + void append_overflowed(uint64_t tid); + void send_appends(AppendBuffers *append_buffers); + + void notify_overflow(); +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_OBJECT_RECORDER_H diff --git a/src/journal/Payload.cc b/src/journal/Payload.cc new file mode 100644 index 00000000000..8015528b4b9 --- /dev/null +++ b/src/journal/Payload.cc @@ -0,0 +1,21 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Payload.h" +#include "journal/PayloadImpl.h" + +namespace journal { + +const bufferlist &Payload::get_data() const { + return m_payload_impl->get_data(); +} + +void intrusive_ptr_add_ref(PayloadImpl *p) { + p->get(); +} + +void intrusive_ptr_release(PayloadImpl *p) { + p->put(); +} + +} // namespace journal diff --git a/src/journal/Payload.h b/src/journal/Payload.h new file mode 100644 index 00000000000..a878111eb5f --- /dev/null +++ b/src/journal/Payload.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_PAYLOAD_H +#define CEPH_JOURNAL_PAYLOAD_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include + +namespace journal { + +class PayloadImpl; + +class Payload { +public: + typedef boost::intrusive_ptr PayloadImplPtr; + + Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {} + + const bufferlist &get_data() const; + +private: + friend class Journaler; + + inline PayloadImplPtr get_payload_impl() const { + return m_payload_impl; + } + + PayloadImplPtr m_payload_impl; +}; + +void intrusive_ptr_add_ref(PayloadImpl *p); +void intrusive_ptr_release(PayloadImpl *p); + +} // namespace journal + +#endif // CEPH_JOURNAL_PAYLOAD_H diff --git a/src/journal/PayloadImpl.cc b/src/journal/PayloadImpl.cc new file mode 100644 index 00000000000..6bb134f91f9 --- /dev/null +++ b/src/journal/PayloadImpl.cc @@ -0,0 +1,22 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/PayloadImpl.h" + +namespace journal { + +PayloadImpl::PayloadImpl(const bufferlist &data, + const ObjectSetPosition &object_set_position) + : m_data(data), m_object_set_position(object_set_position) { +} + +const bufferlist &PayloadImpl::get_data() const { + return m_data; +} + +const PayloadImpl::ObjectSetPosition & +PayloadImpl::get_object_set_position() const { + return m_object_set_position; +} + +} // namespace journal diff --git a/src/journal/PayloadImpl.h b/src/journal/PayloadImpl.h new file mode 100644 index 00000000000..9c4d0b596e5 --- /dev/null +++ b/src/journal/PayloadImpl.h @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H +#define CEPH_JOURNAL_PAYLOAD_IMPL_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "common/RefCountedObj.h" +#include "cls/journal/cls_journal_types.h" +#include +#include +#include "include/assert.h" + +namespace journal { + +class PayloadImpl; +typedef boost::intrusive_ptr PayloadImplPtr; + +class PayloadImpl : public RefCountedObject, boost::noncopyable { +public: + typedef cls::journal::ObjectSetPosition ObjectSetPosition; + + PayloadImpl(const bufferlist &data, + const ObjectSetPosition &object_set_position); + + const bufferlist &get_data() const; + const ObjectSetPosition &get_object_set_position() const; + +private: + bufferlist m_data; + ObjectSetPosition m_object_set_position; +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h new file mode 100644 index 00000000000..47126847c8d --- /dev/null +++ b/src/journal/ReplayHandler.h @@ -0,0 +1,18 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_REPLAY_HANDLER_H +#define CEPH_JOURNAL_REPLAY_HANDLER_H + +namespace journal { + +struct ReplayHandler { + virtual ~ReplayHandler() {} + + virtual void handle_entries_available() = 0; + virtual void handle_error(int r) = 0; +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_REPLAY_HANDLER_H diff --git a/src/journal/Utils.cc b/src/journal/Utils.cc new file mode 100644 index 00000000000..eea61363c47 --- /dev/null +++ b/src/journal/Utils.cc @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/Utils.h" +#include "include/stringify.h" + +namespace journal { +namespace utils { + +std::string get_object_name(const std::string &prefix, uint64_t number) { + return prefix + stringify(number); +} + +std::string unique_lock_name(const std::string &name, void *address) { + return name + " (" + stringify(address) + ")"; +} + +} // namespace utils +} // namespace journal diff --git a/src/journal/Utils.h b/src/journal/Utils.h new file mode 100644 index 00000000000..1e449b187fd --- /dev/null +++ b/src/journal/Utils.h @@ -0,0 +1,20 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_UTILS_H +#define CEPH_JOURNAL_UTILS_H + +#include "include/int_types.h" +#include + +namespace journal { +namespace utils { + +std::string get_object_name(const std::string &prefix, uint64_t number); + +std::string unique_lock_name(const std::string &name, void *address); + +} // namespace utils +} // namespace journal + +#endif // CEPH_JOURNAL_UTILS_H