]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: Use ceph_assert for asserts.
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 23 Aug 2018 15:26:01 +0000 (11:26 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 27 Aug 2018 14:27:22 +0000 (10:27 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
48 files changed:
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/Message.cc
src/msg/Messenger.h
src/msg/QueueStrategy.cc
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc
src/msg/async/Event.h
src/msg/async/EventKqueue.cc
src/msg/async/PosixStack.cc
src/msg/async/PosixStack.h
src/msg/async/Stack.cc
src/msg/async/Stack.h
src/msg/async/dpdk/DPDK.cc
src/msg/async/dpdk/DPDK.h
src/msg/async/dpdk/DPDKStack.cc
src/msg/async/dpdk/DPDKStack.h
src/msg/async/dpdk/Packet.h
src/msg/async/dpdk/TCP.cc
src/msg/async/dpdk/TCP.h
src/msg/async/dpdk/UserspaceEvent.cc
src/msg/async/dpdk/UserspaceEvent.h
src/msg/async/dpdk/ethernet.h
src/msg/async/dpdk/net.cc
src/msg/async/dpdk/net.h
src/msg/async/dpdk/stream.h
src/msg/async/rdma/Infiniband.cc
src/msg/async/rdma/Infiniband.h
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h
src/msg/simple/Accepter.cc
src/msg/simple/Pipe.cc
src/msg/simple/Pipe.h
src/msg/simple/PipeConnection.cc
src/msg/simple/SimpleMessenger.cc
src/msg/simple/SimpleMessenger.h
src/msg/xio/XioConnection.cc
src/msg/xio/XioMessenger.cc
src/msg/xio/XioMsg.cc
src/msg/xio/XioPool.h
src/msg/xio/XioPortal.h

index 53e112bed4bfc052d8fbc17438417cf831fb4e1e..5a943e309a5699cf977d2ddeb829110c68d891cb 100644 (file)
@@ -216,7 +216,7 @@ void DispatchQueue::discard_queue(uint64_t id) {
   for (list<QueueItem>::iterator i = removed.begin();
        i != removed.end();
        ++i) {
-    assert(!(i->is_code())); // We don't discard id 0, ever!
+    ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
     const Message::ref& m = i->get_message();
     remove_arrival(m);
     dispatch_throttle_release(m->get_dispatch_throttle_size());
@@ -225,8 +225,8 @@ void DispatchQueue::discard_queue(uint64_t id) {
 
 void DispatchQueue::start()
 {
-  assert(!stop);
-  assert(!dispatch_thread.is_started());
+  ceph_assert(!stop);
+  ceph_assert(!dispatch_thread.is_started());
   dispatch_thread.create("ms_dispatch");
   local_delivery_thread.create("ms_local");
 }
index 2c16056c21651ab6b014d9418b0eb2441f5e0274..30e6b654f66d36737c04d925eb91f05468595b31 100644 (file)
@@ -50,15 +50,15 @@ class DispatchQueue {
       return type != -1;
     }
     int get_code () const {
-      assert(is_code());
+      ceph_assert(is_code());
       return type;
     }
     const Message::ref& get_message() {
-      assert(!is_code());
+      ceph_assert(!is_code());
       return m;
     }
     Connection *get_connection() {
-      assert(is_code());
+      ceph_assert(is_code());
       return con.get();
     }
   };
@@ -82,7 +82,7 @@ class DispatchQueue {
   }
   void remove_arrival(const Message::ref& m) {
     auto it = marrival_map.find(m);
-    assert(it != marrival_map.end());
+    ceph_assert(it != marrival_map.end());
     marrival.erase(it->second);
     marrival_map.erase(it);
   }
@@ -234,9 +234,9 @@ class DispatchQueue {
       stop(false)
     {}
   ~DispatchQueue() {
-    assert(mqueue.empty());
-    assert(marrival.empty());
-    assert(local_messages.empty());
+    ceph_assert(mqueue.empty());
+    ceph_assert(marrival.empty());
+    ceph_assert(local_messages.empty());
   }
 };
 
index 27caab5e3ccfecc9cfd57cf2e42b2661442f760f..ac9a175b483ec6cc4939a833d71bd5507e41e241 100644 (file)
@@ -206,7 +206,7 @@ void Message::encode(uint64_t features, int crcflags)
 {
   // encode and copy out of *m
   if (empty_payload()) {
-    assert(middle.length() == 0);
+    ceph_assert(middle.length() == 0);
     encode_payload(features);
 
     if (byte_throttler) {
index 187223b03ed896e8145eae190646d27482d32c57..f0e0f8af4e0923507308e240a5b80e0dd2a69c48 100644 (file)
@@ -288,7 +288,7 @@ public:
    * @param p The cluster protocol to use. Defined externally.
    */
   void set_default_send_priority(int p) {
-    assert(!started);
+    ceph_assert(!started);
     default_send_priority = p;
   }
   /**
@@ -566,16 +566,16 @@ public:
       } else {
        blocked = true;
        int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
-       assert(r == 0);
+       ceph_assert(r == 0);
       }
     }
     ~sigpipe_stopper() {
       if (blocked) {
        struct timespec nowait{0};
        int r = sigtimedwait(&pipe_mask, 0, &nowait);
-       assert(r == EAGAIN || r == 0);
+       ceph_assert(r == EAGAIN || r == 0);
        r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
-       assert(r == 0);
+       ceph_assert(r == 0);
       }
     }
   };
@@ -632,7 +632,7 @@ public:
   /**
    *  Deliver a single Message. Send it to each Dispatcher
    *  in sequence until one of them handles it.
-   *  If none of our Dispatchers can handle it, assert(0).
+   *  If none of our Dispatchers can handle it, ceph_assert(0).
    *
    *  @param m The Message to deliver.
    */
@@ -644,7 +644,7 @@ public:
     }
     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                         << m->get_source_inst() << dendl;
-    assert(!cct->_conf->ms_die_on_unhandled_msg);
+    ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
   }
   void ms_deliver_dispatch(Message *m) {
     return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
index 0bbee4f3835cdee4857c27e6f08f8accb726c5c9..9356e5c597c7bc87da8546895d5630ae50be6c11 100644 (file)
@@ -84,7 +84,7 @@ void QueueStrategy::shutdown()
 void QueueStrategy::wait()
 {
   lock.Lock();
-  assert(stop);
+  ceph_assert(stop);
   for (auto& thread : threads) {
     lock.Unlock();
 
@@ -98,7 +98,7 @@ void QueueStrategy::wait()
 
 void QueueStrategy::start()
 {
-  assert(!stop);
+  ceph_assert(!stop);
   lock.Lock();
   threads.reserve(n_threads);
   for (int ix = 0; ix < n_threads; ++ix) {
index b021ff0004e16c23159ce1709f05f04c34fc37b7..674a6714871c412987fd0b8b6964be940941892f 100644 (file)
@@ -149,14 +149,14 @@ AsyncConnection::AsyncConnection(
 
 AsyncConnection::~AsyncConnection()
 {
-  assert(out_q.empty());
-  assert(sent.empty());
+  ceph_assert(out_q.empty());
+  ceph_assert(sent.empty());
   delete authorizer;
   if (recv_buf)
     delete[] recv_buf;
   if (state_buffer)
     delete[] state_buffer;
-  assert(!delay_state);
+  ceph_assert(!delay_state);
 }
 
 void AsyncConnection::maybe_start_delay_thread()
@@ -211,7 +211,7 @@ ssize_t AsyncConnection::_try_send(bool more)
     }
   }
 
-  assert(center->in_thread());
+  ceph_assert(center->in_thread());
   ssize_t r = cs.send(outcoming_bl, more);
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
@@ -726,14 +726,14 @@ void AsyncConnection::process()
                     << ", discarding" << dendl;
             message->put();
             if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
-              assert(0 == "old msgs despite reconnect_seq feature");
+              ceph_assert(0 == "old msgs despite reconnect_seq feature");
             break;
           }
           if (message->get_seq() > cur_seq + 1) {
             ldout(async_msgr->cct, 0) << __func__ << " missed message?  skipped from seq "
                                       << cur_seq << " to " << message->get_seq() << dendl;
             if (async_msgr->cct->_conf->ms_die_on_skipped_message)
-              assert(0 == "skipped incoming seq");
+              ceph_assert(0 == "skipped incoming seq");
           }
 
           message->set_connection(this);
@@ -850,7 +850,7 @@ ssize_t AsyncConnection::_process_connection()
       {
         std::lock_guard<std::mutex> l(write_lock);
         if (!outcoming_bl.length()) {
-          assert(state_after_send);
+          ceph_assert(state_after_send);
           state = state_after_send;
           state_after_send = STATE_NONE;
         }
@@ -859,7 +859,7 @@ ssize_t AsyncConnection::_process_connection()
 
     case STATE_CONNECTING:
       {
-        assert(!policy.server);
+        ceph_assert(!policy.server);
 
         // reset connect state variables
         got_bad_auth = false;
@@ -1088,7 +1088,7 @@ ssize_t AsyncConnection::_process_connection()
         bufferlist authorizer_reply;
         if (connect_reply.authorizer_len) {
           ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl;
-          assert(connect_reply.authorizer_len < 4096);
+          ceph_assert(connect_reply.authorizer_len < 4096);
           r = read_until(connect_reply.authorizer_len, state_buffer);
           if (r < 0) {
             ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl;
@@ -1117,7 +1117,7 @@ ssize_t AsyncConnection::_process_connection()
           goto fail;
 
         // state must be changed!
-        assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
+        ceph_assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
         break;
       }
 
@@ -1139,10 +1139,10 @@ ssize_t AsyncConnection::_process_connection()
         discard_requeued_up_to(newly_acked_seq);
         //while (newly_acked_seq > out_seq.read()) {
         //  Message *m = _get_next_outgoing(NULL);
-        //  assert(m);
+        //  ceph_assert(m);
         //  ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
         //                      << " " << *m << dendl;
-        //  assert(m->get_seq() <= newly_acked_seq);
+        //  ceph_assert(m->get_seq() <= newly_acked_seq);
         //  m->put();
         //  out_seq.inc();
         //}
@@ -1172,7 +1172,7 @@ ssize_t AsyncConnection::_process_connection()
         state = STATE_OPEN;
         once_ready = true;
         connect_seq += 1;
-        assert(connect_seq == connect_reply.connect_seq);
+        ceph_assert(connect_seq == connect_reply.connect_seq);
         backoff = utime_t();
         set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
         ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
@@ -1193,7 +1193,7 @@ ssize_t AsyncConnection::_process_connection()
         }
 
         if (delay_state)
-          assert(delay_state->ready());
+          ceph_assert(delay_state->ready());
         dispatch_queue->queue_connect(this);
         async_msgr->ms_deliver_handle_fast_connect(this);
 
@@ -1332,7 +1332,7 @@ ssize_t AsyncConnection::_process_connection()
           goto fail;
 
         // state is changed by "handle_connect_msg"
-        assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
+        ceph_assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
         break;
       }
 
@@ -1361,7 +1361,7 @@ ssize_t AsyncConnection::_process_connection()
         memset(&connect_msg, 0, sizeof(connect_msg));
 
         if (delay_state)
-          assert(delay_state->ready());
+          ceph_assert(delay_state->ready());
         // make sure no pending tick timer
         if (last_tick_id)
           center->delete_time_event(last_tick_id);
@@ -1436,7 +1436,7 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
     state = STATE_CONNECTING_SEND_CONNECT_MSG;
   }
   if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
-    assert(reply.connect_seq > connect_seq);
+    ceph_assert(reply.connect_seq > connect_seq);
     ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION "
                               << connect_seq << " -> "
                               << reply.connect_seq << dendl;
@@ -1537,7 +1537,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     if (need_challenge && !had_challenge && authorizer_challenge) {
       ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer"
                               << dendl;
-      assert(authorizer_reply.length());
+      ceph_assert(authorizer_reply.length());
       tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
     } else {
       ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
@@ -1558,7 +1558,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   lock.lock();
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
-    assert(state == STATE_CLOSED);
+    ceph_assert(state == STATE_CLOSED);
     goto fail;
   }
 
@@ -1663,14 +1663,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
         ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
                             << existing << ".cseq " << existing->connect_seq
                             << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-        assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr());
+        ceph_assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr());
         existing->lock.unlock();
         return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
       }
     }
 
-    assert(connect.connect_seq > existing->connect_seq);
-    assert(connect.global_seq >= existing->peer_global_seq);
+    ceph_assert(connect.connect_seq > existing->connect_seq);
+    ceph_assert(connect.global_seq >= existing->peer_global_seq);
     if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
         existing->connect_seq == 0) {
       ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
@@ -1707,7 +1707,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     existing->_stop();
     existing->dispatch_queue->queue_reset(existing.get());
   } else {
-    assert(can_write == WriteStatus::NOWRITE);
+    ceph_assert(can_write == WriteStatus::NOWRITE);
     existing->write_lock.lock();
 
     // reset the in_seq if this is a hard reset from peer,
@@ -1720,7 +1720,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
 
     if (existing->delay_state) {
       existing->delay_state->flush();
-      assert(!delay_state);
+      ceph_assert(!delay_state);
     }
     existing->reset_recv_state();
 
@@ -1741,7 +1741,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     // Discard existing prefetch buffer in `recv_buf`
     existing->recv_start = existing->recv_end = 0;
     // there shouldn't exist any buffer
-    assert(recv_start == recv_end);
+    ceph_assert(recv_start == recv_end);
 
     existing->authorizer_challenge.reset();
 
@@ -1783,7 +1783,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
         std::lock_guard<std::mutex> l(existing->lock);
         if (existing->state == STATE_CLOSED)
           return ;
-        assert(existing->state == STATE_NONE);
+        ceph_assert(existing->state == STATE_NONE);
   
         existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
         existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
@@ -1870,7 +1870,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   }
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
     ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
-    assert(state == STATE_CLOSED || state == STATE_NONE);
+    ceph_assert(state == STATE_CLOSED || state == STATE_NONE);
     goto fail_registered;
   }
 
@@ -1916,7 +1916,7 @@ void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
 {
   ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
                             << " on " << addr << dendl;
-  assert(socket.fd() >= 0);
+  ceph_assert(socket.fd() >= 0);
 
   std::lock_guard<std::mutex> l(lock);
   cs = std::move(socket);
@@ -2224,7 +2224,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
 ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
 {
   FUNCTRACE(async_msgr->cct);
-  assert(center->in_thread());
+  ceph_assert(center->in_thread());
   m->set_seq(++out_seq);
 
   if (msgr->crcflags & MSG_CRC_HEADER)
@@ -2428,7 +2428,7 @@ void AsyncConnection::_append_keepalive_or_ack(bool ack, utime_t *tp)
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
   if (ack) {
-    assert(tp);
+    ceph_assert(tp);
     struct ceph_timespec ts;
     tp->encode_timeval(&ts);
     outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
index 4ec8e0eb7c10e228d788087a7c72865c182dec46..a7396f5895b451b1ba7143e67d3cdd5b0a523484 100644 (file)
@@ -114,7 +114,7 @@ class AsyncConnection : public Connection {
     Message *m = 0;
     if (!out_q.empty()) {
       map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
-      assert(!it->second.empty());
+      ceph_assert(!it->second.empty());
       list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
       m = p->second;
       if (bl)
@@ -152,8 +152,8 @@ class AsyncConnection : public Connection {
       : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
         stop_dispatch(false) { }
     ~DelayedDelivery() override {
-      assert(register_time_events.empty());
-      assert(delay_queue.empty());
+      ceph_assert(register_time_events.empty());
+      ceph_assert(delay_queue.empty());
     }
     void set_center(EventCenter *c) { center = c; }
     void do_request(uint64_t id) override;
index d4fc61624ce3425f9cb80cd1decff69574e3e52b..bb761866d44b195dabe17633bfb03d296b8ea9ff 100644 (file)
@@ -293,7 +293,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
 AsyncMessenger::~AsyncMessenger()
 {
   delete reap_handler;
-  assert(!did_bind); // either we didn't bind or we shut down the Processor
+  ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
   local_connection->mark_down();
   for (auto &&p : processors)
     delete p;
@@ -392,7 +392,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
       // it, like port is used case. But if the first worker successfully to bind
       // but the second worker failed, it's not expected and we need to assert
       // here
-      assert(i == 0);
+      ceph_assert(i == 0);
       return r;
     }
     ++i;
@@ -404,7 +404,7 @@ int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
 int AsyncMessenger::rebind(const set<int>& avoid_ports)
 {
   ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
-  assert(did_bind);
+  ceph_assert(did_bind);
 
   for (auto &&p : processors)
     p->stop();
@@ -428,7 +428,7 @@ int AsyncMessenger::rebind(const set<int>& avoid_ports)
   for (auto &&p : processors) {
     int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
     if (r) {
-      assert(i == 0);
+      ceph_assert(i == 0);
       return r;
     }
     ++i;
@@ -446,7 +446,7 @@ int AsyncMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(my_addrs->legacy_addr() == bind_addr);
+    ceph_assert(my_addrs->legacy_addr() == bind_addr);
     return 0;
   }
   if (started) {
@@ -490,9 +490,9 @@ int AsyncMessenger::start()
   ldout(cct,1) << __func__ << " start" << dendl;
 
   // register at least one entity, first!
-  assert(my_name.type() >= 0);
+  ceph_assert(my_name.type() >= 0);
 
-  assert(!started);
+  ceph_assert(!started);
   started = true;
   stopped = false;
 
@@ -551,8 +551,8 @@ void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_ad
 AsyncConnectionRef AsyncMessenger::create_connect(
   const entity_addrvec_t& addrs, int type)
 {
-  assert(lock.is_locked());
-  assert(addrs != *my_addrs);
+  ceph_assert(lock.is_locked());
+  ceph_assert(addrs != *my_addrs);
 
   ldout(cct, 10) << __func__ << " " << addrs
       << ", creating connection and registering" << dendl;
@@ -574,7 +574,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
                                                target.is_msgr2());
   conn->connect(addrs, type, target);
-  assert(!conns.count(addrs));
+  ceph_assert(!conns.count(addrs));
   conns[addrs] = conn;
   w->get_perf_counter()->inc(l_msgr_active_connections);
 
@@ -608,7 +608,7 @@ ConnectionRef AsyncMessenger::get_loopback_connection()
 int AsyncMessenger::_send_to(Message *m, int type, const entity_addrvec_t& addrs)
 {
   FUNCTRACE(cct);
-  assert(m);
+  ceph_assert(m);
 
   if (m->get_type() == CEPH_MSG_OSD_OP)
     OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
index e97c161ea867f38ca845a18471efcbde5a5bf5ff..d349bab221d0e4b2b9b82180c3df91a13e6b69f3 100644 (file)
@@ -111,7 +111,7 @@ public:
    * @{
    */
   void set_cluster_protocol(int p) override {
-    assert(!started && !did_bind);
+    ceph_assert(!started && !did_bind);
     cluster_protocol = p;
   }
 
@@ -307,7 +307,7 @@ private:
   bool stopped;
 
   AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
-    assert(lock.is_locked());
+    ceph_assert(lock.is_locked());
     auto p = conns.find(k);
     if (p == conns.end())
       return NULL;
@@ -324,7 +324,7 @@ private:
   }
 
   void _init_local_connection() {
-    assert(lock.is_locked());
+    ceph_assert(lock.is_locked());
     local_connection->peer_addrs = *my_addrs;
     local_connection->peer_type = my_name.type();
     local_connection->set_features(CEPH_FEATURES_ALL);
index 9e16efd867cee32e3f7a239c613f3317a5780783..aee9a390493c1ec15801e30c198ff1e46525ba12 100644 (file)
@@ -102,7 +102,7 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout)
 int EventCenter::init(int n, unsigned i, const std::string &t)
 {
   // can't init multi times
-  assert(nevent == 0);
+  ceph_assert(nevent == 0);
 
   type = t;
   idx = i;
@@ -193,19 +193,19 @@ void EventCenter::set_owner()
     global_centers = &cct->lookup_or_create_singleton_object<
       EventCenter::AssociatedCenters>(
        "AsyncMessenger::EventCenter::global_center::" + type, true);
-    assert(global_centers);
+    ceph_assert(global_centers);
     global_centers->centers[idx] = this;
     if (driver->need_wakeup()) {
       notify_handler = new C_handle_notify(this, cct);
       int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
-      assert(r == 0);
+      ceph_assert(r == 0);
     }
   }
 }
 
 int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 {
-  assert(in_thread());
+  ceph_assert(in_thread());
   int r = 0;
   if (fd >= nevent) {
     int new_size = nevent << 2;
@@ -234,7 +234,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
     // add_event shouldn't report error, otherwise it must be a innermost bug!
     lderr(cct) << __func__ << " add event failed, ret=" << r << " fd=" << fd
                << " mask=" << mask << " original mask is " << event->mask << dendl;
-    assert(0 == "BUG!");
+    ceph_assert(0 == "BUG!");
     return r;
   }
 
@@ -252,7 +252,7 @@ int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
 
 void EventCenter::delete_file_event(int fd, int mask)
 {
-  assert(in_thread() && fd >= 0);
+  ceph_assert(in_thread() && fd >= 0);
   if (fd >= nevent) {
     ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
                   << "mask=" << mask << dendl;
@@ -267,7 +267,7 @@ void EventCenter::delete_file_event(int fd, int mask)
   int r = driver->del_event(fd, event->mask, mask);
   if (r < 0) {
     // see create_file_event
-    assert(0 == "BUG!");
+    ceph_assert(0 == "BUG!");
   }
 
   if (mask & EVENT_READABLE && event->read_cb) {
@@ -284,7 +284,7 @@ void EventCenter::delete_file_event(int fd, int mask)
 
 uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
 {
-  assert(in_thread());
+  ceph_assert(in_thread());
   uint64_t id = time_event_next_id++;
 
   ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
@@ -301,7 +301,7 @@ uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef
 
 void EventCenter::delete_time_event(uint64_t id)
 {
-  assert(in_thread());
+  ceph_assert(in_thread());
   ldout(cct, 30) << __func__ << " id=" << id << dendl;
   if (id >= time_event_next_id || id == 0)
     return ;
index e2f2ca98432bd7f0492dbd283e66761e840c4eed..abd27845c88ce0ad098450bd485d4b4d3f177a45 100644 (file)
@@ -177,7 +177,7 @@ class EventCenter {
 
   int process_time_events();
   FileEvent *_get_file_event(int fd) {
-    assert(fd < nevent);
+    ceph_assert(fd < nevent);
     return &file_events[fd];
   }
 
@@ -234,7 +234,7 @@ class EventCenter {
         delete this;
     }
     void wait() {
-      assert(!nonwait);
+      ceph_assert(!nonwait);
       std::unique_lock<std::mutex> l(lock);
       while (!done)
         cond.wait(l);
@@ -244,9 +244,9 @@ class EventCenter {
  public:
   template <typename func>
   void submit_to(int i, func &&f, bool nowait = false) {
-    assert(i < MAX_EVENTCENTER && global_centers);
+    ceph_assert(i < MAX_EVENTCENTER && global_centers);
     EventCenter *c = global_centers->centers[i];
-    assert(c);
+    ceph_assert(c);
     if (!nowait && c->in_thread()) {
       f();
       return ;
index 026da80295088f12efaf0db5cfb06d8755b48414..d6ba4a3db362e99f82718eaa390c6470ff20c070 100644 (file)
@@ -73,7 +73,7 @@ int KqueueDriver::test_thread_change(const char* funcname) {
   } else if ((kqfd != -1) && (test_kqfd() < 0)) {
     // should this ever happen?
     // It would be strange to change kqfd with thread change.
-    // Might nee to change this into an assert() in the future.
+    // Might nee to change this into an ceph_assert() in the future.
     ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
                  << "This should not happen!!!"  << dendl;
     kqfd = -1;
@@ -203,7 +203,7 @@ int KqueueDriver::resize_events(int newsize)
     if (!sav_events) {
       lderr(cct) << __func__ << " unable to realloc memory: "
                              << cpp_strerror(errno) << dendl;
-      assert(sav_events);
+      ceph_assert(sav_events);
       return -ENOMEM;
     }
     memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
index 410bcc4b52be70e57c15eea562123271c674bfb5..5dd8bb0bcae83ebf8003cc62c0161453cdecf0fd 100644 (file)
@@ -182,7 +182,7 @@ class PosixServerSocketImpl : public ServerSocketImpl {
 };
 
 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
-  assert(sock);
+  ceph_assert(sock);
   sockaddr_storage ss;
   socklen_t slen = sizeof(ss);
   int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
@@ -203,7 +203,7 @@ int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &op
     return -errno;
   }
 
-  assert(NULL != out); //out should not be NULL in accept connection
+  ceph_assert(NULL != out); //out should not be NULL in accept connection
 
   out->set_type(addr_type);
   out->set_sockaddr((sockaddr*)&ss);
index 0fb00a8537be187febb89b28d53e3252211c474e..e70fa650b0e35c8d20f8b1a93a4ae9dfc51e719e 100644 (file)
@@ -52,7 +52,7 @@ class PosixNetworkStack : public NetworkStack {
     threads[i] = std::thread(func);
   }
   void join_worker(unsigned i) override {
-    assert(threads.size() > i && threads[i].joinable());
+    ceph_assert(threads.size() > i && threads[i].joinable());
     threads[i].join();
   }
 };
index 9eeb9bd91071cc4c65677d41dd31b652f34eca8f..ef6ead8a419d8500d737c144c45531d72f3394b1 100644 (file)
@@ -103,7 +103,7 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
 
 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
 {
-  assert(cct->_conf->ms_async_op_threads > 0);
+  ceph_assert(cct->_conf->ms_async_op_threads > 0);
 
   const uint64_t InitEventNumber = 5000;
   num_workers = cct->_conf->ms_async_op_threads;
@@ -164,7 +164,7 @@ Worker* NetworkStack::get_worker()
   }
 
   pool_spin.unlock();
-  assert(current_best);
+  ceph_assert(current_best);
   ++current_best->references;
   return current_best;
 }
@@ -208,7 +208,7 @@ void NetworkStack::drain()
   pool_spin.lock();
   C_drain drain(num_workers);
   for (unsigned i = 0; i < num_workers; ++i) {
-    assert(cur != workers[i]->center.get_owner());
+    ceph_assert(cur != workers[i]->center.get_owner());
     workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
   }
   pool_spin.unlock();
index 3835b8483c62984e4553be5b51c144d17649e31f..32f9a5b2ee02f57a34de7dfa00c16168d0f8b8ba 100644 (file)
@@ -260,7 +260,7 @@ class Worker {
   PerfCounters *get_perf_counter() { return perf_logger; }
   void release_worker() {
     int oldref = references.fetch_sub(1);
-    assert(oldref > 0);
+    ceph_assert(oldref > 0);
   }
   void init_done() {
     init_lock.lock();
index f556cda890f811d706c782b40fa857c9d72a2c7c..5fc2cf938d1450bb47e76d031cd238869f3582a8 100644 (file)
@@ -151,7 +151,7 @@ static constexpr uint8_t packet_read_size        = 32;
 
 int DPDKDevice::init_port_start()
 {
-  assert(_port_idx < rte_eth_dev_count());
+  ceph_assert(_port_idx < rte_eth_dev_count());
 
   rte_eth_dev_info_get(_port_idx, &_dev_info);
 
@@ -263,7 +263,7 @@ int DPDKDevice::init_port_start()
   if (_num_queues > 1) {
     if (_dev_info.reta_size) {
       // RETA size should be a power of 2
-      assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
+      ceph_assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
 
       // Set the RSS table to the correct size
       _redir_table.resize(_dev_info.reta_size);
@@ -301,7 +301,7 @@ int DPDKDevice::init_port_start()
   // all together. If this assumption breaks we need to rework the below logic
   // by splitting the csum offload feature bit into separate bits for IPv4,
   // TCP.
-  assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
+  ceph_assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
           (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
          (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
           !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
@@ -329,7 +329,7 @@ int DPDKDevice::init_port_start()
   // or not set all together. If this assumption breaks we need to rework the
   // below logic by splitting the csum offload feature bit into separate bits
   // for TCP.
-  assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
+  ceph_assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
           !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM));
 
   if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
@@ -443,7 +443,7 @@ int DPDKDevice::init_port_fini()
 }
 
 void DPDKQueuePair::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
-  assert(!cpu_weights.empty());
+  ceph_assert(!cpu_weights.empty());
   if (cpu_weights.size() == 1 && cpu_weights.begin()->first == _qid) {
     // special case queue sending to self only, to avoid requiring a hash value
     return;
@@ -518,10 +518,10 @@ bool DPDKQueuePair::init_rx_mbuf_pool()
     std::string mz_name = "rx_buffer_data" + std::to_string(_qid);
     const struct rte_memzone *mz = rte_memzone_reserve_aligned(mz_name.c_str(),
           mbuf_data_size*bufs_count, _pktmbuf_pool_rx->socket_id, mz_flags, mbuf_data_size);
-    assert(mz);
+    ceph_assert(mz);
     void* m = mz->addr;
     for (int i = 0; i < bufs_count; i++) {
-      assert(m);
+      ceph_assert(m);
       _alloc_bufs.push_back(m);
       m += mbuf_data_size;
     }
@@ -781,14 +781,14 @@ bool DPDKQueuePair::rx_gc(bool force)
                            (void **)_rx_free_bufs.data(),
                            _rx_free_bufs.size());
 
-      // TODO: assert() in a fast path! Remove me ASAP!
-      assert(_num_rx_free_segs >= _rx_free_bufs.size());
+      // TODO: ceph_assert() in a fast path! Remove me ASAP!
+      ceph_assert(_num_rx_free_segs >= _rx_free_bufs.size());
 
       _num_rx_free_segs -= _rx_free_bufs.size();
       _rx_free_bufs.clear();
 
-      // TODO: assert() in a fast path! Remove me ASAP!
-      assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
+      // TODO: ceph_assert() in a fast path! Remove me ASAP!
+      ceph_assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
              (!_rx_free_pkts.empty() && _num_rx_free_segs));
     }
   }
@@ -1020,7 +1020,7 @@ void DPDKQueuePair::tx_buf::set_cluster_offload_info(const Packet& p, const DPDK
       head->l3_len = oi.ip_hdr_len;
 
       if (oi.tso_seg_size) {
-        assert(oi.needs_ip_csum);
+        ceph_assert(oi.needs_ip_csum);
         head->ol_flags |= PKT_TX_TCP_SEG;
         head->l4_len = oi.tcp_hdr_len;
         head->tso_segsz = oi.tso_seg_size;
@@ -1139,7 +1139,7 @@ void DPDKQueuePair::tx_buf::copy_packet_to_cluster(const Packet& p, rte_mbuf* he
       cur_seg_offset = 0;
 
       // FIXME: assert in a fast-path - remove!!!
-      assert(cur_seg);
+      ceph_assert(cur_seg);
     }
   }
 }
index 04127f4aaae6db463fc1fc963fc4eaa7b60fc8d6..1feb661ef3246c5979070cd43cf95ba7a3a7c896 100644 (file)
@@ -187,8 +187,8 @@ class DPDKQueuePair {
 
       rte_mbuf* m;
 
-      // TODO: assert() in a fast path! Remove me ASAP!
-      assert(frag.size);
+      // TODO: ceph_assert() in a fast path! Remove me ASAP!
+      ceph_assert(frag.size);
 
       // Create a HEAD of mbufs' cluster and set the first bytes into it
       len = do_one_buf(qp, head, base, left_to_set);
@@ -291,7 +291,7 @@ class DPDKQueuePair {
       if (!pa)
         return copy_one_data_buf(qp, m, va, buf_len);
 
-      assert(buf_len);
+      ceph_assert(buf_len);
       tx_buf* buf = qp.get_tx_buf();
       if (!buf) {
         return 0;
@@ -550,8 +550,8 @@ class DPDKQueuePair {
   uint32_t _send(circular_buffer<Packet>& pb, Func &&packet_to_tx_buf_p) {
     if (_tx_burst.size() == 0) {
       for (auto&& p : pb) {
-        // TODO: assert() in a fast path! Remove me ASAP!
-        assert(p.len());
+        // TODO: ceph_assert() in a fast path! Remove me ASAP!
+        ceph_assert(p.len());
 
         tx_buf* buf = packet_to_tx_buf_p(std::move(p));
         if (!buf) {
@@ -856,11 +856,11 @@ class DPDKDevice {
     return _redir_table[hash & (_redir_table.size() - 1)];
   }
   void set_local_queue(unsigned i, std::unique_ptr<DPDKQueuePair> qp) {
-    assert(!_queues[i]);
+    ceph_assert(!_queues[i]);
     _queues[i] = std::move(qp);
   }
   void unset_local_queue(unsigned i) {
-    assert(_queues[i]);
+    ceph_assert(_queues[i]);
     _queues[i].reset();
   }
   template <typename Func>
@@ -869,7 +869,7 @@ class DPDKDevice {
     if (!qp._sw_reta)
       return src_cpuid;
 
-    assert(!qp._sw_reta);
+    ceph_assert(!qp._sw_reta);
     auto hash = hashfn() >> _rss_table_bits;
     auto& reta = *qp._sw_reta;
     return reta[hash % reta.size()];
index bb2f807f4bb5b73250037e9db06b1273c7b14e60..fabfb024129fc0a2c479f181f823795a4ba84f8f 100644 (file)
@@ -94,7 +94,7 @@ void DPDKWorker::initialize()
     while (create_stage <= WAIT_DEVICE_STAGE)
       cond.Wait(lock);
   }
-  assert(sdev);
+  ceph_assert(sdev);
   if (i < sdev->hw_queues_count()) {
     auto qp = sdev->init_local_queue(cct, &center, cct->_conf->ms_dpdk_hugepages, i);
     std::map<unsigned, float> cpu_weights;
@@ -200,8 +200,8 @@ DPDKWorker::Impl::Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared
 int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
                        ServerSocket *sock)
 {
-  assert(sa.get_family() == AF_INET);
-  assert(sock);
+  ceph_assert(sa.get_family() == AF_INET);
+  ceph_assert(sock);
 
   ldout(cct, 10) << __func__ << " addr " << sa << dendl;
   // vector<AvailableIPAddress> tuples;
@@ -231,7 +231,7 @@ int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
 
 int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  // assert(addr.get_family() == AF_INET);
+  // ceph_assert(addr.get_family() == AF_INET);
   int r =  tcpv4_connect(_impl->_inet.get_tcp(), addr, socket);
   ldout(cct, 10) << __func__ << " addr " << addr << dendl;
   return r;
@@ -250,7 +250,7 @@ void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
   }
   // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
   // cores
-  assert(rte_lcore_count() >= i + 1);
+  ceph_assert(rte_lcore_count() >= i + 1);
   unsigned core_id;
   RTE_LCORE_FOREACH_SLAVE(core_id) {
     if (i-- == 0) {
index 843975d0566700a48a38d365385df0e9256dd3e7..3ccf2a22e591b3ae4268e923b6f584efadcd217e 100644 (file)
@@ -123,7 +123,7 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl {
     } else {
       _cur_off += f.size;
     }
-    assert(data.length());
+    ceph_assert(data.length());
     return data.length();
   }
   virtual ssize_t send(bufferlist &bl, bool more) override {
index b22492db86574b36e6afbfbdc475d5384611c024..db9cd2a7649d638fb4ea7a0abda2d95f1a8a8d09 100644 (file)
@@ -139,7 +139,7 @@ class Packet {
       return copy(old.get(), std::max<size_t>(old->_nr_frags + extra_frags, 2 * old->_nr_frags));
     }
     void* operator new(size_t size, size_t nr_frags = default_nr_frags) {
-      assert(nr_frags == uint16_t(nr_frags));
+      ceph_assert(nr_frags == uint16_t(nr_frags));
       return ::operator new(size + nr_frags * sizeof(fragment));
     }
     // Matching the operator new above
@@ -295,7 +295,7 @@ inline Packet::impl::impl(size_t nr_frags)
 
 inline Packet::impl::impl(fragment frag, size_t nr_frags)
     : _len(frag.size), _allocated_frags(nr_frags) {
-    assert(_allocated_frags > _nr_frags);
+    ceph_assert(_allocated_frags > _nr_frags);
   if (frag.size <= internal_data_size) {
     headroom -= frag.size;
     frags[0] = { data + headroom, frag.size };
@@ -458,7 +458,7 @@ inline Header* Packet::get_header(size_t offset) {
 }
 
 inline void Packet::trim_front(size_t how_much) {
-  assert(how_much <= _impl->_len);
+  ceph_assert(how_much <= _impl->_len);
   _impl->_len -= how_much;
   size_t i = 0;
   while (how_much && how_much >= _impl->frags[i].size) {
@@ -479,7 +479,7 @@ inline void Packet::trim_front(size_t how_much) {
 }
 
 inline void Packet::trim_back(size_t how_much) {
-  assert(how_much <= _impl->_len);
+  ceph_assert(how_much <= _impl->_len);
   _impl->_len -= how_much;
   size_t i = _impl->_nr_frags - 1;
   while (how_much && how_much >= _impl->frags[i].size) {
@@ -542,7 +542,7 @@ inline Packet Packet::share(size_t offset, size_t len) {
     offset = 0;
   }
   n._impl->_offload_info = _impl->_offload_info;
-  assert(!n._impl->_deleter);
+  ceph_assert(!n._impl->_deleter);
   n._impl->_deleter = _impl->_deleter.share();
   return n;
 }
index d215a4c105b5a1c99be0e8dca1a1ba0512f7241a..dec159b6cc1fe77d87dd29abea6547ef518107d4 100644 (file)
@@ -112,7 +112,7 @@ uint8_t tcp_option::fill(tcp_hdr* th, uint8_t options_size)
     new (off) tcp_option::eol;
     size += option_len::eol;
   }
-  assert(size == options_size);
+  ceph_assert(size == options_size);
 
   return size;
 }
index c1d64bc52572256b2e99a1bd396b75823a834fa1..9445dbce536de441541b7bd2321ee323931c627d 100644 (file)
@@ -1235,7 +1235,7 @@ Tub<Packet> tcp<InetTraits>::tcb::read() {
 template <typename InetTraits>
 int tcp<InetTraits>::tcb::send(Packet p) {
   // We can not send after the connection is closed
-  assert(!_snd.closed);
+  ceph_assert(!_snd.closed);
 
   if (in_state(CLOSED))
     return -ECONNRESET;
@@ -1460,7 +1460,7 @@ Tub<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
     return p;
   }
 
-  assert(!_packetq.empty());
+  ceph_assert(!_packetq.empty());
 
   p = std::move(_packetq.front());
   _packetq.pop_front();
index 25b082d613bb73a4446fae08bf0fb90573877b46..fce6588757cf47f19e75a10a98f6d645fdf41eb9 100644 (file)
@@ -34,7 +34,7 @@ int UserspaceEventManager::get_eventfd()
   }
 
   Tub<UserspaceFDImpl> &impl = fds[fd];
-  assert(!impl);
+  ceph_assert(!impl);
   impl.construct();
   ldout(cct, 20) << __func__ << " fd=" << fd << dendl;
   return fd;
@@ -88,7 +88,7 @@ void UserspaceEventManager::close(int fd)
 
   if (impl->activating_mask) {
     if (waiting_fds[max_wait_idx] == fd) {
-      assert(impl->waiting_idx == max_wait_idx);
+      ceph_assert(impl->waiting_idx == max_wait_idx);
       --max_wait_idx;
     }
     waiting_fds[impl->waiting_idx] = -1;
@@ -101,7 +101,7 @@ int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct
   int fd;
   uint32_t i = 0;
   int count = 0;
-  assert(num_events);
+  ceph_assert(num_events);
   // leave zero slot for waiting_fds
   while (i < max_wait_idx) {
     fd = waiting_fds[++i];
@@ -110,9 +110,9 @@ int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct
 
     events[count] = fd;
     Tub<UserspaceFDImpl> &impl = fds[fd];
-    assert(impl);
+    ceph_assert(impl);
     masks[count] = impl->listening_mask & impl->activating_mask;
-    assert(masks[count]);
+    ceph_assert(masks[count]);
     ldout(cct, 20) << __func__ << " fd=" << fd << " mask=" << masks[count] << dendl;
     impl->activating_mask &= (~masks[count]);
     impl->waiting_idx = 0;
index 75f3abf78abae484d3e9397603bda5489c14f9e5..01ef0dc6643e4eaab26729d990be1fe2eff80875 100644 (file)
@@ -81,7 +81,7 @@ class UserspaceEventManager {
     impl->listening_mask &= (~mask);
     if (!(impl->activating_mask & impl->listening_mask) && impl->waiting_idx) {
       if (waiting_fds[max_wait_idx] == fd) {
-        assert(impl->waiting_idx == max_wait_idx);
+        ceph_assert(impl->waiting_idx == max_wait_idx);
         --max_wait_idx;
       }
       waiting_fds[impl->waiting_idx] = -1;
index 5f3dec9ef108b9e4bbf8bf34b24491fb40521804..858df89a47371478b1b76f2ce47dbbce3d5db282 100644 (file)
@@ -37,7 +37,7 @@ struct ethernet_address {
   }
 
   ethernet_address(std::initializer_list<uint8_t> eaddr) {
-    assert(eaddr.size() == mac.size());
+    ceph_assert(eaddr.size() == mac.size());
     std::copy(eaddr.begin(), eaddr.end(), mac.begin());
   }
 
index 8b5c970e0687e36b8d7967de98469508a765b5a3..29e341195d2cb59523aa979710b5278a690b47aa 100644 (file)
@@ -81,7 +81,7 @@ subscription<Packet, ethernet_address> interface::register_l3(
     std::function<bool (forward_hash&, Packet& p, size_t)> forward)
 {
   auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
-  assert(i.second);
+  ceph_assert(i.second);
   l3_rx_stream& l3_rx = i.first->second;
   return l3_rx.packet_stream.listen(std::move(next));
 }
index 2104e5d31e4ff767b73bf66ea5879d9cd5a2f721..63f0422b72cfc922fa180413338f7a2816556aaa 100644 (file)
@@ -55,7 +55,7 @@ class forward_hash {
     return end_idx;
   }
   void push_back(uint8_t b) {
-    assert(end_idx < sizeof(data));
+    ceph_assert(end_idx < sizeof(data));
     data[end_idx++] = b;
   }
   void push_back(uint16_t b) {
index a120bd17fc80a82ffc4e71a6d1d391d6a4ec085f..1898e8f862803a704c35b62fb5932622fcc62853 100644 (file)
@@ -117,7 +117,7 @@ class subscription {
   next_fn _next;
  private:
   explicit subscription(stream<T...>* s): _stream(s) {
-    assert(!_stream->_sub);
+    ceph_assert(!_stream->_sub);
     _stream->_sub = this;
   }
 
index 3458bd2b53815bcbfe0beb503ce20df4d8bac5f4..e64f50a4862eab4d27c9ffe95c46f0853166be91 100644 (file)
@@ -148,7 +148,7 @@ void Device::binding_port(CephContext *cct, int port_num) {
   }
   if (nullptr == active_port) {
     lderr(cct) << __func__ << "  port not found" << dendl;
-    assert(active_port);
+    ceph_assert(active_port);
   }
 }
 
@@ -213,7 +213,7 @@ int Infiniband::QueuePair::init()
       return -1;
     }
   } else {
-    assert(cm_id->verbs == pd->context);
+    ceph_assert(cm_id->verbs == pd->context);
     if (rdma_create_qp(cm_id, pd, &qpia)) {
       lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
                  << cpp_strerror(errno) << dendl;
@@ -382,7 +382,7 @@ Infiniband::CompletionChannel::~CompletionChannel()
     int r = ibv_destroy_comp_channel(channel);
     if (r < 0)
       lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
-    assert(r == 0);
+    ceph_assert(r == 0);
   }
 }
 
@@ -439,7 +439,7 @@ Infiniband::CompletionQueue::~CompletionQueue()
     int r = ibv_destroy_cq(cq);
     if (r < 0)
       lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
-    assert(r == 0);
+    ceph_assert(r == 0);
   }
 }
 
@@ -587,7 +587,7 @@ Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
 Infiniband::MemoryManager::Cluster::~Cluster()
 {
   int r = ibv_dereg_mr(chunk_base->mr);
-  assert(r == 0);
+  ceph_assert(r == 0);
   const auto chunk_end = chunk_base + num_chunk;
   for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
     chunk->~Chunk();
@@ -599,18 +599,18 @@ Infiniband::MemoryManager::Cluster::~Cluster()
 
 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
 {
-  assert(!base);
+  ceph_assert(!base);
   num_chunk = num;
   uint32_t bytes = buffer_size * num;
 
   base = (char*)manager.malloc(bytes);
   end = base + bytes;
-  assert(base);
+  ceph_assert(base);
   chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
   memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
   free_chunks.reserve(num);
   ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
-  assert(m);
+  ceph_assert(m);
   Chunk* chunk = chunk_base;
   for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
     new(chunk) Chunk(m, buffer_size, base+offset);
@@ -717,7 +717,7 @@ char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes)
   MemoryManager *manager;
   CephContext *cct;
 
-  assert(g_ctx);
+  ceph_assert(g_ctx);
   manager     = g_ctx->manager;
   cct         = manager->cct;
   rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
@@ -812,7 +812,7 @@ void Infiniband::MemoryManager::huge_pages_free(void *ptr)
   if (ptr == NULL) return;
   void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
   size_t real_size = *((size_t *)real_ptr);
-  assert(real_size % HUGE_PAGE_SIZE == 0);
+  ceph_assert(real_size % HUGE_PAGE_SIZE == 0);
   if (real_size != 0)
     munmap(real_ptr, real_size);
   else
@@ -838,8 +838,8 @@ void Infiniband::MemoryManager::free(void *ptr)
 
 void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
 {
-  assert(device);
-  assert(pd);
+  ceph_assert(device);
+  ceph_assert(pd);
 
   send = new Cluster(*this, size);
   send->fill(tx_num);
@@ -907,11 +907,11 @@ void Infiniband::init()
   initialized = true;
 
   device = device_list->get_device(device_name.c_str());
-  assert(device);
+  ceph_assert(device);
   device->binding_port(cct, port_num);
   ib_physical_port = device->active_port->get_port_num();
   pd = new ProtectionDomain(cct, device);
-  assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+  ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
 
   support_srq = cct->_conf->ms_async_rdma_support_srq;
   if (support_srq)
@@ -1050,11 +1050,11 @@ int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
   ibv_recv_wr *badworkrequest;
   if (support_srq) {
     ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
-    assert(ret == 0);
+    ceph_assert(ret == 0);
   } else {
-    assert(qp);
+    ceph_assert(qp);
     ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
-    assert(ret == 0);
+    ceph_assert(ret == 0);
   }
   return i;
 }
@@ -1172,7 +1172,7 @@ Infiniband::QueuePair::~QueuePair()
 {
   if (qp) {
     ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
-    assert(!ibv_destroy_qp(qp));
+    ceph_assert(!ibv_destroy_qp(qp));
   }
 }
 
index 8b4684bcf4ef744b33a9684d37880afeb8254eb4..319130ff8617e6135b4b30ad72580be147840a2a 100644 (file)
@@ -81,7 +81,7 @@ class Device {
   ~Device() {
     if (active_port) {
       delete active_port;
-      assert(ibv_close_device(ctxt) == 0);
+      ceph_assert(ibv_close_device(ctxt) == 0);
     }
   }
   const char* get_name() { return name;}
@@ -122,7 +122,7 @@ class DeviceList {
   }
 
   Device* get_device(const char* device_name) {
-    assert(devices);
+    ceph_assert(devices);
     for (int i = 0; i < num; ++i) {
       if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
         return devices[i];
index 10ff2416c894e58a28aa96640af74562931a0f61..ebf328706c4005bcb55dcbd6d747d0b112defc30 100644 (file)
@@ -222,7 +222,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
                    <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
     if (!connected) {
       r = activate();
-      assert(!r);
+      ceph_assert(!r);
     }
     notify();
     r = infiniband->send_msg(cct, tcp_fd, my_msg);
@@ -238,7 +238,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
         return ;
       }
       r = activate();
-      assert(!r);
+      ceph_assert(!r);
       r = infiniband->send_msg(cct, tcp_fd, my_msg);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
@@ -294,7 +294,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
   ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
   for (size_t i = 0; i < cqe.size(); ++i) {
     ibv_wc* response = &cqe[i];
-    assert(response->status == IBV_WC_SUCCESS);
+    ceph_assert(response->status == IBV_WC_SUCCESS);
     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
     ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
     chunk->prepare_read(response->byte_len);
@@ -450,7 +450,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
                                  std::list<bufferptr>::const_iterator &start,
                                  std::list<bufferptr>::const_iterator &end) -> unsigned {
-    assert(start != end);
+    ceph_assert(start != end);
     auto chunk_idx = tx_buffers.size();
     int ret = worker->get_reged_mem(this, tx_buffers, bytes);
     if (ret == 0) {
@@ -477,7 +477,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
       }
       ++start;
     }
-    assert(bytes == 0);
+    ceph_assert(bytes == 0);
     return total_copied;
   };
 
@@ -495,7 +495,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
           goto sending;
         need_reserve_bytes = 0;
       }
-      assert(copy_it == it);
+      ceph_assert(copy_it == it);
       tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
       total += it->length();
       ++copy_it;
@@ -510,7 +510,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
  sending:
   if (total == 0)
     return -EAGAIN;
-  assert(total <= pending_bl.length());
+  ceph_assert(total <= pending_bl.length());
   bufferlist swapped;
   if (total < pending_bl.length()) {
     worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
@@ -622,7 +622,7 @@ void RDMAConnectedSocketImpl::notify()
   // write argument must be a 64bit integer
   uint64_t i = 1;
 
-  assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
+  ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
 }
 
 void RDMAConnectedSocketImpl::shutdown()
index d687ca8c769b845733a44281981624e379eb75e1..d5762d6eb5ccd6bdb27cebee948ced64fbf34024 100644 (file)
@@ -142,7 +142,7 @@ void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
       break;
 
     default:
-      assert(0 == "unhandled event");
+      ceph_assert(0 == "unhandled event");
       break;
   }
   rdma_ack_cm_event(event);
index cf7c5144ea7d08314787ec26879e584ea56d9f0d..f0f82f53f93b408a3fa386752bed33388348a476 100644 (file)
@@ -49,13 +49,13 @@ int RDMAIWARPServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions
 {
   ldout(cct, 15) << __func__ << dendl;
 
-  assert(sock);
+  ceph_assert(sock);
   struct pollfd pfd = {
     .fd = cm_channel->fd,
     .events = POLLIN,
   };
   int ret = poll(&pfd, 1, 0);
-  assert(ret >= 0);
+  ceph_assert(ret >= 0);
   if (!ret)
     return -EAGAIN;
 
index d87fdee5823938b811f375de7a9a146537948273..b3bf46b92f31290a9e4bf10ebd327b72bb9e9b94 100644 (file)
@@ -80,7 +80,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
 {
   ldout(cct, 15) << __func__ << dendl;
 
-  assert(sock);
+  ceph_assert(sock);
 
   sockaddr_storage ss;
   socklen_t slen = sizeof(ss);
@@ -102,7 +102,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt
     return -errno;
   }
 
-  assert(NULL != out); //out should not be NULL in accept connection
+  ceph_assert(NULL != out); //out should not be NULL in accept connection
 
   out->set_type(addr_type);
   out->set_sockaddr((sockaddr*)&ss);
index 7b5400a13d6eddcdbfdf4e21625c99ac16630fcb..9c05f23be9d4698592df824a9d362e9224430ed2 100644 (file)
@@ -34,10 +34,10 @@ RDMADispatcher::~RDMADispatcher()
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
   polling_stop();
 
-  assert(qp_conns.empty());
-  assert(num_qp_conn == 0);
-  assert(dead_queue_pairs.empty());
-  assert(num_dead_queue_pair == 0);
+  ceph_assert(qp_conns.empty());
+  ceph_assert(num_qp_conn == 0);
+  ceph_assert(dead_queue_pairs.empty());
+  ceph_assert(num_dead_queue_pair == 0);
 
   delete async_handler;
 }
@@ -87,13 +87,13 @@ void RDMADispatcher::polling_start()
   get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
 
   tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
-  assert(tx_cc);
+  ceph_assert(tx_cc);
   rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
-  assert(rx_cc);
+  ceph_assert(rx_cc);
   tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
-  assert(tx_cq);
+  ceph_assert(tx_cq);
   rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
-  assert(rx_cq);
+  ceph_assert(rx_cq);
 
   t = std::thread(&RDMADispatcher::polling, this);
   ceph_pthread_setname(t.native_handle(), "rdma-polling");
@@ -205,7 +205,7 @@ void RDMADispatcher::polling()
         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
 
         if (response->status == IBV_WC_SUCCESS) {
-          assert(wc[i].opcode == IBV_WC_RECV);
+          ceph_assert(wc[i].opcode == IBV_WC_RECV);
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
@@ -322,7 +322,7 @@ void RDMADispatcher::notify_pending_workers() {
 void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
 {
   Mutex::Locker l(lock);
-  assert(!qp_conns.count(qp->get_local_qp_number()));
+  ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
   qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
   ++num_qp_conn;
 }
@@ -528,9 +528,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
-  assert(center.in_thread());
+  ceph_assert(center.in_thread());
   int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
-  assert(r >= 0);
+  ceph_assert(r >= 0);
   size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
   stack->get_dispatcher().inflight += r;
@@ -599,6 +599,6 @@ void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
 
 void RDMAStack::join_worker(unsigned i)
 {
-  assert(threads.size() > i && threads[i].joinable());
+  ceph_assert(threads.size() > i && threads[i].joinable());
   threads[i].join();
 }
index e38284d4552a728e70fe9887b648c36d29c605cc..f363d2fefed041efaafa351262c025969f06e876 100644 (file)
@@ -153,7 +153,7 @@ class RDMAWorker : public Worker {
   RDMAStack *get_stack() { return stack; }
   int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
   void remove_pending_conn(RDMAConnectedSocketImpl *o) {
-    assert(center.in_thread());
+    ceph_assert(center.in_thread());
     pending_sent_conns.remove(o);
   }
   void handle_pending_message();
index eadcffbb6cfa553b46b6ee51bac85835cfea2412..72cbc421dc4b3f6a1934e2dbbf8bdb1712015284 100644 (file)
@@ -237,7 +237,7 @@ int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
       !bind_addr.is_blank_ip())
     msgr->learned_addr(bind_addr);
   else
-    assert(msgr->get_need_addr());  // should still be true.
+    ceph_assert(msgr->get_need_addr());  // should still be true.
 
   if (msgr->get_myaddr().get_port() == 0) {
     msgr->set_myaddrs(entity_addrvec_t(listen_addr));
index be128bdaadf66b0ba34b749b4b4a34cfe4fef10d..d8f595eade51c385de444e02f58e9f4bc1c3596b 100644 (file)
@@ -172,8 +172,8 @@ Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
 
 Pipe::~Pipe()
 {
-  assert(out_q.empty());
-  assert(sent.empty());
+  ceph_assert(out_q.empty());
+  ceph_assert(sent.empty());
   delete delay_thread;
   delete[] recv_buf;
 }
@@ -194,8 +194,8 @@ void Pipe::handle_ack(uint64_t seq)
 
 void Pipe::start_reader()
 {
-  assert(pipe_lock.is_locked());
-  assert(!reader_running);
+  ceph_assert(pipe_lock.is_locked());
+  ceph_assert(!reader_running);
   if (reader_needs_join) {
     reader_thread.join();
     reader_needs_join = false;
@@ -218,8 +218,8 @@ void Pipe::maybe_start_delay_thread()
 
 void Pipe::start_writer()
 {
-  assert(pipe_lock.is_locked());
-  assert(!writer_running);
+  ceph_assert(pipe_lock.is_locked());
+  ceph_assert(!writer_running);
   writer_running = true;
   writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
 }
@@ -316,8 +316,8 @@ void Pipe::DelayedDelivery::stop_fast_dispatching() {
 int Pipe::accept()
 {
   ldout(msgr->cct,10) << "accept" << dendl;
-  assert(pipe_lock.is_locked());
-  assert(state == STATE_ACCEPTING);
+  ceph_assert(pipe_lock.is_locked());
+  ceph_assert(state == STATE_ACCEPTING);
 
   pipe_lock.Unlock();
 
@@ -534,7 +534,7 @@ int Pipe::accept()
        ldout(msgr->cct,0) << "accept: challenging authorizer "
                           << authorizer_reply.length()
                           << " bytes" << dendl;
-       assert(authorizer_reply.length());
+       ceph_assert(authorizer_reply.length());
        reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
       } else {
        ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
@@ -651,21 +651,21 @@ int Pipe::accept()
                             << " " << existing << ".cseq=" << existing->connect_seq
                             << " == " << connect.connect_seq
                             << dendl;
-         assert(existing->state == STATE_CONNECTING ||
+         ceph_assert(existing->state == STATE_CONNECTING ||
                 existing->state == STATE_WAIT);
          goto replace;
        } else {
          // our existing outgoing wins
          ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
                   << " == " << connect.connect_seq << ", sending WAIT" << dendl;
-         assert(peer_addr > msgr->my_addr);
+         ceph_assert(peer_addr > msgr->my_addr);
          if (!(existing->state == STATE_CONNECTING))
            lderr(msgr->cct) << "accept race bad state, would send wait, existing="
                             << existing->get_state_name()
                             << " " << existing << ".cseq=" << existing->connect_seq
                             << " == " << connect.connect_seq
                             << dendl;
-         assert(existing->state == STATE_CONNECTING);
+         ceph_assert(existing->state == STATE_CONNECTING);
          // make sure our outgoing connection will follow through
          existing->_send_keepalive();
          reply.tag = CEPH_MSGR_TAG_WAIT;
@@ -675,8 +675,8 @@ int Pipe::accept()
        }
       }
 
-      assert(connect.connect_seq > existing->connect_seq);
-      assert(connect.global_seq >= existing->peer_global_seq);
+      ceph_assert(connect.connect_seq > existing->connect_seq);
+      ceph_assert(connect.global_seq >= existing->peer_global_seq);
       if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
          existing->connect_seq == 0) {
        ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq 
@@ -708,8 +708,8 @@ int Pipe::accept()
     ceph_abort();
 
   retry_session:
-    assert(existing->pipe_lock.is_locked());
-    assert(pipe_lock.is_locked());
+    ceph_assert(existing->pipe_lock.is_locked());
+    ceph_assert(pipe_lock.is_locked());
     reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
     reply.connect_seq = existing->connect_seq + 1;
     existing->pipe_lock.Unlock();
@@ -717,7 +717,7 @@ int Pipe::accept()
     goto reply;    
 
   reply:
-    assert(pipe_lock.is_locked());
+    ceph_assert(pipe_lock.is_locked());
     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
     reply.authorizer_len = authorizer_reply.length();
     pipe_lock.Unlock();
@@ -732,8 +732,8 @@ int Pipe::accept()
   }
   
  replace:
-  assert(existing->pipe_lock.is_locked());
-  assert(pipe_lock.is_locked());
+  ceph_assert(existing->pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
   // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
   if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
     reply_tag = CEPH_MSGR_TAG_SEQ;
@@ -746,7 +746,7 @@ int Pipe::accept()
 
   if (existing->policy.lossy) {
     // disconnect from the Connection
-    assert(existing->connection_state);
+    ceph_assert(existing->connection_state);
     if (existing->connection_state->clear_pipe(existing))
       msgr->dispatch_queue.queue_reset(existing->connection_state.get());
   } else {
@@ -792,10 +792,10 @@ int Pipe::accept()
 
  open:
   // open
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
-  assert(state == STATE_ACCEPTING);
+  ceph_assert(state == STATE_ACCEPTING);
   state = STATE_OPEN;
   ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
@@ -826,7 +826,7 @@ int Pipe::accept()
   if (msgr->dispatch_queue.stop)
     goto shutting_down;
   removed = msgr->accepting_pipes.erase(this);
-  assert(removed == 1);
+  ceph_assert(removed == 1);
   register_pipe();
   msgr->lock.Unlock();
   pipe_lock.Unlock();
@@ -898,7 +898,7 @@ int Pipe::accept()
  shutting_down:
   msgr->lock.Unlock();
  shutting_down_msgr_unlocked:
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
 
   if (msgr->cct->_conf->ms_inject_internal_delays) {
     ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
@@ -995,7 +995,7 @@ int Pipe::connect()
   bool got_bad_auth = false;
 
   ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
 
   __u32 cseq = connect_seq;
   __u32 gseq = msgr->get_global_seq();
@@ -1281,7 +1281,7 @@ int Pipe::connect()
       continue;
     }
     if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
-      assert(reply.connect_seq > connect_seq);
+      ceph_assert(reply.connect_seq > connect_seq);
       ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
               << " -> " << reply.connect_seq << dendl;
       cseq = connect_seq = reply.connect_seq;
@@ -1315,10 +1315,10 @@ int Pipe::connect()
                           << " vs out_seq " << out_seq << dendl;
        while (newly_acked_seq > out_seq) {
          Message *m = _get_next_outgoing();
-         assert(m);
+         ceph_assert(m);
          ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
                             << " " << *m << dendl;
-         assert(m->get_seq() <= newly_acked_seq);
+         ceph_assert(m->get_seq() <= newly_acked_seq);
          m->put();
          ++out_seq;
        }
@@ -1333,7 +1333,7 @@ int Pipe::connect()
       policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
       state = STATE_OPEN;
       connect_seq = cseq + 1;
-      assert(connect_seq == reply.connect_seq);
+      ceph_assert(connect_seq == reply.connect_seq);
       backoff = utime_t();
       connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
       ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
@@ -1395,15 +1395,15 @@ int Pipe::connect()
 void Pipe::register_pipe()
 {
   ldout(msgr->cct,10) << "register_pipe" << dendl;
-  assert(msgr->lock.is_locked());
+  ceph_assert(msgr->lock.is_locked());
   Pipe *existing = msgr->_lookup_pipe(peer_addr);
-  assert(existing == NULL);
+  ceph_assert(existing == NULL);
   msgr->rank_pipe[peer_addr] = this;
 }
 
 void Pipe::unregister_pipe()
 {
-  assert(msgr->lock.is_locked());
+  ceph_assert(msgr->lock.is_locked());
   ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
   if (p != msgr->rank_pipe.end() && p->second == this) {
     ldout(msgr->cct,10) << "unregister_pipe" << dendl;
@@ -1490,7 +1490,7 @@ void Pipe::discard_out_queue()
 void Pipe::fault(bool onread)
 {
   const auto& conf = msgr->cct->_conf;
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
   cond.Signal();
 
   if (onread && state == STATE_CONNECTING) {
@@ -1516,7 +1516,7 @@ void Pipe::fault(bool onread)
 
     // disconnect from Connection, and mark it failed.  future messages
     // will be dropped.
-    assert(connection_state);
+    ceph_assert(connection_state);
     stop();
     bool cleared = connection_state->clear_pipe(this);
 
@@ -1595,7 +1595,7 @@ void Pipe::randomize_out_seq()
 
 void Pipe::was_session_reset()
 {
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
 
   ldout(msgr->cct,10) << "was_session_reset" << dendl;
   in_q->discard_queue(conn_id);
@@ -1614,7 +1614,7 @@ void Pipe::was_session_reset()
 void Pipe::stop()
 {
   ldout(msgr->cct,10) << "stop" << dendl;
-  assert(pipe_lock.is_locked());
+  ceph_assert(pipe_lock.is_locked());
   state = STATE_CLOSED;
   state_closed = true;
   cond.Signal();
@@ -1623,7 +1623,7 @@ void Pipe::stop()
 
 void Pipe::stop_and_wait()
 {
-  assert(pipe_lock.is_locked_by_me());
+  ceph_assert(pipe_lock.is_locked_by_me());
   if (state != STATE_CLOSED)
     stop();
 
@@ -1655,13 +1655,13 @@ void Pipe::reader()
 
   if (state == STATE_ACCEPTING) {
     accept();
-    assert(pipe_lock.is_locked());
+    ceph_assert(pipe_lock.is_locked());
   }
 
   // loop.
   while (state != STATE_CLOSED &&
         state != STATE_CONNECTING) {
-    assert(pipe_lock.is_locked());
+    ceph_assert(pipe_lock.is_locked());
 
     // sleep if (re)connecting
     if (state == STATE_STANDBY) {
@@ -1773,14 +1773,14 @@ void Pipe::reader()
        m->put();
        if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
            msgr->cct->_conf->ms_die_on_old_message)
-         assert(0 == "old msgs despite reconnect_seq feature");
+         ceph_assert(0 == "old msgs despite reconnect_seq feature");
        continue;
       }
       if (m->get_seq() > in_seq + 1) {
        ldout(msgr->cct,0) << "reader missed message?  skipped from seq "
                           << in_seq << " to " << m->get_seq() << dendl;
        if (msgr->cct->_conf->ms_die_on_skipped_message)
-         assert(0 == "skipped incoming seq");
+         ceph_assert(0 == "skipped incoming seq");
       }
 
       m->set_connection(connection_state.get());
@@ -1864,7 +1864,7 @@ void Pipe::writer()
 
     // connect?
     if (state == STATE_CONNECTING) {
-      assert(!policy.server);
+      ceph_assert(!policy.server);
       connect();
       continue;
     }
@@ -2417,7 +2417,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo
       ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
                          << " b_off " << b_off << dendl;
     }
-    assert(donow > 0);
+    ceph_assert(donow > 0);
     ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
             << " leftinchunk " << left
             << " buffer len " << pb->length()
@@ -2439,7 +2439,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo
     msglen += donow;
     msg.msg_iovlen++;
     
-    assert(left >= donow);
+    ceph_assert(left >= donow);
     left -= donow;
     b_off += donow;
     bl_pos += donow;
@@ -2450,7 +2450,7 @@ int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& fo
       b_off = 0;
     }
   }
-  assert(left == 0);
+  ceph_assert(left == 0);
 
   // send footer; if receiver doesn't support signatures, use the old footer format
 
@@ -2661,7 +2661,7 @@ int Pipe::tcp_write(const char *buf, unsigned len)
     return -1;
 
   //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
-  assert(len > 0);
+  ceph_assert(len > 0);
   while (len > 0) {
     MSGR_SIGPIPE_STOPPER;
     int did = ::send( sd, buf, len, MSG_NOSIGNAL );
index ce9925bd0d9b6a978cdc3e48f51594f4cd013ed4..81245198460ca71740746cb669ad428aa6d86276 100644 (file)
@@ -228,17 +228,17 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
     void stop_and_wait();
 
     void _send(Message *m) {
-      assert(pipe_lock.is_locked());
+      ceph_assert(pipe_lock.is_locked());
       out_q[m->get_priority()].push_back(m);
       cond.Signal();
     }
     void _send_keepalive() {
-      assert(pipe_lock.is_locked());
+      ceph_assert(pipe_lock.is_locked());
       send_keepalive = true;
       cond.Signal();
     }
     Message *_get_next_outgoing() {
-      assert(pipe_lock.is_locked());
+      ceph_assert(pipe_lock.is_locked());
       Message *m = 0;
       while (!m && !out_q.empty()) {
         map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
index 96e27a4fd301fb791c85f451a6dff2b49d25b1d5..faa1ea9e043eaf81781ce42494773358826a7f8b 100644 (file)
@@ -74,7 +74,7 @@ bool PipeConnection::is_connected()
 
 int PipeConnection::send_message(Message *m)
 {
-  assert(msgr);
+  ceph_assert(msgr);
   return static_cast<SimpleMessenger*>(msgr)->send_message(m, this);
 }
 
index 5bd397c4d300a5a9f1e28e44c027292624b6c6aa..26e05a8f60a0f60997d4b197f637cbf132652521 100644 (file)
@@ -63,9 +63,9 @@ SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
  */
 SimpleMessenger::~SimpleMessenger()
 {
-  assert(!did_bind); // either we didn't bind or we shut down the Accepter
-  assert(rank_pipe.empty()); // we don't have any running Pipes.
-  assert(!reaper_started); // the reaper thread is stopped
+  ceph_assert(!did_bind); // either we didn't bind or we shut down the Accepter
+  ceph_assert(rank_pipe.empty()); // we don't have any running Pipes.
+  ceph_assert(!reaper_started); // the reaper thread is stopped
 }
 
 void SimpleMessenger::ready()
@@ -151,7 +151,7 @@ bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
 {
   bool ret = false;
   auto addr = addrs.legacy_addr();
-  assert(my_addr == my_addrs->front());
+  ceph_assert(my_addr == my_addrs->front());
   if (my_addr.is_blank_ip()) {
     ldout(cct,1) << __func__ << " " << addr << dendl;
     entity_addr_t t = my_addr;
@@ -164,7 +164,7 @@ bool SimpleMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
   } else {
     ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl;
   }
-  assert(my_addr == my_addrs->front());
+  ceph_assert(my_addr == my_addrs->front());
   return ret;
 }
 
@@ -243,7 +243,7 @@ void SimpleMessenger::reaper_entry()
 void SimpleMessenger::reaper()
 {
   ldout(cct,10) << "reaper" << dendl;
-  assert(lock.is_locked());
+  ceph_assert(lock.is_locked());
 
   while (!pipe_reap_queue.empty()) {
     Pipe *p = pipe_reap_queue.front();
@@ -257,11 +257,11 @@ void SimpleMessenger::reaper()
       // or accept() may have switch the Connection to a different
       // Pipe... but make sure!
       bool cleared = p->connection_state->clear_pipe(p);
-      assert(!cleared);
+      ceph_assert(!cleared);
     }
     p->pipe_lock.Unlock();
     p->unregister_pipe();
-    assert(pipes.count(p));
+    ceph_assert(pipes.count(p));
     pipes.erase(p);
 
     // drop msgr lock while joining thread; the delay through could be
@@ -295,7 +295,7 @@ bool SimpleMessenger::is_connected(Connection *con)
   if (con) {
     Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
     if (p) {
-      assert(p->msgr == this);
+      ceph_assert(p->msgr == this);
       r = p->is_connected();
       p->put();
     }
@@ -325,7 +325,7 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr)
 int SimpleMessenger::rebind(const set<int>& avoid_ports)
 {
   ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
-  assert(did_bind);
+  ceph_assert(did_bind);
   accepter.stop();
   mark_down_all();
   return accepter.rebind(avoid_ports);
@@ -338,7 +338,7 @@ int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
     return 0;
   Mutex::Locker l(lock);
   if (did_bind) {
-    assert(*my_addrs == entity_addrvec_t(bind_addr));
+    ceph_assert(*my_addrs == entity_addrvec_t(bind_addr));
     return 0;
   }
   if (started) {
@@ -358,9 +358,9 @@ int SimpleMessenger::start()
   ldout(cct,1) << "messenger.start" << dendl;
 
   // register at least one entity, first!
-  assert(my_name.type() >= 0);
+  ceph_assert(my_name.type() >= 0);
 
-  assert(!started);
+  ceph_assert(!started);
   started = true;
   stopped = false;
 
@@ -398,8 +398,8 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
                                    PipeConnection *con,
                                    Message *first)
 {
-  assert(lock.is_locked());
-  assert(addr != my_addr);
+  ceph_assert(lock.is_locked());
+  ceph_assert(addr != my_addr);
   
   ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
   
@@ -560,7 +560,7 @@ int SimpleMessenger::send_keepalive(Connection *con)
     static_cast<PipeConnection*>(con)->get_pipe());
   if (pipe) {
     ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
-    assert(pipe->msgr == this);
+    ceph_assert(pipe->msgr == this);
     pipe->pipe_lock.Lock();
     pipe->_send_keepalive();
     pipe->pipe_lock.Unlock();
@@ -709,7 +709,7 @@ void SimpleMessenger::mark_down(Connection *con)
   Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
   if (p) {
     ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
-    assert(p->msgr == this);
+    ceph_assert(p->msgr == this);
     p->unregister_pipe();
     p->pipe_lock.Lock();
     p->stop();
@@ -732,7 +732,7 @@ void SimpleMessenger::mark_disposable(Connection *con)
   Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
   if (p) {
     ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
-    assert(p->msgr == this);
+    ceph_assert(p->msgr == this);
     p->pipe_lock.Lock();
     p->policy.lossy = true;
     p->pipe_lock.Unlock();
index 3382e713121ee2a2c51eae4c4b83804b8622eb14..0292f6c7684eeada42660b433235d0d9277fc2ef 100644 (file)
@@ -111,7 +111,7 @@ public:
    * @{
    */
   void set_cluster_protocol(int p) override {
-    assert(!started && !did_bind);
+    ceph_assert(!started && !did_bind);
     cluster_protocol = p;
   }
 
index 25ac21544917a9434718bba1096719d6dfb84605..abe270a41dbc080e161d60070daec6310e80e601 100644 (file)
@@ -185,7 +185,7 @@ void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
 
   struct ceph_timespec ts;
   if (ack) {
-    assert(tp);
+    ceph_assert(tp);
     tp->encode_timeval(&ts);
     xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
     xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
@@ -199,9 +199,9 @@ void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
   }
 
   const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
-  assert(header.size() == 1);  /* accelio header must be without scatter gather */
+  ceph_assert(header.size() == 1);  /* accelio header must be without scatter gather */
   list<bufferptr>::const_iterator pb = header.begin();
-  assert(pb->length() < XioMsgHdr::get_max_encoded_length());
+  ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length());
   struct xio_msg * msg = xcmd->get_xio_msg();
   msg->out.header.iov_base = (char*) pb->c_str();
   msg->out.header.iov_len = pb->length();
@@ -286,11 +286,11 @@ int XioConnection::handle_data_msg(struct xio_session *session,
       << " iov_len " << (int) tmsg->in.header.iov_len
       << " nents " << tmsg->in.pdata_iov.nents
       << " sn " << tmsg->sn << dendl;
-    assert(session == this->session);
+    ceph_assert(session == this->session);
     in_seq.set_count(msg_cnt.msg_cnt);
   } else {
     /* XXX major sequence error */
-    assert(! tmsg->in.header.iov_len);
+    ceph_assert(! tmsg->in.header.iov_len);
   }
 
   in_seq.append(msg);
@@ -540,7 +540,7 @@ int XioConnection::on_msg(struct xio_session *session,
 
   default:
     lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
-    assert(! "unsupported message tag");
+    ceph_assert(! "unsupported message tag");
   }
 
   xio_release_msg(msg);
index d62713b9beb679a5838643d3df3086d59ad98e49..722ac6dbd237c09aeda7ed358b875600ebfb910f 100644 (file)
@@ -511,7 +511,7 @@ int XioMessenger::session_event(struct xio_session *session,
 
     xcon->conn = conn;
     xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
-    assert(xcon->portal);
+    ceph_assert(xcon->portal);
 
     xcona.user_context = xcon;
     (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
@@ -769,7 +769,7 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
   if (!!e)
     return NULL;
   XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
-  assert(!!xmsg);
+  ceph_assert(!!xmsg);
   new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
   return xmsg;
 }
@@ -781,7 +781,7 @@ XioCommand* pool_alloc_xio_command(XioConnection *xcon)
   if (!!e)
     return NULL;
   XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
-  assert(!!xcmd);
+  ceph_assert(!!xcmd);
   new (xcmd) XioCommand(xcon, mp_mem);
   return xcmd;
 }
@@ -912,7 +912,7 @@ int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
   req = xmsg->get_xio_msg();
 
   const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
-  assert(header.size() == 1); /* XXX */
+  ceph_assert(header.size() == 1); /* XXX */
   list<bufferptr>::const_iterator pb = header.begin();
   req->out.header.iov_base = (char*) pb->c_str();
   req->out.header.iov_len = pb->length();
index 8c2d3d8ec0619f8b957ff6352f7b6cc07268bbdf..4b6a5d687bcab9f61bb3898b814ece2f6d650df8 100644 (file)
@@ -32,7 +32,7 @@ int XioDispatchHook::release_msgs()
   /* merge with portal traffic */
   xcon->portal->enqueue(xcon, xcmp);
 
-  assert(r);
+  ceph_assert(r);
   return r;
 }
 
@@ -41,7 +41,7 @@ int XioDispatchHook::release_msgs()
   ceph_msg_footer _ceph_msg_footer;
   XioMsgHdr hdr (_ceph_msg_header, _ceph_msg_footer, 0 /* features */);
   const std::list<buffer::ptr>& hdr_buffers = hdr.get_bl().buffers();
-  assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */
+  ceph_assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */
   return hdr_buffers.begin()->length();
 }
 
index 6084ce8568242f03dfd24899224f0efced35489e..07fa73114abaf9c837edbbcca05b40a3f9c72ce2 100644 (file)
@@ -183,7 +183,7 @@ static inline int xpool_alloc(struct xio_mempool *pool, uint64_t size,
   }
   // fall back to malloc on errors
   mp->addr = malloc(size);
-  assert(mp->addr);
+  ceph_assert(mp->addr);
   mp->length = 0;
   if (unlikely(XioPool::trace_mempool))
     xp_stats.inc_overflow();
index c22d891b3ed486b5eb6d040366f40b7ad22765b0..ad9d4df0b4d0648763a12f9cb42dfe65165904c2 100644 (file)
@@ -145,7 +145,7 @@ public:
 
     /* a portal is an xio_context and event loop */
     ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
-    assert(ctx && "Whoops, failed to create portal/ctx");
+    ceph_assert(ctx && "Whoops, failed to create portal/ctx");
   }
 
   int bind(struct xio_session_ops *ops, const string &base_uri,
@@ -356,7 +356,7 @@ public:
     for (int i = 0; i < n; i++) {
       if (!portals[i]) {
         portals[i] = new XioPortal(msgr, nconns);
-        assert(portals[i] != nullptr);
+        ceph_assert(portals[i] != nullptr);
       }
     }
   }