existing->lock.Unlock();
return 0;
}
- existing->write_lock.Unlock();
existing->lock.Unlock();
open:
if (reply.authorizer_len)
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
- uint64_t s = in_seq.read();
- if (reply.tag == CEPH_MSGR_TAG_SEQ)
+ if (reply.tag == CEPH_MSGR_TAG_SEQ) {
+ uint64_t s = in_seq.read();
reply_bl.append((char*)&s, sizeof(s));
+ }
lock.Unlock();
// Because "replacing" will prevent other connections preempt this addr,
prepare_send_message(f, m, bl);
Mutex::Locker l(write_lock);
- m->set_req(out_seq.inc());
+ m->set_seq(out_seq.inc());
// "features" changes will change the payload encoding
if (can_write == NOWRITE || get_features() != f) {
// ensure the correctness of message encoding
bl.clear();
ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
- << f << " != " << get_features() << dedl;
- } else {
- inject_msg_header_crc(m, bl);
+ << f << " != " << get_features() << dendl;
}
if (!is_queued() && can_write == CANWRITE) {
if (write_message(m, bl) < 0) {
list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!sent.empty()) {
- pair<bufferlist, Message*> p = sent.back();
+ Message* m = sent.back();
sent.pop_back();
- ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
- << " (" << p.second->get_seq() << ")" << dendl;
- rq.push_front(p);
+ ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
+ << " (" << m->get_seq() << ")" << dendl;
+ rq.push_front(make_pair(bufferlist(), m));
}
}
ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
assert(write_lock.is_locked());
- for (list<pair<bufferlist, Message*> >::iterator p = sent.begin(); p != sent.end(); ++p) {
- ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl;
- p->second->put();
+ for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
+ ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
+ (*p)->put();
}
sent.clear();
for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
{
if (state == STATE_CLOSED) {
ldout(async_msgr->cct, 10) << __func__ << " state is already " << get_state_name(state) << dendl;
- center->dispatch_event_external(reset_handler);
return ;
}
void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
{
- assert(write_lock.is_locked());
ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
}
}
- // send tag
- char tag = CEPH_MSGR_TAG_MSG;
- bl.append(&tag, sizeof(tag));
-
- ceph_msg_header_old oldheader;
- if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
- bl.append((char*)&header, sizeof(header));
- } else {
- memcpy(&oldheader, &header, sizeof(header));
- oldheader.src.name = header.src;
- oldheader.src.addr = get_peer_addr();
- oldheader.orig_src = oldheader.src;
- oldheader.reserved = header.reserved;
- // delay crc calculate to "inject_msg_header_crc"
- oldheader.crc = 0;
- bl.append((char*)&oldheader, sizeof(oldheader));
- }
-
- bl.claim_append(m->get_payload());
+ bl.append(m->get_payload());
bl.append(m->get_middle());
bl.append(m->get_data());
{
assert(write_lock.is_locked());
assert(can_write == CANWRITE);
+
if (!policy.lossy) {
// put on sent list
- sent.push_back(make_pair(bl, m));
+ sent.push_back(m);
m->get();
}
+ bufferlist complete_bl;
+ // send tag
+ char tag = CEPH_MSGR_TAG_MSG;
+ complete_bl.append(&tag, sizeof(tag));
+
+ m->calc_header_crc();
+ ceph_msg_header& header = m->get_header();
+ if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
+ complete_bl.append((char*)&header, sizeof(header));
+ } else {
+ ceph_msg_header_old oldheader;
+ memcpy(&oldheader, &header, sizeof(header));
+ oldheader.src.name = header.src;
+ oldheader.src.addr = get_peer_addr();
+ oldheader.orig_src = oldheader.src;
+ oldheader.reserved = header.reserved;
+ // delay crc calculate to "inject_msg_header_crc"
+ oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
+ sizeof(oldheader) - sizeof(oldheader.crc));
+ complete_bl.append((char*)&oldheader, sizeof(oldheader));
+ }
+
+ complete_bl.claim_append(bl);
+
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
- int rc = _try_send(bl);
+ int rc = _try_send(complete_bl);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(errno) << dendl;
ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
// trim sent list
Mutex::Locker l(write_lock);
- while (!sent.empty() && sent.front().second->get_seq() <= seq) {
- pair<bufferlist, Message*> p = sent.front();
+ while (!sent.empty() && sent.front()->get_seq() <= seq) {
+ Message* m = sent.front();
sent.pop_front();
ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
- << seq << " >= " << p.second->get_seq() << " on "
- << p.second << " " << *(p.second) << dendl;
- p.second->put();
+ << seq << " >= " << m->get_seq() << " on "
+ << m << " " << *m << dendl;
+ m->put();
}
}
if (!m)
break;
- // send_message may not encode message
- if (!data.length()) {
+ // send_message or requeue messages may not encode message
+ if (!data.length())
prepare_send_message(get_features(), m, data);
- inject_msg_header_crc(m, bl);
- }
r = write_message(m, data);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
+ write_lock.Unlock();
goto fail;
} else if (r > 0) {
break;
bl.append(CEPH_MSGR_TAG_ACK);
bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
- ack_left.sub(ack_left);
+ ack_left.sub(left);
r = _try_send(bl);
} else if (is_queued()) {
r = _try_send(bl);
}
+ write_lock.Unlock();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
}
- write_lock.Unlock();
} else {
write_lock.Unlock();
-
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
- if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
- ldout(msgr->cct, 10) << __func__ << " sleep for "
- << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
- t.sleep();
- }
- }
-
lock.Lock();
write_lock.Lock();
if (state == STATE_STANDBY && !policy.server && is_queued()) {
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
write_lock.Unlock();
- goto fail;
+ fault();
+ lock.Unlock();
+ return ;
}
}
write_lock.Unlock();
}
return ;
+
fail:
+ lock.Lock();
fault();
lock.Unlock();
}