pipe_lock.Unlock();
xlist<Pipe *>* list_on;
q.lock.Lock();//to remove from round-robin
+ pipe_lock.Lock();
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
i != queue_items.end();
++i) {
}
}
q.lock.Unlock();
- pipe_lock.Lock();
// clear queue_items
while (!queue_items.empty()) {
in_seq = m->get_seq();
cond.Signal(); // wake up writer, to ack this
- pipe_lock.Unlock();
dout(10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
queue_received(m);
-
- pipe_lock.Lock();
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
//push it to the back of the line!
//Also, call with pipe_lock held or bad things happen
void enqueue_me(int priority) {
+ assert(pipe_lock.is_locked());
if (!queue_items.count(priority))
queue_items[priority] = new xlist<Pipe *>::item(this);
pipe_lock.Unlock();
messenger->dispatch_queue.lock.Lock();
+ pipe_lock.Lock();
if (messenger->dispatch_queue.queued_pipes.empty())
messenger->dispatch_queue.cond.Signal();
messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
messenger->dispatch_queue.lock.Unlock();
- pipe_lock.Lock();
}
//we have two queue_received's to allow local signal delivery
// via Message * (that doesn't actually point to a Message)
- //Don't call while holding pipe_lock!
void queue_received(Message *m, int priority) {
+ assert(pipe_lock.is_locked());
+
list<Message *>& queue = in_q[priority];
bool was_empty;
- pipe_lock.Lock();
+
if (halt_delivery) {
if (m>(void *)5) // don't want to put local-delivery signals
// this magic number should be larger than
// the size of the D_CONNECT et al enum
m->put();
- goto unlock_return;
+ return;
}
was_empty = queue.empty();
queue.push_back(m);
- if (was_empty) //this pipe isn't on the endpoint queue
- enqueue_me(priority);
//increment queue length counters
in_qlen++;
++messenger->dispatch_queue.qlen;
messenger->dispatch_queue.qlen_lock.unlock();
-unlock_return:
- pipe_lock.Unlock();
+ if (was_empty) //this pipe isn't on the endpoint queue
+ enqueue_me(priority);
}
void queue_received(Message *m) {
Pipe *local_pipe;
void local_delivery(Message *m, int priority) {
+ local_pipe->pipe_lock.Lock();
if ((unsigned long)m > 10)
m->set_connection(local_pipe->connection_state->get());
local_pipe->queue_received(m, priority);
+ local_pipe->pipe_lock.Unlock();
}
int get_queue_len() {
return l;
}
- void local_delivery(Message *m) {
- if ((unsigned long)m > 10)
- m->set_connection(local_pipe->connection_state->get());
- local_pipe->queue_received(m);
- }
-
void queue_connect(Connection *con) {
lock.Lock();
connect_q.push_back(con);