From 6c73c0a6ac0534af4279612babca10d996d0b2fb Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Mon, 10 Feb 2020 23:16:08 +0800 Subject: [PATCH] crimson: add pglog related logic to crimson's data write path Signed-off-by: Xuehan Xu --- src/crimson/osd/ec_backend.cc | 4 +- src/crimson/osd/ec_backend.h | 4 +- src/crimson/osd/ops_executer.cc | 14 +++---- src/crimson/osd/ops_executer.h | 19 +++++++-- src/crimson/osd/osd_operations/osdop_params.h | 25 ++++++++++++ src/crimson/osd/pg.cc | 40 ++++++++++++++----- src/crimson/osd/pg.h | 20 +++++++++- src/crimson/osd/pg_backend.cc | 15 ++++--- src/crimson/osd/pg_backend.h | 9 +++-- src/crimson/osd/replicated_backend.cc | 13 ++++-- src/crimson/osd/replicated_backend.h | 4 +- src/osd/PeeringState.h | 11 +++++ 12 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 src/crimson/osd/osd_operations/osdop_params.h diff --git a/src/crimson/osd/ec_backend.cc b/src/crimson/osd/ec_backend.cc index 911dc250e8a..c6516d50a8e 100644 --- a/src/crimson/osd/ec_backend.cc +++ b/src/crimson/osd/ec_backend.cc @@ -26,9 +26,9 @@ seastar::future ECBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, - osd_reqid_t req_id, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, - eversion_t ver) + std::vector&& log_entries) { // todo return seastar::make_ready_future(); diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index c7548de0749..d9451d2296b 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -26,9 +26,9 @@ private: _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, - osd_reqid_t req_id, + const osd_op_params_t& req, epoch_t min_epoch, epoch_t max_epoch, - eversion_t ver) final; + std::vector&& log_entries) final; CollectionRef coll; crimson::os::FuturizedStore* store; }; diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index c3d797e5815..748f98027eb 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -669,25 +669,25 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op) case CEPH_OSD_OP_CREATE: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.create(os, osd_op, txn); - }); + }, true); case CEPH_OSD_OP_WRITE: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.write(os, osd_op, txn); - }); + }, true); case CEPH_OSD_OP_WRITEFULL: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.writefull(os, osd_op, txn); - }); + }, true); case CEPH_OSD_OP_SETALLOCHINT: return osd_op_errorator::now(); case CEPH_OSD_OP_SETXATTR: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.setxattr(os, osd_op, txn); - }); + }, true); case CEPH_OSD_OP_DELETE: return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.remove(os, txn); - }); + }, true); case CEPH_OSD_OP_CALL: return this->do_op_call(osd_op); case CEPH_OSD_OP_STAT: @@ -722,13 +722,13 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op) #endif return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { return backend.omap_set_vals(os, osd_op, txn); - }); + }, true); // watch/notify case CEPH_OSD_OP_WATCH: return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { return do_op_watch(osd_op, os, txn); - }); + }, false); case CEPH_OSD_OP_NOTIFY: return do_read_op([this, &osd_op] (auto&, const auto& os) { return do_op_notify(osd_op, os); diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index a66df94a502..11ee868e0aa 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -83,6 +83,7 @@ private: PG& pg; PGBackend& backend; Ref msg; + bool user_modify = false; ceph::os::Transaction txn; size_t num_read = 0; ///< count read ops @@ -145,8 +146,9 @@ private: } template - auto do_write_op(Func&& f) { + auto do_write_op(Func&& f, bool um) { ++num_write; + user_modify = um; return std::forward(f)(backend, obc->obs, txn); } @@ -220,10 +222,21 @@ auto OpsExecuter::with_effect_on_obc( template OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && { + assert(obc); + osd_op_params_t osd_op_params(std::move(msg)); + eversion_t at_version = pg.next_version(); + + osd_op_params.at_version = at_version; + osd_op_params.pg_trim_to = pg.get_pg_trim_to(); + osd_op_params.min_last_complete_ondisk = pg.get_min_last_complete_ondisk(); + osd_op_params.last_complete = pg.get_info().last_complete; + if (user_modify) + osd_op_params.user_at_version = at_version.version; + if (__builtin_expect(op_effects.empty(), true)) { - return std::forward(f)(std::move(txn), std::move(obc)); + return std::forward(f)(std::move(txn), std::move(obc), std::move(osd_op_params)); } - return std::forward(f)(std::move(txn), std::move(obc)).safe_then([this] { + return std::forward(f)(std::move(txn), std::move(obc), std::move(osd_op_params)).safe_then([this] { // let's do the cleaning of `op_effects` in destructor return crimson::do_for_each(op_effects, [] (auto& op_effect) { return op_effect->execute(); diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h new file mode 100644 index 00000000000..b50fb2b5f78 --- /dev/null +++ b/src/crimson/osd/osd_operations/osdop_params.h @@ -0,0 +1,25 @@ +#pragma once + +#include "messages/MOSDOp.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" + +// The fields in this struct are parameters that may be needed in multiple +// level of processing. I inclosed all those parameters in this struct to +// avoid passing each of them as a method parameter. +struct osd_op_params_t { + Ref req; + eversion_t at_version; + eversion_t pg_trim_to; + eversion_t min_last_complete_ondisk; + eversion_t last_complete; + version_t user_at_version; + bool user_modify = false; + + osd_op_params_t(Ref&& req) : req(req) {} + osd_op_params_t(Ref&& req, eversion_t at_version, eversion_t pg_trim_to, + eversion_t mlcod, eversion_t lc, version_t user_at_version) : + req(req), at_version(at_version), pg_trim_to(pg_trim_to), + min_last_complete_ondisk(mlcod), last_complete(lc), + user_at_version(user_at_version) {} +}; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 991002b90b2..7a651ff22c7 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -36,6 +36,7 @@ #include "crimson/osd/pg_meta.h" #include "crimson/osd/pg_backend.h" #include "crimson/osd/ops_executer.h" +#include "crimson/osd/osd_operations/osdop_params.h" #include "crimson/osd/osd_operations/peering_event.h" namespace { @@ -487,21 +488,33 @@ blocking_future<> PG::WaitForActiveBlocker::wait() seastar::future<> PG::submit_transaction(ObjectContextRef&& obc, ceph::os::Transaction&& txn, - const MOSDOp& req) + const osd_op_params_t& osd_op_p) { epoch_t map_epoch = get_osdmap_epoch(); - eversion_t at_version{map_epoch, projected_last_update.version + 1}; + + std::vector log_entries; + log_entries.emplace_back(obc->obs.exists ? + pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE, + obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version, + osd_op_p.user_modify ? osd_op_p.at_version.version : 0, + osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(), 0); + peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version, + txn, true, false); + return backend->mutate_object(peering_state.get_acting_recovery_backfill(), std::move(obc), std::move(txn), - req, + std::move(osd_op_p), peering_state.get_last_peering_reset(), map_epoch, - at_version).then([this](auto acked) { + std::move(log_entries)).then( + [this, last_complete=peering_state.get_info().last_complete, + at_version=osd_op_p.at_version](auto acked) { for (const auto& peer : acked) { peering_state.update_peer_last_complete_ondisk( peer.shard, peer.last_complete_ondisk); } + peering_state.complete_write(at_version, last_complete); return seastar::now(); }); } @@ -515,6 +528,7 @@ seastar::future> PG::do_osd_ops( : m->get_hobj(); auto ox = std::make_unique(obc, *this/* as const& */, m); + return crimson::do_for_each( m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) { logger().debug( @@ -528,10 +542,10 @@ seastar::future> PG::do_osd_ops( "do_osd_ops: {} - object {} all operations successful", *m, obc->obs.oi.soid); - return std::move(*ox).submit_changes( - [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> { - // XXX: the entire lambda could be scheduled conditionally. ::if_then()? - if (txn.empty()) { + return std::move(*ox).submit_changes([this, m] + (auto&& txn, auto&& obc, auto&& osd_op_p) -> osd_op_errorator::future<> { + // XXX: the entire lambda could be scheduled conditionally. ::if_then()? + if (txn.empty()) { logger().debug( "do_osd_ops: {} - object {} txn is empty, bypassing mutate", *m, @@ -542,8 +556,9 @@ seastar::future> PG::do_osd_ops( "do_osd_ops: {} - object {} submitting txn", *m, obc->obs.oi.soid); - return submit_transaction(std::move(obc), std::move(txn), *m); - } + return submit_transaction(std::move(obc), std::move(txn), + std::move(osd_op_p)); + } }); }).safe_then([m, obc, this, ox_deleter = std::move(ox)] { auto reply = make_message(m.get(), 0, get_osdmap_epoch(), @@ -787,6 +802,11 @@ seastar::future<> PG::handle_rep_op(Ref req) ceph::os::Transaction txn; auto encoded_txn = req->get_data().cbegin(); decode(txn, encoded_txn); + auto p = req->logbl.cbegin(); + std::vector log_entries; + decode(log_entries, p); + peering_state.append_log(std::move(log_entries), req->pg_trim_to, + req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false); return shard_services.get_store().do_transaction(coll_ref, std::move(txn)) .then([req, lcod=peering_state.get_info().last_complete, this] { peering_state.update_last_complete_ondisk(lcod); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 1ced9b22e77..36cf22073ee 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -30,6 +30,8 @@ class OSDMap; class MQuery; class PGBackend; class PGPeeringEvent; +class osd_op_params_t; + namespace recovery { class Context; } @@ -94,6 +96,18 @@ public: return peering_state.get_osdmap_epoch(); } + eversion_t get_pg_trim_to() const { + return peering_state.get_pg_trim_to(); + } + + eversion_t get_min_last_complete_ondisk() const { + return peering_state.get_min_last_complete_ondisk(); + } + + const pg_info_t& get_info() const { + return peering_state.get_info(); + } + // DoutPrefixProvider std::ostream& gen_prefix(std::ostream& out) const final { return out << *this; @@ -474,7 +488,7 @@ private: uint64_t limit); seastar::future<> submit_transaction(ObjectContextRef&& obc, ceph::os::Transaction&& txn, - const MOSDOp& req); + const osd_op_params_t& oop); private: OSDMapGate osdmap_gate; @@ -484,6 +498,10 @@ private: public: cached_map_t get_osdmap() { return osdmap; } + eversion_t next_version() { + return eversion_t(projected_last_update.epoch, + ++projected_last_update.version); + } private: std::unique_ptr backend; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index 3394daeaccf..72f32f61881 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -119,10 +119,10 @@ PGBackend::mutate_object( std::set pg_shards, crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, - const MOSDOp& m, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t map_epoch, - eversion_t ver) + std::vector&& log_entries) { logger().trace("mutate_object: num_ops={}", txn.get_num_ops()); if (obc->obs.exists) { @@ -131,8 +131,13 @@ PGBackend::mutate_object( obc->obs.oi.prior_version = ctx->obs->oi.version; #endif - obc->obs.oi.last_reqid = m.get_reqid(); - obc->obs.oi.mtime = m.get_mtime(); + auto& m = osd_op_p.req; + obc->obs.oi.prior_version = obc->obs.oi.version; + obc->obs.oi.version = osd_op_p.at_version; + if (osd_op_p.user_at_version > obc->obs.oi.user_version) + obc->obs.oi.user_version = osd_op_p.user_at_version; + obc->obs.oi.last_reqid = m->get_reqid(); + obc->obs.oi.mtime = m->get_mtime(); obc->obs.oi.local_mtime = ceph_clock_now(); // object_info_t @@ -148,7 +153,7 @@ PGBackend::mutate_object( } return _submit_transaction( std::move(pg_shards), obc->obs.oi.soid, std::move(txn), - m.get_reqid(), min_epoch, map_epoch, ver); + std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries)); } static inline bool _read_verify_data( diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index d72997eff66..28b39c95c50 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -14,6 +14,7 @@ #include "crimson/common/shared_lru.h" #include "osd/osd_types.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/osd_operations/osdop_params.h" struct hobject_t; class MOSDRepOpReply; @@ -79,10 +80,10 @@ public: std::set pg_shards, crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, - const MOSDOp& m, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t map_epoch, - eversion_t ver); + std::vector&& log_entries); seastar::future, hobject_t> list_objects( const hobject_t& start, uint64_t limit) const; @@ -143,7 +144,7 @@ private: _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, - osd_reqid_t req_id, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, - eversion_t ver) = 0; + std::vector&& log_entries) = 0; }; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 18c98faeadb..b1b311e61aa 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -6,6 +6,8 @@ #include "crimson/os/cyanstore/cyan_object.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/shard_services.h" +#include "crimson/osd/pg.h" +#include "osd/PeeringState.h" namespace { seastar::logger& logger() { @@ -36,11 +38,12 @@ seastar::future ReplicatedBackend::_submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, - osd_reqid_t req_id, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t map_epoch, - eversion_t ver) + std::vector&& log_entries) { const ceph_tid_t tid = next_txn_id++; + auto req_id = osd_op_p.req->get_reqid(); auto pending_txn = pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first; bufferlist encoded_txn; @@ -56,9 +59,13 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, spg_t{pgid, pg_shard.shard}, hoid, CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, map_epoch, min_epoch, - tid, ver); + tid, osd_op_p.at_version); m->set_data(encoded_txn); pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); + encode(log_entries, m->logbl); + m->pg_trim_to = osd_op_p.pg_trim_to; + m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; + m->set_rollback_to(osd_op_p.at_version); // TODO: set more stuff. e.g., pg_states return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); } diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index c61d2b88d0e..53a0038da8a 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -31,9 +31,9 @@ private: _submit_transaction(std::set&& pg_shards, const hobject_t& hoid, ceph::os::Transaction&& txn, - osd_reqid_t req_id, + const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, - eversion_t ver) final; + std::vector&& log_entries) final; const pg_t pgid; const pg_shard_t whoami; crimson::osd::ShardServices& shard_services; diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 03faa5c7734..9b8f3883bfc 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1765,6 +1765,17 @@ public: std::optional trim_to, std::optional roll_forward_to); + void append_log_with_trim_to_updated( + std::vector&& log_entries, + eversion_t roll_forward_to, + ObjectStore::Transaction &t, + bool transaction_applied, + bool async) { + update_trim_to(); + append_log(std::move(log_entries), pg_trim_to, roll_forward_to, + min_last_complete_ondisk, t, transaction_applied, async); + } + /** * Updates local log to reflect new write from primary. */ -- 2.39.5