xlist<C_ReadFinish*>::item set_item;
bool trust_enoent;
ceph_tid_t tid;
+ ZTracer::Trace trace;
public:
bufferlist bl;
C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
- uint64_t l) :
+ uint64_t l, const ZTracer::Trace &trace) :
oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
set_item(this), trust_enoent(true),
- tid(t) {
+ tid(t), trace(trace) {
ob->reads.push_back(&set_item);
}
void finish(int r) override {
oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
+ trace.event("finish");
// object destructor clears the list
if (set_item.is_on_list())
OSDRead *rd;
ObjectSet *oset;
Context *onfinish;
- ZTracer::Trace r_trace;
+ ZTracer::Trace trace;
public:
C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
- ZTracer::Trace *trace)
- : oc(_oc), rd(r), oset(os), onfinish(c) {
- if (trace) {
- r_trace = *trace;
- }
- }
+ const ZTracer::Trace &trace)
+ : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
+ }
void finish(int r) override {
- if (r < 0) {
- if (onfinish)
- onfinish->complete(r);
+ if (r >= 0) {
+ r = oc->_readx(rd, oset, onfinish, false, &trace);
+ }
+
+ if (r == 0) {
+ // read is still in-progress
return;
}
- int ret = oc->_readx(rd, oset, onfinish, false, &r_trace);
- if (ret != 0 && onfinish) {
- onfinish->complete(ret);
+
+ trace.event("finish");
+ if (onfinish) {
+ onfinish->complete(r);
}
}
};
ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
// split off right
- ObjectCacher::BufferHead *right = new BufferHead(this, &left->b_trace);
+ ObjectCacher::BufferHead *right = new BufferHead(this);
//inherit and if later access, this auto clean.
right->set_dontneed(left->get_dontneed());
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors,
- ZTracer::Trace *trace)
+ map<loff_t, BufferHead*>& errors)
{
assert(oc->lock.is_locked());
- ldout(oc->cct, 10) << "map_read " << ex.oid
- << " " << ex.offset << "~" << ex.length
- << dendl;
-
+ ldout(oc->cct, 10) << "map_read " << ex.oid << " "
+ << ex.offset << "~" << ex.length << dendl;
+
loff_t cur = ex.offset;
loff_t left = ex.length;
// at end?
if (p == data.end()) {
// rest is a miss.
- BufferHead *n = new BufferHead(this, trace);
+ BufferHead *n = new BufferHead(this);
n->set_start(cur);
n->set_length(left);
oc->bh_add(this, n);
assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
break; // no more.
}
-
+
if (p->first <= cur) {
// have it (or part of it)
BufferHead *e = p->second;
} else {
ceph_abort();
}
-
+
loff_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
left -= lenfromcur;
++p;
continue; // more?
-
+
} else if (p->first > cur) {
// gap.. miss
loff_t next = p->first;
- BufferHead *n = new BufferHead(this, trace);
+ BufferHead *n = new BufferHead(this);
loff_t len = MIN(next - cur, left);
n->set_start(cur);
n->set_length(len);
* other dirty data to left and/or right.
*/
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
- ceph_tid_t tid,
- ZTracer::Trace *trace)
+ ceph_tid_t tid)
{
assert(oc->lock.is_locked());
BufferHead *final = 0;
// at end ?
if (p == data.end()) {
if (final == NULL) {
- final = new BufferHead(this, trace);
+ final = new BufferHead(this);
replace_journal_tid(final, tid);
final->set_start( cur );
final->set_length( max );
final->set_length(final->length() + glen);
oc->bh_stat_add(final);
} else {
- final = new BufferHead(this, trace);
+ final = new BufferHead(this);
replace_journal_tid(final, tid);
final->set_start( cur );
final->set_length( glen );
max_size(max_bytes), max_objects(max_objects),
max_dirty_age(ceph::make_timespan(max_dirty_age)),
block_writes_upfront(block_writes_upfront),
+ trace_endpoint("ObjectCacher"),
flush_set_callback(flush_callback),
flush_set_callback_arg(flush_callback_arg),
last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
delete ob;
}
-void ObjectCacher::bh_read(BufferHead *bh, int op_flags)
+void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
+ const ZTracer::Trace &parent_trace)
{
assert(lock.is_locked());
ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
<< reads_outstanding << dendl;
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_read " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
mark_rx(bh);
bh->last_read_tid = ++last_read_tid;
// finisher
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
- bh->start(), bh->length());
+ bh->start(), bh->length(), trace);
// go
writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
bh->ob->get_oloc(), bh->start(), bh->length(),
bh->ob->get_snap(), &onfinish->bl,
bh->ob->truncate_size, bh->ob->truncate_seq,
- op_flags, onfinish, &bh->b_trace);
+ op_flags, trace, onfinish);
++reads_outstanding;
}
int64_t poolid;
sobject_t oid;
vector<pair<loff_t, uint64_t> > ranges;
+ ZTracer::Trace trace;
public:
- ceph_tid_t tid;
+ ceph_tid_t tid = 0;
C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
- uint64_t l) :
- oc(c), poolid(_poolid), oid(o), tid(0) {
+ uint64_t l, const ZTracer::Trace &trace) :
+ oc(c), poolid(_poolid), oid(o), trace(trace) {
ranges.push_back(make_pair(s, l));
}
C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
}
void finish(int r) override {
oc->bh_write_commit(poolid, oid, ranges, tid, r);
+ trace.event("finish");
}
};
void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
perfcounter->inc(l_objectcacher_data_flushed, total_len);
}
-void ObjectCacher::bh_write(BufferHead *bh)
+void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
{
assert(lock.is_locked());
ldout(cct, 7) << "bh_write " << *bh << dendl;
bh->ob->get();
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_write " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
// finishers
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
bh->ob->get_soid(), bh->start(),
- bh->length());
+ bh->length(), trace);
// go
ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
bh->ob->get_oloc(),
bh->snapc, bh->bl, bh->last_write,
bh->ob->truncate_size,
bh->ob->truncate_seq,
- bh->journal_tid, oncommit,
- &bh->b_trace);
+ bh->journal_tid, trace, oncommit);
ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
// set bh last_write_tid
finish_contexts(cct, ls, r);
}
-void ObjectCacher::flush(loff_t amount)
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
ceph::real_time cutoff = ceph::real_clock::now();
bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
} else {
left -= bh->length();
- bh_write(bh);
+ bh_write(bh, *trace);
}
- }
+ }
}
* returns 0 if doing async read
*/
int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- ZTracer::Trace *trace)
+ ZTracer::Trace *parent_trace)
{
- return _readx(rd, oset, onfinish, true, trace);
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("read", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
+ int r =_readx(rd, oset, onfinish, true, &trace);
+ if (r < 0) {
+ trace.event("finish");
+ }
+ return r;
}
int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call, ZTracer::Trace *trace)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
bool success = true;
int error = 0;
if (scattered_write)
blist.push_back(bh);
else
- bh_write(bh);
+ bh_write(bh, *trace);
}
}
}
ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
<< " on " << *o << dendl;
o->waitfor_commit[o->last_write_tid].push_back(
- new C_RetryRead(this,rd, oset, onfinish, trace));
+ new C_RetryRead(this,rd, oset, onfinish, *trace));
// FIXME: perfcounter!
return 0;
}
// map extent into bufferheads
map<loff_t, BufferHead*> hits, missing, rx, errors;
- o->map_read(*ex_it, hits, missing, rx, errors, trace);
+ o->map_read(*ex_it, hits, missing, rx, errors);
if (external_call) {
// retry reading error buffers
missing.insert(errors.begin(), errors.end());
<< (MAX(rx_bytes, max_size) - max_size)
<< " read bytes" << dendl;
waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
- trace));
+ *trace));
}
bh_remove(o, bh_it->second);
delete bh_it->second;
} else {
bh_it->second->set_nocache(nocache);
- bh_read(bh_it->second, rd->fadvise_flags);
+ bh_read(bh_it->second, rd->fadvise_flags, *trace);
if ((success && onfinish) || last != missing.end())
last = bh_it;
}
ldout(cct, 10) << "readx missed, waiting on " << *last->second
<< " off " << last->first << dendl;
last->second->waitfor_read[last->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish, trace) );
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
}
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish, trace) );
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
}
bytes_not_in_cache += bh_it->second->length();
success = false;
}
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
- ZTracer::Trace *trace)
+ ZTracer::Trace *parent_trace)
{
assert(lock.is_locked());
ceph::real_time now = ceph::real_clock::now();
bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("write", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
++ex_it) {
ex_it->truncate_size, oset->truncate_seq);
// map it all into a single bufferhead.
- BufferHead *bh = o->map_write(*ex_it, wr->journal_tid, trace);
+ BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
bool missing = bh->is_missing();
bh->snapc = wr->snapc;
-
+
bytes_written += ex_it->length;
if (bh->is_tx()) {
bytes_written_in_flush += ex_it->length;
}
}
- int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
+ int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
delete wr;
//verify_stats();
class ObjectCacher::C_WaitForWrite : public Context {
public:
- C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
- m_oc(oc), m_len(len), m_onfinish(onfinish) {}
+ C_WaitForWrite(ObjectCacher *oc, uint64_t len,
+ const ZTracer::Trace &trace, Context *onfinish) :
+ m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
void finish(int r) override;
private:
ObjectCacher *m_oc;
uint64_t m_len;
+ ZTracer::Trace m_trace;
Context *m_onfinish;
};
void ObjectCacher::C_WaitForWrite::finish(int r)
{
Mutex::Locker l(m_oc->lock);
- m_oc->maybe_wait_for_writeback(m_len);
+ m_oc->maybe_wait_for_writeback(m_len, &m_trace);
m_onfinish->complete(r);
}
-void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len,
+ ZTracer::Trace *trace)
{
assert(lock.is_locked());
ceph::mono_time start = ceph::mono_clock::now();
while (get_stat_dirty() + get_stat_tx() > 0 &&
(uint64_t) (get_stat_dirty() + get_stat_tx()) >=
max_dirty + get_stat_dirty_waiting()) {
+ if (blocked == 0) {
+ trace->event("start wait for writeback");
+ }
ldout(cct, 10) << __func__ << " waiting for dirty|tx "
<< (get_stat_dirty() + get_stat_tx()) << " >= max "
<< max_dirty << " + dirty_waiting "
++blocked;
ldout(cct, 10) << __func__ << " woke up" << dendl;
}
+ if (blocked > 0) {
+ trace->event("finish wait for writeback");
+ }
if (blocked && perfcounter) {
perfcounter->inc(l_objectcacher_write_ops_blocked);
perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
- Context *onfreespace)
+ ZTracer::Trace *trace, Context *onfreespace)
{
assert(lock.is_locked());
+ assert(trace != nullptr);
int ret = 0;
if (max_dirty > 0) {
if (block_writes_upfront) {
- maybe_wait_for_writeback(len);
+ maybe_wait_for_writeback(len, trace);
if (onfreespace)
onfreespace->complete(0);
} else {
assert(onfreespace);
- finisher.queue(new C_WaitForWrite(this, len, onfreespace));
+ finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
}
} else {
// write-thru! flush what we just wrote.
Context *fin = block_writes_upfront ?
new C_Cond(&cond, &done, &ret) : onfreespace;
assert(fin);
- bool flushed = flush_set(oset, wr->extents, fin);
+ bool flushed = flush_set(oset, wr->extents, trace, fin);
assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
<< " bytes" << dendl;
<< max_dirty << " max)"
<< dendl;
loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+
+ ZTracer::Trace trace;
+ if (cct->_conf->osdc_blkin_trace_all) {
+ trace.init("flusher", &trace_endpoint);
+ trace.event("start");
+ }
+
if (actual > 0 && (uint64_t) actual > target_dirty) {
// flush some dirty pages
ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
<< get_stat_dirty_waiting() << " dirty_waiting > target "
<< target_dirty << ", flushing some dirty bhs" << dendl;
- flush(actual - target_dirty);
+ flush(&trace, actual - target_dirty);
} else {
// check tail of lru for old dirty items
ceph::real_time cutoff = ceph::real_clock::now();
if (scattered_write) {
bh_write_adjacencies(bh, cutoff, NULL, &max);
} else {
- bh_write(bh);
+ bh_write(bh, trace);
--max;
}
}
if (!max) {
// back off the lock to avoid starving other threads
+ trace.event("backoff");
lock.Unlock();
lock.Lock();
continue;
}
}
+
+ trace.event("finish");
if (flusher_stop)
break;
// true if clean, already flushed.
// false if we wrote something.
// be sloppy about the ranges and flush any buffer it touches
-bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
+ ZTracer::Trace *trace)
{
+ assert(trace != nullptr);
assert(lock.is_locked());
list<BufferHead*> blist;
bool clean = true;
if (scattered_write)
blist.push_back(bh);
else
- bh_write(bh);
+ bh_write(bh, *trace);
clean = false;
}
if (scattered_write && !blist.empty())
// Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
// order. But items in oset->objects are not sorted. So the iterator can
// point to any buffer head in the ObjectSet
- BufferHead key(*oset->objects.begin(), nullptr);
+ BufferHead key(*oset->objects.begin());
it = dirty_or_tx_bh.lower_bound(&key);
p = q = it;
}
blist.push_back(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}
}
}
blist.push_front(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}
if (!backwards)
// flush. non-blocking, takes callback.
// returns true if already flushed
bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
- Context *onfinish)
+ ZTracer::Trace *trace, Context *onfinish)
{
assert(lock.is_locked());
+ assert(trace != nullptr);
assert(onfinish != NULL);
if (oset->objects.empty()) {
ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
<< " " << ob << dendl;
- if (!flush(ob, ex.offset, ex.length)) {
+ if (!flush(ob, ex.offset, ex.length, trace)) {
// we'll need to gather...
ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid << " on " << *ob << dendl;
}
blist.push_back(bh);
} else {
- bh_write(bh);
+ bh_write(bh, {});
}
}
int error; // holds return value for failed reads
map<loff_t, list<Context*> > waitfor_read;
- ZTracer::Trace b_trace;
// cons
- explicit BufferHead(Object *o, ZTracer::Trace *trace) :
+ explicit BufferHead(Object *o) :
state(STATE_MISSING),
ref(0),
dontneed(false),
journal_tid(0),
error(0) {
ex.start = ex.length = 0;
- if (trace) {
- b_trace = *trace;
- }
}
// extent
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors,
- ZTracer::Trace *trace);
- BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid,
- ZTracer::Trace *trace);
-
+ map<loff_t, BufferHead*>& errors);
+ BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
+
void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
void truncate(loff_t s);
void discard(loff_t off, loff_t len);
ceph::timespan max_dirty_age;
bool block_writes_upfront;
+ ZTracer::Endpoint trace_endpoint;
+
flush_set_callback_t flush_set_callback;
void *flush_set_callback_arg;
void bh_remove(Object *ob, BufferHead *bh);
// io
- void bh_read(BufferHead *bh, int op_flags);
- void bh_write(BufferHead *bh);
+ void bh_read(BufferHead *bh, int op_flags,
+ const ZTracer::Trace &parent_trace);
+ void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace);
void bh_write_scattered(list<BufferHead*>& blist);
void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
int64_t *amount, int *max_count);
void trim();
- void flush(loff_t amount=0);
+ void flush(ZTracer::Trace *trace, loff_t amount=0);
/**
* flush a range of buffers
* @param len extent length, or 0 for entire object
* @return true if object was already clean/flushed.
*/
- bool flush(Object *o, loff_t off, loff_t len);
+ bool flush(Object *o, loff_t off, loff_t len,
+ ZTracer::Trace *trace);
loff_t release(Object *o);
void purge(Object *o);
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- ZTracer::Trace *trace = nullptr);
+ ZTracer::Trace *parent_trace = nullptr);
int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
- ZTracer::Trace *trace = nullptr);
+ ZTracer::Trace *parent_trace = nullptr);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
snapid_t snapid);
private:
// write blocking
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
- Context *onfreespace);
- void maybe_wait_for_writeback(uint64_t len);
+ ZTracer::Trace *trace, Context *onfreespace);
+ void maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace);
bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);
public:
bool flush_set(ObjectSet *oset, Context *onfinish=0);
bool flush_set(ObjectSet *oset, vector<ObjectExtent>& ex,
- Context *onfinish = 0);
+ ZTracer::Trace *trace, Context *onfinish = 0);
bool flush_all(Context *onfinish = 0);
void purge_set(ObjectSet *oset);
vector<ObjectExtent> extents;
Striper::file_to_extents(cct, oset->ino, layout, offset, len,
oset->truncate_size, extents);
- return flush_set(oset, extents, onfinish);
+ ZTracer::Trace trace;
+ return flush_set(oset, extents, &trace, onfinish);
}
};