void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
- if (m_commit_position_ctx == nullptr) {
- return;
- }
-
- cancel_commit_task();
- handle_commit_position_task();
+ C_SaferCond ctx;
+ flush_commit_position(&ctx);
+ ctx.wait();
}
void JournalMetadata::flush_commit_position(Context *on_safe) {
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
- if (m_commit_position_ctx == nullptr) {
+ if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
// nothing to flush
if (on_safe != nullptr) {
m_work_queue->queue(on_safe, 0);
}
if (on_safe != nullptr) {
- m_commit_position_ctx = new C_FlushCommitPosition(
- m_commit_position_ctx, on_safe);
+ m_flush_commit_position_ctxs.push_back(on_safe);
}
+ if (m_commit_position_ctx == nullptr) {
+ return;
+ }
+
cancel_commit_task();
handle_commit_position_task();
}
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
assert(m_commit_position_task_ctx != nullptr);
-
m_timer->cancel_event(m_commit_position_task_ctx);
m_commit_position_task_ctx = NULL;
}
assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
- if (m_commit_position_task_ctx == NULL) {
+ if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_settings.commit_interval,
m_commit_position_task_ctx);
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
- librados::ObjectWriteOperation op;
- client::client_commit(&op, m_client_id, m_commit_position);
+ m_commit_position_task_ctx = nullptr;
+ Context* commit_position_ctx = nullptr;
+ std::swap(commit_position_ctx, m_commit_position_ctx);
- Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
- m_commit_position_ctx = NULL;
+ m_async_op_tracker.start_op();
+ ++m_flush_commits_in_progress;
- ctx = schedule_laggy_clients_disconnect(ctx);
+ Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
+ Contexts flush_commit_position_ctxs;
+ m_lock.Lock();
+ assert(m_flush_commits_in_progress > 0);
+ --m_flush_commits_in_progress;
+ if (m_flush_commits_in_progress == 0) {
+ std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
+ }
+ m_lock.Unlock();
- librados::AioCompletion *comp =
- librados::Rados::aio_create_completion(ctx, NULL,
- utils::rados_ctx_callback);
+ commit_position_ctx->complete(0);
+ for (auto ctx : flush_commit_position_ctxs) {
+ ctx->complete(0);
+ }
+ m_async_op_tracker.finish_op();
+ });
+ ctx = new C_NotifyUpdate(this, ctx);
+ ctx = new FunctionContext([this, ctx](int r) {
+ // manually kick of a refresh in case the notification is missed
+ // and ignore the next notification that we are about to send
+ m_lock.Lock();
+ ++m_ignore_watch_notifies;
+ m_lock.Unlock();
+
+ refresh(ctx);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ schedule_laggy_clients_disconnect(ctx);
+ });
+
+ librados::ObjectWriteOperation op;
+ client::client_commit(&op, m_client_id, m_commit_position);
+
+ auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
+ utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
-
- m_commit_position_task_ctx = NULL;
}
void JournalMetadata::schedule_watch_reset() {
bufferlist bl;
m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_ignore_watch_notifies > 0) {
+ --m_ignore_watch_notifies;
+ return;
+ }
+ }
+
refresh(NULL);
}
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}
-Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
- assert(m_lock.is_locked());
-
+void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
-
if (m_settings.max_concurrent_object_sets <= 0) {
- return on_finish;
+ on_finish->complete(0);
+ return;
}
Context *ctx = on_finish;
+ {
+ Mutex::Locker locker(m_lock);
+ for (auto &c : m_registered_clients) {
+ if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+ c.id == m_client_id ||
+ m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+ continue;
+ }
+ const std::string &client_id = c.id;
+ uint64_t object_set = 0;
+ if (!c.commit_position.object_positions.empty()) {
+ auto &position = *(c.commit_position.object_positions.begin());
+ object_set = position.object_number / m_splay_width;
+ }
- for (auto &c : m_registered_clients) {
- if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
- c.id == m_client_id ||
- m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
- continue;
- }
- const std::string &client_id = c.id;
- uint64_t object_set = 0;
- if (!c.commit_position.object_positions.empty()) {
- auto &position = *(c.commit_position.object_positions.begin());
- object_set = position.object_number / m_splay_width;
- }
-
- if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
- ldout(m_cct, 1) << __func__ << ": " << client_id
- << ": scheduling disconnect" << dendl;
+ if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+ ldout(m_cct, 1) << __func__ << ": " << client_id
+ << ": scheduling disconnect" << dendl;
- ctx = new FunctionContext([this, client_id, ctx](int r1) {
- ldout(m_cct, 10) << __func__ << ": " << client_id
- << ": flagging disconnected" << dendl;
+ ctx = new FunctionContext([this, client_id, ctx](int r1) {
+ ldout(m_cct, 10) << __func__ << ": " << client_id
+ << ": flagging disconnected" << dendl;
- librados::ObjectWriteOperation op;
- client::client_update_state(&op, client_id,
- cls::journal::CLIENT_STATE_DISCONNECTED);
+ librados::ObjectWriteOperation op;
+ client::client_update_state(
+ &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
- librados::AioCompletion *comp =
- librados::Rados::aio_create_completion(ctx, nullptr,
- utils::rados_ctx_callback);
- int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
- comp->release();
- });
+ auto comp = librados::Rados::aio_create_completion(
+ ctx, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+ });
+ }
}
}
if (ctx == on_finish) {
ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
}
-
- return ctx;
+ ctx->complete(0);
}
std::ostream &operator<<(std::ostream &os,