}
if (next.finish)
finisher->queue(next.finish);
- if (next.tracked_op)
+ if (next.tracked_op) {
next.tracked_op->mark_event("journaled_completion_queued");
+ next.tracked_op->journal_trace.event("queued completion");
+ next.tracked_op->journal_trace.keyval("completed through", seq);
+ }
items.erase(it++);
}
batch_unpop_completions(items);
footerptr.copy_in(post_offset + magic2_offset, sizeof(uint64_t), (char *)&magic2);
bl.claim_append(ebl);
- if (next_write.tracked_op)
+ if (next_write.tracked_op) {
next_write.tracked_op->mark_event("write_thread_in_journal_buffer");
+ next_write.tracked_op->journal_trace.event("prepare_single_write");
+ }
journalq.push_back(pair<uint64_t,off64_t>(seq, queue_pos));
writing_seq = seq;
logger->inc(l_filestore_journal_bytes, e.length());
}
+ if (osd_op) {
+ osd_op->mark_event("commit_queued_for_journal_write");
+ osd_op->journal_trace.init("journal", &trace_endpoint, &osd_op->store_trace);
+ osd_op->journal_trace.event("submit_entry");
+ osd_op->journal_trace.keyval("seq", seq);
+ }
{
Mutex::Locker l1(writeq_lock);
#ifdef HAVE_LIBAIO
if (writeq.empty())
writeq_cond.Signal();
writeq.push_back(write_item(seq, e, orig_len, osd_op));
+ if (osd_op)
+ osd_op->journal_trace.keyval("queue depth", writeq.size());
}
}
#include "common/Thread.h"
#include "common/Throttle.h"
#include "JournalThrottle.h"
-
+#include "common/zipkin_trace.h"
#ifdef HAVE_LIBAIO
# include <libaio.h>
Context *finish;
utime_t start;
TrackedOpRef tracked_op;
- completion_item(uint64_t o, Context *c, utime_t s,
- TrackedOpRef opref)
+ completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
: seq(o), finish(c), start(s), tracked_op(opref) {}
completion_item() : seq(0), finish(0), start(0) {}
};
bufferlist bl;
uint32_t orig_len;
TrackedOpRef tracked_op;
+ ZTracer::Trace trace;
write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
seq(s), orig_len(ol), tracked_op(opref) {
bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
return ROUND_UP_TO(sizeof(header), block_size);
}
+ ZTracer::Endpoint trace_endpoint;
+
public:
FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
const char *f, bool dio=false, bool ai=true, bool faio=false) :
write_stop(true),
aio_stop(true),
write_thread(this),
- write_finish_thread(this) {
+ write_finish_thread(this),
+ trace_endpoint("0.0.0.0", 0, "FileJournal") {
if (aio && !directio) {
lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
op_wq(this, cct->_conf->filestore_op_thread_timeout,
cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
logger(NULL),
+ trace_endpoint("0.0.0.0", 0, "FileStore"),
read_error_lock("FileStore::read_error_lock"),
m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
// sequencer, the op order will be preserved.
osr->queue(o);
+ o->trace.event("queued");
logger->inc(l_filestore_ops);
logger->inc(l_filestore_bytes, o->bytes);
osr->apply_lock.Lock();
Op *o = osr->peek_queue();
+ o->trace.event("op_apply_start");
apply_manager.op_apply_start(o->op);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
+ o->trace.event("_do_transactions start");
int r = _do_transactions(o->tls, o->op, &handle);
+ o->trace.event("op_apply_finish");
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
osr->apply_lock.Unlock(); // locked in _do_op
+ o->trace.event("_finish_op");
// called with tp lock held
op_queue_release_throttle(o);
(*i).set_osr(osr);
}
+ ZTracer::Trace trace;
+ if (osd_op && osd_op->pg_trace) {
+ osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
+ trace = osd_op->store_trace;
+ }
+
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num;
+ trace.keyval("opnum", op_num);
if (m_filestore_do_dump)
dump_transactions(o->tls, o->op, osr);
if (m_filestore_journal_parallel) {
dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
+ trace.keyval("journal mode", "parallel");
+ trace.event("journal started");
_op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
// queue inside submit_manager op submission lock
queue_op(osr, o);
+ trace.event("op queued");
} else if (m_filestore_journal_writeahead) {
dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
osr->queue_journal(o->op);
+ trace.keyval("journal mode", "writeahead");
+ trace.event("journal started");
_op_journal_transactions(tbl, orig_len, o->op,
new C_JournaledAhead(this, osr, o, ondisk),
osd_op);
dump_transactions(o->tls, o->op, osr);
queue_op(osr, o);
+ trace.keyval("opnum", op_num);
+ trace.keyval("journal mode", "none");
+ trace.event("op queued");
if (ondisk)
apply_manager.add_waiter(op_num, ondisk);
if (m_filestore_do_dump)
dump_transactions(tls, op, osr);
+ trace.event("op_apply_start");
+ trace.keyval("opnum", op);
+ trace.keyval("journal mode", "trailing");
apply_manager.op_apply_start(op);
+ trace.event("do_transactions");
int r = do_transactions(tls, op);
if (r >= 0) {
+ trace.event("journal started");
_op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
} else {
delete ondisk;
apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
submit_manager.op_submit_finish(op);
+ trace.event("op_apply_finish");
apply_manager.op_apply_finish(op);
utime_t end = ceph_clock_now();
{
dout(5) << "_journaled_ahead " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
+ o->trace.event("writeahead journal finished");
+
// this should queue in order because the journal does it's completions in order.
queue_op(osr, o);