PROPERTIES
INTERFACE_LINK_LIBRARIES "${opentelemetry_deps}"
INTERFACE_INCLUDE_DIRECTORIES "${opentelemetry_include_dir}")
+ include_directories(SYSTEM "${opentelemetry_include_dir}")
endfunction()
add_subdirectory(blkin/blkin-lib)
endif(WITH_BLKIN)
+if(WITH_JAEGER)
+ find_package(thrift 0.13.0 REQUIRED)
+ include(BuildOpentelemetry)
+ build_opentelemetry()
+ add_library(jaeger_base INTERFACE)
+ target_link_libraries(jaeger_base INTERFACE opentelemetry::libopentelemetry
+ thrift::libthrift)
+endif()
+
set(mds_files)
list(APPEND mds_files
mds/MDSMap.cc
add_dependencies(common-objs legacy-option-headers)
if(WITH_JAEGER)
- find_package(thrift 0.13.0 REQUIRED)
- include(BuildOpentelemetry)
- build_opentelemetry()
- add_library(jaeger_base INTERFACE)
- target_link_libraries(jaeger_base INTERFACE opentelemetry::libopentelemetry
- thrift::libthrift)
add_dependencies(common-objs jaeger_base)
target_link_libraries(common-objs jaeger_base)
endif()
}
jspan Tracer::add_span(opentelemetry::nostd::string_view span_name, const jspan& parent_span) {
- if (is_enabled() && parent_span->IsRecording()) {
+ if (is_enabled() && parent_span && parent_span->IsRecording()) {
opentelemetry::trace::StartSpanOptions span_opts;
span_opts.parent = parent_span->GetContext();
return tracer->StartSpan(span_name, span_opts);
return g_ceph_context->_conf->jaeger_tracing_enable;
}
-void encode(const jspan_context& span_ctx, bufferlist& bl, uint64_t f) {
- ENCODE_START(1, 1, bl);
- using namespace opentelemetry;
- using namespace trace;
- auto is_valid = span_ctx.IsValid();
- encode(is_valid, bl);
- if (is_valid) {
- encode_nohead(std::string_view(reinterpret_cast<const char*>(span_ctx.trace_id().Id().data()), TraceId::kSize), bl);
- encode_nohead(std::string_view(reinterpret_cast<const char*>(span_ctx.span_id().Id().data()), SpanId::kSize), bl);
- encode(span_ctx.trace_flags().flags(), bl);
- }
- ENCODE_FINISH(bl);
-}
-
-void decode(jspan_context& span_ctx, bufferlist::const_iterator& bl) {
- using namespace opentelemetry;
- using namespace trace;
- DECODE_START(1, bl);
- bool is_valid;
- decode(is_valid, bl);
- if (is_valid) {
- std::array<uint8_t, TraceId::kSize> trace_id;
- std::array<uint8_t, SpanId::kSize> span_id;
- uint8_t flags;
- decode(trace_id, bl);
- decode(span_id, bl);
- decode(flags, bl);
- span_ctx = SpanContext(
- TraceId(nostd::span<uint8_t, TraceId::kSize>(trace_id)),
- SpanId(nostd::span<uint8_t, SpanId::kSize>(span_id)),
- TraceFlags(flags),
- true);
- }
- DECODE_FINISH(bl);
-}
} // namespace tracing
#endif // HAVE_JAEGER
#pragma once
#include "acconfig.h"
-#include "include/buffer.h"
+#include "include/encoding.h"
#ifdef HAVE_JAEGER
#include "opentelemetry/trace/provider.h"
class Tracer {
private:
const static opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> noop_tracer;
- const static jspan noop_span;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer;
public:
+ const static jspan noop_span;
+
Tracer() = default;
Tracer(opentelemetry::nostd::string_view service_name);
};
-void encode(const jspan_context& span, ceph::buffer::list& bl, uint64_t f = 0);
-void decode(jspan_context& span_ctx, ceph::buffer::list::const_iterator& bl);
+inline void encode(const jspan_context& span_ctx, bufferlist& bl, uint64_t f = 0) {
+ ENCODE_START(1, 1, bl);
+ using namespace opentelemetry;
+ using namespace trace;
+ auto is_valid = span_ctx.IsValid();
+ encode(is_valid, bl);
+ if (is_valid) {
+ encode_nohead(std::string_view(reinterpret_cast<const char*>(span_ctx.trace_id().Id().data()), TraceId::kSize), bl);
+ encode_nohead(std::string_view(reinterpret_cast<const char*>(span_ctx.span_id().Id().data()), SpanId::kSize), bl);
+ encode(span_ctx.trace_flags().flags(), bl);
+ }
+ ENCODE_FINISH(bl);
+}
+
+inline void decode(jspan_context& span_ctx, bufferlist::const_iterator& bl) {
+ using namespace opentelemetry;
+ using namespace trace;
+ DECODE_START(1, bl);
+ bool is_valid;
+ decode(is_valid, bl);
+ if (is_valid) {
+ std::array<uint8_t, TraceId::kSize> trace_id;
+ std::array<uint8_t, SpanId::kSize> span_id;
+ uint8_t flags;
+ decode(trace_id, bl);
+ decode(span_id, bl);
+ decode(flags, bl);
+ span_ctx = SpanContext(
+ TraceId(nostd::span<uint8_t, TraceId::kSize>(trace_id)),
+ SpanId(nostd::span<uint8_t, SpanId::kSize>(span_id)),
+ TraceFlags(flags),
+ true);
+ }
+ DECODE_FINISH(bl);
+}
} // namespace tracing
using jspan_attribute = Value;
-struct jspan_context {
- jspan_context() {}
- jspan_context(bool sampled_flag, bool is_remote) {}
+namespace opentelemetry {
+inline namespace v1 {
+namespace trace {
+class SpanContext {
+public:
+ SpanContext() = default;
+ SpanContext(bool sampled_flag, bool is_remote) {}
+ bool IsValid() const { return false;}
};
+} // namespace trace
+} // namespace v1
+} // namespace opentelemetry
+
+using jspan_context = opentelemetry::v1::trace::SpanContext;
struct span_stub {
jspan_context _ctx;
void AddEvent(std::string_view) {}
void AddEvent(std::string_view, std::initializer_list<std::pair<std::string_view, jspan_attribute>> fields) {}
template <typename T> void AddEvent(std::string_view name, const T& fields = {}) {}
- const jspan_context& GetContext() { return _ctx; }
+ jspan_context GetContext() const { return _ctx; }
void UpdateName(std::string_view) {}
bool IsRecording() { return false; }
};
// compound object operations
int operate(const std::string& oid, ObjectWriteOperation *op);
- int operate(const std::string& oid, ObjectWriteOperation *op, int flags);
+ int operate(const std::string& oid, ObjectWriteOperation *op, int flags, const jspan_context *trace_info = nullptr);
int operate(const std::string& oid, ObjectReadOperation *op, bufferlist *pbl);
int operate(const std::string& oid, ObjectReadOperation *op, bufferlist *pbl, int flags);
int aio_operate(const std::string& oid, AioCompletion *c, ObjectWriteOperation *op);
- int aio_operate(const std::string& oid, AioCompletion *c, ObjectWriteOperation *op, int flags);
+ int aio_operate(const std::string& oid, AioCompletion *c, ObjectWriteOperation *op, int flags, const jspan_context *trace_info = nullptr);
/**
* Schedule an async write operation with explicit snapshot parameters
*
struct blkin_trace_info;
+namespace opentelemetry {
+inline namespace v1 {
+namespace trace {
+
+class SpanContext;
+
+} // namespace trace
+} // inline namespace v1
+} // namespace opentelemetry
+
+using jspan_context = opentelemetry::v1::trace::SpanContext;
+
namespace libradosstriper {
class RadosStriper;
}
int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
- ceph::real_time *pmtime, int flags)
+ ceph::real_time *pmtime, int flags, const jspan_context* otel_trace)
{
ceph::real_time ut = (pmtime ? *pmtime :
ceph::real_clock::now());
oid, oloc,
*o, snapc, ut,
flags | extra_op_flags,
- oncommit, &ver);
+ oncommit, &ver, osd_reqid_t(), nullptr, otel_trace);
objecter->op_submit(objecter_op);
{
int librados::IoCtxImpl::aio_operate(const object_t& oid,
::ObjectOperation *o, AioCompletionImpl *c,
const SnapContext& snap_context, int flags,
- const blkin_trace_info *trace_info)
+ const blkin_trace_info *trace_info, const jspan_context *otel_trace)
{
FUNCTRACE(client->cct);
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
trace.event("init root span");
Objecter::Op *op = objecter->prepare_mutate_op(
oid, oloc, *o, snap_context, ut, flags | extra_op_flags,
- oncomplete, &c->objver, osd_reqid_t(), &trace);
+ oncomplete, &c->objver, osd_reqid_t(), &trace, otel_trace);
objecter->op_submit(op, &c->tid);
trace.event("rados operate op submitted");
int getxattrs(const object_t& oid, std::map<std::string, bufferlist>& attrset);
int rmxattr(const object_t& oid, const char *name);
- int operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags=0);
+ int operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags=0, const jspan_context *otel_trace = nullptr);
int operate_read(const object_t& oid, ::ObjectOperation *o, bufferlist *pbl, int flags=0);
int aio_operate(const object_t& oid, ::ObjectOperation *o,
AioCompletionImpl *c, const SnapContext& snap_context,
- int flags, const blkin_trace_info *trace_info = nullptr);
+ int flags, const blkin_trace_info *trace_info = nullptr, const jspan_context *otel_trace = nullptr);
int aio_operate_read(const object_t& oid, ::ObjectOperation *o,
AioCompletionImpl *c, int flags, bufferlist *pbl, const blkin_trace_info *trace_info = nullptr);
template <typename ExecutionContext, typename CompletionToken>
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectReadOperation *read_op, int flags,
- CompletionToken&& token)
+ CompletionToken&& token, const jspan_context* trace_ctx = nullptr)
{
using Op = detail::AsyncOp<bufferlist>;
using Signature = typename Op::Signature;
template <typename ExecutionContext, typename CompletionToken>
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectWriteOperation *write_op, int flags,
- CompletionToken &&token)
+ CompletionToken &&token, const jspan_context* trace_ctx = nullptr)
{
using Op = detail::AsyncOp<void>;
using Signature = typename Op::Signature;
auto p = Op::create(ctx.get_executor(), init.completion_handler);
auto& op = p->user_data;
- int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags);
+ int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
if (ret < 0) {
auto ec = boost::system::error_code{-ret, librados::detail::err_category()};
ceph::async::post(std::move(p), ec);
return io_ctx_impl->operate(obj, &o->impl->o, (ceph::real_time *)o->impl->prt);
}
-int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o, int flags)
+int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o, int flags, const jspan_context* otel_trace)
{
object_t obj(oid);
if (unlikely(!o->impl))
return -EINVAL;
- return io_ctx_impl->operate(obj, &o->impl->o, (ceph::real_time *)o->impl->prt, translate_flags(flags));
+ return io_ctx_impl->operate(obj, &o->impl->o, (ceph::real_time *)o->impl->prt, translate_flags(flags), otel_trace);
}
int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl)
io_ctx_impl->snapc, 0);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c,
- ObjectWriteOperation *o, int flags)
+ ObjectWriteOperation *o, int flags, const jspan_context* otel_trace)
{
object_t obj(oid);
if (unlikely(!o->impl))
return -EINVAL;
return io_ctx_impl->aio_operate(obj, &o->impl->o, c->pc,
io_ctx_impl->snapc,
- translate_flags(flags));
+ translate_flags(flags), nullptr, otel_trace);
}
int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c,
template<typename V>
class MOSDOp final : public MOSDFastDispatchOp {
private:
- static constexpr int HEAD_VERSION = 8;
+ static constexpr int HEAD_VERSION = 9;
static constexpr int COMPAT_VERSION = 3;
private:
encode(retry_attempt, payload);
encode(features, payload);
} else {
- // latest v8 encoding with hobject_t hash separate from pgid, no
- // reassert version
+ // latest v9 opentelemetry trace
header.version = HEAD_VERSION;
encode(pgid, payload);
encode(flags, payload);
encode(reqid, payload);
encode_trace(payload, features);
+ encode_otel_trace(payload, features);
// -- above decoded up front; below decoded post-dispatch thread --
// Always keep here the newest version of decoding order/rule
if (header.version == HEAD_VERSION) {
+ decode(pgid, p);
+ uint32_t hash;
+ decode(hash, p);
+ hobj.set_hash(hash);
+ decode(osdmap_epoch, p);
+ decode(flags, p);
+ decode(reqid, p);
+ decode_trace(p);
+ decode_otel_trace(p);
+ } else if (header.version == 8) {
decode(pgid, p); // actual pgid
uint32_t hash;
decode(hash, p); // raw hash value
#endif
}
+void Message::encode_otel_trace(ceph::bufferlist &bl, uint64_t features) const
+{
+ tracing::encode(otel_trace, bl);
+}
+
+void Message::decode_otel_trace(ceph::bufferlist::const_iterator &p, bool create)
+{
+ tracing::decode(otel_trace, p);
+}
// This routine is not used for ordinary messages, but only when encapsulating a message
// for forwarding and routing. It's also used in a backward compatibility test, which only
#include "common/ref.h"
#include "common/debug.h"
#include "common/zipkin_trace.h"
+#include "common/tracer.h"
#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
void encode_trace(ceph::buffer::list &bl, uint64_t features) const;
void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false);
+ // otel tracing
+ jspan_context otel_trace{false, false};
+ void encode_otel_trace(ceph::buffer::list &bl, uint64_t features) const;
+ void decode_otel_trace(ceph::buffer::list::const_iterator &p, bool create = false);
+
class CompletionHook : public Context {
protected:
Message *m;
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
- op->osd_parent_span = tracing::osd::tracer.start_trace("op-request-created");
+
+ if (m->otel_trace.IsValid()) {
+ op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", m->otel_trace);
+ } else {
+ op->osd_parent_span = tracing::osd::tracer.start_trace("op-request-created");
+ }
if (m->trace)
op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
m->set_reqid(op->reqid);
}
+ if (op->otel_trace && op->otel_trace->IsValid()) {
+ m->otel_trace = jspan_context(*op->otel_trace);
+ }
+
logger->inc(l_osdc_op_send);
ssize_t sum = 0;
for (unsigned i = 0; i < m->ops.size(); i++) {
#include "common/config_obs.h"
#include "common/shunique_lock.h"
#include "common/zipkin_trace.h"
+#include "common/tracer.h"
#include "common/Throttle.h"
#include "mon/MonClient.h"
osd_reqid_t reqid; // explicitly setting reqid
ZTracer::Trace trace;
+ const jspan_context* otel_trace = nullptr;
static bool has_completion(decltype(onfinish)& f) {
return std::visit([](auto&& arg) { return bool(arg);}, f);
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
int f, Context* fin, version_t *ov, int *offset = nullptr,
- ZTracer::Trace *parent_trace = nullptr) :
+ ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) :
target(o, ol, f),
ops(std::move(_ops)),
out_bl(ops.size(), nullptr),
out_ec(ops.size(), nullptr),
onfinish(fin),
objver(ov),
- data_offset(offset) {
+ data_offset(offset),
+ otel_trace(otel_trace) {
if (target.base_oloc.key == o)
target.base_oloc.key.clear();
if (parent_trace && parent_trace->valid()) {
ceph::real_time mtime, int flags,
Context *oncommit, version_t *objver = NULL,
osd_reqid_t reqid = osd_reqid_t(),
- ZTracer::Trace *parent_trace = nullptr) {
+ ZTracer::Trace *parent_trace = nullptr,
+ const jspan_context *otel_trace = nullptr) {
Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, oncommit, objver,
- nullptr, parent_trace);
+ nullptr, nullptr, otel_trace);
o->priority = op.priority;
o->mtime = mtime;
o->snapc = snapc;
}
constexpr uint64_t id = 0; // unused
auto& ref = stripe_obj.get_ref();
- auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
+ auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y, &trace), cost, id);
return process_completed(c, &written);
}
constexpr uint64_t id = 0; // unused
auto& ref = stripe_obj.get_ref();
- auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y), cost, id);
+ auto c = aio->get(ref.obj, Aio::librados_op(ref.pool.ioctx(), std::move(op), y, &trace), cost, id);
auto d = aio->drain();
c.splice(c.end(), d);
return process_completed(c, &written);
obj_op.meta.zones_trace = zones_trace;
obj_op.meta.modify_tail = true;
- r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
+ r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y, writer.get_trace());
if (r < 0) {
if (r == -ETIMEDOUT) {
// The head object write may eventually succeed, clear the set of objects for deletion. if it
obj_op.meta.zones_trace = zones_trace;
obj_op.meta.modify_tail = true;
- r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
+ r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y, writer.get_trace());
if (r < 0)
return r;
}
r = obj_op.write_meta(dpp, actual_size + cur_size,
accounted_size + *cur_accounted_size,
- attrs, y);
+ attrs, y, writer.get_trace());
if (r < 0) {
return r;
}
RawObjSet written; // set of written objects for deletion
const DoutPrefixProvider *dpp;
optional_yield y;
+ jspan_context& trace;
public:
RadosWriter(Aio *aio, RGWRados *store,
const RGWBucketInfo& bucket_info,
RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, jspan_context& _trace)
: aio(aio), store(store), bucket_info(bucket_info),
- obj_ctx(obj_ctx), head_obj(_head_obj), dpp(dpp), y(y)
+ obj_ctx(obj_ctx), head_obj(_head_obj), dpp(dpp), y(y), trace(_trace)
{}
~RadosWriter();
// so they aren't deleted on destruction
void clear_written() { written.clear(); }
+ jspan_context& get_trace() { return trace; }
};
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& _obj_ctx,
const rgw_obj& _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y)
+ const DoutPrefixProvider* dpp,
+ optional_yield y,
+ jspan_context& trace)
: HeadObjectProcessor(0),
store(store), bucket_info(bucket_info),
owner(owner),
obj_ctx(_obj_ctx), head_obj(_head_obj),
- writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y),
+ writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y, trace),
chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
if (ptail_placement_rule) {
tail_placement_rule = *ptail_placement_rule;
RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
std::optional<uint64_t> olh_epoch,
const std::string& unique_tag,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
- owner, obj_ctx, _head_obj, dpp, y),
+ owner, obj_ctx, _head_obj, dpp, y, trace),
olh_epoch(olh_epoch), unique_tag(unique_tag)
{}
const rgw_obj& _head_obj,
const std::string& upload_id, uint64_t part_num,
const std::string& part_num_str,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
- owner, obj_ctx, _head_obj, dpp, y),
+ owner, obj_ctx, _head_obj, dpp, y, trace),
target_obj(head_obj), upload_id(upload_id),
part_num(part_num), part_num_str(part_num_str),
mp(head_obj.key.name, upload_id)
const rgw_obj& _head_obj,
const std::string& unique_tag, uint64_t position,
uint64_t *cur_accounted_size,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
- owner, obj_ctx, _head_obj, dpp, y),
+ owner, obj_ctx, _head_obj, dpp, y, trace),
position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
unique_tag(unique_tag), cur_manifest(nullptr)
{}
rgw_zone_id no_zone;
+ jspan_context no_trace{false, false};
+
r = copy_obj(obj_ctx,
user,
NULL, /* req_info *info */
NULL, /* void (*progress_cb)(off_t, void *) */
NULL, /* void *progress_data */
dpp,
- null_yield);
+ null_yield,
+ no_trace);
if (r == -ECANCELED || r == -ENOENT) {
/* Has already been overwritten, meaning another rgw process already
* copied it out */
obj_ctx.set_atomic(archive_obj);
obj_ctx.set_atomic(obj);
+ jspan_context no_trace{false, false};
+
int ret = copy_obj(obj_ctx,
user,
nullptr, /* req_info *info */
nullptr, /* void (*progress_cb)(off_t, void *) */
nullptr, /* void *progress_data */
dpp,
- null_yield);
+ null_yield,
+ no_trace);
if (ret == -ECANCELED || ret == -ENOENT) {
/* Has already been overwritten, meaning another rgw process already
* copied it out */
uint64_t size, uint64_t accounted_size,
map<string, bufferlist>& attrs,
bool assume_noent, bool modify_tail,
- void *_index_op, optional_yield y)
+ void *_index_op, optional_yield y, jspan_context& trace)
{
RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
RGWRados *store = target->get_store();
auto& ioctx = ref.pool.ioctx();
tracepoint(rgw_rados, operate_enter, req_id.c_str());
- r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield, 0, &trace);
tracepoint(rgw_rados, operate_exit, req_id.c_str());
if (r < 0) { /* we can expect to get -ECANCELED if object was replaced under,
or -ENOENT if was removed, or -EEXIST if it did not exist
}
int RGWRados::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size,
- map<string, bufferlist>& attrs, optional_yield y)
+ map<string, bufferlist>& attrs, optional_yield y, jspan_context& trace)
{
RGWBucketInfo& bucket_info = target->get_bucket_info();
bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
int r;
if (assume_noent) {
- r = _do_write_meta(dpp, size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op, y);
+ r = _do_write_meta(dpp, size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op, y, trace);
if (r == -EEXIST) {
assume_noent = false;
}
}
if (!assume_noent) {
- r = _do_write_meta(dpp, size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op, y);
+ r = _do_write_meta(dpp, size, accounted_size, attrs, assume_noent, meta.modify_tail, (void *)&index_op, y, trace);
}
return r;
}
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
+ jspan_context no_trace{false, false};
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, nullptr,
user_id, obj_ctx, dest_obj, olh_epoch,
- tag, dpp, null_yield);
+ tag, dpp, null_yield, no_trace);
RGWRESTConn *conn;
auto& zone_conn_map = svc.zone->get_zone_conn_map();
auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
- optional_yield y)
+ optional_yield y,
+ jspan_context& trace)
{
int ret;
uint64_t obj_size;
write_op.meta.delete_at = delete_at;
write_op.meta.modify_tail = !copy_itself;
- ret = write_op.write_meta(dpp, obj_size, astate->accounted_size, attrs, y);
+ ret = write_op.write_meta(dpp, obj_size, astate->accounted_size, attrs, y, trace);
if (ret < 0) {
goto done_ret;
}
auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y);
using namespace rgw::putobj;
+ jspan_context no_trace{false, false};
AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info,
&dest_placement, dest_bucket_info.owner,
- obj_ctx, dest_obj, olh_epoch, tag, dpp, y);
+ obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
int ret = processor.prepare(y);
if (ret < 0)
return ret;
uint64_t size, uint64_t accounted_size,
std::map<std::string, bufferlist>& attrs,
bool modify_tail, bool assume_noent,
- void *index_op, optional_yield y);
+ void *index_op, optional_yield y, jspan_context& trace);
int write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size,
- std::map<std::string, bufferlist>& attrs, optional_yield y);
+ std::map<std::string, bufferlist>& attrs, optional_yield y, jspan_context& trace);
int write_data(const char *data, uint64_t ofs, uint64_t len, bool exclusive);
const req_state* get_req_state() {
return nullptr; /* XXX dang Only used by LTTng, and it handles null anyway */
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
- optional_yield y);
+ optional_yield y,
+ jspan_context& trace);
int copy_obj_data(RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
this, std::move(aio), owner,
ptail_placement_rule,
unique_tag, position,
- cur_accounted_size);
+ cur_accounted_size, obj->get_trace());
}
std::unique_ptr<Writer> RadosStore::get_atomic_writer(const DoutPrefixProvider *dpp,
bucket_info, obj_ctx, obj->get_obj(),
this, std::move(aio), owner,
ptail_placement_rule,
- olh_epoch, unique_tag);
+ olh_epoch, unique_tag, obj->get_trace());
}
const std::string& RadosStore::get_compression_type(const rgw_placement_rule& rule)
attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_TAIL_TAG);
- return obj_op.write_meta(dpp, 0, 0, attrs, y);
+ return obj_op.write_meta(dpp, 0, 0, attrs, y, head_obj->get_trace());
}
int RadosObject::get_max_chunk_size(const DoutPrefixProvider* dpp, rgw_placement_rule placement_rule, uint64_t* max_chunk_size, uint64_t* alignment)
progress_cb,
progress_data,
dpp,
- y);
+ y,
+ dest_object->get_trace());
}
int RadosObject::RadosReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y)
encode(upload_info, bl);
obj_op.meta.data = &bl;
- ret = obj_op.write_meta(dpp, bl.length(), 0, attrs, y);
+ ret = obj_op.write_meta(dpp, bl.length(), 0, attrs, y, get_trace());
} while (ret == -EEXIST);
return ret;
obj_op.meta.completeMultipart = true;
obj_op.meta.olh_epoch = olh_epoch;
- ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs, y);
+ ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs, y, get_trace());
if (ret < 0)
return ret;
return std::make_unique<RadosMultipartWriter>(dpp, y, get_upload_id(),
bucket_info, obj_ctx,
obj->get_obj(), store, std::move(aio), owner,
- ptail_placement_rule, part_num, part_num_str);
+ ptail_placement_rule, part_num, part_num_str, obj->get_trace());
}
MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
const rgw_user& owner,
const rgw_placement_rule *ptail_placement_rule,
uint64_t olh_epoch,
- const std::string& unique_tag) :
+ const std::string& unique_tag,
+ jspan_context& trace) :
StoreWriter(dpp, y),
store(_store),
aio(std::move(_aio)),
processor(&*aio, store->getRados(), bucket_info,
ptail_placement_rule, owner, obj_ctx,
obj, olh_epoch, unique_tag,
- dpp, y)
+ dpp, y, trace)
{}
~RadosAtomicWriter() = default;
const rgw_placement_rule *ptail_placement_rule,
const std::string& unique_tag,
uint64_t position,
- uint64_t *cur_accounted_size) :
+ uint64_t *cur_accounted_size,
+ jspan_context& trace) :
StoreWriter(dpp, y),
store(_store),
aio(std::move(_aio)),
processor(&*aio, store->getRados(), bucket_info,
ptail_placement_rule, owner, obj_ctx,
obj, unique_tag, position,
- cur_accounted_size, dpp, y)
+ cur_accounted_size, dpp, y, trace)
{}
~RadosAppendWriter() = default;
RadosStore* _store, std::unique_ptr<Aio> _aio,
const rgw_user& owner,
const rgw_placement_rule *ptail_placement_rule,
- uint64_t part_num, const std::string& part_num_str) :
+ uint64_t part_num, const std::string& part_num_str, jspan_context& trace) :
StoreWriter(dpp, y),
store(_store),
aio(std::move(_aio)),
processor(&*aio, store->getRados(), bucket_info,
ptail_placement_rule, owner, obj_ctx,
obj, upload_id,
- part_num, part_num_str, dpp, y)
+ part_num, part_num_str, dpp, y, trace)
{}
~RadosMultipartWriter() = default;
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
librados::ObjectReadOperation *op, bufferlist* pbl,
- optional_yield y, int flags)
+ optional_yield y, int flags, const jspan_context* trace_info)
{
// given a yield_context, call async_operate() to yield the coroutine instead
// of blocking
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
librados::ObjectWriteOperation *op, optional_yield y,
- int flags)
+ int flags, const jspan_context* trace_info)
{
if (y) {
auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
- librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
+ librados::async_operate(context, ioctx, oid, op, flags, yield[ec], trace_info);
return -ec.value();
}
if (is_asio_thread) {
ldpp_dout(dpp, 20) << "BACKTRACE: " << __func__ << ": " << ClibBackTrace(0) << dendl;
#endif
}
- return ioctx.operate(oid, op, flags);
+ return ioctx.operate(oid, op, flags, trace_info);
}
int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
/// perform the rados operation, using the yield context when given
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
librados::ObjectReadOperation *op, bufferlist* pbl,
- optional_yield y, int flags = 0);
+ optional_yield y, int flags = 0, const jspan_context* trace_info = nullptr);
int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
librados::ObjectWriteOperation *op, optional_yield y,
- int flags = 0);
+ int flags = 0, const jspan_context* trace_info = nullptr);
int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
optional_yield y);
}
template <typename Op>
-Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op) {
- return [ctx = std::move(ctx), op = std::move(op)] (Aio* aio, AioResult& r) mutable {
+Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, jspan_context* trace_ctx = nullptr) {
+ return [ctx = std::move(ctx), op = std::move(op), trace_ctx] (Aio* aio, AioResult& r) mutable {
constexpr bool read = std::is_same_v<std::decay_t<Op>, librados::ObjectReadOperation>;
// use placement new to construct the rados state inside of user_data
auto s = new (&r.user_data) state(aio, ctx, r);
if constexpr (read) {
+ (void)trace_ctx; // suppress unused trace_ctx warning. until we will support the read op trace
r.result = ctx.aio_operate(r.obj.oid, s->c, &op, &r.data);
} else {
- r.result = ctx.aio_operate(r.obj.oid, s->c, &op);
+ r.result = ctx.aio_operate(r.obj.oid, s->c, &op, trace_ctx);
}
if (r.result < 0) {
// cb() won't be called, so release everything here
template <typename Op>
Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op,
boost::asio::io_context& context,
- yield_context yield) {
- return [ctx = std::move(ctx), op = std::move(op), &context, yield] (Aio* aio, AioResult& r) mutable {
+ yield_context yield, jspan_context* trace_ctx = nullptr) {
+ return [ctx = std::move(ctx), op = std::move(op), &context, yield, trace_ctx] (Aio* aio, AioResult& r) mutable {
// arrange for the completion Handler to run on the yield_context's strand
// executor so it can safely call back into Aio without locking
using namespace boost::asio;
auto ex = get_associated_executor(init.completion_handler);
librados::async_operate(context, ctx, r.obj.oid, &op, 0,
- bind_executor(ex, Handler{aio, ctx, r}));
+ bind_executor(ex, Handler{aio, ctx, r}), trace_ctx);
};
}
template <typename Op>
-Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y) {
+Aio::OpFunc aio_abstract(librados::IoCtx ctx, Op&& op, optional_yield y, jspan_context *trace_ctx = nullptr) {
static_assert(std::is_base_of_v<librados::ObjectOperation, std::decay_t<Op>>);
static_assert(!std::is_lvalue_reference_v<Op>);
static_assert(!std::is_const_v<Op>);
if (y) {
return aio_abstract(std::move(ctx), std::forward<Op>(op),
- y.get_io_context(), y.get_yield_context());
+ y.get_io_context(), y.get_yield_context(), trace_ctx);
}
- return aio_abstract(std::move(ctx), std::forward<Op>(op));
+ return aio_abstract(std::move(ctx), std::forward<Op>(op), null_yield, trace_ctx);
}
} // anonymous namespace
}
Aio::OpFunc Aio::librados_op(librados::IoCtx ctx,
librados::ObjectWriteOperation&& op,
- optional_yield y) {
- return aio_abstract(std::move(ctx), std::move(op), y);
+ optional_yield y, jspan_context *trace_ctx) {
+ return aio_abstract(std::move(ctx), std::move(op), y, trace_ctx);
}
Aio::OpFunc Aio::d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,
optional_yield y);
static OpFunc librados_op(librados::IoCtx ctx,
librados::ObjectWriteOperation&& op,
- optional_yield y);
+ optional_yield y, jspan_context *trace_ctx = nullptr);
static OpFunc d3n_cache_op(const DoutPrefixProvider *dpp, optional_yield y,
off_t read_ofs, off_t read_len, std::string& location);
};
goto done;
}
+ s->trace = tracing::rgw::tracer.start_trace(op->name());
+
/* req is-a RGWOp, currently initialized separately */
ret = req->op_init();
if (ret < 0) {
rgw_placement_rule *pdest_placement = &s->dest_placement;
+ s->object->set_trace(s->trace->GetContext());
+
if (multipart) {
std::unique_ptr<rgw::sal::MultipartUpload> upload;
upload = s->bucket->get_multipart_upload(s->object->get_name(),
/** Get a unique copy of this object */
virtual std::unique_ptr<Object> clone() = 0;
+ virtual jspan_context& get_trace() = 0;
+ virtual void set_trace (jspan_context&& _trace_ctx) = 0;
+
/* dang - This is temporary, until the API is completed */
/** Get the key for this object */
virtual rgw_obj_key& get_key() = 0;
virtual std::map<uint32_t, std::unique_ptr<MultipartPart>>& get_parts() = 0;
/** Get the trace context of this upload */
- virtual const jspan_context& get_trace() = 0;
+ virtual jspan_context& get_trace() = 0;
/** Get the Object that represents this upload */
virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() = 0;
return std::make_unique<FilterObject>(*this);
}
+ virtual jspan_context& get_trace() { return next->get_trace(); }
+ virtual void set_trace (jspan_context&& _trace_ctx) { next->set_trace(std::move(_trace_ctx)); }
+
virtual void print(std::ostream& out) const override { return next->print(out); }
/* Internal to Filters */
virtual std::map<uint32_t, std::unique_ptr<MultipartPart>>& get_parts() override { return parts; }
- virtual const jspan_context& get_trace() override { return next->get_trace(); }
+ virtual jspan_context& get_trace() override { return next->get_trace(); }
virtual std::unique_ptr<rgw::sal::Object> get_meta_obj() override;
RGWObjState state;
Bucket* bucket = nullptr;
bool delete_marker{false};
+ jspan_context trace_ctx{false, false};
public:
StoreObject() = default;
* work with lifecycle */
return -1;
}
+ jspan_context& get_trace() override { return trace_ctx; }
+ void set_trace (jspan_context&& _trace_ctx) override { trace_ctx = std::move(_trace_ctx); }
virtual int get_torrent_info(const DoutPrefixProvider* dpp,
optional_yield y, bufferlist& bl) override {
virtual std::map<uint32_t, std::unique_ptr<MultipartPart>>& get_parts() override { return parts; }
- virtual const jspan_context& get_trace() override { return trace_ctx; }
+ virtual jspan_context& get_trace() override { return trace_ctx; }
virtual void print(std::ostream& out) const override {
out << get_meta();
return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, pbl, y, flags);
}
-int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op)
+int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op, const jspan_context *trace_ctx)
{
- return ref.pool.ioctx().aio_operate(ref.obj.oid, c, op);
+ return ref.pool.ioctx().aio_operate(ref.obj.oid, c, op, 0, trace_ctx);
}
int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectReadOperation *op,
int flags = 0);
int operate(const DoutPrefixProvider *dpp, librados::ObjectReadOperation *op, bufferlist *pbl,
optional_yield y, int flags = 0);
- int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op);
+ int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op, const jspan_context *trace_ctx = nullptr);
int aio_operate(librados::AioCompletion *c, librados::ObjectReadOperation *op,
bufferlist *pbl);