<< ").";
}
+// Notes:
+// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
+
const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
class C_time_wakeup : public EventCallback {
public:
C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
void do_request(int id) {
- //msgr->ms_fast_preprocess(m);
- //if (msgr->ms_can_fast_dispatch(m)) {
- // msgr->ms_fast_dispatch(m);
- //} else {
- msgr->ms_deliver_dispatch(m);
- //}
+ msgr->ms_deliver_dispatch(m);
}
};
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1),
- port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
+ out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
+ sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
+ open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
// else return < 0 means error
int AsyncConnection::_try_send(bufferlist send_bl, bool send)
{
- assert(lock.is_locked());
+ assert(write_lock.is_locked());
if (send_bl.length()) {
if (outcoming_bl.length())
outcoming_bl.claim_append(send_bl);
}
}
- uint64_t sent = 0;
+ uint64_t sent_bytes = 0;
list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
uint64_t left_pbrs = outcoming_bl.buffers().size();
while (left_pbrs) {
return r;
// "r" is the remaining length
- sent += msglen - r;
+ sent_bytes += msglen - r;
if (r > 0) {
ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
<< " needed to be sent, creating event for writing"
}
// trim already sent for outcoming_bl
- if (sent) {
+ if (sent_bytes) {
bufferlist bl;
- if (sent < outcoming_bl.length())
- outcoming_bl.splice(sent, outcoming_bl.length()-sent, &bl);
+ if (sent_bytes < outcoming_bl.length())
+ outcoming_bl.splice(sent_bytes, outcoming_bl.length()-sent_bytes, &bl);
bl.swap(outcoming_bl);
}
- ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent
+ ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes
<< " remaining bytes " << outcoming_bl.length() << dendl;
if (!open_write && is_queued()) {
ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
t = (ceph_timespec*)state_buffer;
utime_t kp_t = utime_t(*t);
+ write_lock.Lock();
_send_keepalive_or_ack(true, &kp_t);
+ write_lock.Unlock();
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
state = STATE_OPEN;
break;
switch(state) {
case STATE_WAIT_SEND:
{
+ Mutex::Locker l(write_lock);
if (!outcoming_bl.length()) {
assert(state_after_send);
state = state_after_send;
bufferlist bl;
bl.append(state_buffer, strlen(CEPH_BANNER));
- r = _try_send(bl);
+ r = try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_WAIT_IDENTIFY_PEER;
ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: "
}
::encode(async_msgr->get_myaddr(), myaddrbl);
- r = _try_send(myaddrbl);
+ r = try_send(myaddrbl);
if (r == 0) {
state = STATE_CONNECTING_SEND_CONNECT_MSG;
ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr "
ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
<< connect_seq << " proto=" << connect_msg.protocol_version << dendl;
- r = _try_send(bl);
+ r = try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
case STATE_CONNECTING_WAIT_ACK_SEQ:
{
uint64_t newly_acked_seq = 0;
- bufferlist bl;
r = read_until(sizeof(newly_acked_seq), state_buffer);
if (r < 0) {
newly_acked_seq = *((uint64_t*)state_buffer);
ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
- << " vs out_seq " << out_seq << dendl;
- while (newly_acked_seq > out_seq) {
- Message *m = _get_next_outgoing();
- assert(m);
- ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
- << " " << *m << dendl;
- assert(m->get_seq() <= newly_acked_seq);
- m->put();
- ++out_seq;
- }
+ << " vs out_seq " << out_seq.read() << dendl;
+ discard_requeued_up_to(newly_acked_seq);
+ //while (newly_acked_seq > out_seq.read()) {
+ // Message *m = _get_next_outgoing(NULL);
+ // assert(m);
+ // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
+ // << " " << *m << dendl;
+ // assert(m->get_seq() <= newly_acked_seq);
+ // m->put();
+ // out_seq.inc();
+ //}
+ bufferlist bl;
bl.append((char*)&in_seq, sizeof(in_seq));
- r = _try_send(bl);
+ r = try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_READY;
ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl;
// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
+ write_lock.Lock();
+ can_write = 1;
if (is_queued())
center->dispatch_event_external(write_handler);
+ write_lock.Unlock();
break;
}
::encode(socket_addr, bl);
ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl;
- r = _try_send(bl);
+ r = try_send(bl);
if (r == 0) {
state = STATE_ACCEPTING_WAIT_BANNER_ADDR;
ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: "
ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));
+ write_lock.Lock();
+ can_write = 1;
+ write_lock.Unlock();
break;
}
existing->center->dispatch_event_external(existing->reset_handler);
existing->_stop();
} else {
+ assert(can_write == 0);
+ existing->write_lock.Lock(true);
// queue a reset on the new connection, which we're dumping for the old
center->dispatch_event_external(reset_handler);
existing->requeue_sent();
swap(existing->sd, sd);
+ swap(existing->can_write, can_write);
+ existing->can_write = 0;
existing->open_write = false;
existing->replacing = true;
existing->state_offset = 0;
// there shouldn't exist any buffer
assert(recv_start == recv_end);
+ existing->write_lock.Unlock();
if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
// handle error
existing->fault();
existing->lock.Unlock();
return 0;
}
+ existing->write_lock.Unlock();
existing->lock.Unlock();
open:
goto fail_registered;
}
- r = _try_send(reply_bl);
+ r = try_send(reply_bl);
if (r < 0)
goto fail_registered;
int AsyncConnection::send_message(Message *m)
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
+
+ if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+ ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+ Mutex::Locker l(write_lock);
+ local_messages.push_back(m);
+ center->dispatch_event_external(local_deliver_handler);
+ return 0;
+ }
+
+ bufferlist bl;
+ Mutex::Locker l(write_lock);
+ m->set_seq(out_seq.inc());
m->get_header().src = async_msgr->get_myname();
if (!m->get_priority())
m->set_priority(async_msgr->get_default_send_priority());
+ if (can_write == 1)
+ prepare_send_message(m, bl);
- Mutex::Locker l(lock);
- if (!is_queued() && state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+ if (!is_queued() && can_write == 1) {
ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
- int r = _send(m);
+ int r = write_message(m, bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
// we want to handle fault within internal thread
center->dispatch_event_external(write_handler);
}
- } else if (state == STATE_CLOSED) {
+ } else if (can_write == 2) {
ldout(async_msgr->cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
m->put();
- } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
- ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
- local_messages.push_back(m);
- center->dispatch_event_external(local_deliver_handler);
} else {
- out_q[m->get_priority()].push_back(m);
- if (state == STATE_STANDBY && !policy.server) {
- ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
- << " policy.server is false" << dendl;
- _connect();
- } else if (sd >= 0 && !open_write) {
+ out_q[m->get_priority()].push_back(make_pair(bl, m));
+ if (can_write == 0) {
+ ldout(async_msgr->cct, 10) << __func__ << " write is denied, reschedule m=" << m << dendl;
center->dispatch_event_external(write_handler);
}
}
void AsyncConnection::requeue_sent()
{
+ assert(write_lock.is_locked());
if (sent.empty())
return;
- list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!sent.empty()) {
- Message *m = sent.back();
+ pair<bufferlist, Message*> p = sent.back();
sent.pop_back();
- ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq
- << " (" << m->get_seq() << ")" << dendl;
- rq.push_front(m);
- out_seq--;
+ ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
+ << " (" << p.second->get_seq() << ")" << dendl;
+ rq.push_front(p);
}
}
void AsyncConnection::discard_requeued_up_to(uint64_t seq)
{
ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
+ Mutex::Locker l(write_lock);
if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
return;
- list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!rq.empty()) {
- Message *m = rq.front();
- if (m->get_seq() == 0 || m->get_seq() > seq)
+ pair<bufferlist, Message*> p = rq.front();
+ if (p.second->get_seq() == 0 || p.second->get_seq() > seq)
break;
- ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq
+ ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq()
<< " <= " << seq << ", discarding" << dendl;
- m->put();
+ p.second->put();
rq.pop_front();
- out_seq++;
}
if (rq.empty())
out_q.erase(CEPH_MSG_PRIO_HIGHEST);
{
ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
- for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
- ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
- (*p)->put();
+ Mutex::Locker l(write_lock);
+ for (list<pair<bufferlist, Message*> >::iterator p = sent.begin(); p != sent.end(); ++p) {
+ ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl;
+ p->second->put();
}
sent.clear();
- for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
- for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
- ldout(async_msgr->cct, 20) << __func__ << " discard " << *r << dendl;
- (*r)->put();
+ for (map<int, list<pair<bufferlist, Message*> > >::iterator p = out_q.begin(); p != out_q.end(); ++p)
+ for (list<pair<bufferlist, Message*> >::iterator r = p->second.begin(); r != p->second.end(); ++r) {
+ ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl;
+ r->second->put();
}
out_q.clear();
outcoming_bl.clear();
if (get_features() & CEPH_FEATURE_MSG_AUTH) {
// Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
// here. We'll check it on the call. PLR
- int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
- out_seq &= SEQ_MASK;
- lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl;
+ uint64_t rand_seq;
+ int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
+ rand_seq &= SEQ_MASK;
+ lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
+ out_seq.set(rand_seq);
return seq_error;
} else {
// previously, seq #'s always started at 0.
- out_seq = 0;
+ out_seq.set(0);
return 0;
}
}
return ;
}
+ write_lock.Lock();
if (sd >= 0) {
shutdown_socket();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
::close(sd);
sd = -1;
}
+ can_write = 0;
open_write = false;
// requeue sent items
<< " accept state just closed, state="
<< get_state_name(state) << dendl;
center->dispatch_event_external(reset_handler);
+
+ write_lock.Unlock();
_stop();
return ;
}
if (policy.standby && !is_queued()) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
+ write_lock.Unlock();
return;
}
+ write_lock.Unlock();
if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
// policy maybe empty when state is in accept
if (policy.server) {
center->dispatch_event_external(remote_reset_handler);
if (randomize_out_seq()) {
- ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+ ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
}
in_seq = 0;
async_msgr->unregister_conn(this);
state = STATE_CLOSED;
+ Mutex::Locker l(write_lock);
open_write = false;
+ can_write = 2;
state_offset = 0;
if (sd >= 0) {
shutdown_socket();
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
}
-int AsyncConnection::_send(Message *m)
+void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
{
- m->set_seq(++out_seq);
- if (!policy.lossy) {
- // put on sent list
- sent.push_back(m);
- m->get();
- }
+ assert(write_lock.is_locked());
+ ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
m->set_connection(this);
-
uint64_t features = get_features();
if (m->empty_payload())
- ldout(async_msgr->cct, 20) << __func__ << " encoding " << m->get_seq() << " features " << features
- << " " << m << " " << *m << dendl;
+ ldout(async_msgr->cct, 20) << __func__ << " encoding features "
+ << features << " " << m << " " << *m << dendl;
else
- ldout(async_msgr->cct, 20) << __func__ << " half-reencoding " << m->get_seq() << " features "
- << features << " " << m << " " << *m << dendl;
+ ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features "
+ << features << " " << m << " " << *m << dendl;
// encode and copy out of *m
- m->encode(features, async_msgr->crcflags);
+ m->encode(features, msgr->crcflags);
// prepare everything
ceph_msg_header& header = m->get_header();
ceph_msg_footer& footer = m->get_footer();
+ ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
+ << " src " << entity_name_t(header.src)
+ << " front=" << header.front_len
+ << " data=" << header.data_len
+ << " off " << header.data_off << dendl;
+
+
// Now that we have all the crcs calculated, handle the
// digital signature for the message, if the AsyncConnection has session
// security set up. Some session security options do not
ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl;
} else {
if (session_security->sign_message(m)) {
- ldout(async_msgr->cct, 20) << __func__ << " failed to sign seq # "
- << header.seq << "): sig = " << footer.sig << dendl;
+ ldout(async_msgr->cct, 20) << __func__ << " failed to sign m="
+ << m << "): sig = " << footer.sig << dendl;
} else {
- ldout(async_msgr->cct, 20) << __func__ << " signed seq # " << header.seq
- << "): sig = " << footer.sig << dendl;
+ ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m
+ << "): sig = " << footer.sig << dendl;
}
}
- bufferlist blist = m->get_payload();
- blist.append(m->get_middle());
- blist.append(m->get_data());
-
- ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
- << " " << m << dendl;
- int rc = write_message(header, footer, blist);
-
- if (rc < 0) {
- ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
- << cpp_strerror(errno) << dendl;
- } else if (rc == 0) {
- ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
- } else {
- ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
- }
- m->put();
-
- return rc;
-}
-
-int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer,
- bufferlist& blist)
-{
- bufferlist bl;
- int ret;
-
// send tag
char tag = CEPH_MSGR_TAG_MSG;
bl.append(&tag, sizeof(tag));
- // send envelope
ceph_msg_header_old oldheader;
if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
bl.append((char*)&header, sizeof(header));
bl.append((char*)&oldheader, sizeof(oldheader));
}
- bl.claim_append(blist);
+ bl.claim_append(m->get_payload());
+ bl.append(m->get_middle());
+ bl.append(m->get_data());
// send footer; if receiver doesn't support signatures, use the old footer format
ceph_msg_footer_old old_footer;
old_footer.flags = footer.flags;
bl.append((char*)&old_footer, sizeof(old_footer));
}
+}
- // send
- ret = _try_send(bl);
- if (ret < 0)
- return ret;
+int AsyncConnection::write_message(Message *m, bufferlist& bl)
+{
+ assert(write_lock.is_locked());
+ assert(can_write == 1);
+ if (!policy.lossy) {
+ // put on sent list
+ sent.push_back(make_pair(bl, m));
+ m->get();
+ }
- return ret;
+ ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
+ << " " << m << dendl;
+ int rc = _try_send(bl);
+ if (rc < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
+ << cpp_strerror(errno) << dendl;
+ } else if (rc == 0) {
+ ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl;
+ } else {
+ ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
+ }
+ m->put();
+
+ return rc;
}
void AsyncConnection::handle_ack(uint64_t seq)
{
ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
// trim sent list
- while (!sent.empty() && sent.front()->get_seq() <= seq) {
- Message *m = sent.front();
+ Mutex::Locker l(write_lock);
+ while (!sent.empty() && sent.front().second->get_seq() <= seq) {
+ pair<bufferlist, Message*> p = sent.front();
sent.pop_front();
ldout(async_msgr->cct, 10) << __func__ << " got ack seq "
- << seq << " >= " << m->get_seq() << " on "
- << m << " " << *m << dendl;
- m->put();
+ << seq << " >= " << p.second->get_seq() << " on "
+ << p.second << " " << *(p.second) << dendl;
+ p.second->put();
}
}
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
- assert(lock.is_locked());
+ assert(write_lock.is_locked());
bufferlist bl;
utime_t t = ceph_clock_now(async_msgr->cct);
Mutex::Locker l(lock);
bufferlist bl;
int r = 0;
+
+ write_lock.Lock();
if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
if (keepalive) {
_send_keepalive_or_ack();
}
while (1) {
- Message *m = _get_next_outgoing();
+ bufferlist data;
+ Message *m = _get_next_outgoing(&data);
if (!m)
break;
+ // send_message may not encode message
+ if (!data.length())
+ prepare_send_message(m, data);
+
ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
- r = _send(m);
+ r = write_message(m, data);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
}
- } else if (state != STATE_CONNECTING && state != STATE_CLOSED) {
+ } else if (state == STATE_STANDBY && !policy.server && is_queued()) {
+ ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+ << " policy.server is false" << dendl;
+ _connect();
+ } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
r = _try_send(bl);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
goto fail;
}
}
+ write_lock.Unlock();
return ;
fail:
+ write_lock.Unlock();
fault();
}
void AsyncConnection::local_deliver()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
- Mutex::Locker l(lock);
+ Mutex::Locker l(write_lock);
while (!local_messages.empty()) {
Message *m = local_messages.back();
local_messages.pop_back();
m->set_recv_stamp(ceph_clock_now(async_msgr->cct));
ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl;
async_msgr->ms_fast_preprocess(m);
- lock.Unlock();
+ write_lock.Unlock();
if (async_msgr->ms_can_fast_dispatch(m)) {
async_msgr->ms_fast_dispatch(m);
} else {
msgr->ms_deliver_dispatch(m);
}
- lock.Lock();
+ write_lock.Lock();
}
}