From 738440afaa2c2af19bd22cdfc7b8ecd4204f94b4 Mon Sep 17 00:00:00 2001 From: topher Date: Sun, 1 May 2005 23:53:16 +0000 Subject: [PATCH] Multithreaded version of MPI Messenger and associated tests. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@188 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/msg/MTMessenger.cc | 183 ++++++++++++++++++++++++++++++++++++++++ ceph/msg/MTMessenger.h | 36 ++++++++ ceph/test/mttest.cc | 140 ++++++++++++++++++++++++++++++ 3 files changed, 359 insertions(+) create mode 100644 ceph/msg/MTMessenger.cc create mode 100644 ceph/msg/MTMessenger.h create mode 100644 ceph/test/mttest.cc diff --git a/ceph/msg/MTMessenger.cc b/ceph/msg/MTMessenger.cc new file mode 100644 index 0000000000000..f362553185692 --- /dev/null +++ b/ceph/msg/MTMessenger.cc @@ -0,0 +1,183 @@ +#include +#include "mpi.h" + +#include "include/config.h" +#include "include/error.h" +#include "Messenger.h" +#include "MTMessenger.h" + +// This module uses MPI to implement a blocking sendrecv function that +// feels more like a procedure call and less like event processesing. +// +// Threads are not independently addressable in MPI, only processes +// are. However, MPI does include a user defined tag in the message +// envelope, and a reader may selectively read only messages with a +// matching tag. The modules assign an integer to each thread to use +// as the tag. +// + +// our lock for any common data; it's okay to have only the one global mutex +// because our common data isn't a whole lot. +static pthread_mutex_t mutex; + +// the key used to fetch the tag for the current thread. +pthread_key_t tag_key; + +// the number of distinct threads we've seen so far; used to generate +// a unique tag for each thread. +static int nthreads; + +// the MPI identity of this process +static int mpi_rank; + + +// get the tag for this thread +static int get_tag() +{ + int tag = (int)pthread_getspecific(tag_key); + + if (tag == 0) { + // first time this thread has performed MPI messaging + + if (pthread_mutex_lock(&mutex) < 0) + SYSERROR(); + + tag = ++nthreads; + + if (pthread_mutex_unlock(&mutex) < 0) + SYSERROR(); + + if (pthread_setspecific(tag_key, (void*)tag) < 0) + SYSERROR(); + } + + return tag; +} + + +// marshall a message and send it over MPI +static void send(Message *m, int rank, int tag) +{ + // marshall the message + crope r; + m->encode(r); + int size = r.length(); + + char *buf = (char*)r.c_str(); + ASSERT(MPI_Send(buf, + size, + MPI_CHAR, + rank, + tag, + MPI_COMM_WORLD) == MPI_SUCCESS); +} + +// read a message from MPI and unmarshall it +static Message *receive(int tag) +{ + MPI_Status status; + + // get message size + ASSERT(MPI_Probe(MPI_ANY_SOURCE, + tag, + MPI_COMM_WORLD, + &status) == MPI_SUCCESS); + + // get message; there may be multiple messages on the queue, we + // need to be sure to read the one which corresponds to size + // obtained above. + char *buf = new char[status.count]; + ASSERT(MPI_Recv(buf, + status.count, + MPI_CHAR, + status.MPI_SOURCE, + status.MPI_TAG, + MPI_COMM_WORLD, + &status) == MPI_SUCCESS); + + // unmarshall message + crope r(buf, status.count); + delete[] buf; + Message *m = decode_message(r); + + return m; +} + +MTMessenger::MTMessenger(int& argc, char**& argv) +{ + // setup MPI; MPI errors will probably invoke the default MPI error + // handler, which aborts the program with a friendly message rather + // than returning from a function; just in case, we abort the + // program if we get an MPI error. + + int provided; + ASSERT(MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided) + == MPI_SUCCESS); + + ASSERT(MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank) == MPI_SUCCESS); + + if (pthread_mutex_init(&mutex, NULL) < 0) + SYSERROR(); + + if (pthread_key_create(&tag_key, NULL) < 0) + SYSERROR(); + + nthreads = 0; +} + +MTMessenger::~MTMessenger() +{ + // ignore shutdown errors + + pthread_key_delete(tag_key); + + pthread_mutex_destroy(&mutex); + + MPI_Finalize(); +} + +// send a request and wait for the response +Message *MTMessenger::sendrecv(Message *m, msg_addr_t dest) +{ + int dest_tag = 0; // servers listen for any tag + int my_tag = get_tag(); + + // set our envelope (not to be confused with the MPI envelope) + m->set_source(mpi_rank, my_tag); + m->set_dest(dest, dest_tag); + + send(m, dest, dest_tag); + + return receive(my_tag); +} + +// receive a request from anyone +Message *MTMessenger::recvreq() +{ + return receive(MPI_ANY_TAG); +} + +// forward request, masquerading as original source +void MTMessenger::fwdreq(Message *req, int dest) +{ + int dest_tag = 0; // servers listen for any tag + + // set our envelope (not to be confused with the MPI envelope) + req->set_dest(dest, dest_tag); + + send(req, dest, dest_tag); +} + +// send a response to the originator of the request +void MTMessenger::sendresp(Message *req, Message *resp) +{ + int req_rank = req->get_source(); + int req_tag = req->get_source_port(); + int my_tag = get_tag(); + + // set our envelope (not to be confused with the MPI envelope) + resp->set_source(mpi_rank, my_tag); + resp->set_dest(req_rank, req_tag); + + send(resp, req_rank, req_tag); +} diff --git a/ceph/msg/MTMessenger.h b/ceph/msg/MTMessenger.h new file mode 100644 index 0000000000000..0196322a922b7 --- /dev/null +++ b/ceph/msg/MTMessenger.h @@ -0,0 +1,36 @@ +#ifndef __MTMESSENGER_H +#define __MTMESSENGER_H + +#include "Message.h" +#include "SerialMessenger.h" + +// Marshall and unmarshall OBFS messages, send and receive them over +// MPI. + +class MTMessenger +{ +public: + // sets up the queues and internal thread; the MPI initialization + // will scan argc/argv for MPI specific flags and remove them from + // argc/argv. + MTMessenger(int &argc, char **&argv); + + // tears it all down + ~MTMessenger(); + + // send a request to a server and wait (block) for the response; + virtual Message *sendrecv(Message *m, msg_addr_t dest); + + // wait (block) for a request from anyone + Message *recvreq(); + + // forward request, masquerading as original source + void fwdreq(Message *req, int dest); + + // send the response to the originator of the request + virtual void sendresp(Message *req, Message *resp); + + +}; // class MTMessenger + +#endif // __MTMESSENGER_H diff --git a/ceph/test/mttest.cc b/ceph/test/mttest.cc new file mode 100644 index 0000000000000..f4b53c0f1194f --- /dev/null +++ b/ceph/test/mttest.cc @@ -0,0 +1,140 @@ +// Check that MTMessenger properly dispatches replies to the correct +// thread. Processes with mutliple threads of clients send a +// "request" to a server, which then sends back a "reply". The client +// checks that it received the correct reply for its request. The +// request and reply are both an MClientRequest, which we used because +// it allows us to pass an arbitrary string in the sarg field. In the +// request, the sarg field contains a string "rN:tN:mN" which uniquely +// identifies a request by rank (process), thread and message. The +// server sends the reply with the sarg field set to "rN:tN:mN reply", +// and the client can the verify it receive the correct reply for its +// request. + +#include +#include "mpi.h" + +#include "messages/MClientRequest.h" +#include "msg/MTMessenger.h" +#include "include/error.h" + +#define SARG_SIZE 64 +#define SERVER_RANK 0 +#define NTHREADS 11 // number of threads per rank +#define NMESSAGES 31 // number of messages per thread + +static void server_loop(MTMessenger &msgr, int world_size) +{ + // we expect this many messages from clients, then we quit + // (world_size-1 since server is one of the processes). + int totmsg = NTHREADS * NMESSAGES * (world_size - 1); + int nmsg = 0; + + char buf[SARG_SIZE]; + + while(nmsg < totmsg) { + MClientRequest *req = (MClientRequest*)msgr.recvreq(); + ASSERT(req->get_type() == MSG_CLIENT_REQUEST); + + //cout << "Server acknowledging " << req->get_sarg() << endl; + + sprintf(buf, "%s reply", req->get_sarg().c_str()); + MClientRequest resp(0, 0); + resp.set_sarg(buf); + msgr.sendresp(req, &resp); + + delete req; + nmsg++; + } + + cout << "Server successful" << endl; +} + +// arguments for client thread start function (see pthread_create) +struct client_arg +{ + MTMessenger *msgr; + int rank; + int thread; +}; + +static void *client_session(void *_carg) +{ + client_arg *carg = (client_arg *)_carg; + + char buf[SARG_SIZE]; + + // repeat some number (arbitrary really) of rounds + for (int i = 0; i < NMESSAGES; i++) { + + // send the message, receive the reply and check reply is as + // expected + + MClientRequest request(0, 0); + sprintf(buf, "r%d:t%d:m%d", carg->rank, carg->thread, i); + request.set_sarg(buf); + + //cout << "Client sending " << request.get_sarg() << endl; + + MClientRequest *resp = + (MClientRequest*)carg->msgr->sendrecv(&request, SERVER_RANK); + + ASSERT(resp->get_type() == MSG_CLIENT_REQUEST); + sprintf(buf, "r%d:t%d:m%d reply", carg->rank, carg->thread, i); + ASSERT(strcmp(buf, resp->get_sarg().c_str()) == 0); + + //cout << "Client verified " << resp->get_sarg() << endl; + + delete resp; + } + + cout << "Client (" << carg->rank << "," << carg->thread + << ") successful" << endl; + + delete carg; + return NULL; +} + +static void launch_clients(MTMessenger &msgr, int rank) +{ + pthread_t tid[NTHREADS]; + + // launch some number (arbitrary really) of threads + for (int i = 0; i < NTHREADS; i++) { + + client_arg *carg = (client_arg*)malloc(sizeof(client_arg)); + ASSERT(carg); + carg->msgr = &msgr; + carg->rank = rank; + carg->thread = i; + + if (pthread_create(&tid[i], NULL, client_session, carg) < 0) + SYSERROR(); + } + + // we must wait for all the threads to exit before returning, + // otherwise we shutdown MPI before while the threads are + // chatting. + for (int i = 0; i < NTHREADS; i++) { + void *retval; + + if (pthread_join(tid[i], &retval) < 0) + SYSERROR(); + } +} + +int main(int argc, char **argv) +{ + MTMessenger msgr(argc, argv); + + int rank; + ASSERT(MPI_Comm_rank(MPI_COMM_WORLD, &rank) == MPI_SUCCESS); + int world_size; + ASSERT(MPI_Comm_size(MPI_COMM_WORLD, &world_size) == MPI_SUCCESS); + + if (rank == SERVER_RANK) + server_loop(msgr, world_size); + else + launch_clients(msgr, rank); + + return 0; +} -- 2.39.5