m_cache_state->empty = true;
}
- // TODO: Will init sync point, this will be covered in later PR.
- // init_flush_new_sync_point(later);
- ++m_current_sync_gen;
- m_current_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen,
- this->m_image_ctx.cct);
+ /* Start the sync point following the last one seen in the
+ * log. Flush the last sync point created during the loading of the
+ * existing log entries. */
+ init_flush_new_sync_point(later);
+ ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
m_initialized = true;
// Start the thread
schedule_append(to_append);
}
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op)
+{
+ GenericLogOperations to_append { op };
+
+ schedule_append(to_append);
+}
+
const unsigned long int ops_flushed_together = 4;
/*
* Performs the pmem buffer flush on all scheduled ops, then schedules
return alloc_succeeds;
}
+template <typename I>
+C_FlushRequest<ReplicatedWriteLog<I>>* ReplicatedWriteLog<I>::make_flush_req(Context *on_finish) {
+ utime_t flush_begins = ceph_clock_now();
+ bufferlist bl;
+ auto *flush_req =
+ new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
+ std::move(bl), 0, m_lock, m_perfcounter, on_finish);
+
+ return flush_req;
+}
+
+/* Update/persist the last flushed sync point in the log */
+template <typename I>
+void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ uint64_t flushed_sync_gen;
+
+ std::lock_guard append_locker(m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = m_flushed_sync_gen;
+ }
+
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ }
+}
+
+/* Returns true if the specified SyncPointLogEntry is considered flushed, and
+ * the log will be updated to reflect this. */
+template <typename I>
+bool ReplicatedWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
+
+ if ((log_entry->writes_flushed == log_entry->writes) &&
+ log_entry->completed && log_entry->prior_sync_point_flushed &&
+ log_entry->next_sync_point_entry) {
+ ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
+ << *log_entry << dendl;
+ log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
+ /* Don't move the flushed sync gen num backwards. */
+ if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
+ m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
+ }
+ m_async_op_tracker.start_op();
+ m_work_queue.queue(new LambdaContext(
+ [this, log_entry](int r) {
+ bool handled_by_next;
+ {
+ std::lock_guard locker(m_lock);
+ handled_by_next = handle_flushed_sync_point(log_entry->next_sync_point_entry);
+ }
+ if (!handled_by_next) {
+ persist_last_flushed_sync_gen();
+ }
+ m_async_op_tracker.finish_op();
+ }));
+ return true;
+ }
+ return false;
+}
+
+/* Make a new sync point and flush the previous during initialization, when there may or may
+* not be a previous sync point */
+template <typename I>
+void ReplicatedWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(!m_initialized); /* Don't use this after init */
+
+ if (!m_current_sync_point) {
+ /* First sync point since start */
+ new_sync_point(later);
+ } else {
+ flush_new_sync_point(nullptr, later);
+ }
+}
+
+/**
+* Begin a new sync point
+*/
+template <typename I>
+void ReplicatedWriteLog<I>::new_sync_point(DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
+ std::shared_ptr<SyncPoint> new_sync_point;
+ ldout(cct, 20) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ /* The first time this is called, if this is a newly created log,
+ * this makes the first sync gen number we'll use 1. On the first
+ * call for a re-opened log m_current_sync_gen will be the highest
+ * gen number from all the sync point entries found in the re-opened
+ * log, and this advances to the next sync gen number. */
+ ++m_current_sync_gen;
+
+ new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
+ m_current_sync_point = new_sync_point;
+
+ /* If this log has been re-opened, old_sync_point will initially be
+ * nullptr, but m_current_sync_gen may not be zero. */
+ if (old_sync_point) {
+ new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
+ /* This sync point will acquire no more sub-ops. Activation needs
+ * to acquire m_lock, so defer to later*/
+ later.add(new LambdaContext(
+ [this, old_sync_point](int r) {
+ old_sync_point->prior_persisted_gather_activate();
+ }));
+ }
+
+ new_sync_point->prior_persisted_gather_set_finisher();
+
+ if (old_sync_point) {
+ ldout(cct,6) << "new sync point = [" << *m_current_sync_point
+ << "], prior = [" << *old_sync_point << "]" << dendl;
+ } else {
+ ldout(cct,6) << "first sync point = [" << *m_current_sync_point
+ << "]" << dendl;
+ }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!flush_req) {
+ m_async_null_flush_finish++;
+ m_async_op_tracker.start_op();
+ Context *flush_ctx = new LambdaContext([this](int r) {
+ m_async_null_flush_finish--;
+ m_async_op_tracker.finish_op();
+ });
+ flush_req = make_flush_req(flush_ctx);
+ flush_req->internal = true;
+ }
+
+ /* Add a new sync point. */
+ new_sync_point(later);
+ std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
+ ceph_assert(to_append);
+
+ /* This flush request will append/persist the (now) previous sync point */
+ flush_req->to_append = to_append;
+
+ /* When the m_sync_point_persist Gather completes this sync point can be
+ * appended. The only sub for this Gather is the finisher Context for
+ * m_prior_log_entries_persisted, which records the result of the Gather in
+ * the sync point, and completes. TODO: Do we still need both of these
+ * Gathers?*/
+ Context * ctx = new LambdaContext([this, flush_req](int r) {
+ ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
+ << " sync point =" << flush_req->to_append
+ << ". Ready to persist." << dendl;
+ alloc_and_dispatch_io_req(flush_req);
+ });
+ to_append->persist_gather_set_finisher(ctx);
+
+ /* The m_sync_point_persist Gather has all the subs it will ever have, and
+ * now has its finisher. If the sub is already complete, activation will
+ * complete the Gather. The finisher will acquire m_lock, so we'll activate
+ * this when we release m_lock.*/
+ later.add(new LambdaContext([this, to_append](int r) {
+ to_append->persist_gather_activate();
+ }));
+
+ /* The flush request completes when the sync point persists */
+ to_append->add_in_on_persisted_ctxs(flush_req);
+}
+
} // namespace cache
} // namespace librbd
using This = ReplicatedWriteLog<ImageCtxT>;
using C_WriteRequestT = rwl::C_WriteRequest<This>;
using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
+ using C_FlushRequestT = rwl::C_FlushRequest<This>;
CephContext * get_context();
void release_guarded_request(BlockGuardCell *cell);
void release_write_lanes(C_BlockIORequestT *req);
template <typename V>
void flush_pmem_buffer(V& ops);
void schedule_append(rwl::GenericLogOperationsVector &ops);
+ void schedule_append(rwl::GenericLogOperationSharedPtr op);
void schedule_flush_and_append(rwl::GenericLogOperationsVector &ops);
+ void flush_new_sync_point(C_FlushRequestT *flush_req, rwl::DeferredContexts &later);
std::shared_ptr<rwl::SyncPoint> get_current_sync_point() {
return m_current_sync_point;
}
std::atomic<int> m_async_flush_ops = {0};
std::atomic<int> m_async_append_ops = {0};
std::atomic<int> m_async_complete_ops = {0};
+ std::atomic<int> m_async_null_flush_finish = {0};
/* Acquire locks in order declared here */
void update_image_cache_state(Context *on_finish);
void wake_up();
+ void persist_last_flushed_sync_gen();
+ bool handle_flushed_sync_point(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
+
+ void init_flush_new_sync_point(rwl::DeferredContexts &later);
+ void new_sync_point(rwl::DeferredContexts &later);
+ rwl::C_FlushRequest<ReplicatedWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
+
void dispatch_deferred_writes(void);
void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
void append_scheduled_ops(void);
dispatch_time(dispatched),
perfcounter(perfcounter),
sync_point(sync_point) {
- on_ops_appending = sync_point->prior_log_entries_persisted->new_sub();
+ on_ops_appending = sync_point->prior_persisted_gather_new_sub();
on_ops_persist = nullptr;
extent_ops_persist =
new C_Gather(m_cct,
}
template <typename T>
-void C_WriteRequest<T>::setup_log_operations() {
+void C_WriteRequest<T>::setup_log_operations(DeferredContexts &on_exit) {
{
std::lock_guard locker(m_lock);
- // TODO: Add sync point if necessary
std::shared_ptr<SyncPoint> current_sync_point = rwl.get_current_sync_point();
+ if ((!rwl.get_persist_on_flush() && current_sync_point->log_entry->writes_completed) ||
+ (current_sync_point->log_entry->writes > MAX_WRITES_PER_SYNC_POINT) ||
+ (current_sync_point->log_entry->bytes > MAX_BYTES_PER_SYNC_POINT)) {
+ /* Create new sync point and persist the previous one. This sequenced
+ * write will bear a sync gen number shared with no already completed
+ * writes. A group of sequenced writes may be safely flushed concurrently
+ * if they all arrived before any of them completed. We'll insert one on
+ * an aio_flush() from the application. Here we're inserting one to cap
+ * the number of bytes and writes per sync point. When the application is
+ * not issuing flushes, we insert sync points to record some observed
+ * write concurrency information that enables us to safely issue >1 flush
+ * write (for writes observed here to have been in flight simultaneously)
+ * at a time in persist-on-write mode.
+ */
+ rwl.flush_new_sync_point(nullptr, on_exit);
+ current_sync_point = rwl.get_current_sync_point();
+ }
uint64_t current_sync_gen = rwl.get_current_sync_gen();
op_set =
make_unique<WriteLogOperationSet>(this->m_dispatched_time,
void C_WriteRequest<T>::dispatch()
{
CephContext *cct = rwl.get_context();
+ DeferredContexts on_exit;
utime_t now = ceph_clock_now();
this->m_dispatched_time = now;
ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
- setup_log_operations();
+ setup_log_operations(on_exit);
bool append_deferred = false;
if (!op_set->persist_on_flush &&
void dispatch() override;
- virtual void setup_log_operations();
+ virtual void setup_log_operations(DeferredContexts &on_exit);
bool append_write_request(std::shared_ptr<SyncPoint> sync_point);
class C_FlushRequest : public C_BlockIORequest<T> {
public:
using C_BlockIORequest<T>::rwl;
+ bool internal = false;
std::shared_ptr<SyncPoint> to_append;
C_FlushRequest(T &rwl, const utime_t arrived,
SyncPoint::SyncPoint(uint64_t sync_gen_num, CephContext *cct)
: log_entry(std::make_shared<SyncPointLogEntry>(sync_gen_num)), m_cct(cct) {
- prior_log_entries_persisted = new C_Gather(cct, nullptr);
- sync_point_persist = new C_Gather(cct, nullptr);
+ m_prior_log_entries_persisted = new C_Gather(cct, nullptr);
+ m_sync_point_persist = new C_Gather(cct, nullptr);
on_sync_point_appending.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
on_sync_point_persisted.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
ldout(m_cct, 20) << "sync point " << sync_gen_num << dendl;
os << "log_entry=[" << *p.log_entry << "], "
<< "earlier_sync_point=" << p.earlier_sync_point << ", "
<< "later_sync_point=" << p.later_sync_point << ", "
- << "final_op_sequence_num=" << p.final_op_sequence_num << ", "
- << "prior_log_entries_persisted=" << p.prior_log_entries_persisted << ", "
- << "prior_log_entries_persisted_complete=" << p.prior_log_entries_persisted_complete << ", "
- << "append_scheduled=" << p.append_scheduled << ", "
+ << "m_final_op_sequence_num=" << p.m_final_op_sequence_num << ", "
+ << "m_prior_log_entries_persisted=" << p.m_prior_log_entries_persisted << ", "
+ << "m_prior_log_entries_persisted_complete=" << p.m_prior_log_entries_persisted_complete << ", "
+ << "m_append_scheduled=" << p.m_append_scheduled << ", "
<< "appending=" << p.appending << ", "
<< "on_sync_point_appending=" << p.on_sync_point_appending.size() << ", "
<< "on_sync_point_persisted=" << p.on_sync_point_persisted.size() << "";
return os;
}
+void SyncPoint::persist_gather_set_finisher(Context *ctx) {
+ m_append_scheduled = true;
+ /* All prior sync points that are still in this list must already be scheduled for append */
+ std::shared_ptr<SyncPoint> previous = earlier_sync_point;
+ while (previous) {
+ ceph_assert(previous->m_append_scheduled);
+ previous = previous->earlier_sync_point;
+ }
+
+ m_sync_point_persist->set_finisher(ctx);
+}
+
+void SyncPoint::persist_gather_activate() {
+ m_sync_point_persist->activate();
+}
+
+Context* SyncPoint::persist_gather_new_sub() {
+ return m_sync_point_persist->new_sub();
+}
+
+void SyncPoint::prior_persisted_gather_activate() {
+ m_prior_log_entries_persisted->activate();
+}
+
+Context* SyncPoint::prior_persisted_gather_new_sub() {
+ return m_prior_log_entries_persisted->new_sub();
+}
+
+void SyncPoint::prior_persisted_gather_set_finisher() {
+ Context *sync_point_persist_ready = persist_gather_new_sub();
+ std::shared_ptr<SyncPoint> sp = shared_from_this();
+ m_prior_log_entries_persisted->
+ set_finisher(new LambdaContext([this, sp, sync_point_persist_ready](int r) {
+ ldout(m_cct, 20) << "Prior log entries persisted for sync point =["
+ << sp << "]" << dendl;
+ sp->m_prior_log_entries_persisted_result = r;
+ sp->m_prior_log_entries_persisted_complete = true;
+ sync_point_persist_ready->complete(r);
+ }));
+}
+
+void SyncPoint::add_in_on_persisted_ctxs(Context* ctx) {
+ on_sync_point_persisted.push_back(ctx);
+}
+
+void SyncPoint::add_in_on_appending_ctxs(Context* ctx) {
+ on_sync_point_appending.push_back(ctx);
+}
+
+void SyncPoint::setup_earlier_sync_point(std::shared_ptr<SyncPoint> sync_point,
+ uint64_t last_op_sequence_num) {
+ earlier_sync_point = sync_point;
+ log_entry->prior_sync_point_flushed = false;
+ earlier_sync_point->log_entry->next_sync_point_entry = log_entry;
+ earlier_sync_point->later_sync_point = shared_from_this();
+ earlier_sync_point->m_final_op_sequence_num = last_op_sequence_num;
+ if (!earlier_sync_point->appending) {
+ /* Append of new sync point deferred until old sync point is appending */
+ earlier_sync_point->add_in_on_appending_ctxs(prior_persisted_gather_new_sub());
+ }
+}
+
} // namespace rwl
} // namespace cache
} // namespace librbd
namespace cache {
namespace rwl {
-class SyncPoint {
+class SyncPoint: public std::enable_shared_from_this<SyncPoint> {
public:
std::shared_ptr<SyncPointLogEntry> log_entry;
/* Use lock for earlier/later links */
std::shared_ptr<SyncPoint> earlier_sync_point; /* NULL if earlier has completed */
std::shared_ptr<SyncPoint> later_sync_point;
- uint64_t final_op_sequence_num = 0;
- /* A sync point can't appear in the log until all the writes bearing
- * it and all the prior sync points have been appended and
- * persisted.
- *
- * Writes bearing this sync gen number and the prior sync point will be
- * sub-ops of this Gather. This sync point will not be appended until all
- * these complete to the point where their persist order is guaranteed. */
- C_Gather *prior_log_entries_persisted;
- int prior_log_entries_persisted_result = 0;
- int prior_log_entries_persisted_complete = false;
- /* The finisher for this will append the sync point to the log. The finisher
- * for m_prior_log_entries_persisted will be a sub-op of this. */
- C_Gather *sync_point_persist;
- bool append_scheduled = false;
bool appending = false;
/* Signal these when this sync point is appending to the log, and its order
* of appearance is guaranteed. One of these is is a sub-operation of the
~SyncPoint();
SyncPoint(const SyncPoint&) = delete;
SyncPoint &operator=(const SyncPoint&) = delete;
-
+ void persist_gather_activate();
+ Context* persist_gather_new_sub();
+ void persist_gather_set_finisher(Context *ctx);
+ void prior_persisted_gather_activate();
+ Context* prior_persisted_gather_new_sub();
+ void prior_persisted_gather_set_finisher();
+ void add_in_on_persisted_ctxs(Context* cxt);
+ void add_in_on_appending_ctxs(Context* cxt);
+ void setup_earlier_sync_point(std::shared_ptr<SyncPoint> sync_point,
+ uint64_t last_op_sequence_num);
private:
CephContext *m_cct;
+ bool m_append_scheduled = false;
+ uint64_t m_final_op_sequence_num = 0;
+ /* A sync point can't appear in the log until all the writes bearing
+ * it and all the prior sync points have been appended and
+ * persisted.
+ *
+ * Writes bearing this sync gen number and the prior sync point will be
+ * sub-ops of this Gather. This sync point will not be appended until all
+ * these complete to the point where their persist order is guaranteed. */
+ C_Gather *m_prior_log_entries_persisted;
+ /* The finisher for this will append the sync point to the log. The finisher
+ * for m_prior_log_entries_persisted will be a sub-op of this. */
+ C_Gather *m_sync_point_persist;
+ int m_prior_log_entries_persisted_result = 0;
+ int m_prior_log_entries_persisted_complete = false;
friend std::ostream &operator<<(std::ostream &os,
const SyncPoint &p);
};
return os;
};
+io::Extent whole_volume_extent() {
+ return io::Extent({0, std::numeric_limits<uint64_t>::max()});
+}
+
} // namespace rwl
} // namespace cache
} // namespace librbd
/* Limit work between sync points */
const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
+const uint64_t MAX_BYTES_PER_SYNC_POINT = (1024 * 1024 * 8);
const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
}
};
+io::Extent whole_volume_extent();
+
} // namespace rwl
} // namespace cache
} // namespace librbd