ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
- osd_reqid_t req_id,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
- eversion_t ver)
+ std::vector<pg_log_entry_t>&& log_entries)
{
// todo
return seastar::make_ready_future<crimson::osd::acked_peers_t>();
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
- osd_reqid_t req_id,
+ const osd_op_params_t& req,
epoch_t min_epoch, epoch_t max_epoch,
- eversion_t ver) final;
+ std::vector<pg_log_entry_t>&& log_entries) final;
CollectionRef coll;
crimson::os::FuturizedStore* store;
};
case CEPH_OSD_OP_CREATE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.create(os, osd_op, txn);
- });
+ }, true);
case CEPH_OSD_OP_WRITE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.write(os, osd_op, txn);
- });
+ }, true);
case CEPH_OSD_OP_WRITEFULL:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.writefull(os, osd_op, txn);
- });
+ }, true);
case CEPH_OSD_OP_SETALLOCHINT:
return osd_op_errorator::now();
case CEPH_OSD_OP_SETXATTR:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.setxattr(os, osd_op, txn);
- });
+ }, true);
case CEPH_OSD_OP_DELETE:
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.remove(os, txn);
- });
+ }, true);
case CEPH_OSD_OP_CALL:
return this->do_op_call(osd_op);
case CEPH_OSD_OP_STAT:
#endif
return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
return backend.omap_set_vals(os, osd_op, txn);
- });
+ }, true);
// watch/notify
case CEPH_OSD_OP_WATCH:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
return do_op_watch(osd_op, os, txn);
- });
+ }, false);
case CEPH_OSD_OP_NOTIFY:
return do_read_op([this, &osd_op] (auto&, const auto& os) {
return do_op_notify(osd_op, os);
PG& pg;
PGBackend& backend;
Ref<MOSDOp> msg;
+ bool user_modify = false;
ceph::os::Transaction txn;
size_t num_read = 0; ///< count read ops
}
template <class Func>
- auto do_write_op(Func&& f) {
+ auto do_write_op(Func&& f, bool um) {
++num_write;
+ user_modify = um;
return std::forward<Func>(f)(backend, obc->obs, txn);
}
template <typename Func>
OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
+ assert(obc);
+ osd_op_params_t osd_op_params(std::move(msg));
+ eversion_t at_version = pg.next_version();
+
+ osd_op_params.at_version = at_version;
+ osd_op_params.pg_trim_to = pg.get_pg_trim_to();
+ osd_op_params.min_last_complete_ondisk = pg.get_min_last_complete_ondisk();
+ osd_op_params.last_complete = pg.get_info().last_complete;
+ if (user_modify)
+ osd_op_params.user_at_version = at_version.version;
+
if (__builtin_expect(op_effects.empty(), true)) {
- return std::forward<Func>(f)(std::move(txn), std::move(obc));
+ return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params));
}
- return std::forward<Func>(f)(std::move(txn), std::move(obc)).safe_then([this] {
+ return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params)).safe_then([this] {
// let's do the cleaning of `op_effects` in destructor
return crimson::do_for_each(op_effects, [] (auto& op_effect) {
return op_effect->execute();
--- /dev/null
+#pragma once
+
+#include "messages/MOSDOp.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+
+// The fields in this struct are parameters that may be needed in multiple
+// level of processing. I inclosed all those parameters in this struct to
+// avoid passing each of them as a method parameter.
+struct osd_op_params_t {
+ Ref<MOSDOp> req;
+ eversion_t at_version;
+ eversion_t pg_trim_to;
+ eversion_t min_last_complete_ondisk;
+ eversion_t last_complete;
+ version_t user_at_version;
+ bool user_modify = false;
+
+ osd_op_params_t(Ref<MOSDOp>&& req) : req(req) {}
+ osd_op_params_t(Ref<MOSDOp>&& req, eversion_t at_version, eversion_t pg_trim_to,
+ eversion_t mlcod, eversion_t lc, version_t user_at_version) :
+ req(req), at_version(at_version), pg_trim_to(pg_trim_to),
+ min_last_complete_ondisk(mlcod), last_complete(lc),
+ user_at_version(user_at_version) {}
+};
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/ops_executer.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
#include "crimson/osd/osd_operations/peering_event.h"
namespace {
seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
- const MOSDOp& req)
+ const osd_op_params_t& osd_op_p)
{
epoch_t map_epoch = get_osdmap_epoch();
- eversion_t at_version{map_epoch, projected_last_update.version + 1};
+
+ std::vector<pg_log_entry_t> log_entries;
+ log_entries.emplace_back(obc->obs.exists ?
+ pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+ obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
+ osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
+ osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(), 0);
+ peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
+ txn, true, false);
+
return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
std::move(obc),
std::move(txn),
- req,
+ std::move(osd_op_p),
peering_state.get_last_peering_reset(),
map_epoch,
- at_version).then([this](auto acked) {
+ std::move(log_entries)).then(
+ [this, last_complete=peering_state.get_info().last_complete,
+ at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
}
+ peering_state.complete_write(at_version, last_complete);
return seastar::now();
});
}
: m->get_hobj();
auto ox =
std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+
return crimson::do_for_each(
m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
logger().debug(
"do_osd_ops: {} - object {} all operations successful",
*m,
obc->obs.oi.soid);
- return std::move(*ox).submit_changes(
- [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
- // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
- if (txn.empty()) {
+ return std::move(*ox).submit_changes([this, m]
+ (auto&& txn, auto&& obc, auto&& osd_op_p) -> osd_op_errorator::future<> {
+ // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
+ if (txn.empty()) {
logger().debug(
"do_osd_ops: {} - object {} txn is empty, bypassing mutate",
*m,
"do_osd_ops: {} - object {} submitting txn",
*m,
obc->obs.oi.soid);
- return submit_transaction(std::move(obc), std::move(txn), *m);
- }
+ return submit_transaction(std::move(obc), std::move(txn),
+ std::move(osd_op_p));
+ }
});
}).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
decode(txn, encoded_txn);
+ auto p = req->logbl.cbegin();
+ std::vector<pg_log_entry_t> log_entries;
+ decode(log_entries, p);
+ peering_state.append_log(std::move(log_entries), req->pg_trim_to,
+ req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
.then([req, lcod=peering_state.get_info().last_complete, this] {
peering_state.update_last_complete_ondisk(lcod);
class MQuery;
class PGBackend;
class PGPeeringEvent;
+class osd_op_params_t;
+
namespace recovery {
class Context;
}
return peering_state.get_osdmap_epoch();
}
+ eversion_t get_pg_trim_to() const {
+ return peering_state.get_pg_trim_to();
+ }
+
+ eversion_t get_min_last_complete_ondisk() const {
+ return peering_state.get_min_last_complete_ondisk();
+ }
+
+ const pg_info_t& get_info() const {
+ return peering_state.get_info();
+ }
+
// DoutPrefixProvider
std::ostream& gen_prefix(std::ostream& out) const final {
return out << *this;
uint64_t limit);
seastar::future<> submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
- const MOSDOp& req);
+ const osd_op_params_t& oop);
private:
OSDMapGate osdmap_gate;
public:
cached_map_t get_osdmap() { return osdmap; }
+ eversion_t next_version() {
+ return eversion_t(projected_last_update.epoch,
+ ++projected_last_update.version);
+ }
private:
std::unique_ptr<PGBackend> backend;
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
- const MOSDOp& m,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
- eversion_t ver)
+ std::vector<pg_log_entry_t>&& log_entries)
{
logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
if (obc->obs.exists) {
obc->obs.oi.prior_version = ctx->obs->oi.version;
#endif
- obc->obs.oi.last_reqid = m.get_reqid();
- obc->obs.oi.mtime = m.get_mtime();
+ auto& m = osd_op_p.req;
+ obc->obs.oi.prior_version = obc->obs.oi.version;
+ obc->obs.oi.version = osd_op_p.at_version;
+ if (osd_op_p.user_at_version > obc->obs.oi.user_version)
+ obc->obs.oi.user_version = osd_op_p.user_at_version;
+ obc->obs.oi.last_reqid = m->get_reqid();
+ obc->obs.oi.mtime = m->get_mtime();
obc->obs.oi.local_mtime = ceph_clock_now();
// object_info_t
}
return _submit_transaction(
std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
- m.get_reqid(), min_epoch, map_epoch, ver);
+ std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries));
}
static inline bool _read_verify_data(
#include "crimson/common/shared_lru.h"
#include "osd/osd_types.h"
#include "crimson/osd/object_context.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
struct hobject_t;
class MOSDRepOpReply;
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
- const MOSDOp& m,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch,
epoch_t map_epoch,
- eversion_t ver);
+ std::vector<pg_log_entry_t>&& log_entries);
seastar::future<std::vector<hobject_t>, hobject_t> list_objects(
const hobject_t& start,
uint64_t limit) const;
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
- osd_reqid_t req_id,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
- eversion_t ver) = 0;
+ std::vector<pg_log_entry_t>&& log_entries) = 0;
};
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg.h"
+#include "osd/PeeringState.h"
namespace {
seastar::logger& logger() {
ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
- osd_reqid_t req_id,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch, epoch_t map_epoch,
- eversion_t ver)
+ std::vector<pg_log_entry_t>&& log_entries)
{
const ceph_tid_t tid = next_txn_id++;
+ auto req_id = osd_op_p.req->get_reqid();
auto pending_txn =
pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
bufferlist encoded_txn;
spg_t{pgid, pg_shard.shard}, hoid,
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
map_epoch, min_epoch,
- tid, ver);
+ tid, osd_op_p.at_version);
m->set_data(encoded_txn);
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+ encode(log_entries, m->logbl);
+ m->pg_trim_to = osd_op_p.pg_trim_to;
+ m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
+ m->set_rollback_to(osd_op_p.at_version);
// TODO: set more stuff. e.g., pg_states
return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
}
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
- osd_reqid_t req_id,
+ const osd_op_params_t& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
- eversion_t ver) final;
+ std::vector<pg_log_entry_t>&& log_entries) final;
const pg_t pgid;
const pg_shard_t whoami;
crimson::osd::ShardServices& shard_services;
std::optional<eversion_t> trim_to,
std::optional<eversion_t> roll_forward_to);
+ void append_log_with_trim_to_updated(
+ std::vector<pg_log_entry_t>&& log_entries,
+ eversion_t roll_forward_to,
+ ObjectStore::Transaction &t,
+ bool transaction_applied,
+ bool async) {
+ update_trim_to();
+ append_log(std::move(log_entries), pg_trim_to, roll_forward_to,
+ min_last_complete_ondisk, t, transaction_applied, async);
+ }
+
/**
* Updates local log to reflect new write from primary.
*/