max_ev = events_per_segment + 1;
}
- submit_mutex.lock();
+ std::unique_lock locker{submit_mutex};
// trim!
dout(10) << "trim "
<< dendl;
if (segments.empty()) {
- submit_mutex.unlock();
return;
}
new_expiring_segments++;
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.unlock();
+ locker.unlock();
uint64_t last_seq = ls->seq;
try_expire(ls, op_prio);
log_trim_counter.hit();
trim_end = ceph::coarse_mono_clock::now();
- submit_mutex.lock();
+ locker.lock();
p = segments.lower_bound(last_seq + 1);
}
}
+ ceph_assert(locker.owns_lock());
+
try_to_commit_open_file_table(get_last_segment_seq());
- // discard expired segments and unlock submit_mutex
- _trim_expired_segments();
+ _trim_expired_segments(locker);
}
class C_MaybeExpiredSegment : public MDSInternalContext {
*/
int MDLog::trim_to(SegmentBoundary::seq_t seq)
{
- submit_mutex.lock();
+ std::unique_lock locker(submit_mutex);
dout(10) << __func__ << ": "
<< seq
// Caller should have flushed journaler before calling this
if (pending_events.count(ls->seq)) {
dout(5) << __func__ << ": " << *ls << " has pending events" << dendl;
- submit_mutex.unlock();
+ locker.unlock();
return -CEPHFS_EAGAIN;
}
ceph_assert(expiring_segments.count(ls) == 0);
expiring_segments.insert(ls);
expiring_events += ls->num_events;
- submit_mutex.unlock();
+ locker.unlock();
uint64_t next_seq = ls->seq + 1;
try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
- submit_mutex.lock();
+ locker.lock();
p = segments.lower_bound(next_seq);
}
}
- _trim_expired_segments();
+ _trim_expired_segments(locker);
return 0;
}
try_expire(ls, op_prio);
}
-void MDLog::_trim_expired_segments()
+void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx)
{
ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
+ ceph_assert(locker.owns_lock());
uint64_t const oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
dout(10) << __func__ << ": maybe expiring " << *ls << dendl;
}
- submit_mutex.unlock();
+ locker.unlock();
- if (trimmed)
- journaler->write_head(0);
-}
-
-void MDLog::trim_expired_segments()
-{
- submit_mutex.lock();
- _trim_expired_segments();
+ if (trimmed) {
+ write_head(ctx);
+ } else {
+ if (ctx) {
+ ctx->complete(0);
+ }
+ }
}
void MDLog::_expired(LogSegment *ls)
return unflushed == 0;
}
- void trim_expired_segments();
+ void trim_expired_segments(MDSContext* ctx=nullptr) {
+ std::unique_lock locker(submit_mutex);
+ _trim_expired_segments(locker, ctx);
+ }
int trim_all() {
return trim_to(0);
}
void try_expire(LogSegment *ls, int op_prio);
void _maybe_expired(LogSegment *ls, int op_prio);
void _expired(LogSegment *ls);
- void _trim_expired_segments();
+ void _trim_expired_segments(auto& locker, MDSContext* ctx=nullptr);
void write_head(MDSContext *onfinish);
void trim();
<< mdlog->get_journaler()->get_trimmed_pos() << dendl;
// Now everyone I'm interested in is expired
- mdlog->trim_expired_segments();
+ auto* ctx = new MDSInternalContextWrapper(mds, new LambdaContext([this](int r) {
+ handle_write_head(r);
+ }));
+ mdlog->trim_expired_segments(ctx);
- dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
+ dout(5) << __func__ << ": trimming is complete; wait for journal head write. Journal expire_pos/trim_pos is now "
<< std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
<< mdlog->get_journaler()->get_trimmed_pos() << dendl;
-
- write_journal_head();
- }
-
- void write_journal_head() {
- dout(20) << __func__ << dendl;
-
- Context *ctx = new LambdaContext([this](int r) {
- std::lock_guard locker(mds->mds_lock);
- handle_write_head(r);
- });
- // Flush the journal header so that readers will start from after
- // the flushed region
- mdlog->get_journaler()->write_head(ctx);
}
void handle_write_head(int r) {
}
void finish(int r) override {
- ceph_assert(!ceph_mutex_is_locked_by_me(mds->mds_lock));
+ /* We don't need the mds_lock but MDLog::write_head takes an MDSContext so
+ * we are expected to have it.
+ */
+ ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
dout(20) << __func__ << ": r=" << r << dendl;
on_finish->complete(r);
}