for (list<QueueItem>::iterator i = removed.begin();
i != removed.end();
++i) {
- assert(!(i->is_code())); // We don't discard id 0, ever!
+ ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
const Message::ref& m = i->get_message();
remove_arrival(m);
dispatch_throttle_release(m->get_dispatch_throttle_size());
void DispatchQueue::start()
{
- assert(!stop);
- assert(!dispatch_thread.is_started());
+ ceph_assert(!stop);
+ ceph_assert(!dispatch_thread.is_started());
dispatch_thread.create("ms_dispatch");
local_delivery_thread.create("ms_local");
}
return type != -1;
}
int get_code () const {
- assert(is_code());
+ ceph_assert(is_code());
return type;
}
const Message::ref& get_message() {
- assert(!is_code());
+ ceph_assert(!is_code());
return m;
}
Connection *get_connection() {
- assert(is_code());
+ ceph_assert(is_code());
return con.get();
}
};
}
void remove_arrival(const Message::ref& m) {
auto it = marrival_map.find(m);
- assert(it != marrival_map.end());
+ ceph_assert(it != marrival_map.end());
marrival.erase(it->second);
marrival_map.erase(it);
}
stop(false)
{}
~DispatchQueue() {
- assert(mqueue.empty());
- assert(marrival.empty());
- assert(local_messages.empty());
+ ceph_assert(mqueue.empty());
+ ceph_assert(marrival.empty());
+ ceph_assert(local_messages.empty());
}
};
{
// encode and copy out of *m
if (empty_payload()) {
- assert(middle.length() == 0);
+ ceph_assert(middle.length() == 0);
encode_payload(features);
if (byte_throttler) {
* @param p The cluster protocol to use. Defined externally.
*/
void set_default_send_priority(int p) {
- assert(!started);
+ ceph_assert(!started);
default_send_priority = p;
}
/**
} else {
blocked = true;
int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
~sigpipe_stopper() {
if (blocked) {
struct timespec nowait{0};
int r = sigtimedwait(&pipe_mask, 0, &nowait);
- assert(r == EAGAIN || r == 0);
+ ceph_assert(r == EAGAIN || r == 0);
r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
};
/**
* Deliver a single Message. Send it to each Dispatcher
* in sequence until one of them handles it.
- * If none of our Dispatchers can handle it, assert(0).
+ * If none of our Dispatchers can handle it, ceph_assert(0).
*
* @param m The Message to deliver.
*/
}
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
- assert(!cct->_conf->ms_die_on_unhandled_msg);
+ ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
}
void ms_deliver_dispatch(Message *m) {
return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
void QueueStrategy::wait()
{
lock.Lock();
- assert(stop);
+ ceph_assert(stop);
for (auto& thread : threads) {
lock.Unlock();
void QueueStrategy::start()
{
- assert(!stop);
+ ceph_assert(!stop);
lock.Lock();
threads.reserve(n_threads);
for (int ix = 0; ix < n_threads; ++ix) {
AsyncConnection::~AsyncConnection()
{
- assert(out_q.empty());
- assert(sent.empty());
+ ceph_assert(out_q.empty());
+ ceph_assert(sent.empty());
delete authorizer;
if (recv_buf)
delete[] recv_buf;
if (state_buffer)
delete[] state_buffer;
- assert(!delay_state);
+ ceph_assert(!delay_state);
}
void AsyncConnection::maybe_start_delay_thread()
}
}
- assert(center->in_thread());
+ ceph_assert(center->in_thread());
ssize_t r = cs.send(outcoming_bl, more);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
<< ", discarding" << dendl;
message->put();
if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
- assert(0 == "old msgs despite reconnect_seq feature");
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
break;
}
if (message->get_seq() > cur_seq + 1) {
ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq "
<< cur_seq << " to " << message->get_seq() << dendl;
if (async_msgr->cct->_conf->ms_die_on_skipped_message)
- assert(0 == "skipped incoming seq");
+ ceph_assert(0 == "skipped incoming seq");
}
message->set_connection(this);
{
std::lock_guard<std::mutex> l(write_lock);
if (!outcoming_bl.length()) {
- assert(state_after_send);
+ ceph_assert(state_after_send);
state = state_after_send;
state_after_send = STATE_NONE;
}
case STATE_CONNECTING:
{
- assert(!policy.server);
+ ceph_assert(!policy.server);
// reset connect state variables
got_bad_auth = false;
bufferlist authorizer_reply;
if (connect_reply.authorizer_len) {
ldout(async_msgr->cct, 10) << __func__ << " reply.authorizer_len=" << connect_reply.authorizer_len << dendl;
- assert(connect_reply.authorizer_len < 4096);
+ ceph_assert(connect_reply.authorizer_len < 4096);
r = read_until(connect_reply.authorizer_len, state_buffer);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read connect reply authorizer failed" << dendl;
goto fail;
// state must be changed!
- assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
+ ceph_assert(state != STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH);
break;
}
discard_requeued_up_to(newly_acked_seq);
//while (newly_acked_seq > out_seq.read()) {
// Message *m = _get_next_outgoing(NULL);
- // assert(m);
+ // ceph_assert(m);
// ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq()
// << " " << *m << dendl;
- // assert(m->get_seq() <= newly_acked_seq);
+ // ceph_assert(m->get_seq() <= newly_acked_seq);
// m->put();
// out_seq.inc();
//}
state = STATE_OPEN;
once_ready = true;
connect_seq += 1;
- assert(connect_seq == connect_reply.connect_seq);
+ ceph_assert(connect_seq == connect_reply.connect_seq);
backoff = utime_t();
set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
}
if (delay_state)
- assert(delay_state->ready());
+ ceph_assert(delay_state->ready());
dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);
goto fail;
// state is changed by "handle_connect_msg"
- assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
+ ceph_assert(state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH);
break;
}
memset(&connect_msg, 0, sizeof(connect_msg));
if (delay_state)
- assert(delay_state->ready());
+ ceph_assert(delay_state->ready());
// make sure no pending tick timer
if (last_tick_id)
center->delete_time_event(last_tick_id);
state = STATE_CONNECTING_SEND_CONNECT_MSG;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
- assert(reply.connect_seq > connect_seq);
+ ceph_assert(reply.connect_seq > connect_seq);
ldout(async_msgr->cct, 5) << __func__ << " connect got RETRY_SESSION "
<< connect_seq << " -> "
<< reply.connect_seq << dendl;
if (need_challenge && !had_challenge && authorizer_challenge) {
ldout(async_msgr->cct,0) << __func__ << ": challenging authorizer"
<< dendl;
- assert(authorizer_reply.length());
+ ceph_assert(authorizer_reply.length());
tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
} else {
ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
lock.lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down" << dendl;
- assert(state == STATE_CLOSED);
+ ceph_assert(state == STATE_CLOSED);
goto fail;
}
ldout(async_msgr->cct,10) << __func__ << " accept connection race, existing "
<< existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr());
+ ceph_assert(peer_addrs.legacy_addr() > async_msgr->get_myaddrs().legacy_addr());
existing->lock.unlock();
return _reply_accept(CEPH_MSGR_TAG_WAIT, connect, reply, authorizer_reply);
}
}
- assert(connect.connect_seq > existing->connect_seq);
- assert(connect.global_seq >= existing->peer_global_seq);
+ ceph_assert(connect.connect_seq > existing->connect_seq);
+ ceph_assert(connect.global_seq >= existing->peer_global_seq);
if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
existing->connect_seq == 0) {
ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
existing->_stop();
existing->dispatch_queue->queue_reset(existing.get());
} else {
- assert(can_write == WriteStatus::NOWRITE);
+ ceph_assert(can_write == WriteStatus::NOWRITE);
existing->write_lock.lock();
// reset the in_seq if this is a hard reset from peer,
if (existing->delay_state) {
existing->delay_state->flush();
- assert(!delay_state);
+ ceph_assert(!delay_state);
}
existing->reset_recv_state();
// Discard existing prefetch buffer in `recv_buf`
existing->recv_start = existing->recv_end = 0;
// there shouldn't exist any buffer
- assert(recv_start == recv_end);
+ ceph_assert(recv_start == recv_end);
existing->authorizer_challenge.reset();
std::lock_guard<std::mutex> l(existing->lock);
if (existing->state == STATE_CLOSED)
return ;
- assert(existing->state == STATE_NONE);
+ ceph_assert(existing->state == STATE_NONE);
existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler);
}
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl;
- assert(state == STATE_CLOSED || state == STATE_NONE);
+ ceph_assert(state == STATE_CLOSED || state == STATE_NONE);
goto fail_registered;
}
{
ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
<< " on " << addr << dendl;
- assert(socket.fd() >= 0);
+ ceph_assert(socket.fd() >= 0);
std::lock_guard<std::mutex> l(lock);
cs = std::move(socket);
ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
{
FUNCTRACE(async_msgr->cct);
- assert(center->in_thread());
+ ceph_assert(center->in_thread());
m->set_seq(++out_seq);
if (msgr->crcflags & MSG_CRC_HEADER)
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
if (ack) {
- assert(tp);
+ ceph_assert(tp);
struct ceph_timespec ts;
tp->encode_timeval(&ts);
outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
Message *m = 0;
if (!out_q.empty()) {
map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
- assert(!it->second.empty());
+ ceph_assert(!it->second.empty());
list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
m = p->second;
if (bl)
: msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
stop_dispatch(false) { }
~DelayedDelivery() override {
- assert(register_time_events.empty());
- assert(delay_queue.empty());
+ ceph_assert(register_time_events.empty());
+ ceph_assert(delay_queue.empty());
}
void set_center(EventCenter *c) { center = c; }
void do_request(uint64_t id) override;
AsyncMessenger::~AsyncMessenger()
{
delete reap_handler;
- assert(!did_bind); // either we didn't bind or we shut down the Processor
+ ceph_assert(!did_bind); // either we didn't bind or we shut down the Processor
local_connection->mark_down();
for (auto &&p : processors)
delete p;
// it, like port is used case. But if the first worker successfully to bind
// but the second worker failed, it's not expected and we need to assert
// here
- assert(i == 0);
+ ceph_assert(i == 0);
return r;
}
++i;
int AsyncMessenger::rebind(const set<int>& avoid_ports)
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
- assert(did_bind);
+ ceph_assert(did_bind);
for (auto &&p : processors)
p->stop();
for (auto &&p : processors) {
int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
if (r) {
- assert(i == 0);
+ ceph_assert(i == 0);
return r;
}
++i;
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(my_addrs->legacy_addr() == bind_addr);
+ ceph_assert(my_addrs->legacy_addr() == bind_addr);
return 0;
}
if (started) {
ldout(cct,1) << __func__ << " start" << dendl;
// register at least one entity, first!
- assert(my_name.type() >= 0);
+ ceph_assert(my_name.type() >= 0);
- assert(!started);
+ ceph_assert(!started);
started = true;
stopped = false;
AsyncConnectionRef AsyncMessenger::create_connect(
const entity_addrvec_t& addrs, int type)
{
- assert(lock.is_locked());
- assert(addrs != *my_addrs);
+ ceph_assert(lock.is_locked());
+ ceph_assert(addrs != *my_addrs);
ldout(cct, 10) << __func__ << " " << addrs
<< ", creating connection and registering" << dendl;
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
target.is_msgr2());
conn->connect(addrs, type, target);
- assert(!conns.count(addrs));
+ ceph_assert(!conns.count(addrs));
conns[addrs] = conn;
w->get_perf_counter()->inc(l_msgr_active_connections);
int AsyncMessenger::_send_to(Message *m, int type, const entity_addrvec_t& addrs)
{
FUNCTRACE(cct);
- assert(m);
+ ceph_assert(m);
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
* @{
*/
void set_cluster_protocol(int p) override {
- assert(!started && !did_bind);
+ ceph_assert(!started && !did_bind);
cluster_protocol = p;
}
bool stopped;
AsyncConnectionRef _lookup_conn(const entity_addrvec_t& k) {
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
auto p = conns.find(k);
if (p == conns.end())
return NULL;
}
void _init_local_connection() {
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
local_connection->peer_addrs = *my_addrs;
local_connection->peer_type = my_name.type();
local_connection->set_features(CEPH_FEATURES_ALL);
int EventCenter::init(int n, unsigned i, const std::string &t)
{
// can't init multi times
- assert(nevent == 0);
+ ceph_assert(nevent == 0);
type = t;
idx = i;
global_centers = &cct->lookup_or_create_singleton_object<
EventCenter::AssociatedCenters>(
"AsyncMessenger::EventCenter::global_center::" + type, true);
- assert(global_centers);
+ ceph_assert(global_centers);
global_centers->centers[idx] = this;
if (driver->need_wakeup()) {
notify_handler = new C_handle_notify(this, cct);
int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
- assert(in_thread());
+ ceph_assert(in_thread());
int r = 0;
if (fd >= nevent) {
int new_size = nevent << 2;
// add_event shouldn't report error, otherwise it must be a innermost bug!
lderr(cct) << __func__ << " add event failed, ret=" << r << " fd=" << fd
<< " mask=" << mask << " original mask is " << event->mask << dendl;
- assert(0 == "BUG!");
+ ceph_assert(0 == "BUG!");
return r;
}
void EventCenter::delete_file_event(int fd, int mask)
{
- assert(in_thread() && fd >= 0);
+ ceph_assert(in_thread() && fd >= 0);
if (fd >= nevent) {
ldout(cct, 1) << __func__ << " delete event fd=" << fd << " is equal or greater than nevent=" << nevent
<< "mask=" << mask << dendl;
int r = driver->del_event(fd, event->mask, mask);
if (r < 0) {
// see create_file_event
- assert(0 == "BUG!");
+ ceph_assert(0 == "BUG!");
}
if (mask & EVENT_READABLE && event->read_cb) {
uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
{
- assert(in_thread());
+ ceph_assert(in_thread());
uint64_t id = time_event_next_id++;
ldout(cct, 30) << __func__ << " id=" << id << " trigger after " << microseconds << "us"<< dendl;
void EventCenter::delete_time_event(uint64_t id)
{
- assert(in_thread());
+ ceph_assert(in_thread());
ldout(cct, 30) << __func__ << " id=" << id << dendl;
if (id >= time_event_next_id || id == 0)
return ;
int process_time_events();
FileEvent *_get_file_event(int fd) {
- assert(fd < nevent);
+ ceph_assert(fd < nevent);
return &file_events[fd];
}
delete this;
}
void wait() {
- assert(!nonwait);
+ ceph_assert(!nonwait);
std::unique_lock<std::mutex> l(lock);
while (!done)
cond.wait(l);
public:
template <typename func>
void submit_to(int i, func &&f, bool nowait = false) {
- assert(i < MAX_EVENTCENTER && global_centers);
+ ceph_assert(i < MAX_EVENTCENTER && global_centers);
EventCenter *c = global_centers->centers[i];
- assert(c);
+ ceph_assert(c);
if (!nowait && c->in_thread()) {
f();
return ;
} else if ((kqfd != -1) && (test_kqfd() < 0)) {
// should this ever happen?
// It would be strange to change kqfd with thread change.
- // Might nee to change this into an assert() in the future.
+ // Might nee to change this into an ceph_assert() in the future.
ldout(cct,0) << funcname << " Warning: Recreating old kqfd. "
<< "This should not happen!!!" << dendl;
kqfd = -1;
if (!sav_events) {
lderr(cct) << __func__ << " unable to realloc memory: "
<< cpp_strerror(errno) << dendl;
- assert(sav_events);
+ ceph_assert(sav_events);
return -ENOMEM;
}
memset(&sav_events[size], 0, sizeof(struct SaveEvent)*(newsize-sav_max));
};
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
- assert(sock);
+ ceph_assert(sock);
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
return -errno;
}
- assert(NULL != out); //out should not be NULL in accept connection
+ ceph_assert(NULL != out); //out should not be NULL in accept connection
out->set_type(addr_type);
out->set_sockaddr((sockaddr*)&ss);
threads[i] = std::thread(func);
}
void join_worker(unsigned i) override {
- assert(threads.size() > i && threads[i].joinable());
+ ceph_assert(threads.size() > i && threads[i].joinable());
threads[i].join();
}
};
NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
- assert(cct->_conf->ms_async_op_threads > 0);
+ ceph_assert(cct->_conf->ms_async_op_threads > 0);
const uint64_t InitEventNumber = 5000;
num_workers = cct->_conf->ms_async_op_threads;
}
pool_spin.unlock();
- assert(current_best);
+ ceph_assert(current_best);
++current_best->references;
return current_best;
}
pool_spin.lock();
C_drain drain(num_workers);
for (unsigned i = 0; i < num_workers; ++i) {
- assert(cur != workers[i]->center.get_owner());
+ ceph_assert(cur != workers[i]->center.get_owner());
workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
}
pool_spin.unlock();
PerfCounters *get_perf_counter() { return perf_logger; }
void release_worker() {
int oldref = references.fetch_sub(1);
- assert(oldref > 0);
+ ceph_assert(oldref > 0);
}
void init_done() {
init_lock.lock();
int DPDKDevice::init_port_start()
{
- assert(_port_idx < rte_eth_dev_count());
+ ceph_assert(_port_idx < rte_eth_dev_count());
rte_eth_dev_info_get(_port_idx, &_dev_info);
if (_num_queues > 1) {
if (_dev_info.reta_size) {
// RETA size should be a power of 2
- assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
+ ceph_assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
// Set the RSS table to the correct size
_redir_table.resize(_dev_info.reta_size);
// all together. If this assumption breaks we need to rework the below logic
// by splitting the csum offload feature bit into separate bits for IPv4,
// TCP.
- assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
+ ceph_assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
(!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
// or not set all together. If this assumption breaks we need to rework the
// below logic by splitting the csum offload feature bit into separate bits
// for TCP.
- assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
+ ceph_assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM));
if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
}
void DPDKQueuePair::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
- assert(!cpu_weights.empty());
+ ceph_assert(!cpu_weights.empty());
if (cpu_weights.size() == 1 && cpu_weights.begin()->first == _qid) {
// special case queue sending to self only, to avoid requiring a hash value
return;
std::string mz_name = "rx_buffer_data" + std::to_string(_qid);
const struct rte_memzone *mz = rte_memzone_reserve_aligned(mz_name.c_str(),
mbuf_data_size*bufs_count, _pktmbuf_pool_rx->socket_id, mz_flags, mbuf_data_size);
- assert(mz);
+ ceph_assert(mz);
void* m = mz->addr;
for (int i = 0; i < bufs_count; i++) {
- assert(m);
+ ceph_assert(m);
_alloc_bufs.push_back(m);
m += mbuf_data_size;
}
(void **)_rx_free_bufs.data(),
_rx_free_bufs.size());
- // TODO: assert() in a fast path! Remove me ASAP!
- assert(_num_rx_free_segs >= _rx_free_bufs.size());
+ // TODO: ceph_assert() in a fast path! Remove me ASAP!
+ ceph_assert(_num_rx_free_segs >= _rx_free_bufs.size());
_num_rx_free_segs -= _rx_free_bufs.size();
_rx_free_bufs.clear();
- // TODO: assert() in a fast path! Remove me ASAP!
- assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
+ // TODO: ceph_assert() in a fast path! Remove me ASAP!
+ ceph_assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
(!_rx_free_pkts.empty() && _num_rx_free_segs));
}
}
head->l3_len = oi.ip_hdr_len;
if (oi.tso_seg_size) {
- assert(oi.needs_ip_csum);
+ ceph_assert(oi.needs_ip_csum);
head->ol_flags |= PKT_TX_TCP_SEG;
head->l4_len = oi.tcp_hdr_len;
head->tso_segsz = oi.tso_seg_size;
cur_seg_offset = 0;
// FIXME: assert in a fast-path - remove!!!
- assert(cur_seg);
+ ceph_assert(cur_seg);
}
}
}
rte_mbuf* m;
- // TODO: assert() in a fast path! Remove me ASAP!
- assert(frag.size);
+ // TODO: ceph_assert() in a fast path! Remove me ASAP!
+ ceph_assert(frag.size);
// Create a HEAD of mbufs' cluster and set the first bytes into it
len = do_one_buf(qp, head, base, left_to_set);
if (!pa)
return copy_one_data_buf(qp, m, va, buf_len);
- assert(buf_len);
+ ceph_assert(buf_len);
tx_buf* buf = qp.get_tx_buf();
if (!buf) {
return 0;
uint32_t _send(circular_buffer<Packet>& pb, Func &&packet_to_tx_buf_p) {
if (_tx_burst.size() == 0) {
for (auto&& p : pb) {
- // TODO: assert() in a fast path! Remove me ASAP!
- assert(p.len());
+ // TODO: ceph_assert() in a fast path! Remove me ASAP!
+ ceph_assert(p.len());
tx_buf* buf = packet_to_tx_buf_p(std::move(p));
if (!buf) {
return _redir_table[hash & (_redir_table.size() - 1)];
}
void set_local_queue(unsigned i, std::unique_ptr<DPDKQueuePair> qp) {
- assert(!_queues[i]);
+ ceph_assert(!_queues[i]);
_queues[i] = std::move(qp);
}
void unset_local_queue(unsigned i) {
- assert(_queues[i]);
+ ceph_assert(_queues[i]);
_queues[i].reset();
}
template <typename Func>
if (!qp._sw_reta)
return src_cpuid;
- assert(!qp._sw_reta);
+ ceph_assert(!qp._sw_reta);
auto hash = hashfn() >> _rss_table_bits;
auto& reta = *qp._sw_reta;
return reta[hash % reta.size()];
while (create_stage <= WAIT_DEVICE_STAGE)
cond.Wait(lock);
}
- assert(sdev);
+ ceph_assert(sdev);
if (i < sdev->hw_queues_count()) {
auto qp = sdev->init_local_queue(cct, ¢er, cct->_conf->ms_dpdk_hugepages, i);
std::map<unsigned, float> cpu_weights;
int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
ServerSocket *sock)
{
- assert(sa.get_family() == AF_INET);
- assert(sock);
+ ceph_assert(sa.get_family() == AF_INET);
+ ceph_assert(sock);
ldout(cct, 10) << __func__ << " addr " << sa << dendl;
// vector<AvailableIPAddress> tuples;
int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- // assert(addr.get_family() == AF_INET);
+ // ceph_assert(addr.get_family() == AF_INET);
int r = tcpv4_connect(_impl->_inet.get_tcp(), addr, socket);
ldout(cct, 10) << __func__ << " addr " << addr << dendl;
return r;
}
// if dpdk::eal::init already called by NVMEDevice, we will select 1..n
// cores
- assert(rte_lcore_count() >= i + 1);
+ ceph_assert(rte_lcore_count() >= i + 1);
unsigned core_id;
RTE_LCORE_FOREACH_SLAVE(core_id) {
if (i-- == 0) {
} else {
_cur_off += f.size;
}
- assert(data.length());
+ ceph_assert(data.length());
return data.length();
}
virtual ssize_t send(bufferlist &bl, bool more) override {
return copy(old.get(), std::max<size_t>(old->_nr_frags + extra_frags, 2 * old->_nr_frags));
}
void* operator new(size_t size, size_t nr_frags = default_nr_frags) {
- assert(nr_frags == uint16_t(nr_frags));
+ ceph_assert(nr_frags == uint16_t(nr_frags));
return ::operator new(size + nr_frags * sizeof(fragment));
}
// Matching the operator new above
inline Packet::impl::impl(fragment frag, size_t nr_frags)
: _len(frag.size), _allocated_frags(nr_frags) {
- assert(_allocated_frags > _nr_frags);
+ ceph_assert(_allocated_frags > _nr_frags);
if (frag.size <= internal_data_size) {
headroom -= frag.size;
frags[0] = { data + headroom, frag.size };
}
inline void Packet::trim_front(size_t how_much) {
- assert(how_much <= _impl->_len);
+ ceph_assert(how_much <= _impl->_len);
_impl->_len -= how_much;
size_t i = 0;
while (how_much && how_much >= _impl->frags[i].size) {
}
inline void Packet::trim_back(size_t how_much) {
- assert(how_much <= _impl->_len);
+ ceph_assert(how_much <= _impl->_len);
_impl->_len -= how_much;
size_t i = _impl->_nr_frags - 1;
while (how_much && how_much >= _impl->frags[i].size) {
offset = 0;
}
n._impl->_offload_info = _impl->_offload_info;
- assert(!n._impl->_deleter);
+ ceph_assert(!n._impl->_deleter);
n._impl->_deleter = _impl->_deleter.share();
return n;
}
new (off) tcp_option::eol;
size += option_len::eol;
}
- assert(size == options_size);
+ ceph_assert(size == options_size);
return size;
}
template <typename InetTraits>
int tcp<InetTraits>::tcb::send(Packet p) {
// We can not send after the connection is closed
- assert(!_snd.closed);
+ ceph_assert(!_snd.closed);
if (in_state(CLOSED))
return -ECONNRESET;
return p;
}
- assert(!_packetq.empty());
+ ceph_assert(!_packetq.empty());
p = std::move(_packetq.front());
_packetq.pop_front();
}
Tub<UserspaceFDImpl> &impl = fds[fd];
- assert(!impl);
+ ceph_assert(!impl);
impl.construct();
ldout(cct, 20) << __func__ << " fd=" << fd << dendl;
return fd;
if (impl->activating_mask) {
if (waiting_fds[max_wait_idx] == fd) {
- assert(impl->waiting_idx == max_wait_idx);
+ ceph_assert(impl->waiting_idx == max_wait_idx);
--max_wait_idx;
}
waiting_fds[impl->waiting_idx] = -1;
int fd;
uint32_t i = 0;
int count = 0;
- assert(num_events);
+ ceph_assert(num_events);
// leave zero slot for waiting_fds
while (i < max_wait_idx) {
fd = waiting_fds[++i];
events[count] = fd;
Tub<UserspaceFDImpl> &impl = fds[fd];
- assert(impl);
+ ceph_assert(impl);
masks[count] = impl->listening_mask & impl->activating_mask;
- assert(masks[count]);
+ ceph_assert(masks[count]);
ldout(cct, 20) << __func__ << " fd=" << fd << " mask=" << masks[count] << dendl;
impl->activating_mask &= (~masks[count]);
impl->waiting_idx = 0;
impl->listening_mask &= (~mask);
if (!(impl->activating_mask & impl->listening_mask) && impl->waiting_idx) {
if (waiting_fds[max_wait_idx] == fd) {
- assert(impl->waiting_idx == max_wait_idx);
+ ceph_assert(impl->waiting_idx == max_wait_idx);
--max_wait_idx;
}
waiting_fds[impl->waiting_idx] = -1;
}
ethernet_address(std::initializer_list<uint8_t> eaddr) {
- assert(eaddr.size() == mac.size());
+ ceph_assert(eaddr.size() == mac.size());
std::copy(eaddr.begin(), eaddr.end(), mac.begin());
}
std::function<bool (forward_hash&, Packet& p, size_t)> forward)
{
auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
- assert(i.second);
+ ceph_assert(i.second);
l3_rx_stream& l3_rx = i.first->second;
return l3_rx.packet_stream.listen(std::move(next));
}
return end_idx;
}
void push_back(uint8_t b) {
- assert(end_idx < sizeof(data));
+ ceph_assert(end_idx < sizeof(data));
data[end_idx++] = b;
}
void push_back(uint16_t b) {
next_fn _next;
private:
explicit subscription(stream<T...>* s): _stream(s) {
- assert(!_stream->_sub);
+ ceph_assert(!_stream->_sub);
_stream->_sub = this;
}
}
if (nullptr == active_port) {
lderr(cct) << __func__ << " port not found" << dendl;
- assert(active_port);
+ ceph_assert(active_port);
}
}
return -1;
}
} else {
- assert(cm_id->verbs == pd->context);
+ ceph_assert(cm_id->verbs == pd->context);
if (rdma_create_qp(cm_id, pd, &qpia)) {
lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
<< cpp_strerror(errno) << dendl;
int r = ibv_destroy_comp_channel(channel);
if (r < 0)
lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
int r = ibv_destroy_cq(cq);
if (r < 0)
lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
- assert(r == 0);
+ ceph_assert(r == 0);
}
}
Infiniband::MemoryManager::Cluster::~Cluster()
{
int r = ibv_dereg_mr(chunk_base->mr);
- assert(r == 0);
+ ceph_assert(r == 0);
const auto chunk_end = chunk_base + num_chunk;
for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
chunk->~Chunk();
int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
{
- assert(!base);
+ ceph_assert(!base);
num_chunk = num;
uint32_t bytes = buffer_size * num;
base = (char*)manager.malloc(bytes);
end = base + bytes;
- assert(base);
+ ceph_assert(base);
chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
free_chunks.reserve(num);
ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
- assert(m);
+ ceph_assert(m);
Chunk* chunk = chunk_base;
for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
new(chunk) Chunk(m, buffer_size, base+offset);
MemoryManager *manager;
CephContext *cct;
- assert(g_ctx);
+ ceph_assert(g_ctx);
manager = g_ctx->manager;
cct = manager->cct;
rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
if (ptr == NULL) return;
void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
size_t real_size = *((size_t *)real_ptr);
- assert(real_size % HUGE_PAGE_SIZE == 0);
+ ceph_assert(real_size % HUGE_PAGE_SIZE == 0);
if (real_size != 0)
munmap(real_ptr, real_size);
else
void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
{
- assert(device);
- assert(pd);
+ ceph_assert(device);
+ ceph_assert(pd);
send = new Cluster(*this, size);
send->fill(tx_num);
initialized = true;
device = device_list->get_device(device_name.c_str());
- assert(device);
+ ceph_assert(device);
device->binding_port(cct, port_num);
ib_physical_port = device->active_port->get_port_num();
pd = new ProtectionDomain(cct, device);
- assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
+ ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
support_srq = cct->_conf->ms_async_rdma_support_srq;
if (support_srq)
ibv_recv_wr *badworkrequest;
if (support_srq) {
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
- assert(ret == 0);
+ ceph_assert(ret == 0);
} else {
- assert(qp);
+ ceph_assert(qp);
ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
- assert(ret == 0);
+ ceph_assert(ret == 0);
}
return i;
}
{
if (qp) {
ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
- assert(!ibv_destroy_qp(qp));
+ ceph_assert(!ibv_destroy_qp(qp));
}
}
~Device() {
if (active_port) {
delete active_port;
- assert(ibv_close_device(ctxt) == 0);
+ ceph_assert(ibv_close_device(ctxt) == 0);
}
}
const char* get_name() { return name;}
}
Device* get_device(const char* device_name) {
- assert(devices);
+ ceph_assert(devices);
for (int i = 0; i < num; ++i) {
if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
return devices[i];
<< ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
if (!connected) {
r = activate();
- assert(!r);
+ ceph_assert(!r);
}
notify();
r = infiniband->send_msg(cct, tcp_fd, my_msg);
return ;
}
r = activate();
- assert(!r);
+ ceph_assert(!r);
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
for (size_t i = 0; i < cqe.size(); ++i) {
ibv_wc* response = &cqe[i];
- assert(response->status == IBV_WC_SUCCESS);
+ ceph_assert(response->status == IBV_WC_SUCCESS);
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
std::list<bufferptr>::const_iterator &start,
std::list<bufferptr>::const_iterator &end) -> unsigned {
- assert(start != end);
+ ceph_assert(start != end);
auto chunk_idx = tx_buffers.size();
int ret = worker->get_reged_mem(this, tx_buffers, bytes);
if (ret == 0) {
}
++start;
}
- assert(bytes == 0);
+ ceph_assert(bytes == 0);
return total_copied;
};
goto sending;
need_reserve_bytes = 0;
}
- assert(copy_it == it);
+ ceph_assert(copy_it == it);
tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
total += it->length();
++copy_it;
sending:
if (total == 0)
return -EAGAIN;
- assert(total <= pending_bl.length());
+ ceph_assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
// write argument must be a 64bit integer
uint64_t i = 1;
- assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
+ ceph_assert(sizeof(i) == write(notify_fd, &i, sizeof(i)));
}
void RDMAConnectedSocketImpl::shutdown()
break;
default:
- assert(0 == "unhandled event");
+ ceph_assert(0 == "unhandled event");
break;
}
rdma_ack_cm_event(event);
{
ldout(cct, 15) << __func__ << dendl;
- assert(sock);
+ ceph_assert(sock);
struct pollfd pfd = {
.fd = cm_channel->fd,
.events = POLLIN,
};
int ret = poll(&pfd, 1, 0);
- assert(ret >= 0);
+ ceph_assert(ret >= 0);
if (!ret)
return -EAGAIN;
{
ldout(cct, 15) << __func__ << dendl;
- assert(sock);
+ ceph_assert(sock);
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
return -errno;
}
- assert(NULL != out); //out should not be NULL in accept connection
+ ceph_assert(NULL != out); //out should not be NULL in accept connection
out->set_type(addr_type);
out->set_sockaddr((sockaddr*)&ss);
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
polling_stop();
- assert(qp_conns.empty());
- assert(num_qp_conn == 0);
- assert(dead_queue_pairs.empty());
- assert(num_dead_queue_pair == 0);
+ ceph_assert(qp_conns.empty());
+ ceph_assert(num_qp_conn == 0);
+ ceph_assert(dead_queue_pairs.empty());
+ ceph_assert(num_dead_queue_pair == 0);
delete async_handler;
}
get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
- assert(tx_cc);
+ ceph_assert(tx_cc);
rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
- assert(rx_cc);
+ ceph_assert(rx_cc);
tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
- assert(tx_cq);
+ ceph_assert(tx_cq);
rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
- assert(rx_cq);
+ ceph_assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
ceph_pthread_setname(t.native_handle(), "rdma-polling");
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
if (response->status == IBV_WC_SUCCESS) {
- assert(wc[i].opcode == IBV_WC_RECV);
+ ceph_assert(wc[i].opcode == IBV_WC_RECV);
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
{
Mutex::Locker l(lock);
- assert(!qp_conns.count(qp->get_local_qp_number()));
+ ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
++num_qp_conn;
}
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
- assert(center.in_thread());
+ ceph_assert(center.in_thread());
int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
- assert(r >= 0);
+ ceph_assert(r >= 0);
size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher().inflight += r;
void RDMAStack::join_worker(unsigned i)
{
- assert(threads.size() > i && threads[i].joinable());
+ ceph_assert(threads.size() > i && threads[i].joinable());
threads[i].join();
}
RDMAStack *get_stack() { return stack; }
int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
void remove_pending_conn(RDMAConnectedSocketImpl *o) {
- assert(center.in_thread());
+ ceph_assert(center.in_thread());
pending_sent_conns.remove(o);
}
void handle_pending_message();
!bind_addr.is_blank_ip())
msgr->learned_addr(bind_addr);
else
- assert(msgr->get_need_addr()); // should still be true.
+ ceph_assert(msgr->get_need_addr()); // should still be true.
if (msgr->get_myaddr().get_port() == 0) {
msgr->set_myaddrs(entity_addrvec_t(listen_addr));
Pipe::~Pipe()
{
- assert(out_q.empty());
- assert(sent.empty());
+ ceph_assert(out_q.empty());
+ ceph_assert(sent.empty());
delete delay_thread;
delete[] recv_buf;
}
void Pipe::start_reader()
{
- assert(pipe_lock.is_locked());
- assert(!reader_running);
+ ceph_assert(pipe_lock.is_locked());
+ ceph_assert(!reader_running);
if (reader_needs_join) {
reader_thread.join();
reader_needs_join = false;
void Pipe::start_writer()
{
- assert(pipe_lock.is_locked());
- assert(!writer_running);
+ ceph_assert(pipe_lock.is_locked());
+ ceph_assert(!writer_running);
writer_running = true;
writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
}
int Pipe::accept()
{
ldout(msgr->cct,10) << "accept" << dendl;
- assert(pipe_lock.is_locked());
- assert(state == STATE_ACCEPTING);
+ ceph_assert(pipe_lock.is_locked());
+ ceph_assert(state == STATE_ACCEPTING);
pipe_lock.Unlock();
ldout(msgr->cct,0) << "accept: challenging authorizer "
<< authorizer_reply.length()
<< " bytes" << dendl;
- assert(authorizer_reply.length());
+ ceph_assert(authorizer_reply.length());
reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
} else {
ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
<< " " << existing << ".cseq=" << existing->connect_seq
<< " == " << connect.connect_seq
<< dendl;
- assert(existing->state == STATE_CONNECTING ||
+ ceph_assert(existing->state == STATE_CONNECTING ||
existing->state == STATE_WAIT);
goto replace;
} else {
// our existing outgoing wins
ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > msgr->my_addr);
+ ceph_assert(peer_addr > msgr->my_addr);
if (!(existing->state == STATE_CONNECTING))
lderr(msgr->cct) << "accept race bad state, would send wait, existing="
<< existing->get_state_name()
<< " " << existing << ".cseq=" << existing->connect_seq
<< " == " << connect.connect_seq
<< dendl;
- assert(existing->state == STATE_CONNECTING);
+ ceph_assert(existing->state == STATE_CONNECTING);
// make sure our outgoing connection will follow through
existing->_send_keepalive();
reply.tag = CEPH_MSGR_TAG_WAIT;
}
}
- assert(connect.connect_seq > existing->connect_seq);
- assert(connect.global_seq >= existing->peer_global_seq);
+ ceph_assert(connect.connect_seq > existing->connect_seq);
+ ceph_assert(connect.global_seq >= existing->peer_global_seq);
if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
existing->connect_seq == 0) {
ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
ceph_abort();
retry_session:
- assert(existing->pipe_lock.is_locked());
- assert(pipe_lock.is_locked());
+ ceph_assert(existing->pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq + 1;
existing->pipe_lock.Unlock();
goto reply;
reply:
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
pipe_lock.Unlock();
}
replace:
- assert(existing->pipe_lock.is_locked());
- assert(pipe_lock.is_locked());
+ ceph_assert(existing->pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
// if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
reply_tag = CEPH_MSGR_TAG_SEQ;
if (existing->policy.lossy) {
// disconnect from the Connection
- assert(existing->connection_state);
+ ceph_assert(existing->connection_state);
if (existing->connection_state->clear_pipe(existing))
msgr->dispatch_queue.queue_reset(existing->connection_state.get());
} else {
open:
// open
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
- assert(state == STATE_ACCEPTING);
+ ceph_assert(state == STATE_ACCEPTING);
state = STATE_OPEN;
ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
if (msgr->dispatch_queue.stop)
goto shutting_down;
removed = msgr->accepting_pipes.erase(this);
- assert(removed == 1);
+ ceph_assert(removed == 1);
register_pipe();
msgr->lock.Unlock();
pipe_lock.Unlock();
shutting_down:
msgr->lock.Unlock();
shutting_down_msgr_unlocked:
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
if (msgr->cct->_conf->ms_inject_internal_delays) {
ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
bool got_bad_auth = false;
ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
__u32 cseq = connect_seq;
__u32 gseq = msgr->get_global_seq();
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
- assert(reply.connect_seq > connect_seq);
+ ceph_assert(reply.connect_seq > connect_seq);
ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
<< " -> " << reply.connect_seq << dendl;
cseq = connect_seq = reply.connect_seq;
<< " vs out_seq " << out_seq << dendl;
while (newly_acked_seq > out_seq) {
Message *m = _get_next_outgoing();
- assert(m);
+ ceph_assert(m);
ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
<< " " << *m << dendl;
- assert(m->get_seq() <= newly_acked_seq);
+ ceph_assert(m->get_seq() <= newly_acked_seq);
m->put();
++out_seq;
}
policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
state = STATE_OPEN;
connect_seq = cseq + 1;
- assert(connect_seq == reply.connect_seq);
+ ceph_assert(connect_seq == reply.connect_seq);
backoff = utime_t();
connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
void Pipe::register_pipe()
{
ldout(msgr->cct,10) << "register_pipe" << dendl;
- assert(msgr->lock.is_locked());
+ ceph_assert(msgr->lock.is_locked());
Pipe *existing = msgr->_lookup_pipe(peer_addr);
- assert(existing == NULL);
+ ceph_assert(existing == NULL);
msgr->rank_pipe[peer_addr] = this;
}
void Pipe::unregister_pipe()
{
- assert(msgr->lock.is_locked());
+ ceph_assert(msgr->lock.is_locked());
ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
if (p != msgr->rank_pipe.end() && p->second == this) {
ldout(msgr->cct,10) << "unregister_pipe" << dendl;
void Pipe::fault(bool onread)
{
const auto& conf = msgr->cct->_conf;
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
cond.Signal();
if (onread && state == STATE_CONNECTING) {
// disconnect from Connection, and mark it failed. future messages
// will be dropped.
- assert(connection_state);
+ ceph_assert(connection_state);
stop();
bool cleared = connection_state->clear_pipe(this);
void Pipe::was_session_reset()
{
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
ldout(msgr->cct,10) << "was_session_reset" << dendl;
in_q->discard_queue(conn_id);
void Pipe::stop()
{
ldout(msgr->cct,10) << "stop" << dendl;
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
state = STATE_CLOSED;
state_closed = true;
cond.Signal();
void Pipe::stop_and_wait()
{
- assert(pipe_lock.is_locked_by_me());
+ ceph_assert(pipe_lock.is_locked_by_me());
if (state != STATE_CLOSED)
stop();
if (state == STATE_ACCEPTING) {
accept();
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
}
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
// sleep if (re)connecting
if (state == STATE_STANDBY) {
m->put();
if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
msgr->cct->_conf->ms_die_on_old_message)
- assert(0 == "old msgs despite reconnect_seq feature");
+ ceph_assert(0 == "old msgs despite reconnect_seq feature");
continue;
}
if (m->get_seq() > in_seq + 1) {
ldout(msgr->cct,0) << "reader missed message? skipped from seq "
<< in_seq << " to " << m->get_seq() << dendl;
if (msgr->cct->_conf->ms_die_on_skipped_message)
- assert(0 == "skipped incoming seq");
+ ceph_assert(0 == "skipped incoming seq");
}
m->set_connection(connection_state.get());
// connect?
if (state == STATE_CONNECTING) {
- assert(!policy.server);
+ ceph_assert(!policy.server);
connect();
continue;
}
ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
<< " b_off " << b_off << dendl;
}
- assert(donow > 0);
+ ceph_assert(donow > 0);
ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
<< " leftinchunk " << left
<< " buffer len " << pb->length()
msglen += donow;
msg.msg_iovlen++;
- assert(left >= donow);
+ ceph_assert(left >= donow);
left -= donow;
b_off += donow;
bl_pos += donow;
b_off = 0;
}
}
- assert(left == 0);
+ ceph_assert(left == 0);
// send footer; if receiver doesn't support signatures, use the old footer format
return -1;
//lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
- assert(len > 0);
+ ceph_assert(len > 0);
while (len > 0) {
MSGR_SIGPIPE_STOPPER;
int did = ::send( sd, buf, len, MSG_NOSIGNAL );
void stop_and_wait();
void _send(Message *m) {
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
out_q[m->get_priority()].push_back(m);
cond.Signal();
}
void _send_keepalive() {
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
send_keepalive = true;
cond.Signal();
}
Message *_get_next_outgoing() {
- assert(pipe_lock.is_locked());
+ ceph_assert(pipe_lock.is_locked());
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
int PipeConnection::send_message(Message *m)
{
- assert(msgr);
+ ceph_assert(msgr);
return static_cast<SimpleMessenger*>(msgr)->send_message(m, this);
}
*/
SimpleMessenger::~SimpleMessenger()
{
- assert(!did_bind); // either we didn't bind or we shut down the Accepter
- assert(rank_pipe.empty()); // we don't have any running Pipes.
- assert(!reaper_started); // the reaper thread is stopped
+ ceph_assert(!did_bind); // either we didn't bind or we shut down the Accepter
+ ceph_assert(rank_pipe.empty()); // we don't have any running Pipes.
+ ceph_assert(!reaper_started); // the reaper thread is stopped
}
void SimpleMessenger::ready()
{
bool ret = false;
auto addr = addrs.legacy_addr();
- assert(my_addr == my_addrs->front());
+ ceph_assert(my_addr == my_addrs->front());
if (my_addr.is_blank_ip()) {
ldout(cct,1) << __func__ << " " << addr << dendl;
entity_addr_t t = my_addr;
} else {
ldout(cct,1) << __func__ << " " << addr << " no-op" << dendl;
}
- assert(my_addr == my_addrs->front());
+ ceph_assert(my_addr == my_addrs->front());
return ret;
}
void SimpleMessenger::reaper()
{
ldout(cct,10) << "reaper" << dendl;
- assert(lock.is_locked());
+ ceph_assert(lock.is_locked());
while (!pipe_reap_queue.empty()) {
Pipe *p = pipe_reap_queue.front();
// or accept() may have switch the Connection to a different
// Pipe... but make sure!
bool cleared = p->connection_state->clear_pipe(p);
- assert(!cleared);
+ ceph_assert(!cleared);
}
p->pipe_lock.Unlock();
p->unregister_pipe();
- assert(pipes.count(p));
+ ceph_assert(pipes.count(p));
pipes.erase(p);
// drop msgr lock while joining thread; the delay through could be
if (con) {
Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
if (p) {
- assert(p->msgr == this);
+ ceph_assert(p->msgr == this);
r = p->is_connected();
p->put();
}
int SimpleMessenger::rebind(const set<int>& avoid_ports)
{
ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
- assert(did_bind);
+ ceph_assert(did_bind);
accepter.stop();
mark_down_all();
return accepter.rebind(avoid_ports);
return 0;
Mutex::Locker l(lock);
if (did_bind) {
- assert(*my_addrs == entity_addrvec_t(bind_addr));
+ ceph_assert(*my_addrs == entity_addrvec_t(bind_addr));
return 0;
}
if (started) {
ldout(cct,1) << "messenger.start" << dendl;
// register at least one entity, first!
- assert(my_name.type() >= 0);
+ ceph_assert(my_name.type() >= 0);
- assert(!started);
+ ceph_assert(!started);
started = true;
stopped = false;
PipeConnection *con,
Message *first)
{
- assert(lock.is_locked());
- assert(addr != my_addr);
+ ceph_assert(lock.is_locked());
+ ceph_assert(addr != my_addr);
ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
static_cast<PipeConnection*>(con)->get_pipe());
if (pipe) {
ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
- assert(pipe->msgr == this);
+ ceph_assert(pipe->msgr == this);
pipe->pipe_lock.Lock();
pipe->_send_keepalive();
pipe->pipe_lock.Unlock();
Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
if (p) {
ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
- assert(p->msgr == this);
+ ceph_assert(p->msgr == this);
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
if (p) {
ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
- assert(p->msgr == this);
+ ceph_assert(p->msgr == this);
p->pipe_lock.Lock();
p->policy.lossy = true;
p->pipe_lock.Unlock();
* @{
*/
void set_cluster_protocol(int p) override {
- assert(!started && !did_bind);
+ ceph_assert(!started && !did_bind);
cluster_protocol = p;
}
struct ceph_timespec ts;
if (ack) {
- assert(tp);
+ ceph_assert(tp);
tp->encode_timeval(&ts);
xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
}
const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
- assert(header.size() == 1); /* accelio header must be without scatter gather */
+ ceph_assert(header.size() == 1); /* accelio header must be without scatter gather */
list<bufferptr>::const_iterator pb = header.begin();
- assert(pb->length() < XioMsgHdr::get_max_encoded_length());
+ ceph_assert(pb->length() < XioMsgHdr::get_max_encoded_length());
struct xio_msg * msg = xcmd->get_xio_msg();
msg->out.header.iov_base = (char*) pb->c_str();
msg->out.header.iov_len = pb->length();
<< " iov_len " << (int) tmsg->in.header.iov_len
<< " nents " << tmsg->in.pdata_iov.nents
<< " sn " << tmsg->sn << dendl;
- assert(session == this->session);
+ ceph_assert(session == this->session);
in_seq.set_count(msg_cnt.msg_cnt);
} else {
/* XXX major sequence error */
- assert(! tmsg->in.header.iov_len);
+ ceph_assert(! tmsg->in.header.iov_len);
}
in_seq.append(msg);
default:
lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
- assert(! "unsupported message tag");
+ ceph_assert(! "unsupported message tag");
}
xio_release_msg(msg);
xcon->conn = conn;
xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
- assert(xcon->portal);
+ ceph_assert(xcon->portal);
xcona.user_context = xcon;
(void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
if (!!e)
return NULL;
XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
- assert(!!xmsg);
+ ceph_assert(!!xmsg);
new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
return xmsg;
}
if (!!e)
return NULL;
XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
- assert(!!xcmd);
+ ceph_assert(!!xcmd);
new (xcmd) XioCommand(xcon, mp_mem);
return xcmd;
}
req = xmsg->get_xio_msg();
const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
- assert(header.size() == 1); /* XXX */
+ ceph_assert(header.size() == 1); /* XXX */
list<bufferptr>::const_iterator pb = header.begin();
req->out.header.iov_base = (char*) pb->c_str();
req->out.header.iov_len = pb->length();
/* merge with portal traffic */
xcon->portal->enqueue(xcon, xcmp);
- assert(r);
+ ceph_assert(r);
return r;
}
ceph_msg_footer _ceph_msg_footer;
XioMsgHdr hdr (_ceph_msg_header, _ceph_msg_footer, 0 /* features */);
const std::list<buffer::ptr>& hdr_buffers = hdr.get_bl().buffers();
- assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */
+ ceph_assert(hdr_buffers.size() == 1); /* accelio header is small without scatter gather */
return hdr_buffers.begin()->length();
}
}
// fall back to malloc on errors
mp->addr = malloc(size);
- assert(mp->addr);
+ ceph_assert(mp->addr);
mp->length = 0;
if (unlikely(XioPool::trace_mempool))
xp_stats.inc_overflow();
/* a portal is an xio_context and event loop */
ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
- assert(ctx && "Whoops, failed to create portal/ctx");
+ ceph_assert(ctx && "Whoops, failed to create portal/ctx");
}
int bind(struct xio_session_ops *ops, const string &base_uri,
for (int i = 0; i < n; i++) {
if (!portals[i]) {
portals[i] = new XioPortal(msgr, nconns);
- assert(portals[i] != nullptr);
+ ceph_assert(portals[i] != nullptr);
}
}
}