From: sage Date: Thu, 16 Jun 2005 20:26:49 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: v0.1~2054 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a7fdc272c050e2a4d5d0732be84e5482d267b6cc;p=ceph.git *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@327 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index c250de9cff1..53ca9185041 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -20,7 +20,8 @@ // single global instance Timer g_timer; -Messenger *messenger_to_kick = 0; +Context *messenger_kicker = 0; + ostream& operator<<(ostream& out, timepair_t& t) { @@ -64,10 +65,13 @@ void Timer::timer_thread() scheduled.erase(t); } - if (messenger_to_kick) { + if (messenger_kicker) { dout(DBL) << "kicking messenger" << endl; - messenger_to_kick->trigger_timer(this); + messenger_kicker->finish(0); + } else { + dout(DBL) << "no messenger ot kick!" << endl; } + } else { @@ -92,15 +96,19 @@ void Timer::timer_thread() */ -void Timer::set_messenger(Messenger *m) +void Timer::set_messenger_kicker(Context *c) { - dout(10) << "messenger to kick is " << m << endl; - messenger_to_kick = m; + dout(10) << "messenger kicker is " << c << endl; + messenger_kicker = c; } -void Timer::unset_messenger() + +void Timer::unset_messenger_kicker() { dout(10) << "unset messenger" << endl; - messenger_to_kick = 0; + if (messenger_kicker) { + delete messenger_kicker; + messenger_kicker = 0; + } cancel_timer(); } @@ -118,8 +126,6 @@ void Timer::register_timer() void Timer::cancel_timer() { // clear my callback pointers - messenger_to_kick = 0; - if (thread_id) { dout(10) << "setting thread_stop flag" << endl; lock.Lock(); diff --git a/ceph/common/Timer.h b/ceph/common/Timer.h index f0a6b316efe..2e3bc5a7273 100644 --- a/ceph/common/Timer.h +++ b/ceph/common/Timer.h @@ -19,6 +19,7 @@ using namespace std; class Messenger; + class Timer { private: map< timepair_t, set > scheduled; // time -> (context ...) @@ -96,8 +97,8 @@ class Timer { cancel_timer(); } - void set_messenger(Messenger *m); - void unset_messenger(); + void set_messenger_kicker(Context *c); + void unset_messenger_kicker(); // schedule events void add_event_after(float seconds, diff --git a/ceph/fakesyn.cc b/ceph/fakesyn.cc index 82bad147db8..60afa92c0a2 100644 --- a/ceph/fakesyn.cc +++ b/ceph/fakesyn.cc @@ -185,6 +185,7 @@ int main(int oargc, char **oargv) { free(argv); delete[] nargv; + cout << "fakesyn done" << endl; return 0; } diff --git a/ceph/include/buffer.h b/ceph/include/buffer.h index fdbac01e0ac..57541324365 100644 --- a/ceph/include/buffer.h +++ b/ceph/include/buffer.h @@ -14,7 +14,7 @@ using namespace std; #define BUFFER_MODE_NOFREE 0 #define BUFFER_MODE_FREE 2 -#define BUFFER_MODE_DEFAULT (BUFFER_MODE_COPY|BUFFER_MODE_FREE) +#define BUFFER_MODE_DEFAULT 3//(BUFFER_MODE_COPY|BUFFER_MODE_FREE) /* * buffer - the underlying buffer container. with a reference count. @@ -34,34 +34,46 @@ class buffer { int _ref; - int _get() { return ++_ref; } - int _put() { return --_ref; } + int _get() { + cout << "buffer.get " << *this << " get " << _ref+1 << endl; + return ++_ref; + } + int _put() { + cout << "buffer.put " << *this << " put " << _ref-1 << endl; + return --_ref; + } friend class bufferptr; public: // constructors buffer() : _dataptr(0), _len(0), _alloc_len(0), _ref(0), _myptr(true) { - //cout << "buffer() " << *this << endl; + cout << "buffer.cons " << *this << endl; } - buffer(int a) : _len(0), _alloc_len(a), _ref(0), _myptr(true) { - //cout << "buffer(empty) " << *this << endl; + buffer(int a) : _dataptr(0), _len(0), _alloc_len(a), _ref(0), _myptr(true) { + cout << "buffer.cons " << *this << endl; _dataptr = new char[a]; + cout << "buffer.malloc " << (void*)_dataptr << endl; } ~buffer() { - //cout << "~buffer " << *this << endl; - if (_dataptr && _myptr) + cout << "buffer.des " << *this << endl; + if (_dataptr && _myptr) { + cout << "buffer.free " << (void*)_dataptr << endl; delete[] _dataptr; + } } - + buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) : - _len(l), - _alloc_len(l), - _ref(0), - _myptr(mode & BUFFER_MODE_FREE ? true:false) { - //cout << "buffer cons mode = " << _myptr << endl; + _dataptr(0), + _len(l), + _alloc_len(l), + _ref(0), + _myptr(0) { + _myptr = mode & BUFFER_MODE_FREE ? true:false; + cout << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl; if (mode & BUFFER_MODE_COPY) { _dataptr = new char[l]; + cout << "buffer.malloc " << (void*)_dataptr << endl; memcpy(_dataptr, p, l); //cout << "buffer(copy) " << *this << endl; } else { @@ -94,7 +106,7 @@ class buffer { }; inline ostream& operator<<(ostream& out, buffer& b) { - return out << "buffer(len=" << b._len << ", alloc=" << b._alloc_len << ", " << (void*)b._dataptr << ")"; + return out << "buffer(this=" << &b << " len=" << b._len << ", alloc=" << b._alloc_len << ", data=" << (void*)b._dataptr << " ref=" << b._ref << ")"; } diff --git a/ceph/msg/CheesySerializer.h b/ceph/msg/CheesySerializer.h index 24c58fed259..ca528fa66fb 100644 --- a/ceph/msg/CheesySerializer.h +++ b/ceph/msg/CheesySerializer.h @@ -47,9 +47,10 @@ class CheesySerializer : public Messenger, Message *sendrecv(Message *m, msg_addr_t dest, int port=0); // blocks for matching reply + /* void trigger_timer(class Timer *t) { messenger->trigger_timer(t); - } + }*/ }; #endif diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 754dbf28f7b..5f0ff0262dd 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -38,6 +38,8 @@ LogType fakemsg_logtype; Mutex lock; Cond cond; +bool pending_timer = false; + bool awake = false; bool shutdown = false; pthread_t thread_id; @@ -83,10 +85,15 @@ void fakemessenger_wait() cout << "fakemessenger_wait waiting" << endl; void *ptr; pthread_join(thread_id, &ptr); + + + g_timer.unset_messenger_kicker(); + + } -Timer *pending_timer = 0; + // lame main looper @@ -112,11 +119,8 @@ int fakemessenger_do_loop_2() // timer? if (pending_timer) { - Timer *t = pending_timer; - pending_timer = 0; - dout(5) << "pending timer" << endl; - t->execute_pending(); + g_timer.execute_pending(); } // messages @@ -168,13 +172,20 @@ int fakemessenger_do_loop_2() // class +class C_FakeKicker : public Context { + void finish(int r) { + dout(18) << "timer kick" << endl; + pending_timer = true; + cond.Signal(); // why not + } +}; + FakeMessenger::FakeMessenger(long me) : Messenger(me) { whoami = me; directory[ whoami ] = this; - g_timer.set_messenger(this); - pending_timer = 0; + g_timer.set_messenger_kicker(new C_FakeKicker()); cout << "fakemessenger " << whoami << " messenger is " << this << endl; @@ -205,14 +216,13 @@ FakeMessenger::~FakeMessenger() int FakeMessenger::shutdown() { - g_timer.unset_messenger(); - //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl; directory.erase(whoami); if (directory.empty()) ::shutdown = true; } +/* void FakeMessenger::trigger_timer(Timer *t) { // note timer to call @@ -221,7 +231,7 @@ void FakeMessenger::trigger_timer(Timer *t) // wake up thread? cond.Signal(); // why not } - +*/ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport) { diff --git a/ceph/msg/FakeMessenger.h b/ceph/msg/FakeMessenger.h index 0604be86ce3..0df62302f19 100644 --- a/ceph/msg/FakeMessenger.h +++ b/ceph/msg/FakeMessenger.h @@ -31,7 +31,7 @@ class FakeMessenger : public Messenger { virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }; // events - virtual void trigger_timer(Timer *t); + //virtual void trigger_timer(Timer *t); // -- incoming queue -- diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index bb699785909..71a3efbdd6c 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -46,7 +46,7 @@ pthread_t thread_id = 0; // thread id of the event loop. init value == nobody Mutex sender_lock; Mutex out_queue_lock; -Timer *pending_timer = 0; +bool pending_timer; // our lock for any common data; it's okay to have only the one global mutex @@ -348,18 +348,15 @@ void* mpimessenger_loop(void*) // timer events? if (pending_timer) { - Timer *t = pending_timer; - pending_timer = 0; - dout(DBLVL) << "pending timer" << endl; - t->execute_pending(); + g_timer.execute_pending(); } // done? if (mpi_done && incoming.empty() && outgoing.empty() && - pending_timer == 0) break; + !pending_timer) break; // incoming @@ -485,6 +482,13 @@ void mpimessenger_wait() * MPIMessenger class implementation */ +class C_MPIKicker : public Context { + void finish(int r) { + dout(DBLVL) << "timer kick" << endl; + mpimessenger_kick_loop(); + } +}; + MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr) { // my address @@ -494,7 +498,7 @@ MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr) directory[myaddr] = this; // register to execute timer events - g_timer.set_messenger(this); + g_timer.set_messenger_kicker(new C_MPIKicker()); // logger /* @@ -524,7 +528,7 @@ int MPIMessenger::shutdown() directory.erase(myaddr); // no more timer events - g_timer.unset_messenger(); + g_timer.unset_messenger_kicker(); // last one? if (directory.empty()) { @@ -548,15 +552,6 @@ int MPIMessenger::shutdown() -/*** events - */ - -void MPIMessenger::trigger_timer(Timer *t) -{ - pending_timer = t; - - mpimessenger_kick_loop(); -} /*** * public messaging interface diff --git a/ceph/msg/MPIMessenger.h b/ceph/msg/MPIMessenger.h index c56e25d541c..415a797b883 100644 --- a/ceph/msg/MPIMessenger.h +++ b/ceph/msg/MPIMessenger.h @@ -29,7 +29,7 @@ class MPIMessenger : public Messenger { virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0); // events - virtual void trigger_timer(Timer *t); + //virtual void trigger_timer(Timer *t); }; /** diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index 0987071a40e..63d679b9e89 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -39,7 +39,7 @@ class Messenger { virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) = 0; // events - virtual void trigger_timer(Timer *t) = 0; + //virtual void trigger_timer(Timer *t) = 0; }; diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index d9b74424a52..aac912cad2c 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -65,7 +65,7 @@ pthread_t out_thread_id = 0; // thread id of the event loop. init value == no pthread_t listen_thread_id = 0; Mutex sender_lock; -Timer *pending_timer = 0; +bool pending_timer = false; @@ -491,17 +491,14 @@ void* tcpmessenger_loop(void*) // timer events? if (pending_timer) { - Timer *t = pending_timer; - pending_timer = 0; - dout(DBL) << "pending timer" << endl; - t->execute_pending(); + g_timer.execute_pending(); } // done? if (tcp_done && incoming.empty() && - pending_timer == 0) break; + !pending_timer) break; // incoming dout(12) << "loop waiting for incoming messages" << endl; @@ -612,6 +609,13 @@ void tcpmessenger_wait() * Tcpmessenger class implementation */ +class C_TCPKicker : public Context { + void finish(int r) { + dout(DBL) << "timer kick" << endl; + incoming_cond.Signal(); + } +}; + TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) { // my address @@ -621,7 +625,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) directory[myaddr] = this; // register to execute timer events - g_timer.set_messenger(this); + g_timer.set_messenger_kicker(new C_TCPKicker()); // logger /* @@ -651,7 +655,7 @@ int TCPMessenger::shutdown() directory.erase(myaddr); // no more timer events - g_timer.unset_messenger(); + g_timer.unset_messenger_kicker(); // last one? if (directory.empty()) { @@ -691,15 +695,6 @@ int TCPMessenger::shutdown() -/*** events - */ - -void TCPMessenger::trigger_timer(Timer *t) -{ - pending_timer = t; - - tcpmessenger_kick_loop(); -} /*** * public messaging interface @@ -714,8 +709,10 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo m->set_dest(dest, port); if (0) { + // der tcp_send(m); } else { + // good way outgoing_lock.Lock(); outgoing.push_back(m); outgoing_lock.Unlock(); diff --git a/ceph/msg/TCPMessenger.h b/ceph/msg/TCPMessenger.h index a40deee3927..b88ddf927f5 100644 --- a/ceph/msg/TCPMessenger.h +++ b/ceph/msg/TCPMessenger.h @@ -28,9 +28,6 @@ class TCPMessenger : public Messenger { // message interface virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); } - - // events - virtual void trigger_timer(Timer *t); }; /**