}
};
+struct C_FlushCommitPosition : public Context {
+ Context *commit_position_ctx;
+ Context *on_finish;
+
+ C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
+ : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
+ }
+ virtual void finish(int r) override {
+ if (commit_position_ctx != nullptr) {
+ commit_position_ctx->complete(r);
+ }
+ on_finish->complete(r);
+ }
+};
+
} // anonymous namespace
JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
}
void JournalMetadata::flush_commit_position() {
-
ldout(m_cct, 20) << __func__ << dendl;
- {
- Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
- if (m_commit_position_task_ctx == NULL) {
- return;
- }
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (m_commit_position_ctx == nullptr) {
+ return;
+ }
+
+ cancel_commit_task();
+ handle_commit_position_task();
+}
- m_timer->cancel_event(m_commit_position_task_ctx);
- m_commit_position_task_ctx = NULL;
+void JournalMetadata::flush_commit_position(Context *on_safe) {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (m_commit_position_ctx == nullptr) {
+ // nothing to flush
+ m_finisher->queue(on_safe, 0);
+ return;
}
+
+ m_commit_position_ctx = new C_FlushCommitPosition(
+ m_commit_position_ctx, on_safe);
+ cancel_commit_task();
handle_commit_position_task();
}
}
}
-void JournalMetadata::schedule_commit_task() {
-
+void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
assert(m_timer_lock.is_locked());
assert(m_lock.is_locked());
+ assert(m_commit_position_ctx != nullptr);
+ assert(m_commit_position_task_ctx != nullptr);
+
+ m_timer->cancel_event(m_commit_position_task_ctx);
+ m_commit_position_task_ctx = NULL;
+}
+void JournalMetadata::schedule_commit_task() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ assert(m_timer_lock.is_locked());
+ assert(m_lock.is_locked());
+ assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
m_commit_position_task_ctx = new C_CommitPositionTask(this);
m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
}
void JournalMetadata::handle_commit_position_task() {
-
+ assert(m_timer_lock.is_locked());
+ assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker locker(m_lock);
-
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
}
void flush_commit_position();
+ void flush_commit_position(Context *on_safe);
void set_commit_position(const ObjectSetPosition &commit_position,
Context *on_safe);
void get_commit_position(ObjectSetPosition *commit_position) const {
journal_metadata->m_async_op_tracker.finish_op();
}
virtual void finish(int r) {
+ Mutex::Locker locker(journal_metadata->m_lock);
journal_metadata->handle_commit_position_task();
};
};
void refresh(Context *on_finish);
void handle_refresh_complete(C_Refresh *refresh, int r);
+ void cancel_commit_task();
void schedule_commit_task();
void handle_commit_position_task();
return 0;
}
+void Journaler::flush_commit_position(Context *on_safe) {
+ m_metadata->flush_commit_position(on_safe);
+}
+
int Journaler::register_client(const bufferlist &data) {
return m_metadata->register_client(data);
}
void Journaler::stop_append(Context *on_safe) {
assert(m_recorder != NULL);
- flush(new C_DeleteRecorder(m_recorder, on_safe));
+ flush_append(new C_DeleteRecorder(m_recorder, on_safe));
m_recorder = NULL;
}
return m_recorder->append(tag_tid, payload_bl);
}
-void Journaler::flush(Context *on_safe) {
+void Journaler::flush_append(Context *on_safe) {
m_recorder->flush(on_safe);
}
int register_client(const bufferlist &data);
int unregister_client();
+ void flush_commit_position(Context *on_safe);
+
void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
Context *on_finish);
void allocate_tag(uint64_t tag_class, const bufferlist &data,
void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
Future append(uint64_t tag_tid, const bufferlist &bl);
- void flush(Context *on_safe);
+ void flush_append(Context *on_safe);
void stop_append(Context *on_safe);
void committed(const ReplayEntry &replay_entry);