assert(!pg->is_active());
assert(!pg->is_peering());
assert(!pg->is_primary());
- if (!pg->is_replica()) // stray, need to flush for pulls
- pg->start_flush(
- context< RecoveryMachine >().get_cur_transaction(),
- context< RecoveryMachine >().get_on_applied_context_list(),
- context< RecoveryMachine >().get_on_safe_context_list());
+ pg->start_flush(
+ context< RecoveryMachine >().get_cur_transaction(),
+ context< RecoveryMachine >().get_on_applied_context_list(),
+ context< RecoveryMachine >().get_on_safe_context_list());
}
boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt)
RecoveryHandle *h ///< [in,out] handle to attach recovery op to
) = 0;
+ /**
+ * true if PGBackend can handle this message while inactive
+ *
+ * If it returns true, handle_message *must* also return true
+ */
+ virtual bool can_handle_while_inactive(OpRequestRef op) = 0;
+
/// gives PGBackend a crack at an incoming message
virtual bool handle_message(
OpRequestRef op ///< [in] message received
}
}
+bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
+{
+ dout(10) << __func__ << ": " << op << dendl;
+ switch (op->get_req()->get_type()) {
+ case MSG_OSD_PG_PULL:
+ return true;
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+ if (m->ops.size() >= 1) {
+ OSDOp *first = &m->ops[0];
+ switch (first->op.op) {
+ case CEPH_OSD_OP_PULL:
+ return true;
+ default:
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ default:
+ return false;
+ }
+}
+
bool ReplicatedBackend::handle_message(
OpRequestRef op
)
void check_recovery_sources(const OSDMapRef osdmap);
+ /// @see PGBackend::delay_message_until_active
+ bool can_handle_while_inactive(OpRequestRef op);
+
/// @see PGBackend::handle_message
bool handle_message(
OpRequestRef op
return;
}
+ if (!is_active()) {
+ // Delay unless PGBackend says it's ok
+ if (pgbackend->can_handle_while_inactive(op)) {
+ bool handled = pgbackend->handle_message(op);
+ assert(handled);
+ return;
+ } else {
+ waiting_for_active.push_back(op);
+ return;
+ }
+ }
+
+ assert(is_active() && flushes_in_progress == 0);
if (pgbackend->handle_message(op))
return;
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
- if (is_replay() || !is_active()) {
+ if (is_replay()) {
dout(20) << " replay, waiting for active on " << op << dendl;
waiting_for_active.push_back(op);
return;