};
+class C_MDL_WriteHead : public MDSIOContextBase {
+public:
+ explicit C_MDL_WriteHead(MDLog* m)
+ : MDSIOContextBase(true)
+ , mdlog(m)
+ {}
+ void print(ostream& out) const override {
+ out << "mdlog_write_head";
+ }
+protected:
+ void finish(int r) override {
+ mdlog->finish_head_waiters();
+ }
+ MDSRank *get_mds() override {return mdlog->mds;}
+
+ MDLog *mdlog;
+};
+
+void MDLog::finish_head_waiters()
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+ auto&& last_committed = journaler->get_last_committed();
+ auto& expire_pos = last_committed.expire_pos;
+
+ dout(20) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl;
+
+ {
+ auto last = waiting_for_expire.upper_bound(expire_pos);
+ for (auto it = waiting_for_expire.begin(); it != last; it++) {
+ finish_contexts(g_ceph_context, it->second);
+ }
+ waiting_for_expire.erase(waiting_for_expire.begin(), last);
+ }
+}
+
void MDLog::write_head(MDSContext *c)
{
- Context *fin = NULL;
- if (c != NULL) {
- fin = new C_IO_Wrapper(mds, c);
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
+ auto&& last_written = journaler->get_last_written();
+ auto expire_pos = journaler->get_expire_pos();
+ dout(10) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl;
+
+ if (last_written.expire_pos < expire_pos) {
+ if (c != NULL) {
+ dout(25) << __func__ << " queueing waiter " << c << dendl;
+ waiting_for_expire[expire_pos].push_back(c);
+ }
+
+ auto* fin = new C_MDL_WriteHead(this);
+ journaler->write_head(fin);
+ } else {
+ if (c) {
+ c->complete(0);
+ }
}
- journaler->write_head(fin);
}
uint64_t MDLog::get_read_pos() const
void MDLog::create(MDSContext *c)
{
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
+
dout(5) << "create empty log" << dendl;
C_GatherBuilder gather(g_ceph_context);
ceph_assert(locker.owns_lock());
// trim expired segments?
- bool trimmed = false;
uint64_t end = 0;
for (auto it = segments.begin(); it != segments.end(); ++it) {
auto& [seq, ls] = *it;
} else {
logger->set(l_mdl_expos, jexpire_pos);
}
- trimmed = true;
}
if (!expired_segments.count(ls)) {
locker.unlock();
- if (trimmed) {
- write_head(ctx);
- } else {
- if (ctx) {
- ctx->complete(0);
- }
- }
+ write_head(ctx);
}
void MDLog::_expired(LogSegment *ls)