public:
atomic_t(int i=0) : lock("atomic_t::lock", false, false /* no lockdep */), nref(i) {}
atomic_t(const atomic_t& other);
- void inc() {
+ int inc() {
lock.Lock();
- ++nref;
+ int r = ++nref;
lock.Unlock();
+ return r;
}
int dec() {
lock.Lock();
virtual void dispatch(Message *m) = 0;
// how i deal with transmission failures.
- virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { delete m; }
+ virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { }
/*
* on any connection reset.
#include "include/buffer.h"
#include "msg_types.h"
+#include "common/debug.h"
friend class Messenger;
public:
- Message() {
+ atomic_t nref;
+
+ Message() : nref(0) {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
};
- Message(int t) {
+ Message(int t) : nref(0) {
memset(&header, 0, sizeof(header));
header.type = t;
header.priority = 0; // undef
header.data_off = 0;
memset(&footer, 0, sizeof(footer));
}
- virtual ~Message() { }
+ virtual ~Message() {
+ assert(nref.test() == 0);
+ }
+
+ void get() {
+ //int r =
+ nref.inc();
+ //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl;
+ //_dout_end_line();
+ }
+ void put() {
+ int r = nref.dec();
+ //*_dout << dbeginl << "message(" << this << ").put " << (r+1) << " -> " << r << std::endl;
+ //_dout_end_line();
+ if (r == 0)
+ delete this;
+ }
ceph_msg_header &get_header() { return header; }
void set_header(const ceph_msg_header &e) { header = e; }
{
const entity_name_t dest = m->get_dest();
+ assert(m->nref.test() == 0);
+
// lookup
entity_addr_t dest_proc_addr = dest_addr;
dest_proc_addr.erank = 0;
failed_q.pop_front();
lock.Unlock();
get_dispatcher()->ms_handle_failure(m, i);
+ m->put();
} else {
dout(1) << m->get_dest()
<< " <== " << m->get_source_inst()
unsigned srcrank = m->get_source_inst().addr.erank;
if (srcrank >= rank.max_local || rank.local[srcrank] == 0) {
dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
- delete m;
} else if (rank.local[srcrank]->is_stopped()) {
dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
- delete m;
} else {
dout(10) << "fail on " << *m << dendl;
rank.local[srcrank]->queue_failure(m, m->get_dest_inst());
}
}
+ m->put();
}
}
sent.pop_front();
dout(10) << "reader got ack seq "
<< seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
- delete m;
+ m->put();
}
}
continue;
if (erank < rank.max_local && rank.local[erank]) {
// find entity
entity = rank.local[erank];
+ entity->get();
// first message?
if (entity->need_addr) {
}
rank.lock.Unlock();
- if (entity)
+ if (entity) {
entity->queue_message(m); // queue
+ entity->put();
+ }
lock.Lock();
}
if (m) {
m->set_seq(++out_seq);
sent.push_back(m); // move to sent list
+ m->get();
lock.Unlock();
dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
<< errno << ": " << strerror(errno) << dendl;
fault();
}
+ m->put();
}
continue;
}
lock.Unlock();
}
void _send(Message *m) {
+ m->get();
q[m->get_priority()].push_back(m);
last_dest_name = m->get_dest();
cond.Signal();
void queue_message(Message *m) {
// set recv stamp
m->set_recv_stamp(g_clock.now());
-
+
+ assert(m->nref.test() == 0);
+
lock.Lock();
qlen++;
dispatch_queue[m->get_priority()].push_back(m);
}
void queue_failure(Message *m, entity_inst_t i) {
lock.Lock();
+ m->get();
failed_q.push_back(pair<Message*,entity_inst_t>(m,i));
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
cond.Signal();
{
entity_name_t dest = inst.name;
- if (g_conf.ms_die_on_failure) {
- dout(0) << "ms_handle_failure " << inst << " on " << *m << dendl;
- exit(0);
- }
-
- if (is_stopping()) {
- delete m;
- return;
- }
-
- dout(1) << "ms_handle_failure " << inst
- << ", dropping " << *m << dendl;
- delete m;
+ dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl;
+ if (g_conf.ms_die_on_failure)
+ assert(0);
}
}
void ms_handle_failure(Message *m, const entity_inst_t& inst) {
-
-
}
} dispatcher;
# mds
for mds in `seq 0 $((CEPH_NUM_MDS-1))`
do
- $CEPH_BIN/crun $norestart $CEPH_BIN/cmds $ARGS $CMDS_ARGS &
+ $CEPH_BIN/crun $norestart $valgrind $CEPH_BIN/cmds $ARGS $CMDS_ARGS &
#valgrind --tool=massif $CEPH_BIN/cmds $ARGS --mds_log_max_segments 2 --mds_thrash_fragments 0 --mds_thrash_exports 0 > m #--debug_ms 20
#$CEPH_BIN/cmds -d $ARGS --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20