From 019a81b38d65365aea2f71fd45e885b0e8a20387 Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 7 May 2005 01:50:44 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@214 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 5 ++- ceph/common/Timer.cc | 90 +++++++++++++++++++++++++++++++------ ceph/common/Timer.h | 53 +++++++++++++++++++--- ceph/config.cc | 4 +- ceph/include/Context.h | 6 ++- ceph/mds/CDir.h | 1 + ceph/mds/MDCache.cc | 2 +- ceph/msg/CheesySerializer.h | 6 +++ ceph/msg/FakeMessenger.cc | 24 +++++++++- ceph/msg/FakeMessenger.h | 5 +++ ceph/msg/MPIMessenger.cc | 10 +++++ ceph/msg/MPIMessenger.h | 4 ++ ceph/msg/Messenger.h | 3 ++ ceph/test/mpitest.cc | 2 +- 14 files changed, 188 insertions(+), 27 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index 6dc62b663b58e..39ce89c49c827 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -30,10 +30,11 @@ COMMON_OBJS= \ msg/error.o\ common/Logger.o\ common/clock.o\ + common/Timer.o\ config.o -TEST_TARGETS = fakemds mpitest fakefuse -TARGETS = import singleclient mpifuse +TEST_TARGETS = fakemds mpitest +TARGETS = import singleclient mpifuse fakefuse SRCS=*.cc */*.cc diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index c691e9dd0c675..b3dcd38306e66 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -3,37 +3,101 @@ #include "Timer.h" #include "include/config.h" +#include "include/Context.h" + +#include "msg/Messenger.h" #undef dout #define dout(x) if (x <= g_conf.debug) cout << "Timer: " +#include +#include + +Messenger *messenger_to_kick = 0; +Timer *static_timer = 0; + +void timer_signal_handler(int sig) +{ + // kick messenger. + if (messenger_to_kick && static_timer) { + messenger_to_kick->trigger_timer(static_timer); + } +} + + +void Timer::register_timer() +{ + dout(10) << "register_timer installing signal handler" << endl; + + /* only one of these per process. + should be okay, i think.. in the cases where we have multiple people in the same + process (FakeMessenger), we're actually kicking the same event loop anyway. + */ + // FIXME i shouldn't have to do this every time? + static_timer = this; + messenger_to_kick = messenger; + + // install handler + struct sigaction ac; + memset(&ac, sizeof(ac), 0); + ac.sa_handler = timer_signal_handler; + ///FIXMEac.sa_mask = 0; // hmm. + + sigaction(SIGALRM, &ac, NULL); + + // set alarm + double now = g_clock.gettime(); + double delay = next_event_time() - now; + double sec; + double usec = modf(delay, &sec); + + dout(10) << "setting itimer to go off in " << sec << "s " << usec << "us" << endl; + struct itimerval it; + it.it_value.tv_sec = (int)sec; + it.it_value.tv_usec = (int)(usec * 1000000); + it.it_interval.tv_sec = 0; // don't repeat! + it.it_interval.tv_usec = 0; + setitimer(ITIMER_REAL, &it, 0); +} + +void Timer::cancel_timer() +{ + // clear my callback pointers + messenger_to_kick = 0; + static_timer = 0; + + // remove my handler. MESSILY FIXME + sigaction(SIGALRM, 0, 0); +} + void Timer::execute_pending() { - double now = clock.gettime(); + double now = g_clock.gettime(); dout(12) << "now = " << now << endl; while (event_map.size()) { Context *event = 0; + double next = next_event_time(); + if (next > now) break; + { // scope this so my iterator is destroyed quickly - // check first event - map::iterator it = event_map.begin(); - if (it->first > now) { - dout(12) << "next event at " << it->first << ", stopping" << endl; - break; // no events! - } + // grab first + set::iterator it = event_map[next].begin(); - // claim and remove from map - event = it->second; - event_map.erase(it); - - dout(5) << "executing event " << event << " scheduled for " << it->first << endl; + event = *it; + assert(event); } + + // claim and remove from map + event_map[next].erase(event); + event_times.erase(event); // exec - assert(event); + dout(5) << "executing event " << event << " scheduled for " << next << endl; + event->finish(0); delete event; } diff --git a/ceph/common/Timer.h b/ceph/common/Timer.h index 3915bfebba9fb..5fd647ca3ef8d 100644 --- a/ceph/common/Timer.h +++ b/ceph/common/Timer.h @@ -1,26 +1,69 @@ #ifndef __TIMER_H #define __TIMER_H +#include "include/Context.h" +#include "Clock.h" + +#include +#include +using namespace std; + + /*** Timer * schedule callbacks */ -class Timer { +class Messenger; - // event map: time -> context - map event_map; +class Timer { + private: + map > event_map; // time -> (context ...) + map event_times; // event -> time + + // get time of the next event + double next_event_time() { + map< double, set >::iterator it = event_map.begin(); + return it->first; + } + + Messenger *messenger; + void register_timer(); // make sure i get a callback + void cancel_timer(); // make sure i get a callback public: - + Timer(Messenger *msg) { messenger = msg; } + ~Timer() { + // cancel any wakeup crap + cancel_timer(); + + // + } + // schedule events void add_event_after(double seconds, Context *callback) { - add_event_at(clock.gettime(), callback); + add_event_at(g_clock.gettime() + seconds, callback); } void add_event_at(double when, Context *callback) { + // insert + event_map[when].insert(callback); + event_times[callback] = when; + + // make sure i wake up (soon enough) + register_timer(); + } + + bool cancel_event(Context *callback) { + if (!event_times.count(callback)) + return false; // wasn't scheduled. + double when = event_times[callback]; + event_times.erase(callback); + event_map[when].erase(callback); + if (event_map[when].empty()) event_map.erase(when); + return true; } // execute pending events diff --git a/ceph/config.cc b/ceph/config.cc index 6df77120d45ac..064a2d3364abd 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -50,7 +50,7 @@ md_config_t g_conf = { // --- fakeclient (mds regression testing) --- num_fakeclient: 1000, - fakeclient_requests: 10, + fakeclient_requests: 100, fakeclient_deterministic: false, fakeclient_op_statfs: false, @@ -64,7 +64,7 @@ md_config_t g_conf = { fakeclient_op_readdir: 10, fakeclient_op_mknod: 100, fakeclient_op_link: false, - fakeclient_op_unlink: 10, + fakeclient_op_unlink: 100, fakeclient_op_rename: 100, fakeclient_op_mkdir: 100, diff --git a/ceph/include/Context.h b/ceph/include/Context.h index e669e7676c526..ca59d1601d51f 100644 --- a/ceph/include/Context.h +++ b/ceph/include/Context.h @@ -2,10 +2,12 @@ #ifndef __CONTEXT_H #define __CONTEXT_H -#include -#include #include "config.h" + +#include +#include #include +using namespace std; class MDS; diff --git a/ceph/mds/CDir.h b/ceph/mds/CDir.h index 1cad2a338abe8..f54c3df3bbc10 100644 --- a/ceph/mds/CDir.h +++ b/ceph/mds/CDir.h @@ -157,6 +157,7 @@ static char* cdir_pin_names[CDIR_NUM_PINS] = { #define CDIR_WAIT_DNREAD (1<<20) #define CDIR_WAIT_DNLOCK (1<<21) #define CDIR_WAIT_DNUNPINNED (1<<22) +#define CDIR_WAIT_DNPINNABLE (CDIR_WAIT_DNREAD|CDIR_WAIT_DNUNPINNED) #define CDIR_WAIT_DNREQXLOCK (1<<23) diff --git a/ceph/mds/MDCache.cc b/ceph/mds/MDCache.cc index 14581914ca4f0..1732b9cb8de0f 100644 --- a/ceph/mds/MDCache.cc +++ b/ceph/mds/MDCache.cc @@ -1098,7 +1098,7 @@ bool MDCache::path_pin(vector& trace, // wait if (c) { dout(10) << "path_pin can't pin " << *dn << ", waiting" << endl; - dn->dir->add_waiter(CDIR_WAIT_DNREAD, + dn->dir->add_waiter(CDIR_WAIT_DNPINNABLE, dn->name, c); } else { diff --git a/ceph/msg/CheesySerializer.h b/ceph/msg/CheesySerializer.h index 883710f06ca40..78ed9377cb153 100644 --- a/ceph/msg/CheesySerializer.h +++ b/ceph/msg/CheesySerializer.h @@ -9,6 +9,8 @@ #include "Cond.h" #include "Mutex.h" +#include + #include using namespace std; @@ -38,6 +40,10 @@ class CheesySerializer : public Messenger, int port=0, int fromport=0); // doesn't block Message *sendrecv(Message *m, msg_addr_t dest, int port=0); // blocks for matching reply + + void trigger_timer(class Timer *t) { + assert(0); + } }; #endif diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 6c418caa42d62..84a9982b897bb 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -5,6 +5,7 @@ #include "FakeMessenger.h" #include "mds/MDS.h" +#include "common/Timer.h" #include "common/LogType.h" #include "common/Logger.h" @@ -78,7 +79,7 @@ void fakemessenger_stopthread() { } - +Timer *pending_timer = 0; // lame main looper @@ -100,6 +101,16 @@ int fakemessenger_do_loop_2() dout(11) << "do_loop top" << endl; + // timer? + if (pending_timer) { + Timer *t = pending_timer; + pending_timer = 0; + + dout(5) << "pending timer" << endl; + t->execute_pending(); + } + + // messages map::iterator it = directory.begin(); while (it != directory.end()) { Message *m = it->second->get_message(); @@ -155,6 +166,8 @@ FakeMessenger::FakeMessenger(long me) whoami = me; directory[ whoami ] = this; + pending_timer = 0; + cout << "fakemessenger " << whoami << " messenger is " << this << endl; string name; @@ -183,6 +196,15 @@ int FakeMessenger::shutdown() directory.erase(whoami); } +void FakeMessenger::trigger_timer(Timer *t) +{ + // note timer to call + pending_timer = t; + + // wake up thread? + cond.Signal(); // why not +} + int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport) { diff --git a/ceph/msg/FakeMessenger.h b/ceph/msg/FakeMessenger.h index 4241eb6be1b44..0b0e7f92b6a51 100644 --- a/ceph/msg/FakeMessenger.h +++ b/ceph/msg/FakeMessenger.h @@ -8,6 +8,8 @@ #include #include +class Timer; + class FakeMessenger : public Messenger { protected: int whoami; @@ -25,6 +27,9 @@ class FakeMessenger : public Messenger { // use CheesySerializer for now! virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }; + + // events + virtual void trigger_timer(Timer *t); }; int fakemessenger_do_loop(); diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index f0fe2f560287d..a4ea542cb4ad9 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -2,6 +2,8 @@ #include "include/config.h" #include "include/error.h" +#include "common/Timer.h" + #include "MPIMessenger.h" #include "Message.h" @@ -317,6 +319,14 @@ int MPIMessenger::shutdown() +/*** events + */ + +void MPIMessenger::trigger_timer(Timer *t) +{ + assert(0); //implement me +} + /*** * public messaging interface */ diff --git a/ceph/msg/MPIMessenger.h b/ceph/msg/MPIMessenger.h index b20bc4de139f5..c56e25d541c62 100644 --- a/ceph/msg/MPIMessenger.h +++ b/ceph/msg/MPIMessenger.h @@ -10,6 +10,7 @@ (dest) : \ ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD)))) +class Timer; class MPIMessenger : public Messenger { protected: @@ -26,6 +27,9 @@ class MPIMessenger : public Messenger { // message interface virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0); + + // events + virtual void trigger_timer(Timer *t); }; /** diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index 6ebefdfda4ce5..2d7ce1b2cc387 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -9,6 +9,7 @@ using namespace std; #include "Dispatcher.h" class MDS; +class Timer; class Messenger { protected: @@ -37,6 +38,8 @@ class Messenger { //virtual Message *recv() = 0 + // events + virtual void trigger_timer(Timer *t) = 0; // -- incoming queue -- diff --git a/ceph/test/mpitest.cc b/ceph/test/mpitest.cc index d88396ee8586a..8cb808ff6da65 100644 --- a/ceph/test/mpitest.cc +++ b/ceph/test/mpitest.cc @@ -58,7 +58,7 @@ int main(int argc, char **argv) { FakeClient *client[NUMCLIENT]; for (int i=0; iinit(); } -- 2.39.5