}
}
break;
+
case SYNCLIENT_MODE_RANDOMWALK:
{
int iarg1 = iargs.front();
random_walk(iarg1);
}
break;
+
case SYNCLIENT_MODE_MAKEDIRS:
{
string sarg1 = get_sarg();
make_dirs(sarg1.c_str(), iarg1, iarg2, iarg3);
}
break;
+
case SYNCLIENT_MODE_FULLWALK:
{
string sarg1 = get_sarg();
full_walk(sarg1);
}
break;
+ case SYNCLIENT_MODE_REPEATWALK:
+ {
+ string sarg1 = get_sarg();
+ dout(2) << "repeatwalk " << sarg1 << endl;
+ while (full_walk(sarg1) == 0) ;
+ }
+ break;
+
case SYNCLIENT_MODE_WRITEFILE:
{
string sarg1 = get_sarg();
int SyntheticClient::full_walk(string& basedir)
{
- if (run_until.first && g_clock.gettimepair() > run_until) return 0;
+ if (run_until.first && g_clock.gettimepair() > run_until) return -1;
// read dir
map<string, inode_t*> contents;
#define SYNCLIENT_MODE_WRITEFILE 4
#define SYNCLIENT_MODE_READFILE 5
#define SYNCLIENT_MODE_UNTIL 6
+#define SYNCLIENT_MODE_REPEATWALK 7
class SyntheticClient {
Client *client;
#include "include/config.h"
+#include <sys/stat.h>
+#include <sys/types.h>
+
+
// per-process lock. lame, but this way I protect LogType too!
Mutex logger_lock;
filename = "log/";
if (g_conf.log_name) {
filename += g_conf.log_name;
+ ::mkdir( filename.c_str(), 0755 ); // make sure dir exists
filename += "/";
}
filename += fn;
virtual ~Mutex()
{
- pthread_mutex_unlock(&M);
+ //pthread_mutex_unlock(&M);
pthread_mutex_destroy(&M);
}
*/
hash_map<int, TCPMessenger*> directory; // local
+Mutex directory_lock;
list<Message*> incoming;
Mutex incoming_lock;
Cond incoming_cond;
in.pop_front();
int dest = m->get_dest();
+ directory_lock.Lock();
if (directory.count(dest)) {
Messenger *who = directory[ dest ];
-
+ directory_lock.Unlock();
+
dout(4) << "---- '" << m->get_type_name() <<
"' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() <<
" to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- "
who->dispatch(m);
} else {
+ directory_lock.Unlock();
dout (1) << "---- i don't know who " << dest << " is." << endl;
assert(0);
}
this->myaddr = myaddr;
// register myself in the messenger directory
+ directory_lock.Lock();
directory[myaddr] = this;
+ directory_lock.Unlock();
// register to execute timer events
g_timer.set_messenger_kicker(new C_TCPKicker());
int TCPMessenger::shutdown()
{
// remove me from the directory
+ directory_lock.Lock();
directory.erase(myaddr);
-
- // no more timer events
- g_timer.unset_messenger_kicker();
+ bool lastone = directory.empty();
+ directory_lock.Unlock();
// last one?
- if (directory.empty()) {
+ if (lastone) {
dout(2) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl;
pthread_t whoami = pthread_self();
+ // no more timer events
+ g_timer.unset_messenger_kicker();
// close incoming sockets
}
*/
} else {
- dout(10) << "shutdown still " << directory.size() << " other messengers on rank " << mpi_rank << endl;
+ dout(10) << "shutdown still" /*<< directory.size()*/ << " other messengers on rank " << mpi_rank << endl;
}
}
m->set_source(myaddr, fromport);
m->set_dest(dest, port);
- if (0) {
+ if (1) {
// der
tcp_send(m);
} else {
syn_iargs.push_back( atoi(argv[++i]) );
} else if (strcmp(argv[i],"fullwalk") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
+ } else if (strcmp(argv[i],"repeatwalk") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_REPEATWALK );
//syn_sargs.push_back( atoi(argv[++i]) );
} else if (strcmp(argv[i],"randomwalk") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_RANDOMWALK );