pg_meta.cc
replicated_backend.cc
shard_services.cc
+ ops_executer.cc
osd_operation.cc
osd_operations/client_request.cc
osd_operations/compound_peering_request.cc
#include "common/debug.h"
#include "crimson/osd/exceptions.h"
+#include "crimson/osd/ops_executer.h"
#include "crimson/osd/pg_backend.h"
#include "objclass/objclass.h"
// we're blocking here which presumes execution in Seastar's thread.
try {
- hctx.backend->stat(*hctx.os, op).get();
+ reinterpret_cast<ceph::osd::OpsExecuter*>(hctx)->do_osd_op(op).get();
} catch (ceph::osd::error& e) {
return -e.code().value();
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ops_executer.h"
+
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm/copy.hpp>
+#include <boost/range/algorithm/max_element.hpp>
+#include <boost/range/numeric.hpp>
+
+#include <fmt/format.h>
+#include <fmt/ostream.h>
+
+#include <seastar/core/thread.hh>
+
+#include "crimson/osd/exceptions.h"
+#include "osd/ClassHandler.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+seastar::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
+{
+ if (!os->exists) {
+ throw ceph::osd::object_not_found();
+ }
+
+ std::string cname, mname;
+ ceph::bufferlist indata;
+ try {
+ auto bp = std::begin(osd_op.indata);
+ bp.copy(osd_op.op.cls.class_len, cname);
+ bp.copy(osd_op.op.cls.method_len, mname);
+ bp.copy(osd_op.op.cls.indata_len, indata);
+ } catch (buffer::error&) {
+ logger().warn("call unable to decode class + method + indata");
+ throw ceph::osd::invalid_argument{};
+ }
+
+ // NOTE: opening a class can actually result in dlopen(), and thus
+ // blocking the entire reactor. Thankfully to ClassHandler's cache
+ // this is supposed to be extremely infrequent.
+ ClassHandler::ClassData* cls;
+ int r = ClassHandler::get_instance().open_class(cname, &cls);
+ if (r) {
+ logger().warn("class {} open got {}", cname, cpp_strerror(r));
+ if (r == -ENOENT) {
+ throw ceph::osd::operation_not_supported{};
+ } else if (r == -EPERM) {
+ // propagate permission errors
+ throw ceph::osd::permission_denied{};
+ }
+ throw ceph::osd::input_output_error{};
+ }
+
+ ClassHandler::ClassMethod* method = cls->get_method(mname);
+ if (!method) {
+ logger().warn("call method {}.{} does not exist", cname, mname);
+ throw ceph::osd::operation_not_supported{};
+ }
+
+ const auto flags = method->get_flags();
+#if 0
+ if (flags & CLS_METHOD_WR) {
+ ctx->user_modify = true;
+ }
+#endif
+
+ logger().debug("calling method {}.{}", cname, mname);
+#if 0
+ int prev_rd = ctx->num_read;
+ int prev_wr = ctx->num_write;
+#endif
+
+
+ return seastar::async([this, &osd_op, method, indata=std::move(indata)]() mutable {
+ ceph::bufferlist outdata;
+ const auto ret = method->exec(reinterpret_cast<cls_method_context_t>(this),
+ indata, outdata);
+ if (ret < 0) {
+ throw ceph::osd::make_error(ret);
+ }
+#if 0
+ if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
+ derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl;
+ result = -EIO;
+ break;
+ }
+ if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
+ derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl;
+ result = -EIO;
+ break;
+ }
+#endif
+
+ logger().debug("method called response length={}", outdata.length());
+ osd_op.op.extent.length = outdata.length();
+ osd_op.outdata.claim_append(outdata);
+ });
+
+}
+seastar::future<ceph::bufferlist>
+OpsExecuter::do_pgnls(ceph::bufferlist& indata,
+ const std::string& nspace,
+ uint64_t limit)
+{
+ hobject_t lower_bound;
+ try {
+ ceph::decode(lower_bound, indata);
+ } catch (const buffer::error& e) {
+ throw std::invalid_argument("unable to decode PGNLS handle");
+ }
+ const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
+ const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
+ if (!(lower_bound.is_min() ||
+ lower_bound.is_max() ||
+ (lower_bound >= pg_start && lower_bound < pg_end))) {
+ // this should only happen with a buggy client.
+ throw std::invalid_argument("outside of PG bounds");
+ }
+ return backend.list_objects(lower_bound, limit).then(
+ [lower_bound, pg_end, nspace](auto objects, auto next) {
+ auto in_my_namespace = [&nspace](const hobject_t& o) {
+ using ceph::common::local_conf;
+ if (o.get_namespace() == local_conf()->osd_hit_set_namespace) {
+ return false;
+ } else if (nspace == librados::all_nspaces) {
+ return true;
+ } else {
+ return o.get_namespace() == nspace;
+ }
+ };
+ pg_nls_response_t response;
+ boost::copy(objects |
+ boost::adaptors::filtered(in_my_namespace) |
+ boost::adaptors::transformed([](const hobject_t& o) {
+ return librados::ListObjectImpl{o.get_namespace(),
+ o.oid.name,
+ o.get_key()}; }),
+ std::back_inserter(response.entries));
+ response.handle = next.is_max() ? pg_end : next;
+ bufferlist bl;
+ encode(response, bl);
+ return seastar::make_ready_future<bufferlist>(std::move(bl));
+ });
+}
+
+// TODO: split the method accordingly to os' constness needs
+seastar::future<>
+OpsExecuter::do_osd_op(OSDOp& osd_op)
+{
+ // TODO: dispatch via call table?
+ // TODO: we might want to find a way to unify both input and output
+ // of each op.
+ logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op));
+ switch (const ceph_osd_op& op = osd_op.op; op.op) {
+ case CEPH_OSD_OP_SYNC_READ:
+ [[fallthrough]];
+ case CEPH_OSD_OP_READ:
+ return backend.read(os->oi,
+ op.extent.offset,
+ op.extent.length,
+ op.extent.truncate_size,
+ op.extent.truncate_seq,
+ op.flags).then([&osd_op](bufferlist bl) {
+ osd_op.rval = bl.length();
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+ case CEPH_OSD_OP_WRITE:
+ return backend.write(*os, osd_op, txn);
+ case CEPH_OSD_OP_WRITEFULL:
+ // XXX: os = backend.write(std::move(os), ...) instead?
+ return backend.writefull(*os, osd_op, txn);
+ case CEPH_OSD_OP_SETALLOCHINT:
+ return seastar::now();
+ case CEPH_OSD_OP_PGNLS:
+ return do_pgnls(osd_op.indata, os->oi.soid.get_namespace(), op.pgls.count)
+ .then([&osd_op](bufferlist bl) {
+ osd_op.outdata = std::move(bl);
+ return seastar::now();
+ });
+ case CEPH_OSD_OP_DELETE:
+ return backend.remove(*os, txn);
+ case CEPH_OSD_OP_CALL:
+ return this->do_op_call(osd_op);
+ case CEPH_OSD_OP_STAT:
+ return backend.stat(*os, osd_op);
+ default:
+ logger().warn("unknown op {}", ceph_osd_op_name(op.op));
+ throw std::runtime_error(
+ fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
+ }
+}
+
+} // namespace ceph::osd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "common/dout.h"
+#include "crimson/net/Fwd.h"
+#include "os/Transaction.h"
+#include "osd/osd_types.h"
+#include "osd/osd_internal_types.h"
+
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
+
+namespace ceph::osd {
+class OpsExecuter {
+ PGBackend::cached_os_t os;
+ PG& pg;
+ PGBackend& backend;
+ ceph::os::Transaction txn;
+
+ seastar::future<ceph::bufferlist> do_pgnls(
+ ceph::bufferlist& indata,
+ const std::string& nspace,
+ uint64_t limit);
+ seastar::future<> do_op_call(class OSDOp& osd_op);
+
+public:
+ OpsExecuter(PGBackend::cached_os_t os, PG& pg)
+ : os(std::move(os)), pg(pg), backend(pg.get_backend()) {
+ }
+
+ seastar::future<> do_osd_op(class OSDOp& osd_op);
+
+ template <typename Func> seastar::future<> submit_changes(Func&& f) && {
+ return std::forward<Func>(f)(std::move(txn), std::move(os));
+ }
+};
+
+} // namespace ceph::osd
#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/peering_event.h"
namespace {
}
}
-// TODO: split the method accordingly to os' constness needs
-seastar::future<>
-PG::do_osd_op(ObjectState& os, OSDOp& osd_op, ceph::os::Transaction& txn)
-{
- // TODO: dispatch via call table?
- // TODO: we might want to find a way to unify both input and output
- // of each op.
- logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op));
- switch (const ceph_osd_op& op = osd_op.op; op.op) {
- case CEPH_OSD_OP_SYNC_READ:
- [[fallthrough]];
- case CEPH_OSD_OP_READ:
- return backend->read(os.oi,
- op.extent.offset,
- op.extent.length,
- op.extent.truncate_size,
- op.extent.truncate_seq,
- op.flags).then([&osd_op](bufferlist bl) {
- osd_op.rval = bl.length();
- osd_op.outdata = std::move(bl);
- return seastar::now();
- });
- case CEPH_OSD_OP_WRITE:
- return backend->write(os, osd_op, txn);
- case CEPH_OSD_OP_WRITEFULL:
- // XXX: os = backend->write(std::move(os), ...) instead?
- return backend->writefull(os, osd_op, txn);
- case CEPH_OSD_OP_SETALLOCHINT:
- return seastar::now();
- case CEPH_OSD_OP_PGNLS:
- return do_pgnls(osd_op.indata, os.oi.soid.get_namespace(), op.pgls.count)
- .then([&osd_op](bufferlist bl) {
- osd_op.outdata = std::move(bl);
- return seastar::now();
- });
- case CEPH_OSD_OP_DELETE:
- return backend->remove(os, txn);
- case CEPH_OSD_OP_CALL:
- return backend->call(os, osd_op, txn);
- case CEPH_OSD_OP_STAT:
- return backend->stat(os, osd_op);
- default:
- logger().warn("unknown op {}", ceph_osd_op_name(op.op));
- throw std::runtime_error(
- fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
- }
-}
-
-seastar::future<bufferlist> PG::do_pgnls(bufferlist& indata,
- const std::string& nspace,
- uint64_t limit)
-{
- hobject_t lower_bound;
- try {
- ceph::decode(lower_bound, indata);
- } catch (const buffer::error& e) {
- throw std::invalid_argument("unable to decode PGNLS handle");
- }
- const auto pg_start = pgid.pgid.get_hobj_start();
- const auto pg_end = pgid.pgid.get_hobj_end(peering_state.get_pool().info.get_pg_num());
- if (!(lower_bound.is_min() ||
- lower_bound.is_max() ||
- (lower_bound >= pg_start && lower_bound < pg_end))) {
- // this should only happen with a buggy client.
- throw std::invalid_argument("outside of PG bounds");
- }
- return backend->list_objects(lower_bound, limit).then(
- [lower_bound, pg_end, nspace](auto objects, auto next) {
- auto in_my_namespace = [&nspace](const hobject_t& o) {
- if (o.get_namespace() == local_conf()->osd_hit_set_namespace) {
- return false;
- } else if (nspace == librados::all_nspaces) {
- return true;
- } else {
- return o.get_namespace() == nspace;
- }
- };
- pg_nls_response_t response;
- boost::copy(objects |
- boost::adaptors::filtered(in_my_namespace) |
- boost::adaptors::transformed([](const hobject_t& o) {
- return librados::ListObjectImpl{o.get_namespace(),
- o.oid.name,
- o.get_key()}; }),
- std::back_inserter(response.entries));
- response.handle = next.is_max() ? pg_end : next;
- bufferlist bl;
- encode(response, bl);
- return seastar::make_ready_future<bufferlist>(std::move(bl));
- });
-}
-
seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
ceph::os::Transaction&& txn,
const MOSDOp& req)
seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
{
- return seastar::do_with(std::move(m), ceph::os::Transaction{},
- [this](auto& m, auto& txn) {
- const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
- : m->get_hobj();
- return backend->get_object_state(oid).then([m,&txn,this](auto os) {
- // TODO: issue requests in parallel if they don't write,
- // with writes being basically a synchronization barrier
- return seastar::do_for_each(std::begin(m->ops), std::end(m->ops),
- [m,&txn,this,pos=os.get()](OSDOp& osd_op) {
- return do_osd_op(*pos, osd_op, txn);
- }).then([&txn,m,this,os=std::move(os)]() mutable {
- // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
- if (txn.empty()) {
- return seastar::now();
- } else {
- return submit_transaction(std::move(os), std::move(txn), *m);
- }
+ const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
+ : m->get_hobj();
+ return backend->get_object_state(oid).then([this, m](auto os) mutable {
+ return seastar::do_with(OpsExecuter{std::move(os), *this/* as const& */},
+ [this, m=std::move(m)] (auto& ox) {
+ return seastar::do_for_each(m->ops, [this, &ox](OSDOp& osd_op) {
+ logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
+ return ox.do_osd_op(osd_op);
+ }).then([this, m, &ox] {
+ logger().debug("all operations have been executed successfully");
+ return std::move(ox).submit_changes([this, m] (auto&& txn, auto&& os) {
+ // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
+ if (txn.empty()) {
+ logger().debug("txn is empty, bypassing mutate");
+ return seastar::now();
+ } else {
+ return submit_transaction(std::move(os), std::move(txn), *m);
+ }
+ });
});
- }).then([m,this] {
- auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
- 0, false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }).handle_exception_type([=](const ceph::osd::error& e) {
- logger().debug("got ceph::osd::error while handling object {}: {} ({})",
- oid, e.code(), e.what());
-
- backend->evict_object_state(oid);
- auto reply = make_message<MOSDOpReply>(
- m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
+ }).then([m,this] {
+ auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ 0, false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }).handle_exception_type([=,&oid](const ceph::osd::error& e) {
+ logger().debug("got ceph::osd::error while handling object {}: {} ({})",
+ oid, e.code(), e.what());
+
+ backend->evict_object_state(oid);
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
}
return pgid;
}
+ PGBackend& get_backend() {
+ return *backend;
+ }
+
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
return peering_state.get_need_up_thru();
}
+ const auto& get_pool() const {
+ return peering_state.get_pool();
+ }
+
/// initialize created PG
void init(
ceph::os::CollectionRef coll_ref,
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <fmt/ostream.h>
-#include <seastar/core/thread.hh>
#include <seastar/core/print.hh>
#include "messages/MOSDOp.h"
#include "ec_backend.h"
#include "exceptions.h"
-#include "osd/ClassHandler.h"
-
namespace {
seastar::logger& logger() {
return ceph::get_logger(ceph_subsys_osd);
objects, next.hobj);
});
}
-
-seastar::future<> PGBackend::call(
- ObjectState& os,
- OSDOp& osd_op,
- ceph::os::Transaction& txn)
-{
- if (!os.exists) {
- throw ::object_not_found();
- }
-
- std::string cname, mname;
- ceph::bufferlist indata;
- try {
- auto bp = std::begin(osd_op.indata);
- bp.copy(osd_op.op.cls.class_len, cname);
- bp.copy(osd_op.op.cls.method_len, mname);
- bp.copy(osd_op.op.cls.indata_len, indata);
- } catch (buffer::error& e) {
- logger().warn("call unable to decode class + method + indata");
- throw ::invalid_argument{};
- }
-
- // NOTE: opening a class can actually result in dlopen(), and thus
- // blocking the entire reactor. Thankfully to ClassHandler's cache
- // this is supposed to be extremely infrequent.
- ClassHandler::ClassData* cls;
- int r = ClassHandler::get_instance().open_class(cname, &cls);
- if (r) {
- logger().warn("class {} open got {}", cname, cpp_strerror(r));
- if (r == -ENOENT) {
- throw ceph::osd::operation_not_supported{};
- } else if (r == -EPERM) {
- // propagate permission errors
- throw ceph::osd::permission_denied{};
- }
- throw ceph::osd::input_output_error{};
- }
-
- ClassHandler::ClassMethod* method = cls->get_method(mname);
- if (!method) {
- logger().warn("call method {}.{} does not exist", cname, mname);
- throw ceph::osd::operation_not_supported{};
- }
-
- const auto flags = method->get_flags();
-#if 0
- if (flags & CLS_METHOD_WR) {
- ctx->user_modify = true;
- }
-#endif
-
- logger().debug("calling method {}.{}", cname, mname);
-#if 0
- int prev_rd = ctx->num_read;
- int prev_wr = ctx->num_write;
-#endif
-
-
- return seastar::async([this,&os,&osd_op, method, indata=std::move(indata)]() mutable {
- ceph::bufferlist outdata;
- const auto ret = method->exec(cls_method_context_t{this, &os}, indata, outdata);
- if (ret < 0) {
- throw ceph::osd::make_error(ret);
- }
-#if 0
- if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
- derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl;
- result = -EIO;
- break;
- }
- if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
- derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl;
- result = -EIO;
- break;
- }
-#endif
-
- logger().debug("method called response length={}", outdata.length());
- osd_op.op.extent.length = outdata.length();
- osd_op.outdata.claim_append(outdata);
- });
-
-}
seastar::future<std::vector<hobject_t>, hobject_t> list_objects(
const hobject_t& start,
uint64_t limit);
- seastar::future<> call(
- ObjectState& os,
- OSDOp& osd_op,
- ceph::os::Transaction& txn);
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
*
* A context for the method of the object class.
*/
-#ifdef WITH_SEASTAR
-typedef struct {
- class PGBackend *backend = nullptr;
- class ObjectState *os = nullptr;
-} cls_method_context_t;
-#else
typedef void* cls_method_context_t;
-#endif // WITH_SEASTAR
/*class utils*/
extern int cls_log(int level, const char *format, ...)