while (!dispatch_queue.stop) {
while (!dispatch_queue.queued_pipes.empty() && !dispatch_queue.stop) {
//get highest-priority pipe
- map<int, xlist<Pipe *> >::reverse_iterator high_iter =
+ map<int, xlist<Pipe *>* >::reverse_iterator high_iter =
dispatch_queue.queued_pipes.rbegin();
int priority = high_iter->first;
- xlist<Pipe *>& pipe_list = high_iter->second;
+ xlist<Pipe *> *pipe_list = high_iter->second;
- Pipe *pipe = pipe_list.front();
+ Pipe *pipe = pipe_list->front();
//move pipe to back of line -- or just take off if no more messages
pipe->pipe_lock.Lock();
list<Message *>& m_queue = pipe->in_q[priority];
m_queue.pop_front();
if (m_queue.empty()) {
- pipe_list.pop_front(); // pipe is done
- if (pipe_list.empty())
+ pipe_list->pop_front(); // pipe is done
+ if (pipe_list->empty()) {
+ delete pipe_list;
dispatch_queue.queued_pipes.erase(priority);
+ }
} else {
- pipe_list.push_back(pipe->queue_items[priority]); // move to end of list
+ pipe_list->push_back(pipe->queue_items[priority]); // move to end of list
}
ldout(cct,20) << "dispatch_entry pipe " << pipe << " dequeued " << m << dendl;
dispatch_queue.lock.Unlock(); //done with the pipe queue for a while
queue_items[priority] = new xlist<Pipe *>::item(this);
if (msgr->dispatch_queue.queued_pipes.empty())
msgr->dispatch_queue.cond.Signal();
- msgr->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+
+ map<int, xlist<Pipe*>*>::iterator p = msgr->dispatch_queue.queued_pipes.find(priority);
+ xlist<Pipe*> *pipe_list;
+ if (p != msgr->dispatch_queue.queued_pipes.end())
+ pipe_list = p->second;
+ else {
+ pipe_list = new xlist<Pipe*>;
+ msgr->dispatch_queue.queued_pipes[priority] = pipe_list;
+ }
+ pipe_list->push_back(queue_items[priority]);
}
queue.push_back(m);
xlist<Pipe *>* list_on;
if ((list_on = i->second->get_list())) { //if in round-robin
i->second->remove_myself(); //take off
- if (list_on->empty()) //if round-robin queue is empty
+ if (list_on->empty()) { //if round-robin queue is empty
+ delete list_on;
q.queued_pipes.erase(i->first); //remove from map
+ }
}
}
friend class Writer;
public:
+ Pipe(const Pipe& other);
+ const Pipe& operator=(const Pipe& other);
+
Pipe(SimpleMessenger *r, int st) :
msgr(r),
sd(-1),
Cond cond;
bool stop;
- map<int, xlist<Pipe *> > queued_pipes;
+ map<int, xlist<Pipe *>* > queued_pipes;
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
atomic_t qlen;
local_pipe(NULL)
{}
~DispatchQueue() {
- for (map< int, xlist<Pipe *> >::iterator i = queued_pipes.begin();
+ for (map< int, xlist<Pipe *>* >::iterator i = queued_pipes.begin();
i != queued_pipes.end();
++i) {
- i->second.clear();
+ i->second->clear();
+ delete i->second;
}
}
} dispatch_queue;