From f7e2afc9c5efe3b4b090204a827366756dfefe9a Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 23 Aug 2018 11:26:01 -0400 Subject: [PATCH] msg: Use ceph_assert for asserts. Signed-off-by: Adam C. Emerson --- src/msg/DispatchQueue.cc | 6 +- src/msg/DispatchQueue.h | 14 +-- src/msg/Message.cc | 2 +- src/msg/Messenger.h | 12 +-- src/msg/QueueStrategy.cc | 4 +- src/msg/async/AsyncConnection.cc | 60 ++++++------ src/msg/async/AsyncConnection.h | 6 +- src/msg/async/AsyncMessenger.cc | 22 ++--- src/msg/async/AsyncMessenger.h | 6 +- src/msg/async/Event.cc | 18 ++-- src/msg/async/Event.h | 8 +- src/msg/async/EventKqueue.cc | 4 +- src/msg/async/PosixStack.cc | 4 +- src/msg/async/PosixStack.h | 2 +- src/msg/async/Stack.cc | 6 +- src/msg/async/Stack.h | 2 +- src/msg/async/dpdk/DPDK.cc | 26 +++--- src/msg/async/dpdk/DPDK.h | 16 ++-- src/msg/async/dpdk/DPDKStack.cc | 10 +- src/msg/async/dpdk/DPDKStack.h | 2 +- src/msg/async/dpdk/Packet.h | 10 +- src/msg/async/dpdk/TCP.cc | 2 +- src/msg/async/dpdk/TCP.h | 4 +- src/msg/async/dpdk/UserspaceEvent.cc | 10 +- src/msg/async/dpdk/UserspaceEvent.h | 2 +- src/msg/async/dpdk/ethernet.h | 2 +- src/msg/async/dpdk/net.cc | 2 +- src/msg/async/dpdk/net.h | 2 +- src/msg/async/dpdk/stream.h | 2 +- src/msg/async/rdma/Infiniband.cc | 36 ++++---- src/msg/async/rdma/Infiniband.h | 4 +- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 16 ++-- .../rdma/RDMAIWARPConnectedSocketImpl.cc | 2 +- .../async/rdma/RDMAIWARPServerSocketImpl.cc | 4 +- src/msg/async/rdma/RDMAServerSocketImpl.cc | 4 +- src/msg/async/rdma/RDMAStack.cc | 26 +++--- src/msg/async/rdma/RDMAStack.h | 2 +- src/msg/simple/Accepter.cc | 2 +- src/msg/simple/Pipe.cc | 92 +++++++++---------- src/msg/simple/Pipe.h | 6 +- src/msg/simple/PipeConnection.cc | 2 +- src/msg/simple/SimpleMessenger.cc | 36 ++++---- src/msg/simple/SimpleMessenger.h | 2 +- src/msg/xio/XioConnection.cc | 12 +-- src/msg/xio/XioMessenger.cc | 8 +- src/msg/xio/XioMsg.cc | 4 +- src/msg/xio/XioPool.h | 2 +- src/msg/xio/XioPortal.h | 4 +- 48 files changed, 265 insertions(+), 265 deletions(-) diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 53e112bed4b..5a943e309a5 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -216,7 +216,7 @@ void DispatchQueue::discard_queue(uint64_t id) { for (list::iterator i = removed.begin(); i != removed.end(); ++i) { - assert(!(i->is_code())); // We don't discard id 0, ever! + ceph_assert(!(i->is_code())); // We don't discard id 0, ever! const Message::ref& m = i->get_message(); remove_arrival(m); dispatch_throttle_release(m->get_dispatch_throttle_size()); @@ -225,8 +225,8 @@ void DispatchQueue::discard_queue(uint64_t id) { void DispatchQueue::start() { - assert(!stop); - assert(!dispatch_thread.is_started()); + ceph_assert(!stop); + ceph_assert(!dispatch_thread.is_started()); dispatch_thread.create("ms_dispatch"); local_delivery_thread.create("ms_local"); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 2c16056c216..30e6b654f66 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -50,15 +50,15 @@ class DispatchQueue { return type != -1; } int get_code () const { - assert(is_code()); + ceph_assert(is_code()); return type; } const Message::ref& get_message() { - assert(!is_code()); + ceph_assert(!is_code()); return m; } Connection *get_connection() { - assert(is_code()); + ceph_assert(is_code()); return con.get(); } }; @@ -82,7 +82,7 @@ class DispatchQueue { } void remove_arrival(const Message::ref& m) { auto it = marrival_map.find(m); - assert(it != marrival_map.end()); + ceph_assert(it != marrival_map.end()); marrival.erase(it->second); marrival_map.erase(it); } @@ -234,9 +234,9 @@ class DispatchQueue { stop(false) {} ~DispatchQueue() { - assert(mqueue.empty()); - assert(marrival.empty()); - assert(local_messages.empty()); + ceph_assert(mqueue.empty()); + ceph_assert(marrival.empty()); + ceph_assert(local_messages.empty()); } }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 27caab5e3cc..ac9a175b483 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -206,7 +206,7 @@ void Message::encode(uint64_t features, int crcflags) { // encode and copy out of *m if (empty_payload()) { - assert(middle.length() == 0); + ceph_assert(middle.length() == 0); encode_payload(features); if (byte_throttler) { diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 187223b03ed..f0e0f8af4e0 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -288,7 +288,7 @@ public: * @param p The cluster protocol to use. Defined externally. */ void set_default_send_priority(int p) { - assert(!started); + ceph_assert(!started); default_send_priority = p; } /** @@ -566,16 +566,16 @@ public: } else { blocked = true; int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask); - assert(r == 0); + ceph_assert(r == 0); } } ~sigpipe_stopper() { if (blocked) { struct timespec nowait{0}; int r = sigtimedwait(&pipe_mask, 0, &nowait); - assert(r == EAGAIN || r == 0); + ceph_assert(r == EAGAIN || r == 0); r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0); - assert(r == 0); + ceph_assert(r == 0); } } }; @@ -632,7 +632,7 @@ public: /** * Deliver a single Message. Send it to each Dispatcher * in sequence until one of them handles it. - * If none of our Dispatchers can handle it, assert(0). + * If none of our Dispatchers can handle it, ceph_assert(0). * * @param m The Message to deliver. */ @@ -644,7 +644,7 @@ public: } lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; - assert(!cct->_conf->ms_die_on_unhandled_msg); + ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); } void ms_deliver_dispatch(Message *m) { return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */ diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc index 0bbee4f3835..9356e5c597c 100644 --- a/src/msg/QueueStrategy.cc +++ b/src/msg/QueueStrategy.cc @@ -84,7 +84,7 @@ void QueueStrategy::shutdown() void QueueStrategy::wait() { lock.Lock(); - assert(stop); + ceph_assert(stop); for (auto& thread : threads) { lock.Unlock(); @@ -98,7 +98,7 @@ void QueueStrategy::wait() void QueueStrategy::start() { - assert(!stop); + ceph_assert(!stop); lock.Lock(); threads.reserve(n_threads); for (int ix = 0; ix < n_threads; ++ix) { diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b021ff0004e..674a6714871 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -149,14 +149,14 @@ AsyncConnection::AsyncConnection( AsyncConnection::~AsyncConnection() { - assert(out_q.empty()); - assert(sent.empty()); + ceph_assert(out_q.empty()); + ceph_assert(sent.empty()); delete authorizer; if (recv_buf) delete[] recv_buf; if (state_buffer) delete[] state_buffer; - assert(!delay_state); + ceph_assert(!delay_state); } void AsyncConnection::maybe_start_delay_thread() @@ -211,7 +211,7 @@ ssize_t AsyncConnection::_try_send(bool more) } } - assert(center->in_thread()); + ceph_assert(center->in_thread()); ssize_t r = cs.send(outcoming_bl, more); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; @@ -726,14 +726,14 @@ void AsyncConnection::process() << ", discarding" << dendl; message->put(); if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) - assert(0 == "old msgs despite reconnect_seq feature"); + ceph_assert(0 == "old msgs despite reconnect_seq feature"); break; } if (message->get_seq() > cur_seq + 1) { ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq " << cur_seq << " to " << message->get_seq() << dendl; if (async_msgr->cct->_conf->ms_die_on_skipped_message) - assert(0 == "skipped incoming seq"); + ceph_assert(0 == "skipped incoming seq"); } message->set_connection(this); @@ -850,7 +850,7 @@ ssize_t AsyncConnection::_process_connection() { std::lock_guard l(write_lock); if (!outcoming_bl.length()) { - assert(state_after_send); + ceph_assert(state_after_send); state = state_after_send; state_after_send = STATE_NONE; } @@ -859,7 +859,7 @@ ssize_t AsyncConnection::_process_connection() case STATE_CONNECTING: { - assert(!policy.server); + ceph_assert(!policy.server); // reset connect state variables got_bad_auth = false; @@ -1088,7 +1088,7 @@ ssize_t AsyncConnection::_process_connection() bufferlist authorizer_reply; if (connect_reply.authorizer_len) { ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl; - assert(connect_reply.authorizer_len < 4096); + ceph_assert(connect_reply.authorizer_len < 4096); r = read_until(connect_reply.authorizer_len, state_buffer); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl; @@ -1117,7 +1117,7 @@ ssize_t AsyncConnection::_process_connection() goto fail; // state must be changed! - assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH); + ceph_assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH); break; } @@ -1139,10 +1139,10 @@ ssize_t AsyncConnection::_process_connection() discard_requeued_up_to(newly_acked_seq); //while (newly_acked_seq > out_seq.read()) { // Message *m = _get_next_outgoing(NULL); - // assert(m); + // ceph_assert(m); // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() // << " " << *m << dendl; - // assert(m->get_seq() <= newly_acked_seq); + // ceph_assert(m->get_seq() <= newly_acked_seq); // m->put(); // out_seq.inc(); //} @@ -1172,7 +1172,7 @@ ssize_t AsyncConnection::_process_connection() state = STATE_OPEN; once_ready = true; connect_seq += 1; - assert(connect_seq == connect_reply.connect_seq); + ceph_assert(connect_seq == connect_reply.connect_seq); backoff = utime_t(); set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features); ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq @@ -1193,7 +1193,7 @@ ssize_t AsyncConnection::_process_connection() } if (delay_state) - assert(delay_state->ready()); + ceph_assert(delay_state->ready()); dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); @@ -1332,7 +1332,7 @@ ssize_t AsyncConnection::_process_connection() goto fail; // state is changed by "handle_connect_msg" - assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH); + ceph_assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH); break; } @@ -1361,7 +1361,7 @@ ssize_t AsyncConnection::_process_connection() memset(&connect_msg, 0, sizeof(connect_msg)); if (delay_state) - assert(delay_state->ready()); + ceph_assert(delay_state->ready()); // make sure no pending tick timer if (last_tick_id) center->delete_time_event(last_tick_id); @@ -1436,7 +1436,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co state = STATE_CONNECTING_SEND_CONNECT_MSG; } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { - assert(reply.connect_seq > connect_seq); + ceph_assert(reply.connect_seq > connect_seq); ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION " << connect_seq << " -> " << reply.connect_seq << dendl; @@ -1537,7 +1537,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (need_challenge && !had_challenge && authorizer_challenge) { ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer" << dendl; - assert(authorizer_reply.length()); + ceph_assert(authorizer_reply.length()); tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER; } else { ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; @@ -1558,7 +1558,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis lock.lock(); if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl; - assert(state == STATE_CLOSED); + ceph_assert(state == STATE_CLOSED); goto fail; } @@ -1663,14 +1663,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr()); + ceph_assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr()); existing->lock.unlock(); return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply); } } - assert(connect.connect_seq > existing->connect_seq); - assert(connect.global_seq >= existing->peer_global_seq); + ceph_assert(connect.connect_seq > existing->connect_seq); + ceph_assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other existing->connect_seq == 0) { ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " @@ -1707,7 +1707,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->_stop(); existing->dispatch_queue->queue_reset(existing.get()); } else { - assert(can_write == WriteStatus::NOWRITE); + ceph_assert(can_write == WriteStatus::NOWRITE); existing->write_lock.lock(); // reset the in_seq if this is a hard reset from peer, @@ -1720,7 +1720,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing->delay_state) { existing->delay_state->flush(); - assert(!delay_state); + ceph_assert(!delay_state); } existing->reset_recv_state(); @@ -1741,7 +1741,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Discard existing prefetch buffer in `recv_buf` existing->recv_start = existing->recv_end = 0; // there shouldn't exist any buffer - assert(recv_start == recv_end); + ceph_assert(recv_start == recv_end); existing->authorizer_challenge.reset(); @@ -1783,7 +1783,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; - assert(existing->state == STATE_NONE); + ceph_assert(existing->state == STATE_NONE); existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); @@ -1870,7 +1870,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis } if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; - assert(state == STATE_CLOSED || state == STATE_NONE); + ceph_assert(state == STATE_CLOSED || state == STATE_NONE); goto fail_registered; } @@ -1916,7 +1916,7 @@ void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) { ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << " on " << addr << dendl; - assert(socket.fd() >= 0); + ceph_assert(socket.fd() >= 0); std::lock_guard l(lock); cs = std::move(socket); @@ -2224,7 +2224,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) { FUNCTRACE(async_msgr->cct); - assert(center->in_thread()); + ceph_assert(center->in_thread()); m->set_seq(++out_seq); if (msgr->crcflags & MSG_CRC_HEADER) @@ -2428,7 +2428,7 @@ void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp) { ldout(async_msgr->cct, 10) << __func__ << dendl; if (ack) { - assert(tp); + ceph_assert(tp); struct ceph_timespec ts; tp->encode_timeval(&ts); outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 4ec8e0eb7c1..a7396f5895b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -114,7 +114,7 @@ class AsyncConnection : public Connection { Message *m = 0; if (!out_q.empty()) { map > >::reverse_iterator it = out_q.rbegin(); - assert(!it->second.empty()); + ceph_assert(!it->second.empty()); list >::iterator p = it->second.begin(); m = p->second; if (bl) @@ -152,8 +152,8 @@ class AsyncConnection : public Connection { : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), stop_dispatch(false) { } ~DelayedDelivery() override { - assert(register_time_events.empty()); - assert(delay_queue.empty()); + ceph_assert(register_time_events.empty()); + ceph_assert(delay_queue.empty()); } void set_center(EventCenter *c) { center = c; } void do_request(uint64_t id) override; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index d4fc61624ce..bb761866d44 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -293,7 +293,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, AsyncMessenger::~AsyncMessenger() { delete reap_handler; - assert(!did_bind); // either we didn't bind or we shut down the Processor + ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor local_connection->mark_down(); for (auto &&p : processors) delete p; @@ -392,7 +392,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) // it, like port is used case. But if the first worker successfully to bind // but the second worker failed, it's not expected and we need to assert // here - assert(i == 0); + ceph_assert(i == 0); return r; } ++i; @@ -404,7 +404,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) int AsyncMessenger::rebind(const set& avoid_ports) { ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; - assert(did_bind); + ceph_assert(did_bind); for (auto &&p : processors) p->stop(); @@ -428,7 +428,7 @@ int AsyncMessenger::rebind(const set& avoid_ports) for (auto &&p : processors) { int r = p->bind(bind_addrs, avoid_ports, &bound_addrs); if (r) { - assert(i == 0); + ceph_assert(i == 0); return r; } ++i; @@ -446,7 +446,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(my_addrs->legacy_addr() == bind_addr); + ceph_assert(my_addrs->legacy_addr() == bind_addr); return 0; } if (started) { @@ -490,9 +490,9 @@ int AsyncMessenger::start() ldout(cct,1) << __func__ << " start" << dendl; // register at least one entity, first! - assert(my_name.type() >= 0); + ceph_assert(my_name.type() >= 0); - assert(!started); + ceph_assert(!started); started = true; stopped = false; @@ -551,8 +551,8 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad AsyncConnectionRef AsyncMessenger::create_connect( const entity_addrvec_t& addrs, int type) { - assert(lock.is_locked()); - assert(addrs != *my_addrs); + ceph_assert(lock.is_locked()); + ceph_assert(addrs != *my_addrs); ldout(cct, 10) << __func__ << " " << addrs << ", creating connection and registering" << dendl; @@ -574,7 +574,7 @@ AsyncConnectionRef AsyncMessenger::create_connect( AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, target.is_msgr2()); conn->connect(addrs, type, target); - assert(!conns.count(addrs)); + ceph_assert(!conns.count(addrs)); conns[addrs] = conn; w->get_perf_counter()->inc(l_msgr_active_connections); @@ -608,7 +608,7 @@ ConnectionRef AsyncMessenger::get_loopback_connection() int AsyncMessenger::_send_to(Message *m, int type, const entity_addrvec_t& addrs) { FUNCTRACE(cct); - assert(m); + ceph_assert(m); if (m->get_type() == CEPH_MSG_OSD_OP) OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP"); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index e97c161ea86..d349bab221d 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -111,7 +111,7 @@ public: * @{ */ void set_cluster_protocol(int p) override { - assert(!started && !did_bind); + ceph_assert(!started && !did_bind); cluster_protocol = p; } @@ -307,7 +307,7 @@ private: bool stopped; AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) { - assert(lock.is_locked()); + ceph_assert(lock.is_locked()); auto p = conns.find(k); if (p == conns.end()) return NULL; @@ -324,7 +324,7 @@ private: } void _init_local_connection() { - assert(lock.is_locked()); + ceph_assert(lock.is_locked()); local_connection->peer_addrs = *my_addrs; local_connection->peer_type = my_name.type(); local_connection->set_features(CEPH_FEATURES_ALL); diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 9e16efd867c..aee9a390493 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -102,7 +102,7 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) int EventCenter::init(int n, unsigned i, const std::string &t) { // can't init multi times - assert(nevent == 0); + ceph_assert(nevent == 0); type = t; idx = i; @@ -193,19 +193,19 @@ void EventCenter::set_owner() global_centers = &cct->lookup_or_create_singleton_object< EventCenter::AssociatedCenters>( "AsyncMessenger::EventCenter::global_center::" + type, true); - assert(global_centers); + ceph_assert(global_centers); global_centers->centers[idx] = this; if (driver->need_wakeup()) { notify_handler = new C_handle_notify(this, cct); int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); - assert(r == 0); + ceph_assert(r == 0); } } } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) { - assert(in_thread()); + ceph_assert(in_thread()); int r = 0; if (fd >= nevent) { int new_size = nevent << 2; @@ -234,7 +234,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) // add_event shouldn't report error, otherwise it must be a innermost bug! lderr(cct) << __func__ << " add event failed, ret=" << r << " fd=" << fd << " mask=" << mask << " original mask is " << event->mask << dendl; - assert(0 == "BUG!"); + ceph_assert(0 == "BUG!"); return r; } @@ -252,7 +252,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) void EventCenter::delete_file_event(int fd, int mask) { - assert(in_thread() && fd >= 0); + ceph_assert(in_thread() && fd >= 0); if (fd >= nevent) { ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent << "mask=" << mask << dendl; @@ -267,7 +267,7 @@ void EventCenter::delete_file_event(int fd, int mask) int r = driver->del_event(fd, event->mask, mask); if (r < 0) { // see create_file_event - assert(0 == "BUG!"); + ceph_assert(0 == "BUG!"); } if (mask & EVENT_READABLE && event->read_cb) { @@ -284,7 +284,7 @@ void EventCenter::delete_file_event(int fd, int mask) uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt) { - assert(in_thread()); + ceph_assert(in_thread()); uint64_t id = time_event_next_id++; ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl; @@ -301,7 +301,7 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef void EventCenter::delete_time_event(uint64_t id) { - assert(in_thread()); + ceph_assert(in_thread()); ldout(cct, 30) << __func__ << " id=" << id << dendl; if (id >= time_event_next_id || id == 0) return ; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index e2f2ca98432..abd27845c88 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -177,7 +177,7 @@ class EventCenter { int process_time_events(); FileEvent *_get_file_event(int fd) { - assert(fd < nevent); + ceph_assert(fd < nevent); return &file_events[fd]; } @@ -234,7 +234,7 @@ class EventCenter { delete this; } void wait() { - assert(!nonwait); + ceph_assert(!nonwait); std::unique_lock l(lock); while (!done) cond.wait(l); @@ -244,9 +244,9 @@ class EventCenter { public: template void submit_to(int i, func &&f, bool nowait = false) { - assert(i < MAX_EVENTCENTER && global_centers); + ceph_assert(i < MAX_EVENTCENTER && global_centers); EventCenter *c = global_centers->centers[i]; - assert(c); + ceph_assert(c); if (!nowait && c->in_thread()) { f(); return ; diff --git a/src/msg/async/EventKqueue.cc b/src/msg/async/EventKqueue.cc index 026da802950..d6ba4a3db36 100644 --- a/src/msg/async/EventKqueue.cc +++ b/src/msg/async/EventKqueue.cc @@ -73,7 +73,7 @@ int KqueueDriver::test_thread_change(const char* funcname) { } else if ((kqfd != -1) && (test_kqfd() < 0)) { // should this ever happen? // It would be strange to change kqfd with thread change. - // Might nee to change this into an assert() in the future. + // Might nee to change this into an ceph_assert() in the future. ldout(cct,0) << funcname << " Warning: Recreating old kqfd. " << "This should not happen!!!" << dendl; kqfd = -1; @@ -203,7 +203,7 @@ int KqueueDriver::resize_events(int newsize) if (!sav_events) { lderr(cct) << __func__ << " unable to realloc memory: " << cpp_strerror(errno) << dendl; - assert(sav_events); + ceph_assert(sav_events); return -ENOMEM; } memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max)); diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 410bcc4b52b..5dd8bb0bcae 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -182,7 +182,7 @@ class PosixServerSocketImpl : public ServerSocketImpl { }; int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { - assert(sock); + ceph_assert(sock); sockaddr_storage ss; socklen_t slen = sizeof(ss); int sd = ::accept(_fd, (sockaddr*)&ss, &slen); @@ -203,7 +203,7 @@ int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &op return -errno; } - assert(NULL != out); //out should not be NULL in accept connection + ceph_assert(NULL != out); //out should not be NULL in accept connection out->set_type(addr_type); out->set_sockaddr((sockaddr*)&ss); diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index 0fb00a8537b..e70fa650b0e 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -52,7 +52,7 @@ class PosixNetworkStack : public NetworkStack { threads[i] = std::thread(func); } void join_worker(unsigned i) override { - assert(threads.size() > i && threads[i].joinable()); + ceph_assert(threads.size() > i && threads[i].joinable()); threads[i].join(); } }; diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 9eeb9bd9107..ef6ead8a419 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -103,7 +103,7 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) { - assert(cct->_conf->ms_async_op_threads > 0); + ceph_assert(cct->_conf->ms_async_op_threads > 0); const uint64_t InitEventNumber = 5000; num_workers = cct->_conf->ms_async_op_threads; @@ -164,7 +164,7 @@ Worker* NetworkStack::get_worker() } pool_spin.unlock(); - assert(current_best); + ceph_assert(current_best); ++current_best->references; return current_best; } @@ -208,7 +208,7 @@ void NetworkStack::drain() pool_spin.lock(); C_drain drain(num_workers); for (unsigned i = 0; i < num_workers; ++i) { - assert(cur != workers[i]->center.get_owner()); + ceph_assert(cur != workers[i]->center.get_owner()); workers[i]->center.dispatch_event_external(EventCallbackRef(&drain)); } pool_spin.unlock(); diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 3835b8483c6..32f9a5b2ee0 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -260,7 +260,7 @@ class Worker { PerfCounters *get_perf_counter() { return perf_logger; } void release_worker() { int oldref = references.fetch_sub(1); - assert(oldref > 0); + ceph_assert(oldref > 0); } void init_done() { init_lock.lock(); diff --git a/src/msg/async/dpdk/DPDK.cc b/src/msg/async/dpdk/DPDK.cc index f556cda890f..5fc2cf938d1 100644 --- a/src/msg/async/dpdk/DPDK.cc +++ b/src/msg/async/dpdk/DPDK.cc @@ -151,7 +151,7 @@ static constexpr uint8_t packet_read_size = 32; int DPDKDevice::init_port_start() { - assert(_port_idx < rte_eth_dev_count()); + ceph_assert(_port_idx < rte_eth_dev_count()); rte_eth_dev_info_get(_port_idx, &_dev_info); @@ -263,7 +263,7 @@ int DPDKDevice::init_port_start() if (_num_queues > 1) { if (_dev_info.reta_size) { // RETA size should be a power of 2 - assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0); + ceph_assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0); // Set the RSS table to the correct size _redir_table.resize(_dev_info.reta_size); @@ -301,7 +301,7 @@ int DPDKDevice::init_port_start() // all together. If this assumption breaks we need to rework the below logic // by splitting the csum offload feature bit into separate bits for IPv4, // TCP. - assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) && + ceph_assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) || (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) && !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM))); @@ -329,7 +329,7 @@ int DPDKDevice::init_port_start() // or not set all together. If this assumption breaks we need to rework the // below logic by splitting the csum offload feature bit into separate bits // for TCP. - assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) || + ceph_assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) || !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)); if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) { @@ -443,7 +443,7 @@ int DPDKDevice::init_port_fini() } void DPDKQueuePair::configure_proxies(const std::map& cpu_weights) { - assert(!cpu_weights.empty()); + ceph_assert(!cpu_weights.empty()); if (cpu_weights.size() == 1 && cpu_weights.begin()->first == _qid) { // special case queue sending to self only, to avoid requiring a hash value return; @@ -518,10 +518,10 @@ bool DPDKQueuePair::init_rx_mbuf_pool() std::string mz_name = "rx_buffer_data" + std::to_string(_qid); const struct rte_memzone *mz = rte_memzone_reserve_aligned(mz_name.c_str(), mbuf_data_size*bufs_count, _pktmbuf_pool_rx->socket_id, mz_flags, mbuf_data_size); - assert(mz); + ceph_assert(mz); void* m = mz->addr; for (int i = 0; i < bufs_count; i++) { - assert(m); + ceph_assert(m); _alloc_bufs.push_back(m); m += mbuf_data_size; } @@ -781,14 +781,14 @@ bool DPDKQueuePair::rx_gc(bool force) (void **)_rx_free_bufs.data(), _rx_free_bufs.size()); - // TODO: assert() in a fast path! Remove me ASAP! - assert(_num_rx_free_segs >= _rx_free_bufs.size()); + // TODO: ceph_assert() in a fast path! Remove me ASAP! + ceph_assert(_num_rx_free_segs >= _rx_free_bufs.size()); _num_rx_free_segs -= _rx_free_bufs.size(); _rx_free_bufs.clear(); - // TODO: assert() in a fast path! Remove me ASAP! - assert((_rx_free_pkts.empty() && !_num_rx_free_segs) || + // TODO: ceph_assert() in a fast path! Remove me ASAP! + ceph_assert((_rx_free_pkts.empty() && !_num_rx_free_segs) || (!_rx_free_pkts.empty() && _num_rx_free_segs)); } } @@ -1020,7 +1020,7 @@ void DPDKQueuePair::tx_buf::set_cluster_offload_info(const Packet& p, const DPDK head->l3_len = oi.ip_hdr_len; if (oi.tso_seg_size) { - assert(oi.needs_ip_csum); + ceph_assert(oi.needs_ip_csum); head->ol_flags |= PKT_TX_TCP_SEG; head->l4_len = oi.tcp_hdr_len; head->tso_segsz = oi.tso_seg_size; @@ -1139,7 +1139,7 @@ void DPDKQueuePair::tx_buf::copy_packet_to_cluster(const Packet& p, rte_mbuf* he cur_seg_offset = 0; // FIXME: assert in a fast-path - remove!!! - assert(cur_seg); + ceph_assert(cur_seg); } } } diff --git a/src/msg/async/dpdk/DPDK.h b/src/msg/async/dpdk/DPDK.h index 04127f4aaae..1feb661ef32 100644 --- a/src/msg/async/dpdk/DPDK.h +++ b/src/msg/async/dpdk/DPDK.h @@ -187,8 +187,8 @@ class DPDKQueuePair { rte_mbuf* m; - // TODO: assert() in a fast path! Remove me ASAP! - assert(frag.size); + // TODO: ceph_assert() in a fast path! Remove me ASAP! + ceph_assert(frag.size); // Create a HEAD of mbufs' cluster and set the first bytes into it len = do_one_buf(qp, head, base, left_to_set); @@ -291,7 +291,7 @@ class DPDKQueuePair { if (!pa) return copy_one_data_buf(qp, m, va, buf_len); - assert(buf_len); + ceph_assert(buf_len); tx_buf* buf = qp.get_tx_buf(); if (!buf) { return 0; @@ -550,8 +550,8 @@ class DPDKQueuePair { uint32_t _send(circular_buffer& pb, Func &&packet_to_tx_buf_p) { if (_tx_burst.size() == 0) { for (auto&& p : pb) { - // TODO: assert() in a fast path! Remove me ASAP! - assert(p.len()); + // TODO: ceph_assert() in a fast path! Remove me ASAP! + ceph_assert(p.len()); tx_buf* buf = packet_to_tx_buf_p(std::move(p)); if (!buf) { @@ -856,11 +856,11 @@ class DPDKDevice { return _redir_table[hash & (_redir_table.size() - 1)]; } void set_local_queue(unsigned i, std::unique_ptr qp) { - assert(!_queues[i]); + ceph_assert(!_queues[i]); _queues[i] = std::move(qp); } void unset_local_queue(unsigned i) { - assert(_queues[i]); + ceph_assert(_queues[i]); _queues[i].reset(); } template @@ -869,7 +869,7 @@ class DPDKDevice { if (!qp._sw_reta) return src_cpuid; - assert(!qp._sw_reta); + ceph_assert(!qp._sw_reta); auto hash = hashfn() >> _rss_table_bits; auto& reta = *qp._sw_reta; return reta[hash % reta.size()]; diff --git a/src/msg/async/dpdk/DPDKStack.cc b/src/msg/async/dpdk/DPDKStack.cc index bb2f807f4bb..fabfb024129 100644 --- a/src/msg/async/dpdk/DPDKStack.cc +++ b/src/msg/async/dpdk/DPDKStack.cc @@ -94,7 +94,7 @@ void DPDKWorker::initialize() while (create_stage <= WAIT_DEVICE_STAGE) cond.Wait(lock); } - assert(sdev); + ceph_assert(sdev); if (i < sdev->hw_queues_count()) { auto qp = sdev->init_local_queue(cct, ¢er, cct->_conf->ms_dpdk_hugepages, i); std::map cpu_weights; @@ -200,8 +200,8 @@ DPDKWorker::Impl::Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt, ServerSocket *sock) { - assert(sa.get_family() == AF_INET); - assert(sock); + ceph_assert(sa.get_family() == AF_INET); + ceph_assert(sock); ldout(cct, 10) << __func__ << " addr " << sa << dendl; // vector tuples; @@ -231,7 +231,7 @@ int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt, int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - // assert(addr.get_family() == AF_INET); + // ceph_assert(addr.get_family() == AF_INET); int r = tcpv4_connect(_impl->_inet.get_tcp(), addr, socket); ldout(cct, 10) << __func__ << " addr " << addr << dendl; return r; @@ -250,7 +250,7 @@ void DPDKStack::spawn_worker(unsigned i, std::function &&func) } // if dpdk::eal::init already called by NVMEDevice, we will select 1..n // cores - assert(rte_lcore_count() >= i + 1); + ceph_assert(rte_lcore_count() >= i + 1); unsigned core_id; RTE_LCORE_FOREACH_SLAVE(core_id) { if (i-- == 0) { diff --git a/src/msg/async/dpdk/DPDKStack.h b/src/msg/async/dpdk/DPDKStack.h index 843975d0566..3ccf2a22e59 100644 --- a/src/msg/async/dpdk/DPDKStack.h +++ b/src/msg/async/dpdk/DPDKStack.h @@ -123,7 +123,7 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { } else { _cur_off += f.size; } - assert(data.length()); + ceph_assert(data.length()); return data.length(); } virtual ssize_t send(bufferlist &bl, bool more) override { diff --git a/src/msg/async/dpdk/Packet.h b/src/msg/async/dpdk/Packet.h index b22492db865..db9cd2a7649 100644 --- a/src/msg/async/dpdk/Packet.h +++ b/src/msg/async/dpdk/Packet.h @@ -139,7 +139,7 @@ class Packet { return copy(old.get(), std::max(old->_nr_frags + extra_frags, 2 * old->_nr_frags)); } void* operator new(size_t size, size_t nr_frags = default_nr_frags) { - assert(nr_frags == uint16_t(nr_frags)); + ceph_assert(nr_frags == uint16_t(nr_frags)); return ::operator new(size + nr_frags * sizeof(fragment)); } // Matching the operator new above @@ -295,7 +295,7 @@ inline Packet::impl::impl(size_t nr_frags) inline Packet::impl::impl(fragment frag, size_t nr_frags) : _len(frag.size), _allocated_frags(nr_frags) { - assert(_allocated_frags > _nr_frags); + ceph_assert(_allocated_frags > _nr_frags); if (frag.size <= internal_data_size) { headroom -= frag.size; frags[0] = { data + headroom, frag.size }; @@ -458,7 +458,7 @@ inline Header* Packet::get_header(size_t offset) { } inline void Packet::trim_front(size_t how_much) { - assert(how_much <= _impl->_len); + ceph_assert(how_much <= _impl->_len); _impl->_len -= how_much; size_t i = 0; while (how_much && how_much >= _impl->frags[i].size) { @@ -479,7 +479,7 @@ inline void Packet::trim_front(size_t how_much) { } inline void Packet::trim_back(size_t how_much) { - assert(how_much <= _impl->_len); + ceph_assert(how_much <= _impl->_len); _impl->_len -= how_much; size_t i = _impl->_nr_frags - 1; while (how_much && how_much >= _impl->frags[i].size) { @@ -542,7 +542,7 @@ inline Packet Packet::share(size_t offset, size_t len) { offset = 0; } n._impl->_offload_info = _impl->_offload_info; - assert(!n._impl->_deleter); + ceph_assert(!n._impl->_deleter); n._impl->_deleter = _impl->_deleter.share(); return n; } diff --git a/src/msg/async/dpdk/TCP.cc b/src/msg/async/dpdk/TCP.cc index d215a4c105b..dec159b6cc1 100644 --- a/src/msg/async/dpdk/TCP.cc +++ b/src/msg/async/dpdk/TCP.cc @@ -112,7 +112,7 @@ uint8_t tcp_option::fill(tcp_hdr* th, uint8_t options_size) new (off) tcp_option::eol; size += option_len::eol; } - assert(size == options_size); + ceph_assert(size == options_size); return size; } diff --git a/src/msg/async/dpdk/TCP.h b/src/msg/async/dpdk/TCP.h index c1d64bc5257..9445dbce536 100644 --- a/src/msg/async/dpdk/TCP.h +++ b/src/msg/async/dpdk/TCP.h @@ -1235,7 +1235,7 @@ Tub tcp::tcb::read() { template int tcp::tcb::send(Packet p) { // We can not send after the connection is closed - assert(!_snd.closed); + ceph_assert(!_snd.closed); if (in_state(CLOSED)) return -ECONNRESET; @@ -1460,7 +1460,7 @@ Tub tcp::tcb::get_packet() { return p; } - assert(!_packetq.empty()); + ceph_assert(!_packetq.empty()); p = std::move(_packetq.front()); _packetq.pop_front(); diff --git a/src/msg/async/dpdk/UserspaceEvent.cc b/src/msg/async/dpdk/UserspaceEvent.cc index 25b082d613b..fce6588757c 100644 --- a/src/msg/async/dpdk/UserspaceEvent.cc +++ b/src/msg/async/dpdk/UserspaceEvent.cc @@ -34,7 +34,7 @@ int UserspaceEventManager::get_eventfd() } Tub &impl = fds[fd]; - assert(!impl); + ceph_assert(!impl); impl.construct(); ldout(cct, 20) << __func__ << " fd=" << fd << dendl; return fd; @@ -88,7 +88,7 @@ void UserspaceEventManager::close(int fd) if (impl->activating_mask) { if (waiting_fds[max_wait_idx] == fd) { - assert(impl->waiting_idx == max_wait_idx); + ceph_assert(impl->waiting_idx == max_wait_idx); --max_wait_idx; } waiting_fds[impl->waiting_idx] = -1; @@ -101,7 +101,7 @@ int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct int fd; uint32_t i = 0; int count = 0; - assert(num_events); + ceph_assert(num_events); // leave zero slot for waiting_fds while (i < max_wait_idx) { fd = waiting_fds[++i]; @@ -110,9 +110,9 @@ int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct events[count] = fd; Tub &impl = fds[fd]; - assert(impl); + ceph_assert(impl); masks[count] = impl->listening_mask & impl->activating_mask; - assert(masks[count]); + ceph_assert(masks[count]); ldout(cct, 20) << __func__ << " fd=" << fd << " mask=" << masks[count] << dendl; impl->activating_mask &= (~masks[count]); impl->waiting_idx = 0; diff --git a/src/msg/async/dpdk/UserspaceEvent.h b/src/msg/async/dpdk/UserspaceEvent.h index 75f3abf78ab..01ef0dc6643 100644 --- a/src/msg/async/dpdk/UserspaceEvent.h +++ b/src/msg/async/dpdk/UserspaceEvent.h @@ -81,7 +81,7 @@ class UserspaceEventManager { impl->listening_mask &= (~mask); if (!(impl->activating_mask & impl->listening_mask) && impl->waiting_idx) { if (waiting_fds[max_wait_idx] == fd) { - assert(impl->waiting_idx == max_wait_idx); + ceph_assert(impl->waiting_idx == max_wait_idx); --max_wait_idx; } waiting_fds[impl->waiting_idx] = -1; diff --git a/src/msg/async/dpdk/ethernet.h b/src/msg/async/dpdk/ethernet.h index 5f3dec9ef10..858df89a473 100644 --- a/src/msg/async/dpdk/ethernet.h +++ b/src/msg/async/dpdk/ethernet.h @@ -37,7 +37,7 @@ struct ethernet_address { } ethernet_address(std::initializer_list eaddr) { - assert(eaddr.size() == mac.size()); + ceph_assert(eaddr.size() == mac.size()); std::copy(eaddr.begin(), eaddr.end(), mac.begin()); } diff --git a/src/msg/async/dpdk/net.cc b/src/msg/async/dpdk/net.cc index 8b5c970e068..29e341195d2 100644 --- a/src/msg/async/dpdk/net.cc +++ b/src/msg/async/dpdk/net.cc @@ -81,7 +81,7 @@ subscription interface::register_l3( std::function forward) { auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward))); - assert(i.second); + ceph_assert(i.second); l3_rx_stream& l3_rx = i.first->second; return l3_rx.packet_stream.listen(std::move(next)); } diff --git a/src/msg/async/dpdk/net.h b/src/msg/async/dpdk/net.h index 2104e5d31e4..63f0422b72c 100644 --- a/src/msg/async/dpdk/net.h +++ b/src/msg/async/dpdk/net.h @@ -55,7 +55,7 @@ class forward_hash { return end_idx; } void push_back(uint8_t b) { - assert(end_idx < sizeof(data)); + ceph_assert(end_idx < sizeof(data)); data[end_idx++] = b; } void push_back(uint16_t b) { diff --git a/src/msg/async/dpdk/stream.h b/src/msg/async/dpdk/stream.h index a120bd17fc8..1898e8f8628 100644 --- a/src/msg/async/dpdk/stream.h +++ b/src/msg/async/dpdk/stream.h @@ -117,7 +117,7 @@ class subscription { next_fn _next; private: explicit subscription(stream* s): _stream(s) { - assert(!_stream->_sub); + ceph_assert(!_stream->_sub); _stream->_sub = this; } diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index 3458bd2b538..e64f50a4862 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -148,7 +148,7 @@ void Device::binding_port(CephContext *cct, int port_num) { } if (nullptr == active_port) { lderr(cct) << __func__ << " port not found" << dendl; - assert(active_port); + ceph_assert(active_port); } } @@ -213,7 +213,7 @@ int Infiniband::QueuePair::init() return -1; } } else { - assert(cm_id->verbs == pd->context); + ceph_assert(cm_id->verbs == pd->context); if (rdma_create_qp(cm_id, pd, &qpia)) { lderr(cct) << __func__ << " failed to create queue pair with rdmacm library" << cpp_strerror(errno) << dendl; @@ -382,7 +382,7 @@ Infiniband::CompletionChannel::~CompletionChannel() int r = ibv_destroy_comp_channel(channel); if (r < 0) lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; - assert(r == 0); + ceph_assert(r == 0); } } @@ -439,7 +439,7 @@ Infiniband::CompletionQueue::~CompletionQueue() int r = ibv_destroy_cq(cq); if (r < 0) lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; - assert(r == 0); + ceph_assert(r == 0); } } @@ -587,7 +587,7 @@ Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) Infiniband::MemoryManager::Cluster::~Cluster() { int r = ibv_dereg_mr(chunk_base->mr); - assert(r == 0); + ceph_assert(r == 0); const auto chunk_end = chunk_base + num_chunk; for (auto chunk = chunk_base; chunk != chunk_end; chunk++) { chunk->~Chunk(); @@ -599,18 +599,18 @@ Infiniband::MemoryManager::Cluster::~Cluster() int Infiniband::MemoryManager::Cluster::fill(uint32_t num) { - assert(!base); + ceph_assert(!base); num_chunk = num; uint32_t bytes = buffer_size * num; base = (char*)manager.malloc(bytes); end = base + bytes; - assert(base); + ceph_assert(base); chunk_base = static_cast(::malloc(sizeof(Chunk) * num)); memset(static_cast(chunk_base), 0, sizeof(Chunk) * num); free_chunks.reserve(num); ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); - assert(m); + ceph_assert(m); Chunk* chunk = chunk_base; for (uint32_t offset = 0; offset < bytes; offset += buffer_size){ new(chunk) Chunk(m, buffer_size, base+offset); @@ -717,7 +717,7 @@ char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes) MemoryManager *manager; CephContext *cct; - assert(g_ctx); + ceph_assert(g_ctx); manager = g_ctx->manager; cct = manager->cct; rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size; @@ -812,7 +812,7 @@ void Infiniband::MemoryManager::huge_pages_free(void *ptr) if (ptr == NULL) return; void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; size_t real_size = *((size_t *)real_ptr); - assert(real_size % HUGE_PAGE_SIZE == 0); + ceph_assert(real_size % HUGE_PAGE_SIZE == 0); if (real_size != 0) munmap(real_ptr, real_size); else @@ -838,8 +838,8 @@ void Infiniband::MemoryManager::free(void *ptr) void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num) { - assert(device); - assert(pd); + ceph_assert(device); + ceph_assert(pd); send = new Cluster(*this, size); send->fill(tx_num); @@ -907,11 +907,11 @@ void Infiniband::init() initialized = true; device = device_list->get_device(device_name.c_str()); - assert(device); + ceph_assert(device); device->binding_port(cct, port_num); ib_physical_port = device->active_port->get_port_num(); pd = new ProtectionDomain(cct, device); - assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); + ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); support_srq = cct->_conf->ms_async_rdma_support_srq; if (support_srq) @@ -1050,11 +1050,11 @@ int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp) ibv_recv_wr *badworkrequest; if (support_srq) { ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest); - assert(ret == 0); + ceph_assert(ret == 0); } else { - assert(qp); + ceph_assert(qp); ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest); - assert(ret == 0); + ceph_assert(ret == 0); } return i; } @@ -1172,7 +1172,7 @@ Infiniband::QueuePair::~QueuePair() { if (qp) { ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl; - assert(!ibv_destroy_qp(qp)); + ceph_assert(!ibv_destroy_qp(qp)); } } diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 8b4684bcf4e..319130ff861 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -81,7 +81,7 @@ class Device { ~Device() { if (active_port) { delete active_port; - assert(ibv_close_device(ctxt) == 0); + ceph_assert(ibv_close_device(ctxt) == 0); } } const char* get_name() { return name;} @@ -122,7 +122,7 @@ class DeviceList { } Device* get_device(const char* device_name) { - assert(devices); + ceph_assert(devices); for (int i = 0; i < num; ++i) { if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) { return devices[i]; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 10ff2416c89..ebf328706c4 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -222,7 +222,7 @@ void RDMAConnectedSocketImpl::handle_connection() { << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; if (!connected) { r = activate(); - assert(!r); + ceph_assert(!r); } notify(); r = infiniband->send_msg(cct, tcp_fd, my_msg); @@ -238,7 +238,7 @@ void RDMAConnectedSocketImpl::handle_connection() { return ; } r = activate(); - assert(!r); + ceph_assert(!r); r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; @@ -294,7 +294,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; for (size_t i = 0; i < cqe.size(); ++i) { ibv_wc* response = &cqe[i]; - assert(response->status == IBV_WC_SUCCESS); + ceph_assert(response->status == IBV_WC_SUCCESS); Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; chunk->prepare_read(response->byte_len); @@ -450,7 +450,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) auto fill_tx_via_copy = [this](std::vector &tx_buffers, unsigned bytes, std::list::const_iterator &start, std::list::const_iterator &end) -> unsigned { - assert(start != end); + ceph_assert(start != end); auto chunk_idx = tx_buffers.size(); int ret = worker->get_reged_mem(this, tx_buffers, bytes); if (ret == 0) { @@ -477,7 +477,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) } ++start; } - assert(bytes == 0); + ceph_assert(bytes == 0); return total_copied; }; @@ -495,7 +495,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) goto sending; need_reserve_bytes = 0; } - assert(copy_it == it); + ceph_assert(copy_it == it); tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); total += it->length(); ++copy_it; @@ -510,7 +510,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) sending: if (total == 0) return -EAGAIN; - assert(total <= pending_bl.length()); + ceph_assert(total <= pending_bl.length()); bufferlist swapped; if (total < pending_bl.length()) { worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); @@ -622,7 +622,7 @@ void RDMAConnectedSocketImpl::notify() // write argument must be a 64bit integer uint64_t i = 1; - assert(sizeof(i) == write(notify_fd, &i, sizeof(i))); + ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i))); } void RDMAConnectedSocketImpl::shutdown() diff --git a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc index d687ca8c769..d5762d6eb5c 100644 --- a/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc @@ -142,7 +142,7 @@ void RDMAIWARPConnectedSocketImpl::handle_cm_connection() { break; default: - assert(0 == "unhandled event"); + ceph_assert(0 == "unhandled event"); break; } rdma_ack_cm_event(event); diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc index cf7c5144ea7..f0f82f53f93 100644 --- a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc @@ -49,13 +49,13 @@ int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions { ldout(cct, 15) << __func__ << dendl; - assert(sock); + ceph_assert(sock); struct pollfd pfd = { .fd = cm_channel->fd, .events = POLLIN, }; int ret = poll(&pfd, 1, 0); - assert(ret >= 0); + ceph_assert(ret >= 0); if (!ret) return -EAGAIN; diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index d87fdee5823..b3bf46b92f3 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -80,7 +80,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt { ldout(cct, 15) << __func__ << dendl; - assert(sock); + ceph_assert(sock); sockaddr_storage ss; socklen_t slen = sizeof(ss); @@ -102,7 +102,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt return -errno; } - assert(NULL != out); //out should not be NULL in accept connection + ceph_assert(NULL != out); //out should not be NULL in accept connection out->set_type(addr_type); out->set_sockaddr((sockaddr*)&ss); diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 7b5400a13d6..9c05f23be9d 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -34,10 +34,10 @@ RDMADispatcher::~RDMADispatcher() ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl; polling_stop(); - assert(qp_conns.empty()); - assert(num_qp_conn == 0); - assert(dead_queue_pairs.empty()); - assert(num_dead_queue_pair == 0); + ceph_assert(qp_conns.empty()); + ceph_assert(num_qp_conn == 0); + ceph_assert(dead_queue_pairs.empty()); + ceph_assert(num_dead_queue_pair == 0); delete async_handler; } @@ -87,13 +87,13 @@ void RDMADispatcher::polling_start() get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger); tx_cc = get_stack()->get_infiniband().create_comp_channel(cct); - assert(tx_cc); + ceph_assert(tx_cc); rx_cc = get_stack()->get_infiniband().create_comp_channel(cct); - assert(rx_cc); + ceph_assert(rx_cc); tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc); - assert(tx_cq); + ceph_assert(tx_cq); rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc); - assert(rx_cq); + ceph_assert(rx_cq); t = std::thread(&RDMADispatcher::polling, this); ceph_pthread_setname(t.native_handle(), "rdma-polling"); @@ -205,7 +205,7 @@ void RDMADispatcher::polling() Chunk* chunk = reinterpret_cast(response->wr_id); if (response->status == IBV_WC_SUCCESS) { - assert(wc[i].opcode == IBV_WC_RECV); + ceph_assert(wc[i].opcode == IBV_WC_RECV); conn = get_conn_lockless(response->qp_num); if (!conn) { ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl; @@ -322,7 +322,7 @@ void RDMADispatcher::notify_pending_workers() { void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) { Mutex::Locker l(lock); - assert(!qp_conns.count(qp->get_local_qp_number())); + ceph_assert(!qp_conns.count(qp->get_local_qp_number())); qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); ++num_qp_conn; } @@ -528,9 +528,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) { - assert(center.in_thread()); + ceph_assert(center.in_thread()); int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes); - assert(r >= 0); + ceph_assert(r >= 0); size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r; ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl; stack->get_dispatcher().inflight += r; @@ -599,6 +599,6 @@ void RDMAStack::spawn_worker(unsigned i, std::function &&func) void RDMAStack::join_worker(unsigned i) { - assert(threads.size() > i && threads[i].joinable()); + ceph_assert(threads.size() > i && threads[i].joinable()); threads[i].join(); } diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index e38284d4552..f363d2fefed 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -153,7 +153,7 @@ class RDMAWorker : public Worker { RDMAStack *get_stack() { return stack; } int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); void remove_pending_conn(RDMAConnectedSocketImpl *o) { - assert(center.in_thread()); + ceph_assert(center.in_thread()); pending_sent_conns.remove(o); } void handle_pending_message(); diff --git a/src/msg/simple/Accepter.cc b/src/msg/simple/Accepter.cc index eadcffbb6cf..72cbc421dc4 100644 --- a/src/msg/simple/Accepter.cc +++ b/src/msg/simple/Accepter.cc @@ -237,7 +237,7 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) !bind_addr.is_blank_ip()) msgr->learned_addr(bind_addr); else - assert(msgr->get_need_addr()); // should still be true. + ceph_assert(msgr->get_need_addr()); // should still be true. if (msgr->get_myaddr().get_port() == 0) { msgr->set_myaddrs(entity_addrvec_t(listen_addr)); diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index be128bdaadf..d8f595eade5 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -172,8 +172,8 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con) Pipe::~Pipe() { - assert(out_q.empty()); - assert(sent.empty()); + ceph_assert(out_q.empty()); + ceph_assert(sent.empty()); delete delay_thread; delete[] recv_buf; } @@ -194,8 +194,8 @@ void Pipe::handle_ack(uint64_t seq) void Pipe::start_reader() { - assert(pipe_lock.is_locked()); - assert(!reader_running); + ceph_assert(pipe_lock.is_locked()); + ceph_assert(!reader_running); if (reader_needs_join) { reader_thread.join(); reader_needs_join = false; @@ -218,8 +218,8 @@ void Pipe::maybe_start_delay_thread() void Pipe::start_writer() { - assert(pipe_lock.is_locked()); - assert(!writer_running); + ceph_assert(pipe_lock.is_locked()); + ceph_assert(!writer_running); writer_running = true; writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes); } @@ -316,8 +316,8 @@ void Pipe::DelayedDelivery::stop_fast_dispatching() { int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; - assert(pipe_lock.is_locked()); - assert(state == STATE_ACCEPTING); + ceph_assert(pipe_lock.is_locked()); + ceph_assert(state == STATE_ACCEPTING); pipe_lock.Unlock(); @@ -534,7 +534,7 @@ int Pipe::accept() ldout(msgr->cct,0) << "accept: challenging authorizer " << authorizer_reply.length() << " bytes" << dendl; - assert(authorizer_reply.length()); + ceph_assert(authorizer_reply.length()); reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER; } else { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; @@ -651,21 +651,21 @@ int Pipe::accept() << " " << existing << ".cseq=" << existing->connect_seq << " == " << connect.connect_seq << dendl; - assert(existing->state == STATE_CONNECTING || + ceph_assert(existing->state == STATE_CONNECTING || existing->state == STATE_WAIT); goto replace; } else { // our existing outgoing wins ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > msgr->my_addr); + ceph_assert(peer_addr > msgr->my_addr); if (!(existing->state == STATE_CONNECTING)) lderr(msgr->cct) << "accept race bad state, would send wait, existing=" << existing->get_state_name() << " " << existing << ".cseq=" << existing->connect_seq << " == " << connect.connect_seq << dendl; - assert(existing->state == STATE_CONNECTING); + ceph_assert(existing->state == STATE_CONNECTING); // make sure our outgoing connection will follow through existing->_send_keepalive(); reply.tag = CEPH_MSGR_TAG_WAIT; @@ -675,8 +675,8 @@ int Pipe::accept() } } - assert(connect.connect_seq > existing->connect_seq); - assert(connect.global_seq >= existing->peer_global_seq); + ceph_assert(connect.connect_seq > existing->connect_seq); + ceph_assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other existing->connect_seq == 0) { ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq @@ -708,8 +708,8 @@ int Pipe::accept() ceph_abort(); retry_session: - assert(existing->pipe_lock.is_locked()); - assert(pipe_lock.is_locked()); + ceph_assert(existing->pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq + 1; existing->pipe_lock.Unlock(); @@ -717,7 +717,7 @@ int Pipe::accept() goto reply; reply: - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); pipe_lock.Unlock(); @@ -732,8 +732,8 @@ int Pipe::accept() } replace: - assert(existing->pipe_lock.is_locked()); - assert(pipe_lock.is_locked()); + ceph_assert(existing->pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { reply_tag = CEPH_MSGR_TAG_SEQ; @@ -746,7 +746,7 @@ int Pipe::accept() if (existing->policy.lossy) { // disconnect from the Connection - assert(existing->connection_state); + ceph_assert(existing->connection_state); if (existing->connection_state->clear_pipe(existing)) msgr->dispatch_queue.queue_reset(existing->connection_state.get()); } else { @@ -792,10 +792,10 @@ int Pipe::accept() open: // open - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; - assert(state == STATE_ACCEPTING); + ceph_assert(state == STATE_ACCEPTING); state = STATE_OPEN; ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; @@ -826,7 +826,7 @@ int Pipe::accept() if (msgr->dispatch_queue.stop) goto shutting_down; removed = msgr->accepting_pipes.erase(this); - assert(removed == 1); + ceph_assert(removed == 1); register_pipe(); msgr->lock.Unlock(); pipe_lock.Unlock(); @@ -898,7 +898,7 @@ int Pipe::accept() shutting_down: msgr->lock.Unlock(); shutting_down_msgr_unlocked: - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; @@ -995,7 +995,7 @@ int Pipe::connect() bool got_bad_auth = false; ldout(msgr->cct,10) << "connect " << connect_seq << dendl; - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); __u32 cseq = connect_seq; __u32 gseq = msgr->get_global_seq(); @@ -1281,7 +1281,7 @@ int Pipe::connect() continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { - assert(reply.connect_seq > connect_seq); + ceph_assert(reply.connect_seq > connect_seq); ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq << " -> " << reply.connect_seq << dendl; cseq = connect_seq = reply.connect_seq; @@ -1315,10 +1315,10 @@ int Pipe::connect() << " vs out_seq " << out_seq << dendl; while (newly_acked_seq > out_seq) { Message *m = _get_next_outgoing(); - assert(m); + ceph_assert(m); ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq() << " " << *m << dendl; - assert(m->get_seq() <= newly_acked_seq); + ceph_assert(m->get_seq() <= newly_acked_seq); m->put(); ++out_seq; } @@ -1333,7 +1333,7 @@ int Pipe::connect() policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY; state = STATE_OPEN; connect_seq = cseq + 1; - assert(connect_seq == reply.connect_seq); + ceph_assert(connect_seq == reply.connect_seq); backoff = utime_t(); connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features); ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy @@ -1395,15 +1395,15 @@ int Pipe::connect() void Pipe::register_pipe() { ldout(msgr->cct,10) << "register_pipe" << dendl; - assert(msgr->lock.is_locked()); + ceph_assert(msgr->lock.is_locked()); Pipe *existing = msgr->_lookup_pipe(peer_addr); - assert(existing == NULL); + ceph_assert(existing == NULL); msgr->rank_pipe[peer_addr] = this; } void Pipe::unregister_pipe() { - assert(msgr->lock.is_locked()); + ceph_assert(msgr->lock.is_locked()); ceph::unordered_map::iterator p = msgr->rank_pipe.find(peer_addr); if (p != msgr->rank_pipe.end() && p->second == this) { ldout(msgr->cct,10) << "unregister_pipe" << dendl; @@ -1490,7 +1490,7 @@ void Pipe::discard_out_queue() void Pipe::fault(bool onread) { const auto& conf = msgr->cct->_conf; - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); cond.Signal(); if (onread && state == STATE_CONNECTING) { @@ -1516,7 +1516,7 @@ void Pipe::fault(bool onread) // disconnect from Connection, and mark it failed. future messages // will be dropped. - assert(connection_state); + ceph_assert(connection_state); stop(); bool cleared = connection_state->clear_pipe(this); @@ -1595,7 +1595,7 @@ void Pipe::randomize_out_seq() void Pipe::was_session_reset() { - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); ldout(msgr->cct,10) << "was_session_reset" << dendl; in_q->discard_queue(conn_id); @@ -1614,7 +1614,7 @@ void Pipe::was_session_reset() void Pipe::stop() { ldout(msgr->cct,10) << "stop" << dendl; - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); state = STATE_CLOSED; state_closed = true; cond.Signal(); @@ -1623,7 +1623,7 @@ void Pipe::stop() void Pipe::stop_and_wait() { - assert(pipe_lock.is_locked_by_me()); + ceph_assert(pipe_lock.is_locked_by_me()); if (state != STATE_CLOSED) stop(); @@ -1655,13 +1655,13 @@ void Pipe::reader() if (state == STATE_ACCEPTING) { accept(); - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); } // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); // sleep if (re)connecting if (state == STATE_STANDBY) { @@ -1773,14 +1773,14 @@ void Pipe::reader() m->put(); if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && msgr->cct->_conf->ms_die_on_old_message) - assert(0 == "old msgs despite reconnect_seq feature"); + ceph_assert(0 == "old msgs despite reconnect_seq feature"); continue; } if (m->get_seq() > in_seq + 1) { ldout(msgr->cct,0) << "reader missed message? skipped from seq " << in_seq << " to " << m->get_seq() << dendl; if (msgr->cct->_conf->ms_die_on_skipped_message) - assert(0 == "skipped incoming seq"); + ceph_assert(0 == "skipped incoming seq"); } m->set_connection(connection_state.get()); @@ -1864,7 +1864,7 @@ void Pipe::writer() // connect? if (state == STATE_CONNECTING) { - assert(!policy.server); + ceph_assert(!policy.server); connect(); continue; } @@ -2417,7 +2417,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl; } - assert(donow > 0); + ceph_assert(donow > 0); ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off << " leftinchunk " << left << " buffer len " << pb->length() @@ -2439,7 +2439,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo msglen += donow; msg.msg_iovlen++; - assert(left >= donow); + ceph_assert(left >= donow); left -= donow; b_off += donow; bl_pos += donow; @@ -2450,7 +2450,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo b_off = 0; } } - assert(left == 0); + ceph_assert(left == 0); // send footer; if receiver doesn't support signatures, use the old footer format @@ -2661,7 +2661,7 @@ int Pipe::tcp_write(const char *buf, unsigned len) return -1; //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl; - assert(len > 0); + ceph_assert(len > 0); while (len > 0) { MSGR_SIGPIPE_STOPPER; int did = ::send( sd, buf, len, MSG_NOSIGNAL ); diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h index ce9925bd0d9..81245198460 100644 --- a/src/msg/simple/Pipe.h +++ b/src/msg/simple/Pipe.h @@ -228,17 +228,17 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); void stop_and_wait(); void _send(Message *m) { - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); out_q[m->get_priority()].push_back(m); cond.Signal(); } void _send_keepalive() { - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); send_keepalive = true; cond.Signal(); } Message *_get_next_outgoing() { - assert(pipe_lock.is_locked()); + ceph_assert(pipe_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map >::reverse_iterator p = out_q.rbegin(); diff --git a/src/msg/simple/PipeConnection.cc b/src/msg/simple/PipeConnection.cc index 96e27a4fd30..faa1ea9e043 100644 --- a/src/msg/simple/PipeConnection.cc +++ b/src/msg/simple/PipeConnection.cc @@ -74,7 +74,7 @@ bool PipeConnection::is_connected() int PipeConnection::send_message(Message *m) { - assert(msgr); + ceph_assert(msgr); return static_cast(msgr)->send_message(m, this); } diff --git a/src/msg/simple/SimpleMessenger.cc b/src/msg/simple/SimpleMessenger.cc index 5bd397c4d30..26e05a8f60a 100644 --- a/src/msg/simple/SimpleMessenger.cc +++ b/src/msg/simple/SimpleMessenger.cc @@ -63,9 +63,9 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name, */ SimpleMessenger::~SimpleMessenger() { - assert(!did_bind); // either we didn't bind or we shut down the Accepter - assert(rank_pipe.empty()); // we don't have any running Pipes. - assert(!reaper_started); // the reaper thread is stopped + ceph_assert(!did_bind); // either we didn't bind or we shut down the Accepter + ceph_assert(rank_pipe.empty()); // we don't have any running Pipes. + ceph_assert(!reaper_started); // the reaper thread is stopped } void SimpleMessenger::ready() @@ -151,7 +151,7 @@ bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) { bool ret = false; auto addr = addrs.legacy_addr(); - assert(my_addr == my_addrs->front()); + ceph_assert(my_addr == my_addrs->front()); if (my_addr.is_blank_ip()) { ldout(cct,1) << __func__ << " " << addr << dendl; entity_addr_t t = my_addr; @@ -164,7 +164,7 @@ bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs) } else { ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl; } - assert(my_addr == my_addrs->front()); + ceph_assert(my_addr == my_addrs->front()); return ret; } @@ -243,7 +243,7 @@ void SimpleMessenger::reaper_entry() void SimpleMessenger::reaper() { ldout(cct,10) << "reaper" << dendl; - assert(lock.is_locked()); + ceph_assert(lock.is_locked()); while (!pipe_reap_queue.empty()) { Pipe *p = pipe_reap_queue.front(); @@ -257,11 +257,11 @@ void SimpleMessenger::reaper() // or accept() may have switch the Connection to a different // Pipe... but make sure! bool cleared = p->connection_state->clear_pipe(p); - assert(!cleared); + ceph_assert(!cleared); } p->pipe_lock.Unlock(); p->unregister_pipe(); - assert(pipes.count(p)); + ceph_assert(pipes.count(p)); pipes.erase(p); // drop msgr lock while joining thread; the delay through could be @@ -295,7 +295,7 @@ bool SimpleMessenger::is_connected(Connection *con) if (con) { Pipe *p = static_cast(static_cast(con)->get_pipe()); if (p) { - assert(p->msgr == this); + ceph_assert(p->msgr == this); r = p->is_connected(); p->put(); } @@ -325,7 +325,7 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr) int SimpleMessenger::rebind(const set& avoid_ports) { ldout(cct,1) << "rebind avoid " << avoid_ports << dendl; - assert(did_bind); + ceph_assert(did_bind); accepter.stop(); mark_down_all(); return accepter.rebind(avoid_ports); @@ -338,7 +338,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr) return 0; Mutex::Locker l(lock); if (did_bind) { - assert(*my_addrs == entity_addrvec_t(bind_addr)); + ceph_assert(*my_addrs == entity_addrvec_t(bind_addr)); return 0; } if (started) { @@ -358,9 +358,9 @@ int SimpleMessenger::start() ldout(cct,1) << "messenger.start" << dendl; // register at least one entity, first! - assert(my_name.type() >= 0); + ceph_assert(my_name.type() >= 0); - assert(!started); + ceph_assert(!started); started = true; stopped = false; @@ -398,8 +398,8 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, PipeConnection *con, Message *first) { - assert(lock.is_locked()); - assert(addr != my_addr); + ceph_assert(lock.is_locked()); + ceph_assert(addr != my_addr); ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; @@ -560,7 +560,7 @@ int SimpleMessenger::send_keepalive(Connection *con) static_cast(con)->get_pipe()); if (pipe) { ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl; - assert(pipe->msgr == this); + ceph_assert(pipe->msgr == this); pipe->pipe_lock.Lock(); pipe->_send_keepalive(); pipe->pipe_lock.Unlock(); @@ -709,7 +709,7 @@ void SimpleMessenger::mark_down(Connection *con) Pipe *p = static_cast(static_cast(con)->get_pipe()); if (p) { ldout(cct,1) << "mark_down " << con << " -- " << p << dendl; - assert(p->msgr == this); + ceph_assert(p->msgr == this); p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); @@ -732,7 +732,7 @@ void SimpleMessenger::mark_disposable(Connection *con) Pipe *p = static_cast(static_cast(con)->get_pipe()); if (p) { ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl; - assert(p->msgr == this); + ceph_assert(p->msgr == this); p->pipe_lock.Lock(); p->policy.lossy = true; p->pipe_lock.Unlock(); diff --git a/src/msg/simple/SimpleMessenger.h b/src/msg/simple/SimpleMessenger.h index 3382e713121..0292f6c7684 100644 --- a/src/msg/simple/SimpleMessenger.h +++ b/src/msg/simple/SimpleMessenger.h @@ -111,7 +111,7 @@ public: * @{ */ void set_cluster_protocol(int p) override { - assert(!started && !did_bind); + ceph_assert(!started && !did_bind); cluster_protocol = p; } diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc index 25ac2154491..abe270a41db 100644 --- a/src/msg/xio/XioConnection.cc +++ b/src/msg/xio/XioConnection.cc @@ -185,7 +185,7 @@ void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp) struct ceph_timespec ts; if (ack) { - assert(tp); + ceph_assert(tp); tp->encode_timeval(&ts); xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); @@ -199,9 +199,9 @@ void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp) } const std::list& header = xcmd->get_bl_ref().buffers(); - assert(header.size() == 1); /* accelio header must be without scatter gather */ + ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */ list::const_iterator pb = header.begin(); - assert(pb->length() < XioMsgHdr::get_max_encoded_length()); + ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length()); struct xio_msg * msg = xcmd->get_xio_msg(); msg->out.header.iov_base = (char*) pb->c_str(); msg->out.header.iov_len = pb->length(); @@ -286,11 +286,11 @@ int XioConnection::handle_data_msg(struct xio_session *session, << " iov_len " << (int) tmsg->in.header.iov_len << " nents " << tmsg->in.pdata_iov.nents << " sn " << tmsg->sn << dendl; - assert(session == this->session); + ceph_assert(session == this->session); in_seq.set_count(msg_cnt.msg_cnt); } else { /* XXX major sequence error */ - assert(! tmsg->in.header.iov_len); + ceph_assert(! tmsg->in.header.iov_len); } in_seq.append(msg); @@ -540,7 +540,7 @@ int XioConnection::on_msg(struct xio_session *session, default: lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl; - assert(! "unsupported message tag"); + ceph_assert(! "unsupported message tag"); } xio_release_msg(msg); diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index d62713b9beb..722ac6dbd23 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -511,7 +511,7 @@ int XioMessenger::session_event(struct xio_session *session, xcon->conn = conn; xcon->portal = static_cast(xctxa.user_context); - assert(xcon->portal); + ceph_assert(xcon->portal); xcona.user_context = xcon; (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); @@ -769,7 +769,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, if (!!e) return NULL; XioMsg *xmsg = reinterpret_cast(mp_mem.addr); - assert(!!xmsg); + ceph_assert(!!xmsg); new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); return xmsg; } @@ -781,7 +781,7 @@ XioCommand* pool_alloc_xio_command(XioConnection *xcon) if (!!e) return NULL; XioCommand *xcmd = reinterpret_cast(mp_mem.addr); - assert(!!xcmd); + ceph_assert(!!xcmd); new (xcmd) XioCommand(xcon, mp_mem); return xcmd; } @@ -912,7 +912,7 @@ int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon) req = xmsg->get_xio_msg(); const std::list& header = xmsg->hdr.get_bl().buffers(); - assert(header.size() == 1); /* XXX */ + ceph_assert(header.size() == 1); /* XXX */ list::const_iterator pb = header.begin(); req->out.header.iov_base = (char*) pb->c_str(); req->out.header.iov_len = pb->length(); diff --git a/src/msg/xio/XioMsg.cc b/src/msg/xio/XioMsg.cc index 8c2d3d8ec06..4b6a5d687bc 100644 --- a/src/msg/xio/XioMsg.cc +++ b/src/msg/xio/XioMsg.cc @@ -32,7 +32,7 @@ int XioDispatchHook::release_msgs() /* merge with portal traffic */ xcon->portal->enqueue(xcon, xcmp); - assert(r); + ceph_assert(r); return r; } @@ -41,7 +41,7 @@ int XioDispatchHook::release_msgs() ceph_msg_footer _ceph_msg_footer; XioMsgHdr hdr (_ceph_msg_header, _ceph_msg_footer, 0 /* features */); const std::list& hdr_buffers = hdr.get_bl().buffers(); - assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */ + ceph_assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */ return hdr_buffers.begin()->length(); } diff --git a/src/msg/xio/XioPool.h b/src/msg/xio/XioPool.h index 6084ce85682..07fa73114ab 100644 --- a/src/msg/xio/XioPool.h +++ b/src/msg/xio/XioPool.h @@ -183,7 +183,7 @@ static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size, } // fall back to malloc on errors mp->addr = malloc(size); - assert(mp->addr); + ceph_assert(mp->addr); mp->length = 0; if (unlikely(XioPool::trace_mempool)) xp_stats.inc_overflow(); diff --git a/src/msg/xio/XioPortal.h b/src/msg/xio/XioPortal.h index c22d891b3ed..ad9d4df0b4d 100644 --- a/src/msg/xio/XioPortal.h +++ b/src/msg/xio/XioPortal.h @@ -145,7 +145,7 @@ public: /* a portal is an xio_context and event loop */ ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */); - assert(ctx && "Whoops, failed to create portal/ctx"); + ceph_assert(ctx && "Whoops, failed to create portal/ctx"); } int bind(struct xio_session_ops *ops, const string &base_uri, @@ -356,7 +356,7 @@ public: for (int i = 0; i < n; i++) { if (!portals[i]) { portals[i] = new XioPortal(msgr, nconns); - assert(portals[i] != nullptr); + ceph_assert(portals[i] != nullptr); } } } -- 2.39.5