#include "crimson/os/futurized_store.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/object_context_loader.h"
+#include "crimson/osd/pg.h"
#include "replicated_backend.h"
#include "replicated_recovery_backend.h"
#include "ec_backend.h"
PGBackend::create(pg_t pgid,
const pg_shard_t pg_shard,
const pg_pool_t& pool,
+ crimson::osd::PG& pg,
crimson::os::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile,
{
switch (pool.type) {
case pg_pool_t::TYPE_REPLICATED:
- return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
+ return std::make_unique<ReplicatedBackend>(pgid, pg_shard, pg,
coll, shard_services,
dpp);
case pg_pool_t::TYPE_ERASURE:
#include "crimson/common/exception.h"
#include "crimson/common/log.h"
#include "crimson/os/futurized_store.h"
+#include "crimson/osd/pg.h"
#include "crimson/osd/shard_services.h"
#include "osd/PeeringState.h"
ReplicatedBackend::ReplicatedBackend(pg_t pgid,
pg_shard_t whoami,
+ crimson::osd::PG& pg,
ReplicatedBackend::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
DoutPrefixProvider &dpp)
: PGBackend{whoami.shard, coll, shard_services, dpp},
pgid{pgid},
- whoami{whoami}
+ whoami{whoami},
+ pg(pg)
{}
ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
std::vector<pg_log_entry_t>&& log_entries)
{
LOG_PREFIX(ReplicatedBackend::_submit_transaction);
+ DEBUGDPP("object {}, {}", dpp, hoid);
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
bufferlist encoded_txn;
encode(txn, encoded_txn);
- DEBUGDPP("object {}", dpp, hoid);
- auto all_completed = interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(txn))
- ).then_interruptible([FNAME, this,
- peers=pending_txn->second.weak_from_this()] {
- if (!peers) {
- // for now, only actingset_changed can cause peers
- // to be nullptr
- ERRORDPP("peers is null, this should be impossible", dpp);
- assert(0 == "impossible");
- }
- if (--peers->pending == 0) {
- peers->all_committed.set_value();
- peers->all_committed = {};
- return seastar::now();
- }
- return peers->all_committed.get_shared_future();
- }).then_interruptible([pending_txn, this] {
- auto acked_peers = std::move(pending_txn->second.acked_peers);
- pending_trans.erase(pending_txn);
- return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
- });
-
auto sends = std::make_unique<std::vector<seastar::future<>>>();
for (auto pg_shard : pg_shards) {
if (pg_shard != whoami) {
m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
m->set_rollback_to(osd_op_p.at_version);
// TODO: set more stuff. e.g., pg_states
- sends->emplace_back(shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch));
+ sends->emplace_back(
+ shard_services.send_to_osd(
+ pg_shard.osd, std::move(m), map_epoch));
}
}
+
+ pg.log_operation(
+ std::move(log_entries),
+ osd_op_p.pg_trim_to,
+ osd_op_p.at_version,
+ osd_op_p.min_last_complete_ondisk,
+ true,
+ txn,
+ false);
+
+ auto all_completed = interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(txn))
+ ).then_interruptible([FNAME, this,
+ peers=pending_txn->second.weak_from_this()] {
+ if (!peers) {
+ // for now, only actingset_changed can cause peers
+ // to be nullptr
+ ERRORDPP("peers is null, this should be impossible", dpp);
+ assert(0 == "impossible");
+ }
+ if (--peers->pending == 0) {
+ peers->all_committed.set_value();
+ peers->all_committed = {};
+ return seastar::now();
+ }
+ return peers->all_committed.get_shared_future();
+ }).then_interruptible([pending_txn, this] {
+ auto acked_peers = std::move(pending_txn->second.acked_peers);
+ pending_trans.erase(pending_txn);
+ return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
+ });
+
auto sends_complete = seastar::when_all_succeed(
sends->begin(), sends->end()
).finally([sends=std::move(sends)] {});