: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1),
- dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
- open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
+ dispatch_queue(q), can_write(WriteStatus::NOWRITE),
+ open_write(false), keepalive(false), recv_buf(NULL),
recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
ssize_t r = 0;
int prev_state = state;
bool already_dispatch_writer = false;
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
last_active = ceph::coarse_mono_clock::now();
do {
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
t = (ceph_timespec*)state_buffer;
utime_t kp_t = utime_t(*t);
- write_lock.Lock();
+ write_lock.lock();
_send_keepalive_or_ack(true, &kp_t);
- write_lock.Unlock();
+ write_lock.unlock();
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
set_last_keepalive(ceph_clock_now(NULL));
state = STATE_OPEN;
}
delay_state->queue(delay_period, release, message);
} else if (async_msgr->ms_can_fast_dispatch(message)) {
- lock.Unlock();
+ lock.unlock();
dispatch_queue->fast_dispatch(message);
- lock.Lock();
+ lock.lock();
} else {
dispatch_queue->enqueue(message, message->get_priority(), conn_id);
}
switch(state) {
case STATE_WAIT_SEND:
{
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
if (!outcoming_bl.length()) {
assert(state_after_send);
state = state_after_send;
}
ldout(async_msgr->cct, 20) << __func__ << " connect peer addr for me is " << peer_addr_for_me << dendl;
- lock.Unlock();
+ lock.unlock();
async_msgr->learned_addr(peer_addr_for_me);
if (async_msgr->cct->_conf->ms_inject_internal_delays) {
if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
}
}
- lock.Lock();
+ lock.lock();
if (state != STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while learned_addr, mark_down or "
<< " replacing must be happened just now" << dendl;
// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
- write_lock.Lock();
+ write_lock.lock();
can_write = WriteStatus::CANWRITE;
if (is_queued())
center->dispatch_event_external(write_handler);
- write_lock.Unlock();
+ write_lock.unlock();
maybe_start_delay_thread();
break;
}
center->delete_time_event(last_tick_id);
last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
- write_lock.Lock();
+ write_lock.lock();
can_write = WriteStatus::CANWRITE;
if (is_queued())
center->dispatch_event_external(write_handler);
- write_lock.Unlock();
+ write_lock.unlock();
maybe_start_delay_thread();
break;
}
return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
}
- lock.Unlock();
+ lock.unlock();
bool authorizer_valid;
if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
- lock.Lock();
+ lock.lock();
ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
session_security.reset();
return _reply_accept(CEPH_MSGR_TAG_BADAUTHORIZER, connect, reply, authorizer_reply);
inject_delay();
- lock.Lock();
+ lock.lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
assert(state == STATE_CLOSED);
if (existing) {
// There is no possible that existing connection will acquire this
// connection's lock
- existing->lock.Lock(true); // skip lockdep check (we are locking a second AsyncConnection here)
+ existing->lock.lock(); // skip lockdep check (we are locking a second AsyncConnection here)
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " existing_state=" << get_state_name(existing->state) << dendl;
reply.global_seq = existing->peer_global_seq;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
- existing->lock.Unlock();
+ existing->lock.unlock();
if (r < 0)
goto fail;
return 0;
<< ".gseq " << existing->peer_global_seq << " > "
<< connect.global_seq << ", RETRY_GLOBAL" << dendl;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
- existing->lock.Unlock();
+ existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply);
} else {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
<< existing->connect_seq << " > " << connect.connect_seq
<< ", RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
- existing->lock.Unlock();
+ existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
<< ".cseq " << existing->connect_seq << " == "
<< connect.connect_seq << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
- existing->lock.Unlock();
+ existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
}
<< existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
assert(peer_addr > async_msgr->get_myaddr());
- existing->lock.Unlock();
+ existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
}
}
ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << ", " << existing << ".cseq = "
<< existing->connect_seq << "), sending RESETSESSION" << dendl;
- existing->lock.Unlock();
+ existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
}
existing->dispatch_queue->queue_reset(existing.get());
} else {
assert(can_write == WriteStatus::NOWRITE);
- existing->write_lock.Lock(true);
+ existing->write_lock.lock();
// reset the in_seq if this is a hard reset from peer,
// otherwise we respect our original connection's value
// there shouldn't exist any buffer
assert(recv_start == recv_end);
- existing->write_lock.Unlock();
+ existing->write_lock.unlock();
// new sd now isn't registered any event while origin events
// have been deleted.
// previous existing->sd now is still open, event will continue to
[existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable {
// we need to delete time event in original thread
{
- Mutex::Locker l(existing->lock);
+ std::lock_guard<std::mutex> l(existing->lock);
if (existing->state == STATE_NONE) {
existing->shutdown_socket();
existing->sd = new_fd;
// Then if we mark down `existing`, it will execute in another thread and clean up connection.
// Previous event will result in segment fault
auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable {
- Mutex::Locker l(existing->lock);
+ std::lock_guard<std::mutex> l(existing->lock);
if (existing->state == STATE_CLOSED)
return ;
assert(new_fd == existing->sd);
existing->center->get_id(), std::move(transfer_existing), true);
}, true);
- existing->lock.Unlock();
+ existing->lock.unlock();
return 0;
}
- existing->lock.Unlock();
+ existing->lock.unlock();
open:
connect_seq = connect.connect_seq + 1;
reply_bl.append((char*)&s, sizeof(s));
}
- lock.Unlock();
+ lock.unlock();
// Because "replacing" will prevent other connections preempt this addr,
// it's safe that here we don't acquire Connection's lock
r = async_msgr->accept_conn(this);
inject_delay();
- lock.Lock();
+ lock.lock();
replacing = false;
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
assert(sd < 0);
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
sd = incoming;
state = STATE_ACCEPTING;
// rescheduler connection in order to avoid lock dep
if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
if (can_write != WriteStatus::CLOSED) {
dispatch_queue->local_delivery(m, m->get_priority());
} else {
if (can_fast_prepare)
prepare_send_message(f, m, bl);
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
// "features" changes will change the payload encoding
if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
// ensure the correctness of message encoding
void AsyncConnection::requeue_sent()
{
- assert(write_lock.is_locked());
if (sent.empty())
return;
void AsyncConnection::discard_requeued_up_to(uint64_t seq)
{
ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl;
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
return;
list<pair<bufferlist, Message*> >& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
void AsyncConnection::discard_out_queue()
{
ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
- assert(write_lock.is_locked());
for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
return ;
}
- write_lock.Lock();
+ write_lock.lock();
shutdown_socket();
can_write = WriteStatus::NOWRITE;
open_write = false;
state >=STATE_ACCEPTING && state <= STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 0) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
- write_lock.Unlock();
+ write_lock.unlock();
_stop();
dispatch_queue->queue_reset(this);
return ;
if (policy.standby && !is_queued() && state != STATE_WAIT) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
- write_lock.Unlock();
+ write_lock.unlock();
return;
}
- write_lock.Unlock();
+ write_lock.unlock();
if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY) &&
state != STATE_WAIT) { // STATE_WAIT is coming from STATE_CONNECTING_*
// policy maybe empty when state is in accept
void AsyncConnection::was_session_reset()
{
ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
- assert(lock.is_locked());
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
if (delay_state)
delay_state->discard();
dispatch_queue->discard_queue(conn_id);
void AsyncConnection::_stop()
{
- assert(lock.is_locked());
if (state == STATE_CLOSED)
return ;
delay_state->flush();
ldout(async_msgr->cct, 1) << __func__ << dendl;
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
reset_recv_state();
dispatch_queue->discard_queue(conn_id);
{
ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl;
// trim sent list
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
while (!sent.empty() && sent.front()->get_seq() <= seq) {
Message* m = sent.front();
sent.pop_front();
{
Message *m = nullptr;
{
- Mutex::Locker l(delay_lock);
+ std::lock_guard<std::mutex> l(delay_lock);
register_time_events.erase(id);
if (stop_dispatch)
return ;
stop_dispatch = true;
center->submit_to(
center->get_id(), [this] () mutable {
- Mutex::Locker l(delay_lock);
+ std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
if (msgr->ms_can_fast_dispatch(m)) {
void AsyncConnection::send_keepalive()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
if (can_write != WriteStatus::CLOSED) {
keepalive = true;
center->dispatch_event_external(write_handler);
void AsyncConnection::mark_down()
{
ldout(async_msgr->cct, 1) << __func__ << " started." << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
_stop();
}
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
- assert(write_lock.is_locked());
-
if (ack) {
assert(tp);
struct ceph_timespec ts;
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
ssize_t r = 0;
- write_lock.Lock();
+ write_lock.lock();
if (can_write == WriteStatus::CANWRITE) {
if (keepalive) {
_send_keepalive_or_ack();
r = write_message(m, data, _has_next_outgoing());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
- write_lock.Unlock();
+ write_lock.unlock();
goto fail;
} else if (r > 0) {
break;
r = _try_send();
}
- write_lock.Unlock();
+ write_lock.unlock();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
}
} else {
- write_lock.Unlock();
- lock.Lock();
- write_lock.Lock();
+ write_lock.unlock();
+ lock.lock();
+ write_lock.lock();
if (state == STATE_STANDBY && !policy.server && is_queued()) {
ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl;
_connect();
r = _try_send();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
- write_lock.Unlock();
+ write_lock.unlock();
fault();
- lock.Unlock();
+ lock.unlock();
return ;
}
}
- write_lock.Unlock();
- lock.Unlock();
+ write_lock.unlock();
+ lock.unlock();
}
return ;
fail:
- lock.Lock();
+ lock.lock();
fault();
- lock.Unlock();
+ lock.unlock();
}
void AsyncConnection::wakeup_from(uint64_t id)
{
- lock.Lock();
+ lock.lock();
register_time_events.erase(id);
- lock.Unlock();
+ lock.unlock();
process();
}
ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
<< " last_active" << last_active << dendl;
assert(last_tick_id == id);
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
last_tick_id = 0;
auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
if (inactive_timeout_us < (uint64_t)idle_period) {
#include <signal.h>
#include <climits>
#include <list>
+#include <mutex>
#include <map>
using namespace std;
#include "auth/AuthSessionHandler.h"
#include "common/ceph_time.h"
-#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "include/buffer.h"
#include "msg/Connection.h"
void restore_sigpipe();
ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
ssize_t try_send(bufferlist &bl, bool more=false) {
- Mutex::Locker l(write_lock);
+ std::lock_guard<std::mutex> l(write_lock);
outcoming_bl.claim_append(bl);
return _try_send(more);
}
return 0;
}
bool is_queued() {
- assert(write_lock.is_locked());
return !out_q.empty() || outcoming_bl.length();
}
void shutdown_socket() {
}
}
Message *_get_next_outgoing(bufferlist *bl) {
- assert(write_lock.is_locked());
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
return m;
}
bool _has_next_outgoing() {
- assert(write_lock.is_locked());
return !out_q.empty();
}
void reset_recv_state();
class DelayedDelivery : public EventCallback {
std::set<uint64_t> register_time_events; // need to delete it if stop
std::deque<std::pair<utime_t, Message*> > delay_queue;
- Mutex delay_lock;
+ std::mutex delay_lock;
AsyncMessenger *msgr;
EventCenter *center;
DispatchQueue *dispatch_queue;
public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q, uint64_t cid)
- : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
- msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
+ : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
stop_dispatch(false) { }
~DelayedDelivery() {
assert(register_time_events.empty());
void set_center(EventCenter *c) { center = c; }
void do_request(int id) override;
void queue(double delay_period, utime_t release, Message *m) {
- Mutex::Locker l(delay_lock);
+ std::lock_guard<std::mutex> l(delay_lock);
delay_queue.push_back(std::make_pair(release, m));
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
stop_dispatch = true;
center->submit_to(center->get_id(), [this] () mutable {
- Mutex::Locker l(delay_lock);
+ std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
void send_keepalive() override;
void mark_down() override;
void mark_disposable() override {
- Mutex::Locker l(lock);
+ std::lock_guard<std::mutex> l(lock);
policy.lossy = true;
}
DispatchQueue *dispatch_queue;
- Mutex write_lock;
+ std::mutex write_lock;
enum class WriteStatus {
NOWRITE,
REPLACING,
bufferlist outcoming_bl;
bool keepalive;
- Mutex lock;
+ std::mutex lock;
utime_t backoff; // backoff time
EventCallbackRef read_handler;
EventCallbackRef write_handler;
void tick(uint64_t id);
void local_deliver();
void stop(bool queue_reset) {
- lock.Lock();
+ lock.lock();
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
_stop();
- lock.Unlock();
+ lock.unlock();
if (need_queue_reset)
dispatch_queue->queue_reset(this);
}