void read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc, uint64_t off, uint64_t len,
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish) override {
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace) override {
m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0,
trunc_size, trunc_seq,
new C_OnFinisher(new C_Lock(m_lock, onfinish),
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit) override {
+ Context *oncommit, ZTracer::Trace *trace) override {
return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
trunc_size, trunc_seq,
new C_OnFinisher(new C_Lock(m_lock,
void ImageCtx::aio_read_from_cache(object_t o, uint64_t object_no,
bufferlist *bl, size_t len,
uint64_t off, Context *onfinish,
- int fadvise_flags) {
+ int fadvise_flags, ZTracer::Trace *trace) {
snap_lock.get_read();
ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, fadvise_flags);
snap_lock.put_read();
extent.buffer_extents.push_back(make_pair(0, len));
rd->extents.push_back(extent);
cache_lock.Lock();
- int r = object_cacher->readx(rd, object_set, onfinish);
+ int r = object_cacher->readx(rd, object_set, onfinish, trace);
cache_lock.Unlock();
if (r != 0)
onfinish->complete(r);
void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish,
- int fadvise_flags, uint64_t journal_tid) {
+ int fadvise_flags, uint64_t journal_tid,
+ ZTracer::Trace *trace) {
snap_lock.get_read();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(
snapc, bl, ceph::real_time::min(), fadvise_flags, journal_tid);
wr->extents.push_back(extent);
{
Mutex::Locker l(cache_lock);
- object_cacher->writex(wr, object_set, onfinish);
+ object_cacher->writex(wr, object_set, onfinish, trace);
}
}
class ThreadPool;
class SafeTimer;
+namespace ZTracer { struct Trace; }
+
namespace librbd {
class AsyncOperation;
uint64_t *overlap) const;
void aio_read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
size_t len, uint64_t off, Context *onfinish,
- int fadvise_flags);
+ int fadvise_flags, ZTracer::Trace *trace);
void write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish, int fadvise_flags,
- uint64_t journal_tid);
+ uint64_t journal_tid, ZTracer::Trace *trace);
void user_flushed();
void flush_cache(Context *onfinish);
void shut_down_cache(Context *on_finish);
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish)
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace)
{
// on completion, take the mutex and then call onfinish.
Context *req = new C_ReadRequest(m_ictx->cct, onfinish, &m_lock);
librados::AioCompletion *rados_completion =
util::create_rados_callback(req);
- int r = m_ictx->data_ctx.aio_operate(oid.name, rados_completion, &op,
- flags, NULL);
+ int r = m_ictx->data_ctx.aio_operate(
+ oid.name, rados_completion, &op, flags, nullptr,
+ (trace ? trace->get_info() : nullptr));
rados_completion->release();
assert(r >= 0);
}
const bufferlist &bl,
ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit)
+ Context *oncommit,
+ ZTracer::Trace *trace)
{
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
void read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc, uint64_t off, uint64_t len,
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish) override;
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace) override;
// Determine whether a read to this extent could be affected by a
// write-triggered copy-on-write
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit) override;
+ Context *oncommit, ZTracer::Trace *trace) override;
using WritebackHandler::write;
void overwrite_extent(const object_t& oid, uint64_t off,
ictx->readahead.inc_pending();
ictx->aio_read_from_cache(q->oid, q->objectno, NULL,
q->length, q->offset,
- req_comp, 0);
+ req_comp, 0, nullptr);
}
}
ictx->perfcounter->inc(l_librbd_readahead);
req);
image_ctx.aio_read_from_cache(extent.oid, extent.objectno,
&req->data(), extent.length,
- extent.offset, cache_comp, m_op_flags);
+ extent.offset, cache_comp, m_op_flags,
+ nullptr);
} else {
req->send();
}
C_AioRequest *req_comp = new C_AioRequest(aio_comp);
image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
object_extent.offset, req_comp, m_op_flags,
- journal_tid);
+ journal_tid, nullptr);
}
}
C_AioRequest *req_comp = new C_AioRequest(aio_comp);
image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
object_extent.offset, req_comp, m_op_flags,
- journal_tid);
+ journal_tid, nullptr);
}
}
OSDRead *rd;
ObjectSet *oset;
Context *onfinish;
+ ZTracer::Trace r_trace;
public:
- C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c)
- : oc(_oc), rd(r), oset(os), onfinish(c) {}
+ C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
+ ZTracer::Trace *trace)
+ : oc(_oc), rd(r), oset(os), onfinish(c) {
+ if (trace) {
+ r_trace = *trace;
+ }
+ }
void finish(int r) override {
if (r < 0) {
if (onfinish)
onfinish->complete(r);
return;
}
- int ret = oc->_readx(rd, oset, onfinish, false);
+ int ret = oc->_readx(rd, oset, onfinish, false, &r_trace);
if (ret != 0 && onfinish) {
onfinish->complete(ret);
}
ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
// split off right
- ObjectCacher::BufferHead *right = new BufferHead(this);
+ ObjectCacher::BufferHead *right = new BufferHead(this, &left->b_trace);
//inherit and if later access, this auto clean.
right->set_dontneed(left->get_dontneed());
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors)
+ map<loff_t, BufferHead*>& errors,
+ ZTracer::Trace *trace)
{
assert(oc->lock.is_locked());
ldout(oc->cct, 10) << "map_read " << ex.oid
// at end?
if (p == data.end()) {
// rest is a miss.
- BufferHead *n = new BufferHead(this);
+ BufferHead *n = new BufferHead(this, trace);
n->set_start(cur);
n->set_length(left);
oc->bh_add(this, n);
} else if (p->first > cur) {
// gap.. miss
loff_t next = p->first;
- BufferHead *n = new BufferHead(this);
+ BufferHead *n = new BufferHead(this, trace);
loff_t len = MIN(next - cur, left);
n->set_start(cur);
n->set_length(len);
* other dirty data to left and/or right.
*/
ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
- ceph_tid_t tid)
+ ceph_tid_t tid,
+ ZTracer::Trace *trace)
{
assert(oc->lock.is_locked());
BufferHead *final = 0;
// at end ?
if (p == data.end()) {
if (final == NULL) {
- final = new BufferHead(this);
+ final = new BufferHead(this, trace);
replace_journal_tid(final, tid);
final->set_start( cur );
final->set_length( max );
final->set_length(final->length() + glen);
oc->bh_stat_add(final);
} else {
- final = new BufferHead(this);
+ final = new BufferHead(this, trace);
replace_journal_tid(final, tid);
final->set_start( cur );
final->set_length( glen );
bh->ob->get_oloc(), bh->start(), bh->length(),
bh->ob->get_snap(), &onfinish->bl,
bh->ob->truncate_size, bh->ob->truncate_seq,
- op_flags, onfinish);
+ op_flags, onfinish, &bh->b_trace);
++reads_outstanding;
}
bh->snapc, bh->bl, bh->last_write,
bh->ob->truncate_size,
bh->ob->truncate_seq,
- bh->journal_tid, oncommit);
+ bh->journal_tid, oncommit,
+ &bh->b_trace);
ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
// set bh last_write_tid
* must delete it)
* returns 0 if doing async read
*/
-int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+ ZTracer::Trace *trace)
{
- return _readx(rd, oset, onfinish, true);
+ return _readx(rd, oset, onfinish, true, trace);
}
int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- bool external_call)
+ bool external_call, ZTracer::Trace *trace)
{
assert(lock.is_locked());
bool success = true;
ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
<< " on " << *o << dendl;
o->waitfor_commit[o->last_write_tid].push_back(
- new C_RetryRead(this,rd, oset, onfinish));
+ new C_RetryRead(this,rd, oset, onfinish, trace));
// FIXME: perfcounter!
return 0;
}
// map extent into bufferheads
map<loff_t, BufferHead*> hits, missing, rx, errors;
- o->map_read(*ex_it, hits, missing, rx, errors);
+ o->map_read(*ex_it, hits, missing, rx, errors, trace);
if (external_call) {
// retry reading error buffers
missing.insert(errors.begin(), errors.end());
<< waitfor_read.size() << " blocked reads, "
<< (MAX(rx_bytes, max_size) - max_size)
<< " read bytes" << dendl;
- waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+ waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
+ trace));
}
bh_remove(o, bh_it->second);
ldout(cct, 10) << "readx missed, waiting on " << *last->second
<< " off " << last->first << dendl;
last->second->waitfor_read[last->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish) );
+ new C_RetryRead(this, rd, oset, onfinish, trace) );
}
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back(
- new C_RetryRead(this, rd, oset, onfinish) );
+ new C_RetryRead(this, rd, oset, onfinish, trace) );
}
bytes_not_in_cache += bh_it->second->length();
success = false;
waitfor_read.splice(waitfor_read.end(), ls);
}
-int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+ ZTracer::Trace *trace)
{
assert(lock.is_locked());
ceph::real_time now = ceph::real_clock::now();
ex_it->truncate_size, oset->truncate_seq);
// map it all into a single bufferhead.
- BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
+ BufferHead *bh = o->map_write(*ex_it, wr->journal_tid, trace);
bool missing = bh->is_missing();
bh->snapc = wr->snapc;
// Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
// order. But items in oset->objects are not sorted. So the iterator can
// point to any buffer head in the ObjectSet
- BufferHead key(*oset->objects.begin());
+ BufferHead key(*oset->objects.begin(), nullptr);
it = dirty_or_tx_bh.lower_bound(&key);
p = q = it;
#include "common/Cond.h"
#include "common/Finisher.h"
#include "common/Thread.h"
+#include "common/zipkin_trace.h"
#include "Objecter.h"
#include "Striper.h"
int error; // holds return value for failed reads
map<loff_t, list<Context*> > waitfor_read;
+ ZTracer::Trace b_trace;
// cons
- explicit BufferHead(Object *o) :
+ explicit BufferHead(Object *o, ZTracer::Trace *trace) :
state(STATE_MISSING),
ref(0),
dontneed(false),
journal_tid(0),
error(0) {
ex.start = ex.length = 0;
+ if (trace) {
+ b_trace = *trace;
+ }
}
// extent
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx,
- map<loff_t, BufferHead*>& errors);
- BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid);
+ map<loff_t, BufferHead*>& errors,
+ ZTracer::Trace *trace);
+ BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid,
+ ZTracer::Trace *trace);
void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
void truncate(loff_t s);
Cond read_cond;
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
- bool external_call);
+ bool external_call, ZTracer::Trace *trace);
void retry_waiting_reads();
public:
* @note total read size must be <= INT_MAX, since
* the return value is total bytes read
*/
- int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
- int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace);
+ int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+ ZTracer::Trace *trace = nullptr);
+ int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+ ZTracer::Trace *trace = nullptr);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
snapid_t snapid);
virtual void read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc, uint64_t off, uint64_t len,
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish) = 0;
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace = nullptr) = 0;
/**
* check if a given extent read result may change due to a write
*
const SnapContext& snapc,
const bufferlist &bl, ceph::real_time mtime,
uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit) = 0;
+ ceph_tid_t journal_tid, Context *oncommit,
+ ZTracer::Trace *trace = nullptr) = 0;
virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len,
ceph_tid_t original_journal_tid,
uint64_t offset, uint64_t length,
uint64_t journal_tid, int r) {
EXPECT_CALL(mock_image_ctx, write_to_cache(object, _, length, offset, _, _,
- journal_tid))
+ journal_tid, _))
.WillOnce(WithArg<4>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)));
}
#include "test/librbd/mock/io/MockImageRequestWQ.h"
#include "common/RWLock.h"
#include "common/WorkQueue.h"
+#include "common/zipkin_trace.h"
#include "librbd/ImageCtx.h"
#include "gmock/gmock.h"
#include <string>
MOCK_CONST_METHOD0(get_journal_policy, journal::Policy*());
MOCK_CONST_METHOD1(set_journal_policy, void(journal::Policy*));
- MOCK_METHOD7(aio_read_from_cache, void(object_t, uint64_t, bufferlist *,
- size_t, uint64_t, Context *, int));
- MOCK_METHOD7(write_to_cache, void(object_t, const bufferlist&, size_t,
- uint64_t, Context *, int, uint64_t));
+ MOCK_METHOD8(aio_read_from_cache, void(object_t, uint64_t, bufferlist *,
+ size_t, uint64_t, Context *, int, ZTracer::Trace *));
+ MOCK_METHOD8(write_to_cache, void(object_t, const bufferlist&, size_t,
+ uint64_t, Context *, int, uint64_t, ZTracer::Trace *));
ImageCtx *image_ctx;
CephContext *cct;
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish)
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace)
{
C_Delay *wrapper = new C_Delay(m_cct, onfinish, m_lock, off, pbl,
m_delay_ns);
const SnapContext& snapc,
const bufferlist &bl, ceph::real_time mtime,
uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit)
+ ceph_tid_t journal_tid, Context *oncommit,
+ ZTracer::Trace *trace)
{
C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL,
m_delay_ns);
void read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc, uint64_t off, uint64_t len,
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish) override;
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace) override;
ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len,
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit) override;
+ Context *oncommit, ZTracer::Trace *trace) override;
using WritebackHandler::write;
const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snapid,
bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish)
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace)
{
assert(snapid == CEPH_NOSNAP);
C_DelayRead *wrapper = new C_DelayRead(this, m_cct, onfinish, m_lock, oid,
const SnapContext& snapc,
const bufferlist &bl, ceph::real_time mtime,
uint64_t trunc_size, __u32 trunc_seq,
- ceph_tid_t journal_tid, Context *oncommit)
+ ceph_tid_t journal_tid, Context *oncommit,
+ ZTracer::Trace *trace)
{
assert(snapc.seq == 0);
C_DelayWrite *wrapper = new C_DelayWrite(this, m_cct, oncommit, m_lock, oid,
void read(const object_t& oid, uint64_t object_no,
const object_locator_t& oloc, uint64_t off, uint64_t len,
snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
- __u32 trunc_seq, int op_flags, Context *onfinish) override;
+ __u32 trunc_seq, int op_flags, Context *onfinish,
+ ZTracer::Trace *trace) override;
ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len,
const SnapContext& snapc, const bufferlist &bl,
ceph::real_time mtime, uint64_t trunc_size,
__u32 trunc_seq, ceph_tid_t journal_tid,
- Context *oncommit) override;
+ Context *oncommit, ZTracer::Trace *trace) override;
using WritebackHandler::write;