cls_method_handle_t h_journal_create;
cls_method_handle_t h_journal_get_order;
cls_method_handle_t h_journal_get_splay_width;
+cls_method_handle_t h_journal_get_pool_id;
cls_method_handle_t h_journal_get_minimum_set;
cls_method_handle_t h_journal_set_minimum_set;
cls_method_handle_t h_journal_get_active_set;
static const std::string HEADER_KEY_ORDER = "order";
static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
+static const std::string HEADER_KEY_POOL_ID = "pool_id";
static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
uint8_t order;
uint8_t splay_width;
+ int64_t pool_id;
try {
bufferlist::iterator iter = in->begin();
::decode(order, iter);
::decode(splay_width, iter);
+ ::decode(pool_id, iter);
} catch (const buffer::error &err) {
CLS_ERR("failed to decode input parameters: %s", err.what());
return -EINVAL;
return r;
}
+ r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
+ if (r < 0) {
+ return r;
+ }
+
uint64_t object_set = 0;
r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
if (r < 0) {
return 0;
}
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * pool_id (int64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ int64_t pool_id;
+ int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
+ if (r < 0) {
+ return r;
+ }
+
+ ::encode(pool_id, *out);
+ return 0;
+}
+
/**
* Input:
* none
cls_register_cxx_method(h_class, "get_splay_width",
CLS_METHOD_RD,
journal_get_splay_width, &h_journal_get_splay_width);
+ cls_register_cxx_method(h_class, "get_pool_id",
+ CLS_METHOD_RD,
+ journal_get_pool_id, &h_journal_get_pool_id);
cls_register_cxx_method(h_class, "get_minimum_set",
CLS_METHOD_RD,
journal_get_minimum_set,
struct C_ImmutableMetadata : public C_AioExec {
uint8_t *order;
uint8_t *splay_width;
+ int64_t *pool_id;
Context *on_finish;
bufferlist outbl;
C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
uint8_t *_order, uint8_t *_splay_width,
- Context *_on_finish)
+ int64_t *_pool_id, Context *_on_finish)
: C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
- on_finish(_on_finish) {
+ pool_id(_pool_id), on_finish(_on_finish) {
}
void send() {
bufferlist inbl;
op.exec("journal", "get_order", inbl);
op.exec("journal", "get_splay_width", inbl);
+ op.exec("journal", "get_pool_id", inbl);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(this, rados_callback, NULL);
bufferlist::iterator iter = outbl.begin();
::decode(*order, iter);
::decode(*splay_width, iter);
+ ::decode(*pool_id, iter);
} catch (const buffer::error &err) {
r = -EBADMSG;
}
} // anonymous namespace
int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
- uint8_t splay) {
+ uint8_t splay, int64_t pool_id) {
bufferlist inbl;
::encode(order, inbl);
::encode(splay, inbl);
+ ::encode(pool_id, inbl);
bufferlist outbl;
int r = ioctx.exec(oid, "journal", "create", inbl, outbl);
void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
uint8_t *order, uint8_t *splay_width,
- Context *on_finish) {
+ int64_t *pool_id, Context *on_finish) {
C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
- splay_width,
+ splay_width, pool_id,
on_finish);
metadata->send();
}
namespace client {
int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
- uint8_t splay);
+ uint8_t splay, int64_t pool_id);
void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
uint8_t *order, uint8_t *splay_width,
- Context *on_finish);
+ int64_t *pool_id, Context *on_finish);
void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
uint64_t *minimum_set, uint64_t *active_set,
std::set<cls::journal::Client> *clients,
double commit_interval)
: RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
- m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL),
- m_timer_lock("JournalMetadata::m_timer_lock"),
+ m_splay_width(0), m_pool_id(-1), m_initialized(false), m_finisher(NULL),
+ m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"),
m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init);
client::get_immutable_metadata(m_ioctx, m_oid, &m_order, &m_splay_width,
- ctx);
+ &m_pool_id, ctx);
}
void JournalMetadata::shutdown() {
<< "initialized=" << jm.m_initialized << ", "
<< "order=" << (int)jm.m_order << ", "
<< "splay_width=" << (int)jm.m_splay_width << ", "
+ << "pool_id=" << jm.m_pool_id << ", "
<< "minimum_set=" << jm.m_minimum_set << ", "
<< "active_set=" << jm.m_active_set << ", "
<< "client_id=" << jm.m_client_id << ", "
inline uint8_t get_splay_width() const {
return m_splay_width;
}
+ inline int64_t get_pool_id() const {
+ return m_pool_id;
+ }
inline Finisher &get_finisher() {
return *m_finisher;
uint8_t m_order;
uint8_t m_splay_width;
+ int64_t m_pool_id;
bool m_initialized;
Finisher *m_finisher;
using namespace cls::journal;
-Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
- const std::string &journal_id,
- const std::string &client_id)
+Journaler::Journaler(librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const std::string &client_id)
: m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
m_trimmer(NULL)
{
m_header_ioctx.dup(header_ioctx);
- m_data_ioctx.dup(data_ioctx);
m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
m_header_oid = JOURNAL_HEADER_PREFIX + journal_id;
m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
5);
m_metadata->get();
-
- m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix, m_metadata);
}
Journaler::~Journaler() {
}
void Journaler::init(Context *on_init) {
- m_metadata->init(on_init);
+ m_metadata->init(new C_InitJournaler(this, on_init));
+}
+
+int Journaler::init_complete() {
+ int64_t pool_id = m_metadata->get_pool_id();
+
+ if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) {
+ ldout(m_cct, 20) << "using image pool for journal data" << dendl;
+ m_data_ioctx.dup(m_header_ioctx);
+ } else {
+ ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data"
+ << dendl;
+ librados::Rados rados(m_header_ioctx);
+ int r = rados.ioctx_create2(pool_id, m_data_ioctx);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists"
+ << dendl;
+ }
+ return r;
+ }
+ }
+ m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix,
+ m_metadata);
+ return 0;
}
-int Journaler::create(uint8_t order, uint8_t splay_width) {
+int Journaler::create(uint8_t order, uint8_t splay_width, int64_t pool_id) {
if (order > 64 || order < 12) {
lderr(m_cct) << "order must be in the range [12, 64]" << dendl;
return -EDOM;
}
ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl;
- int r = client::create(m_header_ioctx, m_header_oid, order, splay_width);
+ int r = client::create(m_header_ioctx, m_header_oid, order, splay_width,
+ pool_id);
if (r < 0) {
lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
return r;
#include "include/int_types.h"
#include "include/buffer.h"
+#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "journal/Future.h"
#include <string>
#include <map>
#include "include/assert.h"
-class Context;
class SafeTimer;
namespace journal {
class Journaler {
public:
- Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
- const std::string &journal_id, const std::string &client_id);
+ Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const std::string &client_id);
~Journaler();
- int create(uint8_t order, uint8_t splay_width);
+ int create(uint8_t order, uint8_t splay_width, int64_t pool_id);
int remove();
void init(Context *on_init);
void committed(const Future &future);
private:
+ struct C_InitJournaler : public Context {
+ Journaler *journaler;
+ Context *on_safe;
+ C_InitJournaler(Journaler *_journaler, Context *_on_safe)
+ : journaler(_journaler), on_safe(_on_safe) {
+ }
+ virtual void finish(int r) {
+ if (r == 0) {
+ r = journaler->init_complete();
+ }
+ on_safe->complete(r);
+ }
+ };
+
librados::IoCtx m_header_ioctx;
librados::IoCtx m_data_ioctx;
CephContext *m_cct;
JournalRecorder *m_recorder;
JournalTrimmer *m_trimmer;
+ int init_complete();
void create_player(ReplayHandler *replay_handler);
friend std::ostream &operator<<(std::ostream &os,
uint8_t order = 1;
uint8_t splay_width = 2;
- ASSERT_EQ(0, client::create(ioctx, oid, order, splay_width));
+ int64_t pool_id = ioctx.get_id();
+ ASSERT_EQ(0, client::create(ioctx, oid, order, splay_width, pool_id));
uint8_t read_order;
uint8_t read_splay_width;
+ int64_t read_pool_id;
C_SaferCond cond;
client::get_immutable_metadata(ioctx, oid, &read_order, &read_splay_width,
- &cond);
+ &read_pool_id, &cond);
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(order, read_order);
ASSERT_EQ(splay_width, read_splay_width);
+ ASSERT_EQ(pool_id, read_pool_id);
}
TEST_F(TestClsJournal, MinimumSet) {
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
librados::ObjectWriteOperation op1;
client::set_active_set(&op1, 300);
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
librados::ObjectWriteOperation op1;
client::set_active_set(&op1, 300);
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
librados::ObjectWriteOperation op1;
client::set_minimum_set(&op1, 123);
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
uint64_t active_set = 234;
librados::ObjectWriteOperation op1;
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
librados::ObjectWriteOperation op1;
client::set_active_set(&op1, 345);
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 4));
- ASSERT_EQ(-EEXIST, client::create(ioctx, oid, 3, 5));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 4, ioctx.get_id()));
+ ASSERT_EQ(-EEXIST, client::create(ioctx, oid, 3, 5, ioctx.get_id()));
}
TEST_F(TestClsJournal, ClientRegister) {
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 2));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
cls::journal::EntryPositions entry_positions;
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 2, 2));
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
cls::journal::EntryPositions entry_positions;
std::string oid = get_temp_image_name();
- ASSERT_EQ(0, client::create(ioctx, oid, 12, 5));
+ ASSERT_EQ(0, client::create(ioctx, oid, 12, 5, ioctx.get_id()));
std::set<Client> expected_clients;
librados::ObjectWriteOperation op1;
int RadosTestFixture::create(const std::string &oid, uint8_t order,
uint8_t splay_width) {
- return cls::journal::client::create(m_ioctx, oid, order, splay_width);
+ return cls::journal::client::create(m_ioctx, oid, order, splay_width, -1);
}
int RadosTestFixture::append(const std::string &oid, const bufferlist &bl) {
virtual void SetUp() {
RadosTestFixture::SetUp();
m_journal_id = get_temp_journal_id();
- m_journaler = new journal::Journaler(m_ioctx, m_ioctx, m_journal_id,
- CLIENT_ID);
+ m_journaler = new journal::Journaler(m_ioctx, m_journal_id, CLIENT_ID);
}
virtual void TearDown() {
}
int create_journal(uint8_t order, uint8_t splay_width) {
- return m_journaler->create(order, splay_width);
+ return m_journaler->create(order, splay_width, -1);
}
int init_journaler() {
}
int register_client(const std::string &client_id, const std::string &desc) {
- journal::Journaler journaler(m_ioctx, m_ioctx, m_journal_id, client_id);
+ journal::Journaler journaler(m_ioctx, m_journal_id, client_id);
return journaler.register_client(desc);
}