last_complete_ondisk(recovery_state.last_complete_ondisk),
last_update_applied(recovery_state.last_update_applied),
last_rollback_info_trimmed_to_applied(recovery_state.last_rollback_info_trimmed_to_applied),
- flushes_in_progress(recovery_state.flushes_in_progress),
stray_set(recovery_state.stray_set),
peer_info(recovery_state.peer_info),
peer_bytes(recovery_state.peer_bytes),
osd->send_message_osd_cluster(get_primary().osd, m, get_osdmap_epoch());
// waiters
- if (flushes_in_progress == 0) {
+ if (recovery_state.needs_flush() == 0) {
requeue_ops(waiting_for_peered);
} else if (!waiting_for_peered.empty()) {
dout(10) << __func__ << " flushes in progress, moving "
return false;
}
-void PG::set_last_peering_reset()
-{
- dout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
- if (last_peering_reset != get_osdmap_epoch()) {
- last_peering_reset = get_osdmap_epoch();
- reset_interval_flush();
- }
-}
-
struct FlushState {
PGRef pg;
epoch_t epoch;
FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
~FlushState() {
pg->lock();
- if (!pg->pg_has_reset_since(epoch))
- pg->on_flushed();
+ if (!pg->pg_has_reset_since(epoch)) {
+ pg->recovery_state.complete_flush();
+ }
pg->unlock();
}
};
typedef std::shared_ptr<FlushState> FlushStateRef;
-void PG::start_flush(ObjectStore::Transaction *t)
+void PG::start_flush_on_transaction(ObjectStore::Transaction *t)
{
// flush in progress ops
FlushStateRef flush_trigger (std::make_shared<FlushState>(
this, get_osdmap_epoch()));
- flushes_in_progress++;
t->register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
t->register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
}
-void PG::reset_interval_flush()
+bool PG::try_flush_or_schedule_async()
{
- dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
- recovery_state.clear_blocked_outgoing();
Context *c = new QueuePeeringEvt<PeeringState::PeeringState::IntervalFlush>(
this, get_osdmap_epoch(), PeeringState::IntervalFlush());
if (!ch->flush_commit(c)) {
- dout(10) << "Beginning to block outgoing recovery messages" << dendl;
- recovery_state.begin_block_outgoing();
+ return false;
} else {
- dout(10) << "Not blocking outgoing recovery messages" << dendl;
delete c;
+ return true;
}
}
{
const OSDMapRef osdmap = get_osdmap();
- set_last_peering_reset();
+ recovery_state.set_last_peering_reset();
vector<int> oldacting, oldup;
int oldrole = get_role();
friend class NamedState;
friend class PeeringState;
+protected:
PeeringState recovery_state;
public:
using PeeringCtx = PeeringState::PeeringCtx;
eversion_t &last_complete_ondisk;
eversion_t &last_update_applied;
eversion_t &last_rollback_info_trimmed_to_applied;
- unsigned &flushes_in_progress;
set<pg_shard_t> &stray_set;
map<pg_shard_t, pg_info_t> &peer_info;
map<pg_shard_t, int64_t> &peer_bytes;
boost::optional<eversion_t> trim_to,
boost::optional<eversion_t> roll_forward_to);
- void reset_interval_flush();
+ bool try_flush_or_schedule_async() override;
void start_peering_interval(
const OSDMapRef lastmap,
const vector<int>& newup, int up_primary,
ObjectStore::Transaction *t);
void on_new_interval();
virtual void _on_new_interval() = 0;
- void start_flush(ObjectStore::Transaction *t);
- void set_last_peering_reset();
+ void start_flush_on_transaction(
+ ObjectStore::Transaction *t) override;
void update_history(const pg_history_t& history) {
recovery_state.update_history(history);
write_if_dirty(*rctx->transaction);
}
+void PeeringState::set_last_peering_reset()
+{
+ psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl;
+ if (last_peering_reset != get_osdmap_epoch()) {
+ dout(10) << "Clearing blocked outgoing recovery messages" << dendl;
+ clear_blocked_outgoing();
+ if (!pl->try_flush_or_schedule_async()) {
+ psdout(10) << "Beginning to block outgoing recovery messages" << dendl;
+ begin_block_outgoing();
+ } else {
+ psdout(10) << "Not blocking outgoing recovery messages" << dendl;
+ }
+ }
+}
+
+void PeeringState::complete_flush()
+{
+ flushes_in_progress--;
+ if (flushes_in_progress == 0) {
+ pl->on_flushed();
+ }
+}
/*------------ Peering State Machine----------------*/
#undef dout_prefix
boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify)
{
PeeringState *ps = context< PeeringMachine >().state;
- PG *pg = context< PeeringMachine >().pg;
ps->proc_replica_info(
notify.from, notify.notify.info, notify.notify.epoch_sent);
- pg->set_last_peering_reset();
+ ps->set_last_peering_reset();
return transit< Primary >();
}
PeeringState::Started::react(const IntervalFlush&)
{
psdout(10) << "Ending blocked outgoing recovery messages" << dendl;
- context< PeeringMachine >().pg->recovery_state.end_block_outgoing();
+ context< PeeringMachine >().state->end_block_outgoing();
return discard_event();
}
NamedState(context< PeeringMachine >().state_history, "Reset")
{
context< PeeringMachine >().log_enter(state_name);
- PG *pg = context< PeeringMachine >().pg;
+ PeeringState *ps = context< PeeringMachine >().state;
- pg->flushes_in_progress = 0;
- pg->set_last_peering_reset();
+ ps->flushes_in_progress = 0;
+ ps->set_last_peering_reset();
}
boost::statechart::result
PeeringState::Reset::react(const IntervalFlush&)
{
psdout(10) << "Ending blocked outgoing recovery messages" << dendl;
- context< PeeringMachine >().pg->recovery_state.end_block_outgoing();
+ context< PeeringMachine >().state->end_block_outgoing();
return discard_event();
}
{
context< PeeringMachine >().log_enter(state_name);
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
ceph_assert(!pg->backfill_reserving);
ceph_assert(!pg->backfill_reserved);
ceph_assert(pg->is_primary());
psdout(10) << "In Active, about to call activate" << dendl;
- pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+ ps->start_flush(context< PeeringMachine >().get_cur_transaction());
pg->activate(*context< PeeringMachine >().get_cur_transaction(),
pg->get_osdmap_epoch(),
*context< PeeringMachine >().get_query_map(),
boost::statechart::result PeeringState::Active::react(const AllReplicasActivated &evt)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
pg_t pgid = context< PeeringMachine >().spgid.pgid;
pg->check_local();
// waiters
- if (pg->flushes_in_progress == 0) {
+ if (ps->flushes_in_progress == 0) {
pg->requeue_ops(pg->waiting_for_peered);
} else if (!pg->waiting_for_peered.empty()) {
psdout(10) << __func__ << " flushes in progress, moving "
{
context< PeeringMachine >().log_enter(state_name);
- PG *pg = context< PeeringMachine >().pg;
- pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+ PeeringState *ps = context< PeeringMachine >().state;
+ ps->start_flush(context< PeeringMachine >().get_cur_transaction());
}
{
context< PeeringMachine >().log_enter(state_name);
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
ceph_assert(!pg->is_peered());
ceph_assert(!pg->is_peering());
ldout(pg->cct,10) << __func__ << " pool is deleted" << dendl;
post_event(DeleteStart());
} else {
- pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+ ps->start_flush(context< PeeringMachine >().get_cur_transaction());
}
}
boost::statechart::result PeeringState::GetLog::react(const GotLog&)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
psdout(10) << "leaving GetLog" << dendl;
if (msg) {
msg->info, msg->log, msg->missing,
auth_log_shard);
}
- pg->start_flush(context< PeeringMachine >().get_cur_transaction());
+ ps->start_flush(context< PeeringMachine >().get_cur_transaction());
return transit< GetMissing >();
}
virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0;
+ // Flush state
+ virtual bool try_flush_or_schedule_async() = 0;
+ virtual void start_flush_on_transaction(
+ ObjectStore::Transaction *t) = 0;
+ virtual void on_flushed() = 0;
+
virtual void check_recovery_sources(const OSDMapRef& newmap) = 0;
virtual void on_pool_change() = 0;
virtual void on_role_change() = 0;
virtual void on_change(ObjectStore::Transaction *t) = 0;
virtual void on_activate() = 0;
- virtual void on_flushed() = 0;
virtual void check_blacklisted_watchers() = 0;
virtual ~PeeringListener() {}
};
void purge_strays();
void update_history(const pg_history_t& new_history);
void check_recovery_sources(const OSDMapRef& map);
+ void set_last_peering_reset();
public:
PeeringState(
bool is_repair() const { return state_test(PG_STATE_REPAIR); }
bool is_empty() const { return info.last_update == eversion_t(0,0); }
+ // Flush control interface
+private:
+ void start_flush(ObjectStore::Transaction *t) {
+ flushes_in_progress++;
+ pl->start_flush_on_transaction(t);
+ }
+public:
+ bool needs_flush() const {
+ return flushes_in_progress > 0;
+ }
+ void complete_flush();
};
}
}
- if (flushes_in_progress > 0) {
- dout(20) << flushes_in_progress
- << " flushes_in_progress pending "
- << "waiting for flush on " << op << dendl;
+ if (recovery_state.needs_flush()) {
+ dout(20) << "waiting for flush on " << op << dendl;
waiting_for_flush.push_back(op);
op->mark_delayed("waiting for flush");
return;
}
- ceph_assert(is_peered() && flushes_in_progress == 0);
+ ceph_assert(is_peered() && !recovery_state.needs_flush());
if (pgbackend->handle_message(op))
return;
void PrimaryLogPG::on_flushed()
{
- ceph_assert(flushes_in_progress > 0);
- flushes_in_progress--;
- if (flushes_in_progress == 0) {
- requeue_ops(waiting_for_flush);
- }
+ requeue_ops(waiting_for_flush);
if (!is_peered() || !is_primary()) {
pair<hobject_t, ObjectContextRef> i;
while (object_contexts.get_next(i.first, &i)) {