set_write_state(_write_state);
}
-seastar::future<> ProtocolV2::fault()
+void ProtocolV2::fault(bool backoff)
{
- logger().warn("{} fault during {}",
- conn, get_state_name(state));
- // TODO: <fault logic here: e.g. backoff, policies, etc.>
- // TODO: <conditions to call execute_standby()>
- // TODO: <conditions to call execute_connecting()>
- close();
- return seastar::now();
+ if (conn.policy.lossy) {
+ dispatch_reset();
+ close();
+ } else if (conn.policy.server || (conn.policy.standby && !is_queued())) {
+ execute_standby();
+ } else if (backoff) {
+ execute_wait(false);
+ } else {
+ execute_connecting();
+ }
}
void ProtocolV2::dispatch_reset()
}
}
}).handle_exception([this] (std::exception_ptr eptr) {
- // TODO: handle fault in CONNECTING state
- return fault();
+ logger().debug("{} execute_connecting(): got exception {} at state {}",
+ conn, eptr, get_state_name(state));
+ if (state != state_t::CONNECTING) {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ logger().debug("{} execute_connecting() protocol aborted", conn);
+ return;
+ }
+
+ if (conn.policy.server || (conn.policy.standby && !is_queued())) {
+ execute_standby();
+ } else {
+ execute_wait(false);
+ }
});
});
}
}
}
}).handle_exception([this] (std::exception_ptr eptr) {
- // TODO: handle fault in ACCEPTING state
- return fault();
+ logger().warn("{} execute_accepting(): got exception {} at state {}",
+ conn, eptr, get_state_name(state));
+ close();
});
});
}
}
});
}).handle_exception([this] (std::exception_ptr eptr) {
- // TODO: handle fault in READY state
- return fault();
+ logger().debug("{} execute_ready(): got exception {} at state {}",
+ conn, eptr, get_state_name(state));
+ if (state != state_t::READY) {
+ assert(state == state_t::REPLACING ||
+ state == state_t::CLOSING);
+ logger().debug("{} execute_ready() protocol aborted", conn);
+ return;
+ }
+ fault(false);
});
});
}