}
void JournalMetadata::handle_commit_position_task() {
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
- ldout(m_cct, 20) << __func__ << dendl;
+ ldout(m_cct, 20) << __func__ << ": "
+ << "client_id=" << m_client_id << ", "
+ << "commit_position=" << m_commit_position << dendl;
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
process_state(object_num, r);
}
-void JournalPlayer::handle_watch(uint64_t object_num, int r) {
- ldout(m_cct, 10) << __func__ << ": "
- << utils::get_object_name(m_object_oid_prefix, object_num)
- << ": r=" << r << dendl;
- process_state(object_num, r);
+void JournalPlayer::schedule_watch() {
+ ldout(m_cct, 10) << __func__ << dendl;
+ assert(m_lock.is_locked());
+ if (m_watch_scheduled) {
+ return;
+ }
+
+ // poll first splay offset and active splay offset since
+ // new records should only appear in those two objects
+ C_Watch *ctx = new C_Watch(this);
+ ObjectPlayerPtr object_player = get_object_player();
+ object_player->watch(ctx, m_watch_interval);
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ if (object_player->get_object_number() % splay_width != 0) {
+ ++ctx->pending_fetches;
+
+ object_player = m_object_players.begin()->second.begin()->second;
+ object_player->watch(ctx, m_watch_interval);
+ }
+ m_watch_scheduled = true;
+}
+
+void JournalPlayer::handle_watch(int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_watch_scheduled = false;
+ std::set<uint64_t> object_numbers;
+ for (auto &players : m_object_players) {
+ object_numbers.insert(
+ players.second.begin()->second->get_object_number());
+ }
+
+ for (auto object_num : object_numbers) {
+ process_state(object_num, r);
+ }
+}
+
+void JournalPlayer::notify_entries_available() {
+ assert(m_lock.is_locked());
+ if (m_handler_notified) {
+ return;
+ }
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
++ m_journal_metadata->queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
+}
+
+void JournalPlayer::notify_complete(int r) {
+ assert(m_lock.is_locked());
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
++ m_journal_metadata->queue(new C_HandleComplete(
+ m_replay_handler), r);
}
} // namespace journal
namespace {
+struct C_DecodeTag : public Context {
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ journal::TagData *tag_data;
+ Context *on_finish;
+
+ cls::journal::Tag tag;
+
+ C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ journal::TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) override {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ virtual void finish(int r) override {
+ }
+
+ int process(int r) {
+ if (r < 0) {
+ lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tag.tid;
+
+ bufferlist::iterator data_it = tag.data.begin();
+ r = decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "failed to decode allocated tag" << dendl;
+ return r;
+ }
+
+ ldout(cct, 20) << "allocated journal tag: "
+ << "tid=" << tag.tid << ", "
+ << "data=" << *tag_data << dendl;
+ return 0;
+ }
+
+ static int decode(bufferlist::iterator *it,
+ journal::TagData *tag_data) {
+ try {
+ ::decode(*tag_data, *it);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+ }
+
+};
+
+struct C_DecodeTags : public Context {
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ journal::TagData *tag_data;
+ Context *on_finish;
+
+ ::journal::Journaler::Tags tags;
+
+ C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ journal::TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ virtual void complete(int r) {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ virtual void finish(int r) override {
+ }
+
+ int process(int r) {
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (tags.empty()) {
+ lderr(cct) << "no journal tags retrieved" << dendl;
+ return -ENOENT;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tags.back().tid;
+
+ bufferlist::iterator data_it = tags.back().data.begin();
+ r = C_DecodeTag::decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "failed to decode journal tag" << dendl;
+ return r;
+ }
+
+ ldout(cct, 20) << "most recent journal tag: "
+ << "tid=" << *tag_tid << ", "
+ << "data=" << *tag_data << dendl;
+ return 0;
+ }
+};
+
+ // TODO: once journaler is 100% async, remove separate threads and
+ // reuse ImageCtx's thread pool
+ class ThreadPoolSingleton : public ThreadPool {
+ public:
+ explicit ThreadPoolSingleton(CephContext *cct)
+ : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+ start();
+ }
+ virtual ~ThreadPoolSingleton() {
+ stop();
+ }
+ };
+
+ class SafeTimerSingleton : public SafeTimer {
+ public:
+ Mutex lock;
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : SafeTimer(cct, lock, true),
+ lock("librbd::Journal::SafeTimerSingleton::lock") {
+ init();
+ }
+ virtual ~SafeTimerSingleton() {
+ Mutex::Locker locker(lock);
+ shutdown();
+ }
+ };
+
} // anonymous namespace
using util::create_async_context_callback;
assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
- m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
- IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
+ m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
- m_image_ctx.md_ctx, m_image_ctx.id, "",
- m_image_ctx.journal_commit_age);
++ m_image_ctx.md_ctx, m_image_ctx.id,
++ IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
m_journaler->init(create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_initialized>(this)));
dout(5) << "starting replay" << dendl;
- replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
- local_pool_id, remote_pool_id,
- remote_image_id);
+ threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ local->cct()));
+ replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
- remote_pool_id, remote_image_id);
++ local_pool_id, remote_pool_id,
++ remote_image_id);
r = replayer->start(&bootstap_params);
if (r < 0) {
false, features, &order, 0, 0));
m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
+ m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+ m_local_ioctx.cct()));
+
m_replayer = new rbd::mirror::ImageReplayer(
+ m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
- m_client_id, remote_pool_id, m_remote_image_id);
+ m_client_id, m_local_ioctx.get_id(), remote_pool_id, m_remote_image_id);
bootstrap();
}
Commands commands;
};
- ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
+ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id,
- int64_t local_pool_id,
++ int64_t local_pool_id,
int64_t remote_pool_id,
const std::string &remote_image_id) :
+ m_threads(threads),
m_local(local),
m_remote(remote),
m_client_id(client_id),
};
public:
- ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
- int64_t local_pool_id, int64_t remote_pool_id,
- const std::string &remote_image_id);
+ ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
- const std::string &client_id, int64_t remote_pool_id,
- const std::string &remote_image_id);
++ const std::string &client_id, int64_t local_pool_id,
++ int64_t remote_pool_id, const std::string &remote_image_id);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
for (auto &kv : peer_configs) {
const peer_t &peer = kv.first;
if (m_replayers.find(peer) == m_replayers.end()) {
- unique_ptr<Replayer> replayer(new Replayer(m_local, peer));
+ dout(20) << "starting replayer for " << peer << dendl;
+ unique_ptr<Replayer> replayer(new Replayer(m_threads, m_local, peer));
// TODO: make async, and retry connecting within replayer
int r = replayer->init();
if (r < 0) {
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
if (pool_replayers.find(image_id) == pool_replayers.end()) {
- unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
+ unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
+ m_local,
m_remote,
m_client_id,
+ local_ioctx.get_id(),
pool_id,
image_id));
int r = image_replayer->start();