void FileJournal::queue_completions_thru(uint64_t seq)
{
+ utime_t now = ceph_clock_now(g_ceph_context);
while (!completions.empty() &&
- completions.front().first <= seq) {
- dout(10) << "queue_completions_thru seq " << seq << " queueing seq " << completions.front().first
- << " " << completions.front().second << dendl;
- finisher->queue(completions.front().second);
+ completions.front().seq <= seq) {
+ utime_t lat = now;
+ lat -= completions.front().start;
+ dout(10) << "queue_completions_thru seq " << seq
+ << " queueing seq " << completions.front().seq
+ << " " << completions.front().finish
+ << " lat " << lat << dendl;
+ if (logger) {
+ logger->finc(l_os_j_lat, lat);
+ }
+ if (completions.front().finish)
+ finisher->queue(completions.front().finish);
completions.pop_front();
}
}
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
- if (oncommit)
- completions.push_back(pair<uint64_t,Context*>(seq, oncommit));
+ completions.push_back(completion_item(seq, oncommit, ceph_clock_now(g_ceph_context)));
if (full_state == FULL_NOTFULL) {
// queue and kick writer thread
// in journal
deque<pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
- deque<pair<uint64_t,Context*> > completions; // queued, writing, waiting for commit.
+
+ struct completion_item {
+ uint64_t seq;
+ Context *finish;
+ utime_t start;
+ completion_item(uint64_t o, Context *c, utime_t s)
+ : seq(o), finish(c), start(s) {}
+ };
+ deque<completion_item> completions; // queued, writing, waiting for commit.
uint64_t writing_seq, journaled_seq;
bool plug_journal_completions;
// initialize logger
PerfCountersBuilder plb(g_ceph_context, "filestore", l_os_first, l_os_last);
- plb.add_u64_counter(l_os_in_ops, "in_ops");
- plb.add_u64_counter(l_os_in_bytes, "in_bytes");
- plb.add_u64_counter(l_os_readable_ops, "readable_ops");
- plb.add_u64_counter(l_os_readable_bytes, "readable_bytes");
- plb.add_u64_counter(l_os_commit_ops, "commit_ops");
- plb.add_u64_counter(l_os_commit_bytes, "commit_bytes");
-
plb.add_u64(l_os_jq_max_ops, "journal_queue_max_ops");
plb.add_u64(l_os_jq_ops, "journal_queue_ops");
plb.add_u64_counter(l_os_j_ops, "journal_ops");
plb.add_u64(l_os_jq_max_bytes, "journal_queue_max_bytes");
plb.add_u64(l_os_jq_bytes, "journal_queue_bytes");
plb.add_u64_counter(l_os_j_bytes, "journal_bytes");
+ plb.add_fl_avg(l_os_j_lat, "journal_latency");
plb.add_u64(l_os_oq_max_ops, "op_queue_max_ops");
plb.add_u64(l_os_oq_ops, "op_queue_ops");
plb.add_u64_counter(l_os_ops, "ops");
plb.add_u64(l_os_oq_max_bytes, "op_queue_max_bytes");
plb.add_u64(l_os_oq_bytes, "op_queue_bytes");
plb.add_u64_counter(l_os_bytes, "bytes");
+ plb.add_fl_avg(l_os_apply_lat, "apply_latency");
plb.add_u64(l_os_committing, "committing");
logger = plb.create_perf_counters();
}
Op *o = new Op;
+ o->start = ceph_clock_now(g_ceph_context);
o->tls.swap(tls);
o->onreadable = onreadable;
o->onreadable_sync = onreadable_sync;
// called with tp lock held
_op_queue_release_throttle(o);
- logger->inc(l_os_readable_ops);
- logger->inc(l_os_readable_bytes, o->bytes);
+ utime_t lat = ceph_clock_now(g_ceph_context);
+ lat -= o->start;
+ logger->finc(l_os_apply_lat, lat);
if (next_finish == o->op) {
dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
dout(5) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
}
- logger->inc(l_os_in_ops);
- //logger->inc(l_os_in_bytes, 1);
-
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync);
op_queue_reserve_throttle(o);
op_submit_finish(op);
op_apply_finish(op);
- logger->inc(l_os_readable_ops);
- //fixme logger->inc(l_os_readable_bytes, 1);
-
return r;
}