}
if (pipe_list.empty())
queued_pipes.erase(priority);
- --qlen;
endpoint_lock.Unlock(); //done with the pipe queue for a while
+ qlen_lock.lock();
+ --qlen;
+ qlen_lock.unlock();
//get message from pipe
Message *m = m_queue.front();
i != queue_items.end();
++i)
i->second->remove_myself();
+ rank->local_endpoint->endpoint_lock.Unlock();
endpoint = true;
pipe_lock.Lock();
}
for (map<int,list<Message*> >::iterator p = in_q.begin(); p != in_q.end(); p++) {
if (endpoint) {
int size = in_q.size();
+ rank->local_endpoint->qlen_lock.lock();
rank->local_endpoint->qlen -= size;
+ rank->local_endpoint->qlen_lock.unlock();
}
for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
delete *r;
}
- if (endpoint)
- rank->local_endpoint->endpoint_lock.Unlock();
in_q.clear();
}
using namespace __gnu_cxx;
#include "common/Mutex.h"
+#include "common/Spinlock.h"
#include "common/Cond.h"
#include "common/Thread.h"
pipe_lock.Unlock();
//increment queue length counter
- rank->local_endpoint->endpoint_lock.Lock();
+ rank->local_endpoint->qlen_lock.lock();
++rank->local_endpoint->qlen;
+ rank->local_endpoint->qlen_lock.unlock();
rank->local_endpoint->cond.Signal();
- rank->local_endpoint->endpoint_lock.Unlock();
dout(0) << "finished queuing received message " << m << "in msgr " << rank << dendl;
}
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
bool stop;
int qlen;
+ Spinlock qlen_lock;
int my_rank;
class DispatchThread : public Thread {
endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"),
stop(false),
qlen(0),
+ qlen_lock("SimpleMessenger::Endpoint::qlen_lock"),
my_rank(rn),
dispatch_thread(this) {
local_pipe = new Pipe(r, Pipe::STATE_OPEN);