// else return < 0 means error
int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
{
- assert(write_lock.is_locked());
ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
if (send_bl.length()) {
if (outcoming_bl.length())
if (!send)
return 0;
- // standby?
- if (is_queued() && state == STATE_STANDBY && !policy.server) {
- assert(!outcoming_bl.length());
- state = STATE_CONNECTING;
- center->dispatch_event_external(read_handler);
- return 0;
- }
-
- if (state == STATE_STANDBY) {
- ldout(async_msgr->cct, 1) << __func__ << " connection is standby" << dendl;
- return 0;
- }
- if (state == STATE_CLOSED) {
- ldout(async_msgr->cct, 1) << __func__ << " connection is closed" << dendl;
- return -EINTR;
- }
-
if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
// else return < 0 means error
int AsyncConnection::read_until(uint64_t len, char *p)
{
- assert(len);
ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
<< state_offset << dendl;
int r = 0;
uint64_t left = len - state_offset;
if (recv_end > recv_start) {
- assert(state_offset == 0);
uint64_t to_read = MIN(recv_end - recv_start, left);
memcpy(p, recv_buf+recv_start, to_read);
recv_start += to_read;
state_offset += to_read;
}
- assert(recv_end == recv_start);
recv_end = recv_start = 0;
/* nothing left in the prefetch buffer */
if (len > recv_max_prefetch) {
int AsyncConnection::write_message(Message *m, bufferlist& bl)
{
- assert(write_lock.is_locked());
assert(can_write == CANWRITE);
if (!policy.lossy) {