}
};
+class actingset_changed final : public std::exception {
+public:
+ actingset_changed(bool sp) : still_primary(sp) {}
+ const char* what() const noexcept final {
+ return "acting set changed";
+ }
+ bool is_primary() const {
+ return still_primary;
+ }
+private:
+ const bool still_primary;
+};
+
template<typename Func, typename... Args>
inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args)
{
seastar::future<> stop() final {
return seastar::now();
}
+ void on_actingset_changed(peering_info_t pi) final {}
private:
ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
uint64_t off,
IRef opref = this;
return crimson::common::handle_system_shutdown(
[this, opref=std::move(opref)]() mutable {
- return with_blocking_future(handle.enter(cp().await_map))
- .then([this]() {
- return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
- }).then([this](epoch_t epoch) {
- return with_blocking_future(handle.enter(cp().get_pg));
- }).then([this] {
- return with_blocking_future(osd.wait_for_pg(m->get_spg()));
- }).then([this, opref=std::move(opref)](Ref<PG> pgref) {
- return seastar::do_with(
- std::move(pgref), std::move(opref), [this](auto& pgref, auto& opref) {
- PG &pg = *pgref;
+ return seastar::repeat([this, opref]() mutable {
+ return with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ }).then([this, opref](Ref<PG> pgref) {
+ PG &pg = *pgref;
+ return with_blocking_future(
+ handle.enter(pp(pg).await_map)
+ ).then([this, &pg]() mutable {
return with_blocking_future(
- handle.enter(pp(pg).await_map)
- ).then([this, &pg]() mutable {
- return with_blocking_future(
- pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
- }).then([this, &pg](auto map) mutable {
- return with_blocking_future(
- handle.enter(pp(pg).wait_for_active));
- }).then([this, &pg]() mutable {
- return with_blocking_future(pg.wait_for_active_blocker.wait());
- }).then([this, &pgref]() mutable {
- if (m->finish_decode()) {
- m->clear_payload();
- }
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(pgref);
- }
- });
+ pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
+ }).then([this, &pg](auto map) mutable {
+ return with_blocking_future(
+ handle.enter(pp(pg).wait_for_active));
+ }).then([this, &pg]() mutable {
+ return with_blocking_future(pg.wait_for_active_blocker.wait());
+ }).then([this, pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(pgref);
+ }
});
+ }).then([] {
+ return seastar::stop_iteration::yes;
+ }).handle_exception_type([](crimson::common::actingset_changed& e) {
+ if (e.is_primary()) {
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "operation restart, acting set changed");
+ return seastar::stop_iteration::no;
+ } else {
+ crimson::get_logger(ceph_subsys_osd).debug(
+ "operation abort, up primary changed");
+ return seastar::stop_iteration::yes;
+ }
+ });
});
});
}
Ref<PG> &pg)
{
return pg->do_pg_ops(m)
- .then([this](Ref<MOSDOpReply> reply) {
+ .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
}
PG& pg = *pgref;
return with_blocking_future(
handle.enter(pp(pg).recover_missing)
- ).then([this, &pg, pgref=std::move(pgref)] {
+ ).then([this, &pg, pgref] {
eversion_t ver;
const hobject_t& soid = m->get_hobj();
if (pg.is_unreadable_object(soid, &ver)) {
return conn->send(reply);
});
});
- }).safe_then([] {
+ }).safe_then([pgref=std::move(pgref)] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
get_osdmap_epoch(),
PeeringState::AllReplicasRecovered{});
}
+ backend->on_activate_complete();
}
void PG::prepare_write(pg_info_t &info,
});
}
+void PG::on_change(ceph::os::Transaction &t) {
+ recovery_backend->on_peering_interval_change(t);
+ backend->on_actingset_changed({ is_primary() });
+}
+
}
void on_role_change() final {
// Not needed yet
}
- void on_change(ceph::os::Transaction &t) final {
- recovery_backend->on_peering_interval_change(t);
- }
+ void on_change(ceph::os::Transaction &t) final;
void on_activate(interval_set<snapid_t> to_trim) final;
void on_activate_complete() final;
void on_new_interval() final {
epoch_t map_epoch,
std::vector<pg_log_entry_t>&& log_entries)
{
+ if (__builtin_expect((bool)peering, false)) {
+ throw crimson::common::actingset_changed(peering->is_primary);
+ }
logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
if (obc->obs.exists) {
#if 0
return store->fiemap(c, oid, off, len);
}
+void PGBackend::on_activate_complete() {
+ peering.reset();
+}
+
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
virtual seastar::future<> stop() = 0;
+ struct peering_info_t {
+ bool is_primary;
+ };
+ virtual void on_actingset_changed(peering_info_t pi) = 0;
+ virtual void on_activate_complete();
protected:
const shard_id_t shard;
CollectionRef coll;
crimson::os::FuturizedStore* store;
bool stopping = false;
+ std::optional<peering_info_t> peering;
public:
struct loaded_object_md_t {
ObjectState os;
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
+ if (__builtin_expect((bool)peering, false)) {
+ throw crimson::common::actingset_changed(peering->is_primary);
+ }
const ceph_tid_t tid = next_txn_id++;
auto req_id = osd_op_p.req->get_reqid();
auto pending_txn =
- pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
+ pending_trans.emplace(tid, pg_shards.size()).first;
bufferlist encoded_txn;
encode(txn, encoded_txn);
// TODO: set more stuff. e.g., pg_states
return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
}
- }).then([&peers=pending_txn->second] {
- if (--peers.pending == 0) {
- peers.all_committed.set_value();
+ }).then([this, peers=pending_txn->second.weak_from_this()] {
+ if (!peers) {
+ // for now, only actingset_changed can cause peers
+ // to be nullptr
+ assert(peering);
+ throw crimson::common::actingset_changed(peering->is_primary);
}
- return peers.all_committed.get_future();
+ if (--peers->pending == 0) {
+ peers->all_committed.set_value();
+ }
+ return peers->all_committed.get_future();
}).then([pending_txn, this] {
pending_txn->second.all_committed = {};
auto acked_peers = std::move(pending_txn->second.acked_peers);
});
}
+void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
+{
+ peering.emplace(pi);
+ crimson::common::actingset_changed e_actingset_changed{peering->is_primary};
+ for (auto& [tid, pending_txn] : pending_trans) {
+ pending_txn.all_committed.set_exception(e_actingset_changed);
+ }
+}
+
void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
{
auto found = pending_trans.find(reply.get_tid());
pending_on.all_committed.set_exception(
crimson::common::system_shutdown_exception());
}
+ pending_trans.clear();
return seastar::now();
}
#include <boost/intrusive_ptr.hpp>
#include <seastar/core/future.hh>
+#include <seastar/core/weak_ptr.hh>
#include "include/buffer_fwd.h"
#include "osd/osd_types.h"
crimson::osd::ShardServices& shard_services);
void got_rep_op_reply(const MOSDRepOpReply& reply) final;
seastar::future<> stop() final;
+ void on_actingset_changed(peering_info_t pi) final;
private:
ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
uint64_t off,
const pg_shard_t whoami;
crimson::osd::ShardServices& shard_services;
ceph_tid_t next_txn_id = 0;
- struct pending_on_t {
+ class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
+ public:
pending_on_t(size_t pending)
: pending{static_cast<unsigned>(pending)}
{}