AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
: Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
- write_lock("AsyncConnection::write_lock"), can_write(NOWRITE),
+ write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
write_lock.Lock();
- can_write = CANWRITE;
+ can_write = WriteStatus::CANWRITE;
if (is_queued())
center->dispatch_event_external(write_handler);
write_lock.Unlock();
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));
write_lock.Lock();
- can_write = CANWRITE;
+ can_write = WriteStatus::CANWRITE;
if (is_queued())
center->dispatch_event_external(write_handler);
write_lock.Unlock();
ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
existing->_stop();
} else {
- assert(can_write == NOWRITE);
+ assert(can_write == WriteStatus::NOWRITE);
existing->write_lock.Lock(true);
// queue a reset on the new connection, which we're dumping for the old
center->dispatch_event_external(reset_handler);
existing->requeue_sent();
swap(existing->sd, sd);
- existing->can_write = NOWRITE;
+ existing->can_write = WriteStatus::NOWRITE;
existing->open_write = false;
existing->replacing = true;
existing->state_offset = 0;
if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
Mutex::Locker l(write_lock);
- if (can_write != CLOSED) {
+ if (can_write != WriteStatus::CLOSED) {
local_messages.push_back(m);
center->dispatch_event_external(local_deliver_handler);
} else {
Mutex::Locker l(write_lock);
// "features" changes will change the payload encoding
- if (can_fast_prepare && (can_write == NOWRITE || get_features() != f)) {
+ if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
// ensure the correctness of message encoding
bl.clear();
m->get_payload().clear();
- ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
+ ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
<< f << " != " << get_features() << dendl;
}
- if (!is_queued() && can_write == CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
+ if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
if (!can_fast_prepare)
prepare_send_message(get_features(), m, bl);
logger->inc(l_msgr_send_messages_inline);
// we want to handle fault within internal thread
center->dispatch_event_external(write_handler);
}
- } else if (can_write == CLOSED) {
+ } else if (can_write == WriteStatus::CLOSED) {
ldout(async_msgr->cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
m->put();
::close(sd);
sd = -1;
}
- can_write = NOWRITE;
+ can_write = WriteStatus::NOWRITE;
open_write = false;
// requeue sent items
// it's safe to directly set 0, double locked
ack_left.set(0);
once_ready = false;
- can_write = NOWRITE;
+ can_write = WriteStatus::NOWRITE;
}
void AsyncConnection::_stop()
state = STATE_CLOSED;
open_write = false;
- can_write = CLOSED;
+ can_write = WriteStatus::CLOSED;
state_offset = 0;
if (sd >= 0) {
shutdown_socket();
ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
{
- assert(can_write == CANWRITE);
+ assert(can_write == WriteStatus::CANWRITE);
m->set_seq(out_seq.inc());
if (!policy.lossy) {
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
Mutex::Locker l(write_lock);
- if (can_write != CLOSED) {
+ if (can_write != WriteStatus::CLOSED) {
keepalive = true;
center->dispatch_event_external(write_handler);
}
ssize_t r = 0;
write_lock.Lock();
- if (can_write == CANWRITE) {
+ if (can_write == WriteStatus::CANWRITE) {
if (keepalive) {
_send_keepalive_or_ack();
keepalive = false;