in_seq(),
cstate(this)
{
- pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
set_peer_type(peer.name.type());
set_peer_addr(peer.addr);
{
/* If con is not in READY state, we need to queue the request */
if (cstate.session_state.read() != XioConnection::UP) {
- pthread_spin_lock(&sp);
+ std::lock_guad<ceph::util::spinlock> lg(sp);
if (cstate.session_state.read() != XioConnection::UP) {
if (ack) {
outgoing.ack = true;
else {
outgoing.keepalive = true;
}
- pthread_spin_unlock(&sp);
return;
}
- pthread_spin_unlock(&sp);
}
send_keepalive_or_ack_internal(ack, tp);
int XioConnection::flush_out_queues(uint32_t flags) {
XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_lock(&sp);
+ sp.lock();
if (outgoing.keepalive) {
outgoing.keepalive = false;
msgr->_send_message_impl(m, this);
}
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&sp);
+ sp.unlock();
return 0;
}
XioSubmit::Queue deferred_q;
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_lock(&sp);
+ sp.lock();
/* the two send queues contain different objects:
* - anything on the mqueue is a Message
outgoing.keepalive = outgoing.ack = false;
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&sp);
+ sp.unlock();
// mqueue
while (!disc_q.empty()) {
int XioConnection::adjust_clru(uint32_t flags)
{
if (flags & CState::OP_FLAG_LOCKED)
- pthread_spin_unlock(&sp);
+ sp.unlock();
XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
msgr->conns_sp.lock();
- pthread_spin_lock(&sp);
+ sp.lock();
if (cstate.flags & CState::FLAG_MAPPED) {
XioConnection::ConnList::iterator citer =
msgr->conns_sp.unlock();
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&sp);
+ sp.unlock();
return 0;
}
int XioConnection::_mark_down(uint32_t flags)
{
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_lock(&sp);
+ sp.lock();
// per interface comment, we only stage a remote reset if the
// current policy required it
discard_out_queues(flags|CState::OP_FLAG_LOCKED);
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&sp);
+ sp.unlock();
return 0;
}
int XioConnection::_mark_disposable(uint32_t flags)
{
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_lock(&sp);
+ sp.lock();
cstate.policy.lossy = true;
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&sp);
+ sp.unlock();
return 0;
}
int XioConnection::CState::state_up_ready(uint32_t flags)
{
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_lock(&xcon->sp);
+ xcon->sp.lock();
xcon->flush_out_queues(flags|CState::OP_FLAG_LOCKED);
startup_state = session_startup_states::READY;
if (! (flags & CState::OP_FLAG_LOCKED))
- pthread_spin_unlock(&xcon->sp);
+ xcon->sp.unlock();
return (0);
}
int XioConnection::CState::state_flow_controlled(uint32_t flags)
{
if (! (flags & OP_FLAG_LOCKED))
- pthread_spin_lock(&xcon->sp);
+ xcon->sp.lock();
session_state = session_states::FLOW_CONTROLLED;
if (! (flags & OP_FLAG_LOCKED))
- pthread_spin_unlock(&xcon->sp);
+ xcon->sp.unlock();
return (0);
}
int XioConnection::CState::state_fail(Message* m, uint32_t flags)
{
if (! (flags & OP_FLAG_LOCKED))
- pthread_spin_lock(&xcon->sp);
+ xcon->sp.lock();
// advance to state FAIL, drop queued, msgs, adjust LRU
session_state = session_states::DISCONNECTED);
xcon->disconnect();
if (! (flags & OP_FLAG_LOCKED))
- pthread_spin_unlock(&xcon->sp);
+ xcon->sp.unlock();
// notify ULP
XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());