From 937f8f1f586dbbd3fe69bf937abb1685494d1ab8 Mon Sep 17 00:00:00 2001 From: sage Date: Sun, 5 Jun 2005 18:11:27 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@270 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/msg/MPIMessenger.cc | 42 ++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index 4ee3a34d154c7..1b250be211b59 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -27,6 +27,7 @@ using namespace __gnu_cxx; hash_map directory; list outgoing, incoming; list unfinished_sends; +map unfinished_send_message; /* this process */ int mpi_world; @@ -111,37 +112,40 @@ MPI_Request *mpi_prep_send_req() { return req; } -void mpi_reap_sends() { +void mpi_reap_sends(bool wait=false) { sender_lock.Lock(); + list::iterator it = unfinished_sends.begin(); while (it != unfinished_sends.end()) { MPI_Status status; int flag; - MPI_Test(*it, &flag, &status); - if (!flag) break; // not finished yet + + if (wait) { + MPI_Wait(*it, &status); + } else { + MPI_Test(*it, &flag, &status); + if (!flag) break; // not finished yet + } + dout(DBLVL) << "send " << *it << " completed" << endl; + + if (unfinished_send_message.count(*it)) { + dout(DBLVL) << "send message " << unfinished_send_message[*it] << " completed" << endl; + delete unfinished_send_message[*it]; + unfinished_send_message.erase(*it); + } + delete *it; it++; unfinished_sends.pop_front(); } - dout(DBLVL) << "reap has " << unfinished_sends.size() << " Isends outstanding" << endl; + dout(DBLVL) << "reap has " << unfinished_sends.size() << " Isends outstanding, " << unfinished_send_message.size() << " messages" << endl; sender_lock.Unlock(); } void mpi_finish_sends() { - sender_lock.Lock(); // not necessary? - list::iterator it = unfinished_sends.begin(); - while (it != unfinished_sends.end()) { - MPI_Status status; - int flag; - MPI_Wait(*it, &status); - dout(DBLVL) << "send " << *it << " completed" << endl; - delete *it; - it++; - unfinished_sends.pop_front(); - } - sender_lock.Unlock(); + mpi_reap_sends(true); } @@ -260,7 +264,11 @@ int mpi_send(Message *m, int tag) i++; } - dout(DBLVL) << "mpi_send done" << endl; + // attach message to last send, so we can free it later + MPI_Request *req = unfinished_sends.back(); + unfinished_send_message[req] = m; + + dout(DBLVL) << "mpi_send done, attached message to Isend " << req << endl; #ifndef FUNNEL_MPI sender_lock.Unlock(); -- 2.39.5