msg/error.o\
common/Logger.o\
common/clock.o\
+ common/Timer.o\
config.o
-TEST_TARGETS = fakemds mpitest fakefuse
-TARGETS = import singleclient mpifuse
+TEST_TARGETS = fakemds mpitest
+TARGETS = import singleclient mpifuse fakefuse
SRCS=*.cc */*.cc
#include "Timer.h"
#include "include/config.h"
+#include "include/Context.h"
+
+#include "msg/Messenger.h"
#undef dout
#define dout(x) if (x <= g_conf.debug) cout << "Timer: "
+#include <signal.h>
+#include <sys/time.h>
+
+Messenger *messenger_to_kick = 0;
+Timer *static_timer = 0;
+
+void timer_signal_handler(int sig)
+{
+ // kick messenger.
+ if (messenger_to_kick && static_timer) {
+ messenger_to_kick->trigger_timer(static_timer);
+ }
+}
+
+
+void Timer::register_timer()
+{
+ dout(10) << "register_timer installing signal handler" << endl;
+
+ /* only one of these per process.
+ should be okay, i think.. in the cases where we have multiple people in the same
+ process (FakeMessenger), we're actually kicking the same event loop anyway.
+ */
+ // FIXME i shouldn't have to do this every time?
+ static_timer = this;
+ messenger_to_kick = messenger;
+
+ // install handler
+ struct sigaction ac;
+ memset(&ac, sizeof(ac), 0);
+ ac.sa_handler = timer_signal_handler;
+ ///FIXMEac.sa_mask = 0; // hmm.
+
+ sigaction(SIGALRM, &ac, NULL);
+
+ // set alarm
+ double now = g_clock.gettime();
+ double delay = next_event_time() - now;
+ double sec;
+ double usec = modf(delay, &sec);
+
+ dout(10) << "setting itimer to go off in " << sec << "s " << usec << "us" << endl;
+ struct itimerval it;
+ it.it_value.tv_sec = (int)sec;
+ it.it_value.tv_usec = (int)(usec * 1000000);
+ it.it_interval.tv_sec = 0; // don't repeat!
+ it.it_interval.tv_usec = 0;
+ setitimer(ITIMER_REAL, &it, 0);
+}
+
+void Timer::cancel_timer()
+{
+ // clear my callback pointers
+ messenger_to_kick = 0;
+ static_timer = 0;
+
+ // remove my handler. MESSILY FIXME
+ sigaction(SIGALRM, 0, 0);
+}
+
void Timer::execute_pending()
{
- double now = clock.gettime();
+ double now = g_clock.gettime();
dout(12) << "now = " << now << endl;
while (event_map.size()) {
Context *event = 0;
+ double next = next_event_time();
+ if (next > now) break;
+
{ // scope this so my iterator is destroyed quickly
- // check first event
- map<double,Context*>::iterator it = event_map.begin();
- if (it->first > now) {
- dout(12) << "next event at " << it->first << ", stopping" << endl;
- break; // no events!
- }
+ // grab first
+ set<Context*>::iterator it = event_map[next].begin();
- // claim and remove from map
- event = it->second;
- event_map.erase(it);
-
- dout(5) << "executing event " << event << " scheduled for " << it->first << endl;
+ event = *it;
+ assert(event);
}
+
+ // claim and remove from map
+ event_map[next].erase(event);
+ event_times.erase(event);
// exec
- assert(event);
+ dout(5) << "executing event " << event << " scheduled for " << next << endl;
+
event->finish(0);
delete event;
}
#ifndef __TIMER_H
#define __TIMER_H
+#include "include/Context.h"
+#include "Clock.h"
+
+#include <map>
+#include <set>
+using namespace std;
+
+
/*** Timer
* schedule callbacks
*/
-class Timer {
+class Messenger;
- // event map: time -> context
- map<double, Context*> event_map;
+class Timer {
+ private:
+ map<double, set<Context*> > event_map; // time -> (context ...)
+ map<Context*, double> event_times; // event -> time
+
+ // get time of the next event
+ double next_event_time() {
+ map< double, set<Context*> >::iterator it = event_map.begin();
+ return it->first;
+ }
+
+ Messenger *messenger;
+ void register_timer(); // make sure i get a callback
+ void cancel_timer(); // make sure i get a callback
public:
-
+ Timer(Messenger *msg) { messenger = msg; }
+ ~Timer() {
+ // cancel any wakeup crap
+ cancel_timer();
+
+ //
+ }
+
// schedule events
void add_event_after(double seconds,
Context *callback) {
- add_event_at(clock.gettime(), callback);
+ add_event_at(g_clock.gettime() + seconds, callback);
}
void add_event_at(double when,
Context *callback) {
+ // insert
+ event_map[when].insert(callback);
+ event_times[callback] = when;
+
+ // make sure i wake up (soon enough)
+ register_timer();
+ }
+
+ bool cancel_event(Context *callback) {
+ if (!event_times.count(callback))
+ return false; // wasn't scheduled.
+ double when = event_times[callback];
+ event_times.erase(callback);
+ event_map[when].erase(callback);
+ if (event_map[when].empty()) event_map.erase(when);
+ return true;
}
// execute pending events
// --- fakeclient (mds regression testing) ---
num_fakeclient: 1000,
- fakeclient_requests: 10,
+ fakeclient_requests: 100,
fakeclient_deterministic: false,
fakeclient_op_statfs: false,
fakeclient_op_readdir: 10,
fakeclient_op_mknod: 100,
fakeclient_op_link: false,
- fakeclient_op_unlink: 10,
+ fakeclient_op_unlink: 100,
fakeclient_op_rename: 100,
fakeclient_op_mkdir: 100,
#ifndef __CONTEXT_H
#define __CONTEXT_H
-#include <list>
-#include <assert.h>
#include "config.h"
+
+#include <assert.h>
+#include <list>
#include <iostream>
+using namespace std;
class MDS;
#define CDIR_WAIT_DNREAD (1<<20)
#define CDIR_WAIT_DNLOCK (1<<21)
#define CDIR_WAIT_DNUNPINNED (1<<22)
+#define CDIR_WAIT_DNPINNABLE (CDIR_WAIT_DNREAD|CDIR_WAIT_DNUNPINNED)
#define CDIR_WAIT_DNREQXLOCK (1<<23)
// wait
if (c) {
dout(10) << "path_pin can't pin " << *dn << ", waiting" << endl;
- dn->dir->add_waiter(CDIR_WAIT_DNREAD,
+ dn->dir->add_waiter(CDIR_WAIT_DNPINNABLE,
dn->name,
c);
} else {
#include "Cond.h"
#include "Mutex.h"
+#include <assert.h>
+
#include <map>
using namespace std;
int port=0, int fromport=0); // doesn't block
Message *sendrecv(Message *m, msg_addr_t dest,
int port=0); // blocks for matching reply
+
+ void trigger_timer(class Timer *t) {
+ assert(0);
+ }
};
#endif
#include "FakeMessenger.h"
#include "mds/MDS.h"
+#include "common/Timer.h"
#include "common/LogType.h"
#include "common/Logger.h"
}
-
+Timer *pending_timer = 0;
// lame main looper
dout(11) << "do_loop top" << endl;
+ // timer?
+ if (pending_timer) {
+ Timer *t = pending_timer;
+ pending_timer = 0;
+
+ dout(5) << "pending timer" << endl;
+ t->execute_pending();
+ }
+
+ // messages
map<int, FakeMessenger*>::iterator it = directory.begin();
while (it != directory.end()) {
Message *m = it->second->get_message();
whoami = me;
directory[ whoami ] = this;
+ pending_timer = 0;
+
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
string name;
directory.erase(whoami);
}
+void FakeMessenger::trigger_timer(Timer *t)
+{
+ // note timer to call
+ pending_timer = t;
+
+ // wake up thread?
+ cond.Signal(); // why not
+}
+
int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{
#include <list>
#include <map>
+class Timer;
+
class FakeMessenger : public Messenger {
protected:
int whoami;
// use CheesySerializer for now!
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); };
+
+ // events
+ virtual void trigger_timer(Timer *t);
};
int fakemessenger_do_loop();
#include "include/config.h"
#include "include/error.h"
+#include "common/Timer.h"
+
#include "MPIMessenger.h"
#include "Message.h"
+/*** events
+ */
+
+void MPIMessenger::trigger_timer(Timer *t)
+{
+ assert(0); //implement me
+}
+
/***
* public messaging interface
*/
(dest) : \
((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD))))
+class Timer;
class MPIMessenger : public Messenger {
protected:
// message interface
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
+
+ // events
+ virtual void trigger_timer(Timer *t);
};
/**
#include "Dispatcher.h"
class MDS;
+class Timer;
class Messenger {
protected:
//virtual Message *recv() = 0
+ // events
+ virtual void trigger_timer(Timer *t) = 0;
// -- incoming queue --
FakeClient *client[NUMCLIENT];
for (int i=0; i<NUMCLIENT; i++) {
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
- client[i] = new FakeClient(mdc, i, new MPIMessenger(MSG_ADDR_CLIENT(i)), g_conf.client_requests);
+ client[i] = new FakeClient(mdc, i, new MPIMessenger(MSG_ADDR_CLIENT(i)), g_conf.fakeclient_requests);
client[i]->init();
}