Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
if (it != refresh->registered_clients.end()) {
+ if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
+ << dendl;
+ }
m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
m_active_set = MAX(m_active_set, refresh->active_set);
m_registered_clients = refresh->registered_clients;
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
- C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
+ Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;
+ ctx = schedule_laggy_clients_disconnect(ctx);
+
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
if (r == -ENOENT) {
ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
} else {
- lderr(m_cct) << __func__ << ": failed to watch journal"
+ lderr(m_cct) << __func__ << ": failed to watch journal: "
<< cpp_strerror(r) << dendl;
}
schedule_watch_reset();
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}
+Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
+ assert(m_lock.is_locked());
+
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ if (m_settings.max_concurrent_object_sets <= 0) {
+ return on_finish;
+ }
+
+ Context *ctx = on_finish;
+
+ for (auto &c : m_registered_clients) {
+ if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+ c.id == m_client_id ||
+ m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+ continue;
+ }
+ const std::string &client_id = c.id;
+ uint64_t object_set = 0;
+ if (!c.commit_position.object_positions.empty()) {
+ auto &position = *(c.commit_position.object_positions.begin());
+ object_set = position.object_number / m_splay_width;
+ }
+
+ if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+ ldout(m_cct, 1) << __func__ << ": " << client_id
+ << ": scheduling disconnect" << dendl;
+
+ ctx = new FunctionContext([this, client_id, ctx](int r1) {
+ ldout(m_cct, 10) << __func__ << ": " << client_id
+ << ": flagging disconnected" << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::client_update_state(&op, client_id,
+ cls::journal::CLIENT_STATE_DISCONNECTED);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, nullptr,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+ });
+ }
+ }
+
+ if (ctx == on_finish) {
+ ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
+ }
+
+ return ctx;
+}
+
std::ostream &operator<<(std::ostream &os,
const JournalMetadata::RegisteredClients &clients) {
os << "[";
void handle_watch_error(int err);
void handle_notified(int r);
+ Context *schedule_laggy_clients_disconnect(Context *on_finish);
+
friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);
};
uint64_t minimum_commit_set = active_set;
std::string minimum_client_id;
- // TODO: add support for trimming past "laggy" clients
for (auto &client : registered_clients) {
+ if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ continue;
+ }
+
if (client.commit_position.object_positions.empty()) {
// client hasn't recorded any commits
minimum_commit_set = minimum_set;
double commit_interval = 5; ///< commit position throttle (in secs)
uint64_t max_fetch_bytes = 0; ///< 0 implies no limit
uint64_t max_payload_bytes = 0; ///< 0 implies object size limit
+ int max_concurrent_object_sets = 0; ///< 0 implies no limit
+ std::set<std::string> whitelisted_laggy_clients;
+ ///< clients that mustn't be disconnected
};
} // namespace journal
journal::JournalMetadataPtr RadosTestFixture::create_metadata(
const std::string &oid, const std::string &client_id,
- double commit_interval, uint64_t max_fetch_bytes) {
+ double commit_interval, uint64_t max_fetch_bytes,
+ int max_concurrent_object_sets) {
journal::Settings settings;
settings.commit_interval = commit_interval;
settings.max_fetch_bytes = max_fetch_bytes;
+ settings.max_concurrent_object_sets = max_concurrent_object_sets;
journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings));
journal::JournalMetadataPtr create_metadata(const std::string &oid,
const std::string &client_id = "client",
double commit_internal = 0.1,
- uint64_t max_fetch_bytes = 0);
+ uint64_t max_fetch_bytes = 0,
+ int max_concurrent_object_sets = 0);
int append(const std::string &oid, const bufferlist &bl);
int client_register(const std::string &oid, const std::string &id = "client",
journal::JournalMetadataPtr create_metadata(const std::string &oid,
const std::string &client_id,
- double commit_internal = 0.1) {
+ double commit_interval = 0.1,
+ uint64_t max_fetch_bytes = 0,
+ int max_concurrent_object_sets = 0) {
journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
- oid, client_id, commit_internal);
+ oid, client_id, commit_interval, max_fetch_bytes,
+ max_concurrent_object_sets);
m_metadata_list.push_back(metadata);
metadata->add_listener(&m_listener);
return metadata;
ASSERT_EQ(123U, metadata1->get_active_set());
}
+TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+ ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
+
+ int max_concurrent_object_sets = 100;
+ journal::JournalMetadataPtr metadata =
+ create_metadata(oid, "client1", 0.1, 0, max_concurrent_object_sets);
+ ASSERT_EQ(0, init_metadata(metadata));
+ ASSERT_TRUE(wait_for_update(metadata));
+
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ journal::JournalMetadata::RegisteredClients clients;
+
+#define ASSERT_CLIENT_STATES(s1, s2) \
+ ASSERT_EQ(2U, clients.size()); \
+ for (auto &c : clients) { \
+ if (c.id == "client1") { \
+ ASSERT_EQ(c.state, s1); \
+ } else if (c.id == "client2") { \
+ ASSERT_EQ(c.state, s2); \
+ } else { \
+ ASSERT_TRUE(false); \
+ } \
+ }
+
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_CONNECTED);
+
+ // client2 is connected when active set <= max_concurrent_object_sets
+ ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets));
+ ASSERT_TRUE(wait_for_update(metadata));
+ uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0);
+ C_SaferCond cond1;
+ metadata->committed(commit_tid, [&cond1]() { return &cond1; });
+ ASSERT_EQ(0, cond1.wait());
+ metadata->flush_commit_position();
+ ASSERT_TRUE(wait_for_update(metadata));
+ ASSERT_EQ(100U, metadata->get_active_set());
+ clients.clear();
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_CONNECTED);
+
+ // client2 is disconnected when active set > max_concurrent_object_sets
+ ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1));
+ ASSERT_TRUE(wait_for_update(metadata));
+ commit_tid = metadata->allocate_commit_tid(0, 0, 1);
+ C_SaferCond cond2;
+ metadata->committed(commit_tid, [&cond2]() { return &cond2; });
+ ASSERT_EQ(0, cond2.wait());
+ metadata->flush_commit_position();
+ ASSERT_TRUE(wait_for_update(metadata));
+ ASSERT_EQ(101U, metadata->get_active_set());
+ clients.clear();
+ metadata->get_registered_clients(&clients);
+ ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+ cls::journal::CLIENT_STATE_DISCONNECTED);
+}
+
TEST_F(TestJournalMetadata, AssertActiveTag) {
std::string oid = get_temp_oid();