From 73c3fa469226683e05d8720ac358350469e31a37 Mon Sep 17 00:00:00 2001 From: sage Date: Thu, 7 Jul 2005 03:26:56 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@409 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/SyntheticClient.cc | 13 ++++++++++++- ceph/client/SyntheticClient.h | 1 + ceph/common/Logger.cc | 5 +++++ ceph/common/Mutex.h | 2 +- ceph/msg/TCPMessenger.cc | 22 +++++++++++++++------- ceph/tcpsyn.cc | 2 ++ 6 files changed, 36 insertions(+), 9 deletions(-) diff --git a/ceph/client/SyntheticClient.cc b/ceph/client/SyntheticClient.cc index 8b07f2db5cbb2..f940645ee0f28 100644 --- a/ceph/client/SyntheticClient.cc +++ b/ceph/client/SyntheticClient.cc @@ -66,6 +66,7 @@ int SyntheticClient::run() } } break; + case SYNCLIENT_MODE_RANDOMWALK: { int iarg1 = iargs.front(); @@ -74,6 +75,7 @@ int SyntheticClient::run() random_walk(iarg1); } break; + case SYNCLIENT_MODE_MAKEDIRS: { string sarg1 = get_sarg(); @@ -84,6 +86,7 @@ int SyntheticClient::run() make_dirs(sarg1.c_str(), iarg1, iarg2, iarg3); } break; + case SYNCLIENT_MODE_FULLWALK: { string sarg1 = get_sarg(); @@ -91,6 +94,14 @@ int SyntheticClient::run() full_walk(sarg1); } break; + case SYNCLIENT_MODE_REPEATWALK: + { + string sarg1 = get_sarg(); + dout(2) << "repeatwalk " << sarg1 << endl; + while (full_walk(sarg1) == 0) ; + } + break; + case SYNCLIENT_MODE_WRITEFILE: { string sarg1 = get_sarg(); @@ -179,7 +190,7 @@ void SyntheticClient::up() int SyntheticClient::full_walk(string& basedir) { - if (run_until.first && g_clock.gettimepair() > run_until) return 0; + if (run_until.first && g_clock.gettimepair() > run_until) return -1; // read dir map contents; diff --git a/ceph/client/SyntheticClient.h b/ceph/client/SyntheticClient.h index e067c75b43635..7745c59d22658 100644 --- a/ceph/client/SyntheticClient.h +++ b/ceph/client/SyntheticClient.h @@ -12,6 +12,7 @@ #define SYNCLIENT_MODE_WRITEFILE 4 #define SYNCLIENT_MODE_READFILE 5 #define SYNCLIENT_MODE_UNTIL 6 +#define SYNCLIENT_MODE_REPEATWALK 7 class SyntheticClient { Client *client; diff --git a/ceph/common/Logger.cc b/ceph/common/Logger.cc index 086b04b3e45a3..6fe4987c87881 100644 --- a/ceph/common/Logger.cc +++ b/ceph/common/Logger.cc @@ -9,6 +9,10 @@ #include "include/config.h" +#include +#include + + // per-process lock. lame, but this way I protect LogType too! Mutex logger_lock; @@ -17,6 +21,7 @@ Logger::Logger(string fn, LogType *type) filename = "log/"; if (g_conf.log_name) { filename += g_conf.log_name; + ::mkdir( filename.c_str(), 0755 ); // make sure dir exists filename += "/"; } filename += fn; diff --git a/ceph/common/Mutex.h b/ceph/common/Mutex.h index 58733e96cc661..46303d365551a 100755 --- a/ceph/common/Mutex.h +++ b/ceph/common/Mutex.h @@ -33,7 +33,7 @@ class Mutex virtual ~Mutex() { - pthread_mutex_unlock(&M); + //pthread_mutex_unlock(&M); pthread_mutex_destroy(&M); } diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index 1a4e8c446f4b9..51f1ec3730565 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -39,6 +39,7 @@ int tcp_port = 9876; */ hash_map directory; // local +Mutex directory_lock; list incoming; Mutex incoming_lock; Cond incoming_cond; @@ -558,9 +559,11 @@ void* tcp_dispatchthread(void*) in.pop_front(); int dest = m->get_dest(); + directory_lock.Lock(); if (directory.count(dest)) { Messenger *who = directory[ dest ]; - + directory_lock.Unlock(); + dout(4) << "---- '" << m->get_type_name() << "' from " << MSG_ADDR_NICE(m->get_source()) << ':' << m->get_source_port() << " to " << MSG_ADDR_NICE(m->get_dest()) << ':' << m->get_dest_port() << " ---- " @@ -569,6 +572,7 @@ void* tcp_dispatchthread(void*) who->dispatch(m); } else { + directory_lock.Unlock(); dout (1) << "---- i don't know who " << dest << " is." << endl; assert(0); } @@ -666,7 +670,9 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) this->myaddr = myaddr; // register myself in the messenger directory + directory_lock.Lock(); directory[myaddr] = this; + directory_lock.Unlock(); // register to execute timer events g_timer.set_messenger_kicker(new C_TCPKicker()); @@ -696,16 +702,18 @@ TCPMessenger::~TCPMessenger() int TCPMessenger::shutdown() { // remove me from the directory + directory_lock.Lock(); directory.erase(myaddr); - - // no more timer events - g_timer.unset_messenger_kicker(); + bool lastone = directory.empty(); + directory_lock.Unlock(); // last one? - if (directory.empty()) { + if (lastone) { dout(2) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl; pthread_t whoami = pthread_self(); + // no more timer events + g_timer.unset_messenger_kicker(); // close incoming sockets @@ -738,7 +746,7 @@ int TCPMessenger::shutdown() } */ } else { - dout(10) << "shutdown still " << directory.size() << " other messengers on rank " << mpi_rank << endl; + dout(10) << "shutdown still" /*<< directory.size()*/ << " other messengers on rank " << mpi_rank << endl; } } @@ -757,7 +765,7 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo m->set_source(myaddr, fromport); m->set_dest(dest, port); - if (0) { + if (1) { // der tcp_send(m); } else { diff --git a/ceph/tcpsyn.cc b/ceph/tcpsyn.cc index 66a8bda563e2f..7a25b3659393d 100644 --- a/ceph/tcpsyn.cc +++ b/ceph/tcpsyn.cc @@ -76,6 +76,8 @@ int main(int oargc, char **oargv) { syn_iargs.push_back( atoi(argv[++i]) ); } else if (strcmp(argv[i],"fullwalk") == 0) { syn_modes.push_back( SYNCLIENT_MODE_FULLWALK ); + } else if (strcmp(argv[i],"repeatwalk") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_REPEATWALK ); //syn_sargs.push_back( atoi(argv[++i]) ); } else if (strcmp(argv[i],"randomwalk") == 0) { syn_modes.push_back( SYNCLIENT_MODE_RANDOMWALK ); -- 2.39.5