]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 29 Apr 2006 16:00:18 +0000 (16:00 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 29 Apr 2006 16:00:18 +0000 (16:00 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@751 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/Makefile
ceph/common/Timer.cc
ceph/msg/TCPMessenger.cc
ceph/osdc/Filer.h
ceph/osdc/ObjectCacher.cc [new file with mode: 0644]
ceph/osdc/ObjectCacher.h

index b79a9af1250774a0ccb73f472cbd33d0b9cf8c83..17f036519e0ca58742935e2146d2b1ed07503adf 100644 (file)
@@ -53,6 +53,7 @@ COMMON_OBJS= \
        msg/Dispatcher.o\
        msg/HostMonitor.o\
        osd/Filer.o\
+       osd/ObjectCacher.o\
        osd/Objecter.o\
        osd/OSDMap.o\
        mds/MDCluster.o\
index 99cda93fcb196cbbf4c3b0a84af0529c791129e9..317ae5adbd04496f936c38f0f91ee3325c126c96 100644 (file)
@@ -20,8 +20,6 @@
 #include "config.h"
 #include "include/Context.h"
 
-//#include "msg/Messenger.h"
-
 #undef dout
 #define dout(x)  if (x <= g_conf.debug) cout << "Timer: "
 
 // single global instance
 Timer      g_timer;
 
-//Context *messenger_kicker = 0;
-//Messenger *messenger = 0;
-
-
 
 /**** thread solution *****/
 
@@ -65,18 +59,14 @@ void Timer::timer_entry()
                utime_t t = it->first;
                dout(DBL) << "queueing event(s) scheduled at " << t << endl;
 
-               /*if (messenger) {
-                 for (multiset<Context*>::iterator cit = it->second.begin();
-                          cit != it->second.end();
-                          cit++) {
-                       pending.push_back(*cit);
-                       event_times.erase(*cit);
-                       num_event--;
-                 }
+               for (multiset<Context*>::iterator cit = it->second.begin();
+                        cit != it->second.end();
+                        cit++) {
+                 pending.push_back(*cit);
+                 event_times.erase(*cit);
+                 num_event--;
                }
-               */
 
-               //pending[t] = it->second;
                map< utime_t, multiset<Context*> >::iterator previt = it;
                it++;
                scheduled.erase(previt);
@@ -86,17 +76,14 @@ void Timer::timer_entry()
                sleeping = false;
                lock.Unlock();
                { // make sure we're not holding any locks while we do callbacks (or talk to the messenger)
-                 if (1) {
-                       // make the callbacks myself.
-                       for (list<Context*>::iterator cit = pending.begin();
-                                cit != pending.end();
-                                cit++) 
-                         (*cit)->finish(0);
-                       pending.clear();
-                 } else {
-                       // give them to the messenger
-                       //messenger->queue_callbacks(pending);
+                 // make the callbacks myself.
+                 for (list<Context*>::iterator cit = pending.begin();
+                          cit != pending.end();
+                          cit++) {
+                       dout(DBL) << "doing callback " << *cit << endl;
+                       (*cit)->finish(0);
                  }
+                 pending.clear();
                  assert(pending.empty());
                }
                lock.Lock();
@@ -133,19 +120,6 @@ void Timer::timer_entry()
  * Timer bits
  */
 
-/*
-void Timer::set_messenger(Messenger *m)
-{
-  dout(10) << "set messenger " << m << endl;
-  messenger = m;
-}
-void Timer::unset_messenger()
-{
-  dout(10) << "unset messenger" << endl;
-  messenger = 0;
-}
-*/
-
 void Timer::register_timer()
 {
   if (timer_thread.is_started()) {
index f91f7072427b0320a2375468d08ff0b904bee3f4..67df8b86227298d51d9d5f633a42036d35d9660a 100644 (file)
@@ -1268,7 +1268,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) :
 
   // register to execute timer events
   //g_timer.set_messenger_kicker(new C_TCPKicker());
-  g_timer.set_messenger(this);
+  //  g_timer.set_messenger(this);
 }
 
 
@@ -1365,7 +1365,7 @@ int TCPMessenger::shutdown()
        //pthread_t whoami = pthread_self();
        
        // no more timer events
-       g_timer.unset_messenger();
+       //g_timer.unset_messenger();
        
        // close incoming sockets
        //void *r;
index d94fdbac17dbfdfd4e4e8c51a3588fdd0bf326b8..7bc7698259afed4dbd8b42130ad2626be0b6f77a 100644 (file)
@@ -40,6 +40,7 @@ using namespace __gnu_cxx;
 
 #include "OSDMap.h"
 #include "Objecter.h"
+#include "ObjectCacher.h"
 
 class Context;
 class Messenger;
@@ -71,11 +72,11 @@ class Filer {
        file_to_extents(inode, len, offset, rd->extents);
 
        // cacheless async?
-       if (oc == 0) 
+       if (1 || oc == 0) 
          return objecter->readx(rd, onfinish);
 
-       // write me
-
+       // use cache
+       oc->readx(rd, inode.ino, onfinish);
        return 0;
   }
 
@@ -90,11 +91,11 @@ class Filer {
        file_to_extents(inode, len, offset, wr->extents);
 
        // cacheles async?
-       if (oc == 0) 
+       if (1 || oc == 0) 
          return objecter->writex(wr, onack, oncommit);
 
-       // write me
-
+       // use cache
+       oc->writex(wr, inode.ino, onack, oncommit);
        return 0;
   }
 
@@ -121,23 +122,28 @@ class Filer {
   
   int atomic_sync_read(inode_t& inode,
                                           size_t len, off_t offset,
-                                          bufferlist& bl) {
-       assert(oc);
-
-       // write me
+                                          bufferlist *bl) {
+       Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
+       file_to_extents(inode, len, offset, rd->extents);
 
-       return 0;
+       assert(oc);
+       int r = oc->atomic_sync_readx(rd, inode.ino, 
+                                                                 0);  // block.
+       return r;
   }
 
   int atomic_sync_write(inode_t& inode,
                                                size_t len, off_t offset,
                                                bufferlist& bl,
                                                Context *oncommit) {
-       assert(oc);
-
-       // write me
+       Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
+       file_to_extents(inode, len, offset, wr->extents);
 
-       return 0;
+       assert(oc);
+       int r = oc->atomic_sync_writex(wr, inode.ino, 
+                                                                  0,  // block
+                                                                  oncommit);
+       return r;
   }
 
 
diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc
new file mode 100644 (file)
index 0000000..74546a9
--- /dev/null
@@ -0,0 +1,31 @@
+
+#include "ObjectCacher.h"
+
+
+
+int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+{
+  assert(0);
+  return 0;
+}
+
+int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+{
+  assert(0);
+  return 0;
+}
+  
+// blocking.  atomic+sync.
+int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+{
+  assert(0);
+  return 0;
+}
+
+int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+{
+  assert(0);
+  return 0;
+}
index 6d6918f6f8de58cd04fe77f8d66826b7d32f672e..99339d85cb66da405add5adc1c6ae20947ebc858 100644 (file)
+#ifndef __OBJECTCACHER_H_
+#define __OBJECTCACHER_H_
 
+#include "include/types.h"
+#include "include/lru.h"
 
+#include "common/Cond.h"
 
-class ObjectCacher {
-  Objecter *objecter;
+#include "Objecter.h"
 
+class Objecter;
+class Objecter::OSDRead;
+class Objecter::OSDWrite;
 
+class ObjectCacher {
+ private:
+  Objecter *objecter;
+  
+  // ******* Object *********
   class Object {
+  public:
        
-       class BufferHead {
+       // ******* Object::BufferHead *********
+       class BufferHead : public LRUObject {
        public:
-         const int CLEAN = 1;
-         const int DIRTY = 2;
-         const int RX = 3;
-         const int TX = 4;
+         // states
+         static const int STATE_MISSING = 0;
+         static const int STATE_CLEAN = 1;
+         static const int STATE_DIRTY = 2;
+         static const int STATE_RX = 3;
+         static const int STATE_TX = 4;
+         
+         // my fields
          int state;
+         int ref;
 
-       };
+         version_t version;      // version of object (if non-zero)
+         bufferlist  bl;
 
-       map<size_t, BufferHead*> bh_map;
+         map<size_t, list<Context*> > waitfor_read;
 
-       class Lock {
-       public:
-         const int NONE = 0;
-         const int WRLOCK = 1;
-         //const int RDLOCK = 2;
-         
-         int state;
+         size_t length() { return bl.length(); }
 
-         Lock() : state(NONE) {}
+         // states
+         void set_state(int s) {
+               if (s == STATE_RX || s == STATE_TX) get();
+               if (state == STATE_RX || state == STATE_TX) put();
+               state = s;
+         }
+         int get_state() { return state; }
+         
+         bool is_missing() { return state == STATE_MISSING; }
+         bool is_dirty() { return state == STATE_DIRTY; }
+         bool is_clean() { return state == STATE_CLEAN; }
+         bool is_tx() { return state == STATE_TX; }
+         bool is_rx() { return state == STATE_RX; }
+
+         // reference counting
+         int get() {
+               assert(ref >= 0);
+               if (ref == 0) lru_pin();
+               return ++ref;
+         }
+         int put() {
+               assert(ref > 0);
+               if (ref == 1) lru_unpin();
+               --ref;
+               return ref;
+         }
+
+         BufferHead() : 
+               state(STATE_MISSING),
+               ref(0),
+               version(0) {}
        };
 
+       // ObjectCacher::Object fields
+       object_t  oid;
+       inodeno_t ino;
+
+       map<size_t, BufferHead*>     bh_map;
+
+       Object(object_t o, inodeno_t i) : oid(o), ino(i) {}
   };
 
+  // ObjectCacher fields
+  hash_map<object_t, Object*> objects;
+
+  set<Object::BufferHead*> dirty_bh;
+  LRU   lru_dirty, lru_rest;
+
+
+  // bh stats
+  Cond  stat_cond;
+  int   stat_waiter;
+
+  off_t stat_clean;
+  off_t stat_dirty;
+  off_t stat_rx;
+  off_t stat_tx;
+  off_t stat_partial;
+  off_t stat_missing;
+
+  void bh_stat_add(Object::BufferHead *bh) {
+       switch (bh->get_state()) {
+       case Object::BufferHead::STATE_MISSING: stat_missing += bh->length(); break;
+       case Object::BufferHead::STATE_CLEAN: stat_clean += bh->length(); break;
+       case Object::BufferHead::STATE_DIRTY: stat_dirty += bh->length(); break;
+       case Object::BufferHead::STATE_TX: stat_tx += bh->length(); break;
+       case Object::BufferHead::STATE_RX: stat_rx += bh->length(); break;
+       }
+       if (stat_waiter) stat_cond.Signal();
+  }
+  void bh_stat_sub(Object::BufferHead *bh) {
+       switch (bh->get_state()) {
+       case Object::BufferHead::STATE_MISSING: stat_missing -= bh->length(); break;
+       case Object::BufferHead::STATE_CLEAN: stat_clean -= bh->length(); break;
+       case Object::BufferHead::STATE_DIRTY: stat_dirty -= bh->length(); break;
+       case Object::BufferHead::STATE_TX: stat_tx -= bh->length(); break;
+       case Object::BufferHead::STATE_RX: stat_rx -= bh->length(); break;
+       }
+  }
+  off_t get_stat_tx() { return stat_tx; }
+  off_t get_stat_rx() { return stat_rx; }
+  off_t get_stat_dirty() { return stat_dirty; }
+  off_t get_stat_clean() { return stat_clean; }
+  off_t get_stat_partial() { return stat_partial; }
+
+  // bh states
+  void bh_set_state(Object::BufferHead *bh, int s) {
+       // move between lru lists?
+       if (s == Object::BufferHead::STATE_DIRTY && bh->get_state() != Object::BufferHead::STATE_DIRTY) {
+         lru_rest.lru_remove(bh);
+         lru_dirty.lru_insert_top(bh);
+         dirty_bh.insert(bh);
+       }
+       if (s != Object::BufferHead::STATE_DIRTY && bh->get_state() == Object::BufferHead::STATE_DIRTY) {
+         lru_dirty.lru_remove(bh);
+         lru_rest.lru_insert_mid(bh);
+         dirty_bh.erase(bh);
+       }
+
+       // set state
+       bh_stat_sub(bh);
+       bh->set_state(s);
+       bh_stat_add(bh);
+  }      
+
+  void copy_bh_state(Object::BufferHead *bh1, Object::BufferHead *bh2) { 
+       bh_set_state(bh2, bh1->get_state());
+  }
   
-
-  int map_read(OSDRead *rd);
-  int map_write(OSDWrite *wr);
+  void mark_missing(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_MISSING); };
+  void mark_clean(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_CLEAN); };
+  void mark_rx(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_RX); };
+  void mark_tx(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_TX); };
+  void mark_dirty(Object::BufferHead *bh) { 
+       bh_set_state(bh, Object::BufferHead::STATE_DIRTY); 
+       //bh->set_dirty_stamp(g_clock.now());
+  };
 
 
-  void flush(set<object_t>& objects);  
-  void flush_all();
 
-  void commit(set<object_t>& objects);
-  void commit_all();
 
+  int map_read(Objecter::OSDRead *rd);
+  int map_write(Objecter::OSDWrite *wr);
 
+ public:
+  // blocking.  async.
+  int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
+  int writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
   
+  // blocking.  atomic+sync.
+  int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
+  int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
+
+  void flush_set(inodeno_t ino, Context *onfinish=0);
+  void flush_all(Context *onfinish=0);
+
+  void commit_set(inodeno_t ino, Context *oncommit=0);
+  void commit_all(Context *oncommit=0);
 };
 
 
-/*
-// sync write (correct)
-Filer->atomic_sync_write();
-   map
-   ObjectCache->atomic_sync_writex(...);  // blocks until sync write happens, or i get write locks
-
-// async write
-Filer->write();
-   map
-   ObjectCache->writex(...);      // non-blocking.  update cache.
- or
-   map
-   Objecter->writex(...);         // non-blocking.  no cache.  (MDS)
-*/
+#endif