rados.watch_flush();
}
-int JournalMetadata::init() {
+void JournalMetadata::init(Context *on_init) {
assert(!m_initialized);
m_initialized = true;
- int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order,
- &m_splay_width);
- if (r < 0) {
- lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
m_timer = new SafeTimer(m_cct, m_timer_lock, false);
m_timer->init();
- r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+ int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
<< cpp_strerror(r) << dendl;
- return r;
+ on_init->complete(r);
+ return;
}
- C_SaferCond cond;
- refresh(&cond);
- r = cond.wait();
- if (r < 0) {
- return r;
- }
- return 0;
+ C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init);
+ client::get_immutable_metadata(m_ioctx, m_oid, &m_order, &m_splay_width,
+ ctx);
}
int JournalMetadata::register_client(const std::string &description) {
- assert(!m_client_id.empty());
-
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
if (r < 0) {
return true;
}
+void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
+ if (r < 0) {
+ lderr(m_cct) << "failed to initialize immutable metadata: "
+ << cpp_strerror(r) << dendl;
+ on_init->complete(r);
+ return;
+ }
+
+ ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
+ refresh(on_init);
+}
+
void JournalMetadata::refresh(Context *on_complete) {
- ldout(m_cct, 10) << "refreshing journal metadata" << dendl;
+ ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
C_Refresh *refresh = new C_Refresh(this, on_complete);
client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set,
&refresh->active_set,
}
void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
- ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl;
+ ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
if (r == 0) {
Mutex::Locker locker(m_lock);
const std::string &client_id, double commit_interval);
~JournalMetadata();
- int init();
+ void init(Context *on_init);
void add_listener(Listener *listener);
void remove_listener(Listener *listener);
}
};
+ struct C_ImmutableMetadata : public Context {
+ JournalMetadataPtr journal_metadata;
+ Context *on_finish;
+
+ C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
+ : journal_metadata(_journal_metadata), on_finish(_on_finish) {
+ }
+
+ virtual void finish(int r) {
+ journal_metadata->handle_immutable_metadata(r, on_finish);
+ }
+
+ };
struct C_Refresh : public Context {
JournalMetadataPtr journal_metadata;
uint64_t minimum_set;
ObjectSetPosition m_commit_position;
Context *m_commit_position_ctx;
+ void handle_immutable_metadata(int r, Context *on_init);
+
void refresh(Context *on_finish);
void handle_refresh_complete(C_Refresh *refresh, int r);
using namespace cls::journal;
Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
- uint64_t journal_id, const std::string &client_id)
+ const std::string &journal_id,
+ const std::string &client_id)
: m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
m_trimmer(NULL)
{
m_data_ioctx.dup(data_ioctx);
m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
- m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id);
- m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + ".";
+ m_header_oid = JOURNAL_HEADER_PREFIX + journal_id;
+ m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + journal_id + ".";
// TODO configurable commit interval
m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
assert(m_recorder == NULL);
}
-int Journaler::init() {
- return m_metadata->init();
+void Journaler::init(Context *on_init) {
+ m_metadata->init(on_init);
}
int Journaler::create(uint8_t order, uint8_t splay_width) {
m_metadata, 0, 0, 0);
}
-void Journaler::stop_append() {
+int Journaler::stop_append() {
assert(m_recorder != NULL);
- m_recorder->flush();
+
+ C_SaferCond cond;
+ flush(&cond);
+ int r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+
delete m_recorder;
m_recorder = NULL;
+ return 0;
}
Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
return m_recorder->append(tag, payload_bl);
}
+void Journaler::flush(Context *on_safe) {
+ // TODO pass ctx
+ m_recorder->flush();
+}
+
void Journaler::create_player(ReplayHandler *replay_handler) {
assert(m_player == NULL);
m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
#include <map>
#include "include/assert.h"
+class Context;
class SafeTimer;
namespace journal {
class Journaler {
public:
Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
- uint64_t journal_id, const std::string &client_id);
+ const std::string &journal_id, const std::string &client_id);
~Journaler();
- int init();
-
int create(uint8_t order, uint8_t splay_width);
+ void init(Context *on_init);
+
int register_client(const std::string &description);
int unregister_client();
void start_append();
Future append(const std::string &tag, const bufferlist &bl);
- void stop_append();
+ void flush(Context *on_safe);
+ int stop_append();
private:
librados::IoCtx m_header_ioctx;