]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 20:26:49 +0000 (20:26 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 16 Jun 2005 20:26:49 +0000 (20:26 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@327 29311d96-e01e-0410-9327-a35deaab8ce9

12 files changed:
ceph/common/Timer.cc
ceph/common/Timer.h
ceph/fakesyn.cc
ceph/include/buffer.h
ceph/msg/CheesySerializer.h
ceph/msg/FakeMessenger.cc
ceph/msg/FakeMessenger.h
ceph/msg/MPIMessenger.cc
ceph/msg/MPIMessenger.h
ceph/msg/Messenger.h
ceph/msg/TCPMessenger.cc
ceph/msg/TCPMessenger.h

index c250de9cff15d51bdc0d0299dad6d68089d03c87..53ca91850419294b4f46adaf1dc29149ef8c647e 100644 (file)
@@ -20,7 +20,8 @@
 // single global instance
 Timer      g_timer;
 
-Messenger *messenger_to_kick = 0;
+Context *messenger_kicker = 0;
+
 
 ostream& operator<<(ostream& out, timepair_t& t)
 {
@@ -64,10 +65,13 @@ void Timer::timer_thread()
                scheduled.erase(t);
          }
 
-         if (messenger_to_kick) {
+         if (messenger_kicker) {
                dout(DBL) << "kicking messenger" << endl;
-               messenger_to_kick->trigger_timer(this);
+               messenger_kicker->finish(0);
+         } else {
+               dout(DBL) << "no messenger ot kick!" << endl;
          }
+
        }
 
        else {
@@ -92,15 +96,19 @@ void Timer::timer_thread()
  */
 
 
-void Timer::set_messenger(Messenger *m) 
+void Timer::set_messenger_kicker(Context *c)
 {
-  dout(10) << "messenger to kick is " << m << endl;
-  messenger_to_kick = m;
+  dout(10) << "messenger kicker is " << c << endl;
+  messenger_kicker = c;
 }
-void Timer::unset_messenger() 
+
+void Timer::unset_messenger_kicker() 
 {
   dout(10) << "unset messenger" << endl;
-  messenger_to_kick = 0;
+  if (messenger_kicker) {
+       delete messenger_kicker;
+       messenger_kicker = 0;
+  }
   cancel_timer();
 }
 
@@ -118,8 +126,6 @@ void Timer::register_timer()
 void Timer::cancel_timer()
 {
   // clear my callback pointers
-  messenger_to_kick = 0;
-
   if (thread_id) {
        dout(10) << "setting thread_stop flag" << endl;
        lock.Lock();
index f0a6b316efe8dfc236c849610a933be7d222d6a3..2e3bc5a7273c992eb1705dd601a00ef6ebdb287a 100644 (file)
@@ -19,6 +19,7 @@ using namespace std;
 
 class Messenger;
 
+
 class Timer {
  private:
   map< timepair_t, set<Context*> >  scheduled;    // time -> (context ...)
@@ -96,8 +97,8 @@ class Timer {
        cancel_timer();
   }
 
-  void set_messenger(Messenger *m);
-  void unset_messenger();
+  void set_messenger_kicker(Context *c);
+  void unset_messenger_kicker();
 
   // schedule events
   void add_event_after(float seconds,
index 82bad147db80dce3ab3600aac595118df4717977..60afa92c0a28ff8be2bf8624d92dc1e653871f2b 100644 (file)
@@ -185,6 +185,7 @@ int main(int oargc, char **oargv) {
 
   free(argv);
   delete[] nargv;
+  cout << "fakesyn done" << endl;
   return 0;
 }
 
index fdbac01e0acb5712299f5f9fe989735a46118835..575413243659b3832469cc5bbba013341968241e 100644 (file)
@@ -14,7 +14,7 @@ using namespace std;
 #define BUFFER_MODE_NOFREE 0
 #define BUFFER_MODE_FREE   2
 
-#define BUFFER_MODE_DEFAULT (BUFFER_MODE_COPY|BUFFER_MODE_FREE)
+#define BUFFER_MODE_DEFAULT 3//(BUFFER_MODE_COPY|BUFFER_MODE_FREE)
 
 /*
  * buffer  - the underlying buffer container.  with a reference count.
@@ -34,34 +34,46 @@ class buffer {
 
   
   int _ref;
-  int _get() { return ++_ref; }
-  int _put() { return --_ref; }
+  int _get() { 
+       cout << "buffer.get " << *this << " get " << _ref+1 << endl;
+       return ++_ref; 
+  }
+  int _put() { 
+       cout << "buffer.put " << *this << " put " << _ref-1 << endl;
+       return --_ref; 
+  }
   
   friend class bufferptr;
 
  public:
   // constructors
   buffer() : _dataptr(0), _len(0), _alloc_len(0), _ref(0), _myptr(true) { 
-       //cout << "buffer() " << *this << endl;
+       cout << "buffer.cons " << *this << endl;
   }
-  buffer(int a) : _len(0), _alloc_len(a), _ref(0), _myptr(true) {
-       //cout << "buffer(empty) " << *this << endl;
+  buffer(int a) : _dataptr(0), _len(0), _alloc_len(a), _ref(0), _myptr(true) {
+       cout << "buffer.cons " << *this << endl;
        _dataptr = new char[a];
+       cout << "buffer.malloc " << (void*)_dataptr << endl;
   }
   ~buffer() {
-       //cout << "~buffer " << *this << endl;
-       if (_dataptr && _myptr) 
+       cout << "buffer.des " << *this << endl;
+       if (_dataptr && _myptr) {
+         cout << "buffer.free " << (void*)_dataptr << endl;
          delete[] _dataptr;
+       }
   }
-
+  
   buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) : 
-       _len(l), 
-       _alloc_len(l), 
-       _ref(0),
-       _myptr(mode & BUFFER_MODE_FREE ? true:false) {
-       //cout << "buffer cons mode = " << _myptr << endl;
+       _dataptr(0), 
+        _len(l), 
+        _alloc_len(l), 
+        _ref(0),
+        _myptr(0) {
+       _myptr = mode & BUFFER_MODE_FREE ? true:false;
+       cout << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl;
        if (mode & BUFFER_MODE_COPY) {
          _dataptr = new char[l];
+         cout << "buffer.malloc " << (void*)_dataptr << endl;
          memcpy(_dataptr, p, l);
          //cout << "buffer(copy) " << *this << endl;
        } else {
@@ -94,7 +106,7 @@ class buffer {
 };
 
 inline ostream& operator<<(ostream& out, buffer& b) {
-  return out << "buffer(len=" << b._len << ", alloc=" << b._alloc_len << ", " << (void*)b._dataptr << ")";
+  return out << "buffer(this=" << &b << " len=" << b._len << ", alloc=" << b._alloc_len << ", data=" << (void*)b._dataptr << " ref=" << b._ref << ")";
 }
 
 
index 24c58fed25910ca0349e9a0cffdca4761c54d63f..ca528fa66fb2106d7ae3053e5a902358b5ae2aa2 100644 (file)
@@ -47,9 +47,10 @@ class CheesySerializer : public Messenger,
   Message *sendrecv(Message *m, msg_addr_t dest, 
                                        int port=0);                     // blocks for matching reply
 
+  /*
   void trigger_timer(class Timer *t) {
        messenger->trigger_timer(t);
-  }
+       }*/
 };
 
 #endif
index 754dbf28f7bcd20bb71ff89da20e226dd90f9ec0..5f0ff0262ddbf29f7a82fbb4fcd7c66a617a1e16 100644 (file)
@@ -38,6 +38,8 @@ LogType fakemsg_logtype;
 Mutex lock;
 Cond  cond;
 
+bool pending_timer = false;
+
 bool      awake = false;
 bool      shutdown = false;
 pthread_t thread_id;
@@ -83,10 +85,15 @@ void fakemessenger_wait()
   cout << "fakemessenger_wait waiting" << endl;
   void *ptr;
   pthread_join(thread_id, &ptr);
+
+
+  g_timer.unset_messenger_kicker();
+
+
 }
 
 
-Timer *pending_timer = 0;
+
 
 // lame main looper
 
@@ -112,11 +119,8 @@ int fakemessenger_do_loop_2()
 
        // timer?
        if (pending_timer) {
-         Timer *t = pending_timer;
-         pending_timer = 0;
-
          dout(5) << "pending timer" << endl;
-         t->execute_pending();
+         g_timer.execute_pending();
        }
 
        // messages
@@ -168,13 +172,20 @@ int fakemessenger_do_loop_2()
 
 // class
 
+class C_FakeKicker : public Context {
+  void finish(int r) {
+       dout(18) << "timer kick" << endl;
+       pending_timer = true;
+       cond.Signal();  // why not
+  }
+};
+
 FakeMessenger::FakeMessenger(long me)  : Messenger(me)
 {
   whoami = me;
   directory[ whoami ] = this;
 
-  g_timer.set_messenger(this);
-  pending_timer = 0;
+  g_timer.set_messenger_kicker(new C_FakeKicker());
 
   cout << "fakemessenger " << whoami << " messenger is " << this << endl;
 
@@ -205,14 +216,13 @@ FakeMessenger::~FakeMessenger()
 
 int FakeMessenger::shutdown()
 {
-  g_timer.unset_messenger();
-
   //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
   directory.erase(whoami);
   if (directory.empty()) 
        ::shutdown = true;
 }
 
+/*
 void FakeMessenger::trigger_timer(Timer *t) 
 {
   // note timer to call
@@ -221,7 +231,7 @@ void FakeMessenger::trigger_timer(Timer *t)
   // wake up thread?
   cond.Signal();  // why not
 }
-
+*/
 
 int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
 {
index 0604be86ce38c73029d3cc8bbdaf4309fc87f5ce..0df62302f19a4dfc5a98c2a49d1bfacea7b1c6fe 100644 (file)
@@ -31,7 +31,7 @@ class FakeMessenger : public Messenger {
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); };
 
   // events
-  virtual void trigger_timer(Timer *t);
+  //virtual void trigger_timer(Timer *t);
 
 
   // -- incoming queue --
index bb6997859098b5bd60a650c8b88bd18c99a2bafa..71a3efbdd6c246c5dfb71d2e322887d73989358d 100644 (file)
@@ -46,7 +46,7 @@ pthread_t thread_id = 0;   // thread id of the event loop.  init value == nobody
 Mutex sender_lock;
 Mutex out_queue_lock;
 
-Timer *pending_timer = 0;
+bool pending_timer;
 
 
 // our lock for any common data; it's okay to have only the one global mutex
@@ -348,18 +348,15 @@ void* mpimessenger_loop(void*)
 
        // timer events?
        if (pending_timer) {
-         Timer *t = pending_timer;
-         pending_timer = 0;
-         
          dout(DBLVL) << "pending timer" << endl;
-         t->execute_pending();
+         g_timer.execute_pending();
        }
 
        // done?
        if (mpi_done &&
                incoming.empty() &&
                outgoing.empty() &&
-               pending_timer == 0) break;
+               !pending_timer) break;
 
 
        // incoming
@@ -485,6 +482,13 @@ void mpimessenger_wait()
  * MPIMessenger class implementation
  */
 
+class C_MPIKicker : public Context {
+  void finish(int r) {
+       dout(DBLVL) << "timer kick" << endl;
+       mpimessenger_kick_loop();
+  }
+};
+
 MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr)
 {
   // my address
@@ -494,7 +498,7 @@ MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr)
   directory[myaddr] = this;
 
   // register to execute timer events
-  g_timer.set_messenger(this);
+  g_timer.set_messenger_kicker(new C_MPIKicker());
 
   // logger
   /*
@@ -524,7 +528,7 @@ int MPIMessenger::shutdown()
   directory.erase(myaddr);
 
   // no more timer events
-  g_timer.unset_messenger();
+  g_timer.unset_messenger_kicker();
 
   // last one?
   if (directory.empty()) {
@@ -548,15 +552,6 @@ int MPIMessenger::shutdown()
 
 
 
-/*** events
- */
-
-void MPIMessenger::trigger_timer(Timer *t)
-{
-  pending_timer = t;
-
-  mpimessenger_kick_loop();
-}
 
 /***
  * public messaging interface
index c56e25d541c629b7b11c1a33b5cdecbfa0d4d0e8..415a797b88352d46ccc1e485dbaa26bfbe9bc832 100644 (file)
@@ -29,7 +29,7 @@ class MPIMessenger : public Messenger {
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
 
   // events
-  virtual void trigger_timer(Timer *t);
+  //virtual void trigger_timer(Timer *t);
 };
 
 /**
index 0987071a40e36176f5edae1037de7c9fcd0ef3a5..63d679b9e89857820f9db9f286bc1b95837def6e 100644 (file)
@@ -39,7 +39,7 @@ class Messenger {
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) = 0;
 
   // events
-  virtual void trigger_timer(Timer *t) = 0;
+  //virtual void trigger_timer(Timer *t) = 0;
 
 };
 
index d9b74424a5246e41fe0fb867f1aa66a12198a7b2..aac912cad2c34975d92719d5aca081faf788c895 100644 (file)
@@ -65,7 +65,7 @@ pthread_t out_thread_id = 0;   // thread id of the event loop.  init value == no
 pthread_t listen_thread_id = 0;
 Mutex sender_lock;
 
-Timer *pending_timer = 0;
+bool pending_timer = false;
 
 
 
@@ -491,17 +491,14 @@ void* tcpmessenger_loop(void*)
 
        // timer events?
        if (pending_timer) {
-         Timer *t = pending_timer;
-         pending_timer = 0;
-         
          dout(DBL) << "pending timer" << endl;
-         t->execute_pending();
+         g_timer.execute_pending();
        }
 
        // done?
        if (tcp_done &&
                incoming.empty() &&
-               pending_timer == 0) break;
+               !pending_timer) break;
        
        // incoming
        dout(12) << "loop waiting for incoming messages" << endl;
@@ -612,6 +609,13 @@ void tcpmessenger_wait()
  * Tcpmessenger class implementation
  */
 
+class C_TCPKicker : public Context {
+  void finish(int r) {
+       dout(DBL) << "timer kick" << endl;
+       incoming_cond.Signal();
+  }
+};
+
 TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
 {
   // my address
@@ -621,7 +625,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
   directory[myaddr] = this;
 
   // register to execute timer events
-  g_timer.set_messenger(this);
+  g_timer.set_messenger_kicker(new C_TCPKicker());
 
   // logger
   /*
@@ -651,7 +655,7 @@ int TCPMessenger::shutdown()
   directory.erase(myaddr);
 
   // no more timer events
-  g_timer.unset_messenger();
+  g_timer.unset_messenger_kicker();
 
   // last one?
   if (directory.empty()) {
@@ -691,15 +695,6 @@ int TCPMessenger::shutdown()
 
 
 
-/*** events
- */
-
-void TCPMessenger::trigger_timer(Timer *t)
-{
-  pending_timer = t;
-
-  tcpmessenger_kick_loop();
-}
 
 /***
  * public messaging interface
@@ -714,8 +709,10 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo
   m->set_dest(dest, port);
 
   if (0) {
+       // der
        tcp_send(m);
   } else {
+       // good way
        outgoing_lock.Lock();
        outgoing.push_back(m);
        outgoing_lock.Unlock();
index a40deee3927c7460963b8f21726eaf2468596eab..b88ddf927f5f1a3828a0d5b94cde7fa3e8400018 100644 (file)
@@ -28,9 +28,6 @@ class TCPMessenger : public Messenger {
   // message interface
   virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }
-
-  // events
-  virtual void trigger_timer(Timer *t);
 };
 
 /**