seastar::future<> stop() final {
return seastar::now();
}
- void on_actingset_changed(peering_info_t pi) final {}
+ void on_actingset_changed(bool same_primary) final {}
private:
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
PeeringState::AllReplicasRecovered{});
}
publish_stats_to_osd();
- backend->on_activate_complete();
}
void PG::prepare_write(pg_info_t &info,
logger().debug("{} {}:", *this, __func__);
obc_loader.notify_on_change(is_primary());
recovery_backend->on_peering_interval_change(t);
- backend->on_actingset_changed({ is_primary() });
+ backend->on_actingset_changed(is_primary());
wait_for_active_blocker.unblock();
if (is_primary()) {
logger().debug("{} {}: requeueing", *this, __func__);
<PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
return interruptor::make_interruptible(store->get_attrs(
coll,
ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
return interruptor::make_interruptible(store->list_objects(coll,
gstart,
const hobject_t& soid,
std::string_view key) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
return store->get_attr(coll, ghobject_t{soid}, key);
}
const hobject_t& soid,
std::string&& key) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
return seastar::do_with(key, [this, &soid](auto &key) {
return store->get_attr(coll, ghobject_t{soid}, key);
});
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
[&delta_stats, &osd_op](auto&& attrs) {
std::vector<std::pair<std::string, bufferlist>> user_xattrs;
const OSDOp& osd_op,
ceph::os::Transaction& txn)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: {} DNE", __func__, os.oi.soid);
return crimson::ct_error::enoent::make();
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", os.oi.soid);
return crimson::ct_error::enoent::make();
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", os.oi.soid);
return crimson::ct_error::enoent::make();
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", __func__, os.oi.soid);
return crimson::ct_error::enoent::make();
osd_op_params_t& osd_op_params,
object_stat_sum_t& delta_stats)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{}: object does not exist: {}", os.oi.soid);
return crimson::ct_error::enoent::make();
read_errorator::pass_further{});
}
-void PGBackend::on_activate_complete() {
- peering.reset();
-}
-
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
virtual seastar::future<> stop() = 0;
- struct peering_info_t {
- bool is_primary;
- };
- virtual void on_actingset_changed(peering_info_t pi) = 0;
- virtual void on_activate_complete();
+ virtual void on_actingset_changed(bool same_primary) = 0;
protected:
const shard_id_t shard;
CollectionRef coll;
crimson::osd::ShardServices &shard_services;
DoutPrefixProvider &dpp; ///< provides log prefix context
crimson::os::FuturizedStore* store;
- bool stopping = false;
- std::optional<peering_info_t> peering;
virtual seastar::future<> request_committed(
const osd_reqid_t& reqid,
const eversion_t& at_version) = 0;
const uint64_t len,
const uint32_t flags)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
std::vector<pg_log_entry_t>&& log_entries)
{
LOG_PREFIX(ReplicatedBackend::_submit_transaction);
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- if (__builtin_expect((bool)peering, false)) {
- throw crimson::common::actingset_changed(peering->is_primary);
- }
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
DEBUGDPP("object {}", dpp, hoid);
auto all_completed = interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(txn)))
- .then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
+ shard_services.get_store().do_transaction(coll, std::move(txn))
+ ).then_interruptible([FNAME, this,
+ peers=pending_txn->second.weak_from_this()] {
if (!peers) {
// for now, only actingset_changed can cause peers
// to be nullptr
- assert(peering);
- throw crimson::common::actingset_changed(peering->is_primary);
+ ERRORDPP("peers is null, this should be impossible", dpp);
+ assert(0 == "impossible");
}
if (--peers->pending == 0) {
peers->all_committed.set_value();
return {std::move(sends_complete), std::move(all_completed)};
}
-void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
+void ReplicatedBackend::on_actingset_changed(bool same_primary)
{
- peering.emplace(pi);
- crimson::common::actingset_changed e_actingset_changed{peering->is_primary};
+ crimson::common::actingset_changed e_actingset_changed{same_primary};
for (auto& [tid, pending_txn] : pending_trans) {
pending_txn.all_committed.set_exception(e_actingset_changed);
}
{
LOG_PREFIX(ReplicatedBackend::stop);
INFODPP("cid {}", coll->get_cid());
- stopping = true;
for (auto& [tid, pending_on] : pending_trans) {
pending_on.all_committed.set_exception(
crimson::common::system_shutdown_exception());
DoutPrefixProvider &dpp);
void got_rep_op_reply(const MOSDRepOpReply& reply) final;
seastar::future<> stop() final;
- void on_actingset_changed(peering_info_t pi) final;
+ void on_actingset_changed(bool same_primary) final;
private:
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid, uint64_t off,