]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge pull request #7906 from dillaman/wip-14869
authorJosh Durgin <jdurgin@redhat.com>
Wed, 9 Mar 2016 22:18:13 +0000 (14:18 -0800)
committerJosh Durgin <jdurgin@redhat.com>
Wed, 9 Mar 2016 22:18:13 +0000 (14:18 -0800)
journal: re-use common threads between journalers

Conflicts:
src/journal/JournalPlayer.cc
src/librbd/Journal.cc
src/test/rbd_mirror/image_replay.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Mirror.cc

(merged interface changes to ImageReplayer, and reduced scope for
change to JournalPlayer due to pr #7884 (wip-14663)).

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
14 files changed:
1  2 
src/journal/JournalMetadata.cc
src/journal/JournalPlayer.cc
src/journal/Journaler.cc
src/journal/Journaler.h
src/librbd/Journal.cc
src/librbd/Journal.h
src/test/journal/test_JournalPlayer.cc
src/test/librbd/test_mock_Journal.cc
src/test/rbd_mirror/image_replay.cc
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Replayer.cc

index 4d81122c1c43fc0b82a3278679453558915cd89c,bf7ac2808864dd1e13309a794ac31a229693f1dc..b93f0379065d83ac3359796c58143f53a25eebd7
@@@ -589,11 -569,9 +569,11 @@@ void JournalMetadata::schedule_commit_t
  }
  
  void JournalMetadata::handle_commit_position_task() {
-   assert(m_timer_lock.is_locked());
+   assert(m_timer_lock->is_locked());
    assert(m_lock.is_locked());
 -  ldout(m_cct, 20) << __func__ << dendl;
 +  ldout(m_cct, 20) << __func__ << ": "
 +                   << "client_id=" << m_client_id << ", "
 +                   << "commit_position=" << m_commit_position << dendl;
  
    librados::ObjectWriteOperation op;
    client::client_commit(&op, m_client_id, m_commit_position);
index b05fead55a8c9ed01269a4086ecebb0f5c783d79,3559e240fb65adba0a0d4f6a6836dea9a18cf573..2f1b31657f89cee70655e149bf3cbfcc4904bb17
@@@ -515,64 -463,12 +514,64 @@@ void JournalPlayer::handle_fetched(uint
    process_state(object_num, r);
  }
  
 -void JournalPlayer::handle_watch(uint64_t object_num, int r) {
 -  ldout(m_cct, 10) << __func__ << ": "
 -                   << utils::get_object_name(m_object_oid_prefix, object_num)
 -                   << ": r=" << r << dendl;
 -  process_state(object_num, r);
 +void JournalPlayer::schedule_watch() {
 +  ldout(m_cct, 10) << __func__ << dendl;
 +  assert(m_lock.is_locked());
 +  if (m_watch_scheduled) {
 +    return;
 +  }
 +
 +  // poll first splay offset and active splay offset since
 +  // new records should only appear in those two objects
 +  C_Watch *ctx = new C_Watch(this);
 +  ObjectPlayerPtr object_player = get_object_player();
 +  object_player->watch(ctx, m_watch_interval);
 +
 +  uint8_t splay_width = m_journal_metadata->get_splay_width();
 +  if (object_player->get_object_number() % splay_width != 0) {
 +    ++ctx->pending_fetches;
 +
 +    object_player = m_object_players.begin()->second.begin()->second;
 +    object_player->watch(ctx, m_watch_interval);
 +  }
 +  m_watch_scheduled = true;
 +}
 +
 +void JournalPlayer::handle_watch(int r) {
 +  ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
 +
 +  Mutex::Locker locker(m_lock);
 +  m_watch_scheduled = false;
 +  std::set<uint64_t> object_numbers;
 +  for (auto &players : m_object_players) {
 +    object_numbers.insert(
 +      players.second.begin()->second->get_object_number());
 +  }
 +
 +  for (auto object_num : object_numbers) {
 +    process_state(object_num, r);
 +  }
 +}
 +
 +void JournalPlayer::notify_entries_available() {
 +  assert(m_lock.is_locked());
 +  if (m_handler_notified) {
 +    return;
 +  }
 +  m_handler_notified = true;
 +
 +  ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-   m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
++  m_journal_metadata->queue(new C_HandleEntriesAvailable(
 +    m_replay_handler), 0);
 +}
 +
 +void JournalPlayer::notify_complete(int r) {
 +  assert(m_lock.is_locked());
 +  m_handler_notified = true;
 +
 +  ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
-   m_journal_metadata->get_finisher().queue(new C_HandleComplete(
++  m_journal_metadata->queue(new C_HandleComplete(
 +    m_replay_handler), r);
  }
  
  } // namespace journal
Simple merge
Simple merge
index 871abcf0a2444e6f9f6e33e960debda1479cad64,21fc12262b375d30e018aa71ebf1cf0fbf4beee5..4a2a9df6983a51fe1b3c24220031a6a04c09b63f
@@@ -22,113 -24,34 +24,141 @@@ namespace librbd 
  
  namespace {
  
 +struct C_DecodeTag : public Context {
 +  CephContext *cct;
 +  Mutex *lock;
 +  uint64_t *tag_tid;
 +  journal::TagData *tag_data;
 +  Context *on_finish;
 +
 +  cls::journal::Tag tag;
 +
 +  C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
 +              journal::TagData *tag_data, Context *on_finish)
 +    : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
 +      on_finish(on_finish) {
 +  }
 +
 +  virtual void complete(int r) override {
 +    on_finish->complete(process(r));
 +    Context::complete(0);
 +  }
 +  virtual void finish(int r) override {
 +  }
 +
 +  int process(int r) {
 +    if (r < 0) {
 +      lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
 +      return r;
 +    }
 +
 +    Mutex::Locker locker(*lock);
 +    *tag_tid = tag.tid;
 +
 +    bufferlist::iterator data_it = tag.data.begin();
 +    r = decode(&data_it, tag_data);
 +    if (r < 0) {
 +      lderr(cct) << "failed to decode allocated tag" << dendl;
 +      return r;
 +    }
 +
 +    ldout(cct, 20) << "allocated journal tag: "
 +                   << "tid=" << tag.tid << ", "
 +                   << "data=" << *tag_data << dendl;
 +    return 0;
 +  }
 +
 +  static int decode(bufferlist::iterator *it,
 +                    journal::TagData *tag_data) {
 +    try {
 +      ::decode(*tag_data, *it);
 +    } catch (const buffer::error &err) {
 +      return -EBADMSG;
 +    }
 +    return 0;
 +  }
 +
 +};
 +
 +struct C_DecodeTags : public Context {
 +  CephContext *cct;
 +  Mutex *lock;
 +  uint64_t *tag_tid;
 +  journal::TagData *tag_data;
 +  Context *on_finish;
 +
 +  ::journal::Journaler::Tags tags;
 +
 +  C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
 +               journal::TagData *tag_data, Context *on_finish)
 +    : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
 +      on_finish(on_finish) {
 +  }
 +
 +  virtual void complete(int r) {
 +    on_finish->complete(process(r));
 +    Context::complete(0);
 +  }
 +  virtual void finish(int r) override {
 +  }
 +
 +  int process(int r) {
 +    if (r < 0) {
 +      lderr(cct) << "failed to retrieve journal tags: " << cpp_strerror(r)
 +                 << dendl;
 +      return r;
 +    }
 +
 +    if (tags.empty()) {
 +      lderr(cct) << "no journal tags retrieved" << dendl;
 +      return -ENOENT;
 +    }
 +
 +    Mutex::Locker locker(*lock);
 +    *tag_tid = tags.back().tid;
 +
 +    bufferlist::iterator data_it = tags.back().data.begin();
 +    r = C_DecodeTag::decode(&data_it, tag_data);
 +    if (r < 0) {
 +      lderr(cct) << "failed to decode journal tag" << dendl;
 +      return r;
 +    }
 +
 +    ldout(cct, 20) << "most recent journal tag: "
 +                   << "tid=" << *tag_tid << ", "
 +                   << "data=" << *tag_data << dendl;
 +    return 0;
 +  }
 +};
 +
+ // TODO: once journaler is 100% async, remove separate threads and
+ // reuse ImageCtx's thread pool
+ class ThreadPoolSingleton : public ThreadPool {
+ public:
+   explicit ThreadPoolSingleton(CephContext *cct)
+     : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+     start();
+   }
+   virtual ~ThreadPoolSingleton() {
+     stop();
+   }
+ };
+ class SafeTimerSingleton : public SafeTimer {
+ public:
+   Mutex lock;
+   explicit SafeTimerSingleton(CephContext *cct)
+       : SafeTimer(cct, lock, true),
+         lock("librbd::Journal::SafeTimerSingleton::lock") {
+     init();
+   }
+   virtual ~SafeTimerSingleton() {
+     Mutex::Locker locker(lock);
+     shutdown();
+   }
+ };
  } // anonymous namespace
  
  using util::create_async_context_callback;
@@@ -704,8 -613,9 +753,9 @@@ void Journal<I>::create_journaler() 
    assert(m_journaler == NULL);
  
    transition_state(STATE_INITIALIZING, 0);
-   m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
-                               IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
+   m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
 -                              m_image_ctx.md_ctx, m_image_ctx.id, "",
 -                              m_image_ctx.journal_commit_age);
++                            m_image_ctx.md_ctx, m_image_ctx.id,
++                            IMAGE_CLIENT_ID, m_image_ctx.journal_commit_age);
    m_journaler->init(create_async_context_callback(
      m_image_ctx, create_context_callback<
        Journal<I>, &Journal<I>::handle_initialized>(this)));
Simple merge
Simple merge
index e23f057996a98c9b89f73c1d90f70074b0fa531b,1b945f563fcdad8558b76f34f73d523b4a402cb5..66fcd610856cecff8c46fbfdf20d6145ceef1c4f
@@@ -179,9 -172,10 +181,11 @@@ int main(int argc, const char **argv
  
    dout(5) << "starting replay" << dendl;
  
-   replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
-                                             local_pool_id, remote_pool_id,
-                                             remote_image_id);
+   threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+     local->cct()));
+   replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
 -                                          remote_pool_id, remote_image_id);
++                                          local_pool_id, remote_pool_id,
++                                          remote_image_id);
  
    r = replayer->start(&bootstap_params);
    if (r < 0) {
index 56172992e0b14402ef8ab9d501014fa41efdb651,11ce286fb94df0ef7f5c73d56e8b55f132f5ed7d..3ab535c55e392100396c4aa179a2270486bcf6d8
@@@ -98,10 -99,14 +99,14 @@@ public
                                false, features, &order, 0, 0));
      m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);
  
+     m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
+       m_local_ioctx.cct()));
      m_replayer = new rbd::mirror::ImageReplayer(
+       m_threads,
        rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
        rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
 -      m_client_id, remote_pool_id, m_remote_image_id);
 +      m_client_id, m_local_ioctx.get_id(), remote_pool_id, m_remote_image_id);
  
      bootstrap();
    }
index d3cd665f09161c79df11d7b696d8cb9700b661ba,0554fe35811e40678212f1be561ed0c8ba542ddf..0afea699fd958c251f941b20168aad7901f16940
@@@ -158,11 -161,11 +161,12 @@@ private
    Commands commands;
  };
  
- ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
+ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
                             const std::string &client_id,
-                              int64_t local_pool_id,
++                           int64_t local_pool_id,
                             int64_t remote_pool_id,
                             const std::string &remote_image_id) :
+   m_threads(threads),
    m_local(local),
    m_remote(remote),
    m_client_id(client_id),
index 4e94de8ef2e784dbeb67fd2b4ff3c1b421e8dd0d,24f7f424cb5cc4d9b8d3c1686bf093cd0dade03b..7e9a7e3b2168ae8b2c98dd5dea15907010154ef2
@@@ -64,9 -67,9 +67,9 @@@ public
    };
  
  public:
-   ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
-               int64_t local_pool_id, int64_t remote_pool_id,
-                 const std::string &remote_image_id);
+   ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
 -                const std::string &client_id, int64_t remote_pool_id,
 -                const std::string &remote_image_id);
++              const std::string &client_id, int64_t local_pool_id,
++              int64_t remote_pool_id, const std::string &remote_image_id);
    virtual ~ImageReplayer();
    ImageReplayer(const ImageReplayer&) = delete;
    ImageReplayer& operator=(const ImageReplayer&) = delete;
index 5887899802c462046f58d7c8a45cb9a209d5608b,e3999eea2d4cff142bbe8c2e0fa18d7a267922e4..20bd41f8b7d92f164e0a49cf7259b88811f59fd0
@@@ -78,8 -78,8 +81,8 @@@ void Mirror::update_replayers(const map
    for (auto &kv : peer_configs) {
      const peer_t &peer = kv.first;
      if (m_replayers.find(peer) == m_replayers.end()) {
-       unique_ptr<Replayer> replayer(new Replayer(m_local, peer));
 +      dout(20) << "starting replayer for " << peer << dendl;
+       unique_ptr<Replayer> replayer(new Replayer(m_threads, m_local, peer));
        // TODO: make async, and retry connecting within replayer
        int r = replayer->init();
        if (r < 0) {
index e507c0cd2f6a03215e86bcff44632eb6124cdd9a,bd379f782519908ee1b64eabcee54aa2c0c7691b..ba043b37ec6d31bf025d9aa4df27d6968474da55
@@@ -148,10 -124,10 +150,11 @@@ void Replayer::set_sources(const map<in
      auto &pool_replayers = m_images[pool_id];
      for (const auto &image_id : kv.second) {
        if (pool_replayers.find(image_id) == pool_replayers.end()) {
-       unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
+       unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
+                                                                    m_local,
                                                                   m_remote,
                                                                   m_client_id,
 +                                                                   local_ioctx.get_id(),
                                                                   pool_id,
                                                                   image_id));
        int r = image_replayer->start();