From 4056e360117b3aacfba2ae98cd4ecb60e356730c Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Wed, 13 Jul 2016 15:49:40 +0300 Subject: [PATCH] journal: allow to trim journal for "laggy" clients Signed-off-by: Mykola Golub (cherry picked from commit 0b8b1aaedc10f7f46e91bf6ad809414feb770c8d) --- src/journal/JournalMetadata.cc | 63 ++++++++++++++++++++- src/journal/JournalMetadata.h | 2 + src/journal/JournalTrimmer.cc | 5 +- src/journal/Settings.h | 3 + src/test/journal/RadosTestFixture.cc | 4 +- src/test/journal/RadosTestFixture.h | 3 +- src/test/journal/test_JournalMetadata.cc | 71 +++++++++++++++++++++++- 7 files changed, 144 insertions(+), 7 deletions(-) diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index de46bd7ff0af1..08a7e9307aa45 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { Client client(m_client_id, bufferlist()); RegisteredClients::iterator it = refresh->registered_clients.find(client); if (it != refresh->registered_clients.end()) { + if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { + ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id + << dendl; + } m_minimum_set = MAX(m_minimum_set, refresh->minimum_set); m_active_set = MAX(m_active_set, refresh->active_set); m_registered_clients = refresh->registered_clients; @@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() { librados::ObjectWriteOperation op; client::client_commit(&op, m_client_id, m_commit_position); - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); + Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); m_commit_position_ctx = NULL; + ctx = schedule_laggy_clients_disconnect(ctx); + librados::AioCompletion *comp = librados::Rados::aio_create_completion(ctx, NULL, utils::rados_ctx_callback); @@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() { if (r == -ENOENT) { ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; } else { - lderr(m_cct) << __func__ << ": failed to watch journal" + lderr(m_cct) << __func__ << ": failed to watch journal: " << cpp_strerror(r) << dendl; } schedule_watch_reset(); @@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) { ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; } +Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { + assert(m_lock.is_locked()); + + ldout(m_cct, 20) << __func__ << dendl; + + if (m_settings.max_concurrent_object_sets <= 0) { + return on_finish; + } + + Context *ctx = on_finish; + + for (auto &c : m_registered_clients) { + if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || + c.id == m_client_id || + m_settings.whitelisted_laggy_clients.count(c.id) > 0) { + continue; + } + const std::string &client_id = c.id; + uint64_t object_set = 0; + if (!c.commit_position.object_positions.empty()) { + auto &position = *(c.commit_position.object_positions.begin()); + object_set = position.object_number / m_splay_width; + } + + if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { + ldout(m_cct, 1) << __func__ << ": " << client_id + << ": scheduling disconnect" << dendl; + + ctx = new FunctionContext([this, client_id, ctx](int r1) { + ldout(m_cct, 10) << __func__ << ": " << client_id + << ": flagging disconnected" << dendl; + + librados::ObjectWriteOperation op; + client::client_update_state(&op, client_id, + cls::journal::CLIENT_STATE_DISCONNECTED); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, nullptr, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); + }); + } + } + + if (ctx == on_finish) { + ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; + } + + return ctx; +} + std::ostream &operator<<(std::ostream &os, const JournalMetadata::RegisteredClients &clients) { os << "["; diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 01116d7d1e016..4055f9937047f 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -344,6 +344,8 @@ private: void handle_watch_error(int err); void handle_notified(int r); + Context *schedule_laggy_clients_disconnect(Context *on_finish); + friend std::ostream &operator<<(std::ostream &os, const JournalMetadata &journal_metadata); }; diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index aef16c2fb0acb..18b8e38b37a18 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -142,8 +142,11 @@ void JournalTrimmer::handle_metadata_updated() { uint64_t minimum_commit_set = active_set; std::string minimum_client_id; - // TODO: add support for trimming past "laggy" clients for (auto &client : registered_clients) { + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + continue; + } + if (client.commit_position.object_positions.empty()) { // client hasn't recorded any commits minimum_commit_set = minimum_set; diff --git a/src/journal/Settings.h b/src/journal/Settings.h index 603770cbaf65c..ca57125a85945 100644 --- a/src/journal/Settings.h +++ b/src/journal/Settings.h @@ -12,6 +12,9 @@ struct Settings { double commit_interval = 5; ///< commit position throttle (in secs) uint64_t max_fetch_bytes = 0; ///< 0 implies no limit uint64_t max_payload_bytes = 0; ///< 0 implies object size limit + int max_concurrent_object_sets = 0; ///< 0 implies no limit + std::set whitelisted_laggy_clients; + ///< clients that mustn't be disconnected }; } // namespace journal diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc index dba3ec6a244a1..a066559c92330 100644 --- a/src/test/journal/RadosTestFixture.cc +++ b/src/test/journal/RadosTestFixture.cc @@ -68,10 +68,12 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order, journal::JournalMetadataPtr RadosTestFixture::create_metadata( const std::string &oid, const std::string &client_id, - double commit_interval, uint64_t max_fetch_bytes) { + double commit_interval, uint64_t max_fetch_bytes, + int max_concurrent_object_sets) { journal::Settings settings; settings.commit_interval = commit_interval; settings.max_fetch_bytes = max_fetch_bytes; + settings.max_concurrent_object_sets = max_concurrent_object_sets; journal::JournalMetadataPtr metadata(new journal::JournalMetadata( m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings)); diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h index 4ea22e7cefcf9..64d3011a69f0e 100644 --- a/src/test/journal/RadosTestFixture.h +++ b/src/test/journal/RadosTestFixture.h @@ -26,7 +26,8 @@ public: journal::JournalMetadataPtr create_metadata(const std::string &oid, const std::string &client_id = "client", double commit_internal = 0.1, - uint64_t max_fetch_bytes = 0); + uint64_t max_fetch_bytes = 0, + int max_concurrent_object_sets = 0); int append(const std::string &oid, const bufferlist &bl); int client_register(const std::string &oid, const std::string &id = "client", diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc index b8f559365f811..560ee4e569f70 100644 --- a/src/test/journal/test_JournalMetadata.cc +++ b/src/test/journal/test_JournalMetadata.cc @@ -21,9 +21,12 @@ public: journal::JournalMetadataPtr create_metadata(const std::string &oid, const std::string &client_id, - double commit_internal = 0.1) { + double commit_interval = 0.1, + uint64_t max_fetch_bytes = 0, + int max_concurrent_object_sets = 0) { journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata( - oid, client_id, commit_internal); + oid, client_id, commit_interval, max_fetch_bytes, + max_concurrent_object_sets); m_metadata_list.push_back(metadata); metadata->add_listener(&m_listener); return metadata; @@ -116,6 +119,70 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) { ASSERT_EQ(123U, metadata1->get_active_set()); } +TEST_F(TestJournalMetadata, DisconnectLaggyClient) { + std::string oid = get_temp_oid(); + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid, "client1", "")); + ASSERT_EQ(0, client_register(oid, "client2", "laggy")); + + int max_concurrent_object_sets = 100; + journal::JournalMetadataPtr metadata = + create_metadata(oid, "client1", 0.1, 0, max_concurrent_object_sets); + ASSERT_EQ(0, init_metadata(metadata)); + ASSERT_TRUE(wait_for_update(metadata)); + + ASSERT_EQ(0U, metadata->get_active_set()); + + journal::JournalMetadata::RegisteredClients clients; + +#define ASSERT_CLIENT_STATES(s1, s2) \ + ASSERT_EQ(2U, clients.size()); \ + for (auto &c : clients) { \ + if (c.id == "client1") { \ + ASSERT_EQ(c.state, s1); \ + } else if (c.id == "client2") { \ + ASSERT_EQ(c.state, s2); \ + } else { \ + ASSERT_TRUE(false); \ + } \ + } + + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_CONNECTED); + + // client2 is connected when active set <= max_concurrent_object_sets + ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets)); + ASSERT_TRUE(wait_for_update(metadata)); + uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0); + C_SaferCond cond1; + metadata->committed(commit_tid, [&cond1]() { return &cond1; }); + ASSERT_EQ(0, cond1.wait()); + metadata->flush_commit_position(); + ASSERT_TRUE(wait_for_update(metadata)); + ASSERT_EQ(100U, metadata->get_active_set()); + clients.clear(); + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_CONNECTED); + + // client2 is disconnected when active set > max_concurrent_object_sets + ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1)); + ASSERT_TRUE(wait_for_update(metadata)); + commit_tid = metadata->allocate_commit_tid(0, 0, 1); + C_SaferCond cond2; + metadata->committed(commit_tid, [&cond2]() { return &cond2; }); + ASSERT_EQ(0, cond2.wait()); + metadata->flush_commit_position(); + ASSERT_TRUE(wait_for_update(metadata)); + ASSERT_EQ(101U, metadata->get_active_set()); + clients.clear(); + metadata->get_registered_clients(&clients); + ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED, + cls::journal::CLIENT_STATE_DISCONNECTED); +} + TEST_F(TestJournalMetadata, AssertActiveTag) { std::string oid = get_temp_oid(); -- 2.39.5