virtual ~RefCountedObject() {}
RefCountedObject *get() {
- //generic_dout(0) << "RefCountedObject::get " << this << " " << nref.test() << " -> " << (nref.test() + 1) << dendl;
+ //generic_dout(0) << "RefCountedObject::get " << this << " " << nref.read() << " -> " << (nref.read() + 1) << dendl;
nref.inc();
return this;
}
void put() {
- //generic_dout(0) << "RefCountedObject::put " << this << " " << nref.test() << " -> " << (nref.test() - 1) << dendl;
+ //generic_dout(0) << "RefCountedObject::put " << this << " " << nref.read() << " -> " << (nref.read() - 1) << dendl;
if (nref.dec() == 0)
delete this;
}
int peer_type;
entity_addr_t peer_addr;
unsigned features;
- void *pipe;
+ RefCountedObject *pipe;
public:
Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1), features(0), pipe(NULL) {}
//generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
priv->put();
}
+ if (pipe)
+ pipe->put();
}
Connection *get() {
return NULL;
}
+ RefCountedObject *get_pipe() {
+ Mutex::Locker l(lock);
+ if (pipe)
+ return pipe->get();
+ return NULL;
+ }
+ void clear_pipe() {
+ Mutex::Locker l(lock);
+ if (pipe) {
+ pipe->put();
+ pipe = NULL;
+ }
+ }
+
int get_peer_type() { return peer_type; }
void set_peer_type(int t) { peer_type = t; }
<< " " << m
<< dendl;
- submit_message(m, (SimpleMessenger::Pipe *)con->pipe);
+ SimpleMessenger::Pipe *pipe = (SimpleMessenger::Pipe *)con->get_pipe();
+ if (pipe) {
+ submit_message(m, pipe);
+ pipe->put();
+ } // else we raced with reaper()
return 0;
}
p->join();
dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
assert(p->sd < 0);
- delete p;
+ if (p->connection_state)
+ p->connection_state->clear_pipe();
+ p->put();
dout(10) << "reaper deleted pipe " << p << dendl;
}
}
void sigint(int r);
// pipe
- class Pipe {
+ class Pipe : public RefCountedObject {
public:
SimpleMessenger *messenger;
ostream& _pipe_prefix();
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) {
- connection_state->pipe = this;
+ connection_state->pipe = get();
}
~Pipe() {
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
}
assert(out_q.empty());
assert(sent.empty());
- connection_state->pipe = NULL;
connection_state->put();
}