// single global instance
Timer g_timer;
-Messenger *messenger_to_kick = 0;
+Context *messenger_kicker = 0;
+
ostream& operator<<(ostream& out, timepair_t& t)
{
scheduled.erase(t);
}
- if (messenger_to_kick) {
+ if (messenger_kicker) {
dout(DBL) << "kicking messenger" << endl;
- messenger_to_kick->trigger_timer(this);
+ messenger_kicker->finish(0);
+ } else {
+ dout(DBL) << "no messenger ot kick!" << endl;
}
+
}
else {
*/
-void Timer::set_messenger(Messenger *m)
+void Timer::set_messenger_kicker(Context *c)
{
- dout(10) << "messenger to kick is " << m << endl;
- messenger_to_kick = m;
+ dout(10) << "messenger kicker is " << c << endl;
+ messenger_kicker = c;
}
-void Timer::unset_messenger()
+
+void Timer::unset_messenger_kicker()
{
dout(10) << "unset messenger" << endl;
- messenger_to_kick = 0;
+ if (messenger_kicker) {
+ delete messenger_kicker;
+ messenger_kicker = 0;
+ }
cancel_timer();
}
void Timer::cancel_timer()
{
// clear my callback pointers
- messenger_to_kick = 0;
-
if (thread_id) {
dout(10) << "setting thread_stop flag" << endl;
lock.Lock();
class Messenger;
+
class Timer {
private:
map< timepair_t, set<Context*> > scheduled; // time -> (context ...)
cancel_timer();
}
- void set_messenger(Messenger *m);
- void unset_messenger();
+ void set_messenger_kicker(Context *c);
+ void unset_messenger_kicker();
// schedule events
void add_event_after(float seconds,
free(argv);
delete[] nargv;
+ cout << "fakesyn done" << endl;
return 0;
}
#define BUFFER_MODE_NOFREE 0
#define BUFFER_MODE_FREE 2
-#define BUFFER_MODE_DEFAULT (BUFFER_MODE_COPY|BUFFER_MODE_FREE)
+#define BUFFER_MODE_DEFAULT 3//(BUFFER_MODE_COPY|BUFFER_MODE_FREE)
/*
* buffer - the underlying buffer container. with a reference count.
int _ref;
- int _get() { return ++_ref; }
- int _put() { return --_ref; }
+ int _get() {
+ cout << "buffer.get " << *this << " get " << _ref+1 << endl;
+ return ++_ref;
+ }
+ int _put() {
+ cout << "buffer.put " << *this << " put " << _ref-1 << endl;
+ return --_ref;
+ }
friend class bufferptr;
public:
// constructors
buffer() : _dataptr(0), _len(0), _alloc_len(0), _ref(0), _myptr(true) {
- //cout << "buffer() " << *this << endl;
+ cout << "buffer.cons " << *this << endl;
}
- buffer(int a) : _len(0), _alloc_len(a), _ref(0), _myptr(true) {
- //cout << "buffer(empty) " << *this << endl;
+ buffer(int a) : _dataptr(0), _len(0), _alloc_len(a), _ref(0), _myptr(true) {
+ cout << "buffer.cons " << *this << endl;
_dataptr = new char[a];
+ cout << "buffer.malloc " << (void*)_dataptr << endl;
}
~buffer() {
- //cout << "~buffer " << *this << endl;
- if (_dataptr && _myptr)
+ cout << "buffer.des " << *this << endl;
+ if (_dataptr && _myptr) {
+ cout << "buffer.free " << (void*)_dataptr << endl;
delete[] _dataptr;
+ }
}
-
+
buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) :
- _len(l),
- _alloc_len(l),
- _ref(0),
- _myptr(mode & BUFFER_MODE_FREE ? true:false) {
- //cout << "buffer cons mode = " << _myptr << endl;
+ _dataptr(0),
+ _len(l),
+ _alloc_len(l),
+ _ref(0),
+ _myptr(0) {
+ _myptr = mode & BUFFER_MODE_FREE ? true:false;
+ cout << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl;
if (mode & BUFFER_MODE_COPY) {
_dataptr = new char[l];
+ cout << "buffer.malloc " << (void*)_dataptr << endl;
memcpy(_dataptr, p, l);
//cout << "buffer(copy) " << *this << endl;
} else {
};
inline ostream& operator<<(ostream& out, buffer& b) {
- return out << "buffer(len=" << b._len << ", alloc=" << b._alloc_len << ", " << (void*)b._dataptr << ")";
+ return out << "buffer(this=" << &b << " len=" << b._len << ", alloc=" << b._alloc_len << ", data=" << (void*)b._dataptr << " ref=" << b._ref << ")";
}
Message *sendrecv(Message *m, msg_addr_t dest,
int port=0); // blocks for matching reply
+ /*
void trigger_timer(class Timer *t) {
messenger->trigger_timer(t);
- }
+ }*/
};
#endif
Mutex lock;
Cond cond;
+bool pending_timer = false;
+
bool awake = false;
bool shutdown = false;
pthread_t thread_id;
cout << "fakemessenger_wait waiting" << endl;
void *ptr;
pthread_join(thread_id, &ptr);
+
+
+ g_timer.unset_messenger_kicker();
+
+
}
-Timer *pending_timer = 0;
+
// lame main looper
// timer?
if (pending_timer) {
- Timer *t = pending_timer;
- pending_timer = 0;
-
dout(5) << "pending timer" << endl;
- t->execute_pending();
+ g_timer.execute_pending();
}
// messages
// class
+class C_FakeKicker : public Context {
+ void finish(int r) {
+ dout(18) << "timer kick" << endl;
+ pending_timer = true;
+ cond.Signal(); // why not
+ }
+};
+
FakeMessenger::FakeMessenger(long me) : Messenger(me)
{
whoami = me;
directory[ whoami ] = this;
- g_timer.set_messenger(this);
- pending_timer = 0;
+ g_timer.set_messenger_kicker(new C_FakeKicker());
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
int FakeMessenger::shutdown()
{
- g_timer.unset_messenger();
-
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
directory.erase(whoami);
if (directory.empty())
::shutdown = true;
}
+/*
void FakeMessenger::trigger_timer(Timer *t)
{
// note timer to call
// wake up thread?
cond.Signal(); // why not
}
-
+*/
int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); };
// events
- virtual void trigger_timer(Timer *t);
+ //virtual void trigger_timer(Timer *t);
// -- incoming queue --
Mutex sender_lock;
Mutex out_queue_lock;
-Timer *pending_timer = 0;
+bool pending_timer;
// our lock for any common data; it's okay to have only the one global mutex
// timer events?
if (pending_timer) {
- Timer *t = pending_timer;
- pending_timer = 0;
-
dout(DBLVL) << "pending timer" << endl;
- t->execute_pending();
+ g_timer.execute_pending();
}
// done?
if (mpi_done &&
incoming.empty() &&
outgoing.empty() &&
- pending_timer == 0) break;
+ !pending_timer) break;
// incoming
* MPIMessenger class implementation
*/
+class C_MPIKicker : public Context {
+ void finish(int r) {
+ dout(DBLVL) << "timer kick" << endl;
+ mpimessenger_kick_loop();
+ }
+};
+
MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr)
{
// my address
directory[myaddr] = this;
// register to execute timer events
- g_timer.set_messenger(this);
+ g_timer.set_messenger_kicker(new C_MPIKicker());
// logger
/*
directory.erase(myaddr);
// no more timer events
- g_timer.unset_messenger();
+ g_timer.unset_messenger_kicker();
// last one?
if (directory.empty()) {
-/*** events
- */
-
-void MPIMessenger::trigger_timer(Timer *t)
-{
- pending_timer = t;
-
- mpimessenger_kick_loop();
-}
/***
* public messaging interface
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
// events
- virtual void trigger_timer(Timer *t);
+ //virtual void trigger_timer(Timer *t);
};
/**
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) = 0;
// events
- virtual void trigger_timer(Timer *t) = 0;
+ //virtual void trigger_timer(Timer *t) = 0;
};
pthread_t listen_thread_id = 0;
Mutex sender_lock;
-Timer *pending_timer = 0;
+bool pending_timer = false;
// timer events?
if (pending_timer) {
- Timer *t = pending_timer;
- pending_timer = 0;
-
dout(DBL) << "pending timer" << endl;
- t->execute_pending();
+ g_timer.execute_pending();
}
// done?
if (tcp_done &&
incoming.empty() &&
- pending_timer == 0) break;
+ !pending_timer) break;
// incoming
dout(12) << "loop waiting for incoming messages" << endl;
* Tcpmessenger class implementation
*/
+class C_TCPKicker : public Context {
+ void finish(int r) {
+ dout(DBL) << "timer kick" << endl;
+ incoming_cond.Signal();
+ }
+};
+
TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
{
// my address
directory[myaddr] = this;
// register to execute timer events
- g_timer.set_messenger(this);
+ g_timer.set_messenger_kicker(new C_TCPKicker());
// logger
/*
directory.erase(myaddr);
// no more timer events
- g_timer.unset_messenger();
+ g_timer.unset_messenger_kicker();
// last one?
if (directory.empty()) {
-/*** events
- */
-
-void TCPMessenger::trigger_timer(Timer *t)
-{
- pending_timer = t;
-
- tcpmessenger_kick_loop();
-}
/***
* public messaging interface
m->set_dest(dest, port);
if (0) {
+ // der
tcp_send(m);
} else {
+ // good way
outgoing_lock.Lock();
outgoing.push_back(m);
outgoing_lock.Unlock();
// message interface
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }
-
- // events
- virtual void trigger_timer(Timer *t);
};
/**