From: Josh Durgin Date: Wed, 9 Mar 2016 22:18:13 +0000 (-0800) Subject: Merge pull request #7906 from dillaman/wip-14869 X-Git-Tag: v10.1.0~185 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3d404f2e1c14a5ddb634ced2b7a2fcfefefc0d08;p=ceph.git Merge pull request #7906 from dillaman/wip-14869 journal: re-use common threads between journalers Conflicts: src/journal/JournalPlayer.cc src/librbd/Journal.cc src/test/rbd_mirror/image_replay.cc src/tools/rbd_mirror/ImageReplayer.h src/tools/rbd_mirror/Mirror.cc (merged interface changes to ImageReplayer, and reduced scope for change to JournalPlayer due to pr #7884 (wip-14663)). Reviewed-by: Josh Durgin --- 3d404f2e1c14a5ddb634ced2b7a2fcfefefc0d08 diff --cc src/journal/JournalMetadata.cc index 4d81122c1c43,bf7ac2808864..b93f0379065d --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@@ -589,11 -569,9 +569,11 @@@ void JournalMetadata::schedule_commit_t } void JournalMetadata::handle_commit_position_task() { - assert(m_timer_lock.is_locked()); + assert(m_timer_lock->is_locked()); assert(m_lock.is_locked()); - ldout(m_cct, 20) << __func__ << dendl; + ldout(m_cct, 20) << __func__ << ": " + << "client_id=" << m_client_id << ", " + << "commit_position=" << m_commit_position << dendl; librados::ObjectWriteOperation op; client::client_commit(&op, m_client_id, m_commit_position); diff --cc src/journal/JournalPlayer.cc index b05fead55a8c,3559e240fb65..2f1b31657f89 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@@ -515,64 -463,12 +514,64 @@@ void JournalPlayer::handle_fetched(uint process_state(object_num, r); } -void JournalPlayer::handle_watch(uint64_t object_num, int r) { - ldout(m_cct, 10) << __func__ << ": " - << utils::get_object_name(m_object_oid_prefix, object_num) - << ": r=" << r << dendl; - process_state(object_num, r); +void JournalPlayer::schedule_watch() { + ldout(m_cct, 10) << __func__ << dendl; + assert(m_lock.is_locked()); + if (m_watch_scheduled) { + return; + } + + // poll first splay offset and active splay offset since + // new records should only appear in those two objects + C_Watch *ctx = new C_Watch(this); + ObjectPlayerPtr object_player = get_object_player(); + object_player->watch(ctx, m_watch_interval); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (object_player->get_object_number() % splay_width != 0) { + ++ctx->pending_fetches; + + object_player = m_object_players.begin()->second.begin()->second; + object_player->watch(ctx, m_watch_interval); + } + m_watch_scheduled = true; +} + +void JournalPlayer::handle_watch(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + m_watch_scheduled = false; + std::set object_numbers; + for (auto &players : m_object_players) { + object_numbers.insert( + players.second.begin()->second->get_object_number()); + } + + for (auto object_num : object_numbers) { + process_state(object_num, r); + } +} + +void JournalPlayer::notify_entries_available() { + assert(m_lock.is_locked()); + if (m_handler_notified) { + return; + } + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( ++ m_journal_metadata->queue(new C_HandleEntriesAvailable( + m_replay_handler), 0); +} + +void JournalPlayer::notify_complete(int r) { + assert(m_lock.is_locked()); + m_handler_notified = true; + + ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleComplete( ++ m_journal_metadata->queue(new C_HandleComplete( + m_replay_handler), r); } } // namespace journal diff --cc src/librbd/Journal.cc index 871abcf0a244,21fc12262b37..4a2a9df6983a --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@@ -22,113 -24,34 +24,141 @@@ namespace librbd namespace { +struct C_DecodeTag : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + cls::journal::Tag tag; + + C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) override { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl; + return r; + } + + Mutex::Locker locker(*lock); + *tag_tid = tag.tid; + + bufferlist::iterator data_it = tag.data.begin(); + r = decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode allocated tag" << dendl; + return r; + } + + ldout(cct, 20) << "allocated journal tag: " + << "tid=" << tag.tid << ", " + << "data=" << *tag_data << dendl; + return 0; + } + + static int decode(bufferlist::iterator *it, + journal::TagData *tag_data) { + try { + ::decode(*tag_data, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; + } + +}; + +struct C_DecodeTags : public Context { + CephContext *cct; + Mutex *lock; + uint64_t *tag_tid; + journal::TagData *tag_data; + Context *on_finish; + + ::journal::Journaler::Tags tags; + + C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + virtual void complete(int r) { + on_finish->complete(process(r)); + Context::complete(0); + } + virtual void finish(int r) override { + } + + int process(int r) { + if (r < 0) { + lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r) + << dendl; + return r; + } + + if (tags.empty()) { + lderr(cct) << "no journal tags retrieved" << dendl; + return -ENOENT; + } + + Mutex::Locker locker(*lock); + *tag_tid = tags.back().tid; + + bufferlist::iterator data_it = tags.back().data.begin(); + r = C_DecodeTag::decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "failed to decode journal tag" << dendl; + return r; + } + + ldout(cct, 20) << "most recent journal tag: " + << "tid=" << *tag_tid << ", " + << "data=" << *tag_data << dendl; + return 0; + } +}; + + // TODO: once journaler is 100% async, remove separate threads and + // reuse ImageCtx's thread pool + class ThreadPoolSingleton : public ThreadPool { + public: + explicit ThreadPoolSingleton(CephContext *cct) + : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) { + start(); + } + virtual ~ThreadPoolSingleton() { + stop(); + } + }; + + class SafeTimerSingleton : public SafeTimer { + public: + Mutex lock; + + explicit SafeTimerSingleton(CephContext *cct) + : SafeTimer(cct, lock, true), + lock("librbd::Journal::SafeTimerSingleton::lock") { + init(); + } + virtual ~SafeTimerSingleton() { + Mutex::Locker locker(lock); + shutdown(); + } + }; + } // anonymous namespace using util::create_async_context_callback; @@@ -704,8 -613,9 +753,9 @@@ void Journal::create_journaler() assert(m_journaler == NULL); transition_state(STATE_INITIALIZING, 0); - m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, - IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age); + m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock, - m_image_ctx.md_ctx, m_image_ctx.id, "", - m_image_ctx.journal_commit_age); ++ m_image_ctx.md_ctx, m_image_ctx.id, ++ IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age); m_journaler->init(create_async_context_callback( m_image_ctx, create_context_callback< Journal, &Journal::handle_initialized>(this))); diff --cc src/test/rbd_mirror/image_replay.cc index e23f057996a9,1b945f563fcd..66fcd610856c --- a/src/test/rbd_mirror/image_replay.cc +++ b/src/test/rbd_mirror/image_replay.cc @@@ -179,9 -172,10 +181,11 @@@ int main(int argc, const char **argv dout(5) << "starting replay" << dendl; - replayer = new rbd::mirror::ImageReplayer(local, remote, client_id, - local_pool_id, remote_pool_id, - remote_image_id); + threads = new rbd::mirror::Threads(reinterpret_cast( + local->cct())); + replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id, - remote_pool_id, remote_image_id); ++ local_pool_id, remote_pool_id, ++ remote_image_id); r = replayer->start(&bootstap_params); if (r < 0) { diff --cc src/test/rbd_mirror/test_ImageReplayer.cc index 56172992e0b1,11ce286fb94d..3ab535c55e39 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@@ -98,10 -99,14 +99,14 @@@ public false, features, &order, 0, 0)); m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name); + m_threads = new rbd::mirror::Threads(reinterpret_cast( + m_local_ioctx.cct())); + m_replayer = new rbd::mirror::ImageReplayer( + m_threads, rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)), rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)), - m_client_id, remote_pool_id, m_remote_image_id); + m_client_id, m_local_ioctx.get_id(), remote_pool_id, m_remote_image_id); bootstrap(); } diff --cc src/tools/rbd_mirror/ImageReplayer.cc index d3cd665f0916,0554fe35811e..0afea699fd95 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@@ -158,11 -161,11 +161,12 @@@ private Commands commands; }; - ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote, + ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, const std::string &client_id, - int64_t local_pool_id, ++ int64_t local_pool_id, int64_t remote_pool_id, const std::string &remote_image_id) : + m_threads(threads), m_local(local), m_remote(remote), m_client_id(client_id), diff --cc src/tools/rbd_mirror/ImageReplayer.h index 4e94de8ef2e7,24f7f424cb5c..7e9a7e3b2168 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@@ -64,9 -67,9 +67,9 @@@ public }; public: - ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id, - int64_t local_pool_id, int64_t remote_pool_id, - const std::string &remote_image_id); + ImageReplayer(Threads *threads, RadosRef local, RadosRef remote, - const std::string &client_id, int64_t remote_pool_id, - const std::string &remote_image_id); ++ const std::string &client_id, int64_t local_pool_id, ++ int64_t remote_pool_id, const std::string &remote_image_id); virtual ~ImageReplayer(); ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; diff --cc src/tools/rbd_mirror/Mirror.cc index 5887899802c4,e3999eea2d4c..20bd41f8b7d9 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@@ -78,8 -78,8 +81,8 @@@ void Mirror::update_replayers(const map for (auto &kv : peer_configs) { const peer_t &peer = kv.first; if (m_replayers.find(peer) == m_replayers.end()) { + dout(20) << "starting replayer for " << peer << dendl; - unique_ptr replayer(new Replayer(m_local, peer)); + unique_ptr replayer(new Replayer(m_threads, m_local, peer)); // TODO: make async, and retry connecting within replayer int r = replayer->init(); if (r < 0) { diff --cc src/tools/rbd_mirror/Replayer.cc index e507c0cd2f6a,bd379f782519..ba043b37ec6d --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@@ -148,10 -124,10 +150,11 @@@ void Replayer::set_sources(const map image_replayer(new ImageReplayer(m_local, + unique_ptr image_replayer(new ImageReplayer(m_threads, + m_local, m_remote, m_client_id, + local_ioctx.get_id(), pool_id, image_id)); int r = image_replayer->start();