]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged trunk changes r1047:1098 into branches/sage/cephmds2
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Feb 2007 18:08:49 +0000 (18:08 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Feb 2007 18:08:49 +0000 (18:08 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1099 29311d96-e01e-0410-9327-a35deaab8ce9

33 files changed:
branches/sage/cephmds2/Makefile
branches/sage/cephmds2/cfuse.cc
branches/sage/cephmds2/client/Client.cc
branches/sage/cephmds2/client/Client.h
branches/sage/cephmds2/client/SyntheticClient.cc
branches/sage/cephmds2/client/SyntheticClient.h
branches/sage/cephmds2/client/fuse.cc
branches/sage/cephmds2/common/Clock.h
branches/sage/cephmds2/csyn.cc
branches/sage/cephmds2/ebofs/BlockDevice.cc
branches/sage/cephmds2/ebofs/BufferCache.cc
branches/sage/cephmds2/ebofs/BufferCache.h
branches/sage/cephmds2/ebofs/Ebofs.cc
branches/sage/cephmds2/ebofs/Ebofs.h
branches/sage/cephmds2/include/buffer.h
branches/sage/cephmds2/mds/MDSMap.h
branches/sage/cephmds2/messages/MClientBoot.h
branches/sage/cephmds2/messages/MMonElectionAck.h
branches/sage/cephmds2/messages/MMonElectionPropose.h [new file with mode: 0644]
branches/sage/cephmds2/messages/MMonElectionVictory.h [new file with mode: 0644]
branches/sage/cephmds2/mon/Elector.cc
branches/sage/cephmds2/mon/Elector.h
branches/sage/cephmds2/mon/Monitor.cc
branches/sage/cephmds2/mon/Monitor.h
branches/sage/cephmds2/mon/OSDMonitor.cc
branches/sage/cephmds2/msg/Message.cc
branches/sage/cephmds2/msg/Message.h
branches/sage/cephmds2/msg/SimpleMessenger.cc
branches/sage/cephmds2/msg/SimpleMessenger.h
branches/sage/cephmds2/msg/tcp.h
branches/sage/cephmds2/osd/Ager.cc
branches/sage/cephmds2/osd/FakeStore.cc
branches/sage/cephmds2/osd/ObjectStore.h

index b73d4bb4d1163db1198e73b8c0c204aedbc5c022..e740425d30b6f309c130ce4dbaa4a720f9272e82 100644 (file)
@@ -9,9 +9,25 @@
 # dev work, and seems to behave just fine...  change ${CC} back to
 # mpicxx if you get paranoid.
 
+#CC = g++
+#CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
+#LIBS = -lpthread  
+
+# Hook for extra -I options, etc.
+EXTRA_CFLAGS = 
+
+ifeq ($(target),darwin)
+# For Darwin
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -DDARWIN -D__FreeBSD__=10 ${EXTRA_CFLAGS}
+LDINC = ar -rc
+else
+# For linux
+CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE 
+LDINC = ld -i -o
+endif
+
 CC = g++
-CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
-LIBS = -lpthread  
+LIBS = -lpthread
 
 #for normal mpich2 machines
 MPICC = mpicxx
@@ -121,7 +137,7 @@ gprof-helper.so: test/gprof-helper.c
 fakefuse: fakefuse.cc mon.o mds.o client.o osd.o osdc.o ebofs.o client/fuse.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
-fakesyn: fakesyn.o mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.cc mon.o mds.o client.o osd.o ebofs.o osdc.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
 
@@ -155,7 +171,7 @@ osd_obfs.o: osd/OBFSStore.o osd/OSD.cc osd/PG.o osd/ObjectStore.o osd/FakeStore.
 
 # libceph
 libceph.o: client/ldceph.o client/Client.o msg/SimpleMessenger.o ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
-       ld -i $^ -o $@
+       ${LDINC} $^ -o $@
 
 bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
        mpicc -c $^ -o $@
@@ -171,30 +187,29 @@ mdtest.ceph: bench/mdtest/mdtest.o libceph.o
 %.so: %.cc
        ${CC} -shared -fPIC ${CFLAGS} $< -o $@
 
-
 clean:
        rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
 
 common.o: ${COMMON_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 ebofs.o: ${EBOFS_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 client.o: ${CLIENT_OBJS} 
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 osd.o: ${OSD_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 osdc.o: ${OSDC_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 mds.o: ${MDS_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 mon.o: ${MON_OBJS}
-       ld -i -o $@ $^
+       ${LDINC} $@ $^
 
 %.o: %.cc
        ${CC} ${CFLAGS} -c $< -o $@
index 33fe40bc67322b5d9d567db700fb32185c6893e2..3c73c3d885a19f1297b8bbc3bf9a5ace9f9148e0 100644 (file)
@@ -25,7 +25,9 @@ using namespace std;
 
 #include "common/Timer.h"
        
+#ifndef DARWIN
 #include <envz.h>
+#endif // DARWIN
 
 #include <sys/types.h>
 #include <sys/stat.h>
index 688ffe6742be3e3b17ad7e8c659b74ef0a0e50fe..af607f1f8618847ff4c2841e47611af13e3aff2d 100644 (file)
@@ -21,6 +21,8 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#include <sys/statvfs.h>
+
 
 #include <iostream>
 using namespace std;
@@ -1485,9 +1487,12 @@ void Client::fill_statlite(inode_t& inode, struct statlite *st)
   st->st_nlink = inode.nlink;
   st->st_uid = inode.uid;
   st->st_gid = inode.gid;
+#ifndef DARWIN
+  // FIXME what's going on here with darwin?
   st->st_ctime = inode.ctime;
   st->st_atime = inode.atime;
   st->st_mtime = inode.mtime;
+#endif
   st->st_size = inode.size;
   st->st_blocks = inode.size ? ((inode.size - 1) / 4096 + 1):0;
   st->st_blksize = 4096;
@@ -1837,6 +1842,7 @@ struct dirent *Client::readdir(DIR *dirp)
   // fill the dirent
   d->dp.d_dirent.d_ino = d->p->second.ino;
 #ifndef __CYGWIN__
+#ifndef DARWIN
   if (d->p->second.is_symlink())
     d->dp.d_dirent.d_type = DT_LNK;
   else if (d->p->second.is_dir())
@@ -1848,6 +1854,7 @@ struct dirent *Client::readdir(DIR *dirp)
 
   d->dp.d_dirent.d_off = d->off;
   d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
 #endif
 
   strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
@@ -1898,6 +1905,7 @@ struct dirent_plus *Client::readdirplus(DIR *dirp)
   // fill the dirent
   d->dp.d_dirent.d_ino = d->p->second.ino;
 #ifndef __CYGWIN__
+#ifndef DARWIN
   if (d->p->second.is_symlink())
     d->dp.d_dirent.d_type = DT_LNK;
   else if (d->p->second.is_dir())
@@ -1909,6 +1917,7 @@ struct dirent_plus *Client::readdirplus(DIR *dirp)
 
   d->dp.d_dirent.d_off = d->off;
   d->dp.d_dirent.d_reclen = 1; // all records are length 1 (wrt offset, seekdir, telldir, etc.)
+#endif // DARWIN
 #endif
 
   strncpy(d->dp.d_dirent.d_name, d->p->first.c_str(), 256);
@@ -2519,14 +2528,24 @@ int Client::chdir(const char *path)
   return 0;
 }
 
-int Client::statfs(const char *path, struct statfs *stbuf) 
+int Client::statfs(const char *path, struct statvfs *stbuf)
 {
-  assert(0);  // implement me
+  bzero (stbuf, sizeof (struct statvfs));
+  // FIXME
+  stbuf->f_bsize   = 1024;
+  stbuf->f_frsize  = 1024;
+  stbuf->f_blocks  = 1024 * 1024;
+  stbuf->f_bfree   = 1024 * 1024;
+  stbuf->f_bavail  = 1024 * 1024;
+  stbuf->f_files   = 1024 * 1024;
+  stbuf->f_ffree   = 1024 * 1024;
+  stbuf->f_favail  = 1024 * 1024;
+  stbuf->f_namemax = 1024;
+
   return 0;
 }
 
 
-
 int Client::lazyio_propogate(int fd, off_t offset, size_t count)
 {
   client_lock.Lock();
index 755b8f34397acc39e0f9d16c4bca33c2deec5c6a..de16639c89e642affe398c30bbba6fbb64265b4a 100644 (file)
@@ -523,7 +523,7 @@ protected:
   int unmount();
 
   // these shoud (more or less) mirror the actual system calls.
-  int statfs(const char *path, struct statfs *stbuf);
+  int statfs(const char *path, struct statvfs *stbuf);
 
   // crap
   int chdir(const char *s);
index 1f8966e127b9123d493305d1790b62191f8970e4..7773d9735d7b958a89364a5a9fa4905fd3fc9931 100644 (file)
@@ -12,6 +12,7 @@
  */
 
 #include <iostream>
+#include <sstream>
 using namespace std;
 
 
@@ -81,6 +82,9 @@ void parse_syn_options(vector<char*>& args)
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
+      } else if (strcmp(args[i],"makedirmess") == 0) {
+        syn_modes.push_back( SYNCLIENT_MODE_MAKEDIRMESS );
+        syn_iargs.push_back( atoi(args[++i]) );
       } else if (strcmp(args[i],"statdirs") == 0) {
         syn_modes.push_back( SYNCLIENT_MODE_STATDIRS );
         syn_iargs.push_back( atoi(args[++i]) );
@@ -282,6 +286,16 @@ int SyntheticClient::run()
       }
       break;
 
+    case SYNCLIENT_MODE_MAKEDIRMESS:
+      {
+        string sarg1 = get_sarg(0);
+        int iarg1 = iargs.front();  iargs.pop_front();
+        if (run_me()) {
+          dout(2) << "makedirmess " << sarg1 << " " << iarg1 << endl;
+          make_dir_mess(sarg1.c_str(), iarg1);
+        }
+      }
+      break;
     case SYNCLIENT_MODE_MAKEDIRS:
       {
         string sarg1 = get_sarg(0);
@@ -1236,3 +1250,42 @@ int SyntheticClient::random_walk(int num_req)
 }
 
 
+
+
+void SyntheticClient::make_dir_mess(const char *basedir, int n)
+{
+  vector<string> dirs;
+  
+  dirs.push_back(basedir);
+  dirs.push_back(basedir);
+  
+  client->mkdir(basedir, 0755);
+
+  // motivation:
+  //  P(dir) ~ subdirs_of(dir) + 2
+  // from 5-year metadata workload paper in fast'07
+
+  // create dirs
+  for (int i=0; i<n; i++) {
+    // pick a dir
+    int k = rand() % dirs.size();
+    string parent = dirs[k];
+    
+    // pick a name
+    std::stringstream ss;
+    ss << parent << "/" << i;
+    string dir;
+    ss >> dir;
+
+    // update dirs
+    dirs.push_back(parent);
+    dirs.push_back(dir);
+    dirs.push_back(dir);
+
+    // do it
+    client->mkdir(dir.c_str(), 0755);
+  }
+    
+  
+}
+
index 14720bdd412b28c97a5059a3f4b173eba18a8ce4..ebf96386be95c6c180de513880eadac03b7b48e4 100644 (file)
@@ -24,8 +24,9 @@
 
 #define SYNCLIENT_MODE_RANDOMWALK  1
 #define SYNCLIENT_MODE_FULLWALK    2
-#define SYNCLIENT_MODE_REPEATWALK  7
+#define SYNCLIENT_MODE_REPEATWALK  3
 
+#define SYNCLIENT_MODE_MAKEDIRMESS  7
 #define SYNCLIENT_MODE_MAKEDIRS     8      // dirs files depth
 #define SYNCLIENT_MODE_STATDIRS     9     // dirs files depth
 #define SYNCLIENT_MODE_READDIRS     10     // dirs files depth
@@ -193,6 +194,8 @@ class SyntheticClient {
 
   int play_trace(Trace& t, string& prefix);
 
+  void make_dir_mess(const char *basedir, int n);
+
 };
 
 #endif
index 560a515a95240d1ddeb63f69ad2dc8649c56f498..64497820f381e791bea04a96f190e8a4f692214c 100644 (file)
@@ -27,7 +27,7 @@
 #define _XOPEN_SOURCE 500
 #endif
 
-#define FUSE_USE_VERSION 22
+#define FUSE_USE_VERSION 25
 
 #include <fuse.h>
 #include <stdio.h>
@@ -36,7 +36,7 @@
 #include <fcntl.h>
 #include <dirent.h>
 #include <errno.h>
-#include <sys/statfs.h>
+#include <sys/statvfs.h>
 
 
 // ceph stuff
@@ -185,7 +185,8 @@ static int ceph_flush(const char *path, struct fuse_file_info *fi)
 }
 */
 
-static int ceph_statfs(const char *path, struct statfs *stbuf)
+
+static int ceph_statfs(const char *path, struct statvfs *stbuf)
 {
   return client->statfs(path, stbuf);
 }
@@ -244,8 +245,11 @@ int ceph_fuse_main(Client *c, int argc, char *argv[])
   
   // allow other (all!) users to see my file system
   // NOTE: echo user_allow_other >> /etc/fuse.conf
+  // NB: seems broken on Darwin
+#ifndef DARWIN
   newargv[newargc++] = "-o";
   newargv[newargc++] = "allow_other";
+#endif // DARWIN
   
   // use inos
   newargv[newargc++] = "-o";
index 877f6884537f22909cb1a7138951aa68e74c3bbc..92a2b2bddf6d0a5ae43594c764c5c587cfd05bc1 100644 (file)
@@ -55,8 +55,10 @@ class utime_t {
   int           nsec() const { return tv.tv_usec*1000; }
 
   // ref accessors/modifiers
-  time_t&         sec_ref()  { return tv.tv_sec; } 
-  long&           usec_ref() { return tv.tv_usec; }
+  time_t&         sec_ref()  { return tv.tv_sec; }
+  // FIXME: tv.tv_usec is a __darwin_suseconds_t on Darwin.
+  // is just casting it to long& OK? 
+  long&           usec_ref() { return (long&) tv.tv_usec; }
 
   // cast to double
   operator double() {
index 9b7a58f5de5e517925146b33025d9c8c46f194d5..b5e4892cb9a6e602c3f33f1a87952b8713cb5c71 100644 (file)
@@ -25,8 +25,10 @@ using namespace std;
 #include "msg/SimpleMessenger.h"
 
 #include "common/Timer.h"
-       
+
+#ifndef DARWIN
 #include <envz.h>
+#endif // DARWIN
 
 #include <sys/types.h>
 #include <sys/stat.h>
index 5188946574643c652e3a4fca60cf992040ced2e9..7044e4ca38f27521f7b96ae7d6e22c03a7b194f4 100644 (file)
 #include <sys/ioctl.h>
 
 #ifndef __CYGWIN__
+#ifndef DARWIN
 #include <linux/fs.h>
 #endif
+#endif
 
 
 /*******************************************
@@ -665,7 +667,13 @@ int BlockDevice::_write(int fd, unsigned bno, unsigned num, bufferlist& bl)
 
 int BlockDevice::open_fd()
 {
+#ifdef DARWIN
+  int fd = ::open(dev.c_str(), O_RDWR|O_SYNC, 0);
+  ::fcntl(fd, F_NOCACHE);
+  return fd;
+#else
   return ::open(dev.c_str(), O_RDWR|O_SYNC|O_DIRECT, 0);
+#endif
 }
 
 int BlockDevice::open(kicker *idle) 
index cee7f2c12ce79789d82e1da4fe9265d163c55264..fa48c08b18a09122a11265ecf4750e1e52d26145 100644 (file)
@@ -530,6 +530,23 @@ int ObjectCache::scan_versions(block_t start, block_t len,
 }
 */
 
+void ObjectCache::touch_bottom(block_t bstart, block_t blast)
+{
+  for (map<block_t, BufferHead*>::iterator p = data.lower_bound(bstart);
+       p != data.end();
+       ++p) {
+    BufferHead *bh = p->second;
+    
+    // don't trim unless it's entirely in our range
+    if (bh->start() < bstart) continue;
+    if (bh->end() > blast) break;     
+    
+    dout(12) << "moving " << *bh << " to bottom of lru" << endl;
+    bc->touch_bottom(bh);  // move to bottom of lru list
+  }
+}  
+
+
 void ObjectCache::truncate(block_t blocks, version_t super_epoch)
 {
   dout(7) << "truncate " << object_id 
index 922c5e531ee566837b94f0f158f4f6e063fa688c..846809735103a1136a9cda707da40f65f744a3a8 100644 (file)
@@ -75,12 +75,15 @@ class BufferHead : public LRUObject {
 
   utime_t    dirty_stamp;
 
+  bool       want_to_expire;  // wants to be at bottom of lru
+
  public:
   BufferHead(ObjectCache *o) :
     oc(o), //cancellable_ioh(0), tx_epoch(0),
     rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
     shadow_of(0),
-    ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
+    ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0),
+    want_to_expire(false)
     {}
   ~BufferHead() {
     unpin_shadows();
@@ -405,6 +408,7 @@ class ObjectCache {
                 interval_set<block_t>& alloc,
                 map<block_t, BufferHead*>& hits,
                 version_t super_epoch);   // can write to these.
+  void touch_bottom(block_t bstart, block_t blast);
 
   BufferHead *split(BufferHead *bh, block_t off);
 
@@ -509,6 +513,13 @@ class BufferCache {
     } else
       lru_rest.lru_touch(bh);
   }
+  void touch_bottom(BufferHead *bh) {
+    if (bh->is_dirty()) {
+      bh->want_to_expire = true;
+      lru_dirty.lru_bottouch(bh);
+    } else
+      lru_rest.lru_bottouch(bh);
+  }
   void remove_bh(BufferHead *bh) {
     bh->get_oc()->remove_bh(bh);
     stat_sub(bh);
@@ -586,7 +597,10 @@ class BufferCache {
     }
     if (s != BufferHead::STATE_DIRTY && bh->get_state() == BufferHead::STATE_DIRTY) {
       lru_dirty.lru_remove(bh);
-      lru_rest.lru_insert_mid(bh);
+      if (bh->want_to_expire)
+       lru_rest.lru_insert_bot(bh);
+      else
+       lru_rest.lru_insert_mid(bh);
       dirty_bh.erase(bh);
     }
 
index 520a9c7a00e926b55969f444350aa189b153c9e9..a190b83387385e3fbe06f2705dd8bf6a8ef9cf44 100644 (file)
@@ -1212,8 +1212,10 @@ void Ebofs::kick_idle()
 void Ebofs::sync(Context *onsafe)
 {
   ebofs_lock.Lock();
-  if (onsafe) 
+  if (onsafe) {
+    dirty = true;
     commit_waiters[super_epoch].push_back(onsafe);
+  }
   ebofs_lock.Unlock();
 }
 
@@ -1223,22 +1225,14 @@ void Ebofs::sync()
   if (!dirty) {
     dout(7) << "sync in " << super_epoch << ", not dirty" << endl;
   } else {
-    dout(7) << "sync in " << super_epoch << endl;
-    
-    if (!commit_thread_started) {
-      dout(10) << "sync waiting for commit thread to start" << endl;
-      sync_cond.Wait(ebofs_lock);
-    }
-    
-    if (mid_commit) {
-      dout(10) << "sync waiting for commit in progress" << endl;
+    epoch_t start = super_epoch;
+    dout(7) << "sync start in " << start << endl;
+    while (super_epoch == start) {
+      dout(7) << "sync kicking commit in " << super_epoch << endl;
+      dirty = true;
+      commit_cond.Signal();
       sync_cond.Wait(ebofs_lock);
     }
-    
-    commit_cond.Signal();  // trigger a commit
-    
-    sync_cond.Wait(ebofs_lock);  // wait
-    
     dout(10) << "sync finish in " << super_epoch << endl;
   }
   ebofs_lock.Unlock();
@@ -1822,6 +1816,89 @@ bool Ebofs::attempt_read(Onode *on, off_t off, size_t len, bufferlist& bl,
   return true;
 }
 
+
+/*
+ * is_cached -- query whether a object extent is in our cache
+ * return value of -1 if onode isn't loaded.  otherwise, the number
+ * of extents that need to be read (i.e. # of seeks)  
+ */
+int Ebofs::is_cached(object_t oid, off_t off, size_t len)
+{
+  ebofs_lock.Lock();
+  int r = _is_cached(oid, off, len);
+  ebofs_lock.Unlock();
+  return r;
+}
+
+int Ebofs::_is_cached(object_t oid, off_t off, size_t len)
+{
+  Onode *on = 0;
+  if (onode_map.count(oid) == 0) {
+    dout(7) << "_is_cached " << oid << " " << off << "~" << len << " ... onode  " << endl;
+    return -1;  // object dne?
+  } 
+  
+  if (!on->have_oc()) {  
+    // nothing is cached.  return # of extents in file.
+    return on->extent_map.size();
+  }
+  
+  // map
+  block_t bstart = off / EBOFS_BLOCK_SIZE;
+  block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+  block_t blen = blast-bstart+1;
+
+  map<block_t, BufferHead*> hits;
+  map<block_t, BufferHead*> missing;  // read these
+  map<block_t, BufferHead*> rx;       // wait for these
+  map<block_t, BufferHead*> partials;  // ??
+  on->get_oc(&bc)->map_read(bstart, blen, hits, missing, rx, partials);
+  return missing.size() + rx.size() + partials.size();
+
+  // FIXME: actually, we should calculate if these extents are contiguous.
+  // and not using map_read, probably...
+  /* hrmpf
+  block_t dpos = 0;
+  block_t opos = bstart;
+  while (opos < blen) {
+    if (hits.begin()->first == opos) {
+    } else {
+      block_t d;
+      if (missing.begin()->first == opos) d = missing.begin()->second.
+    
+  }
+  */
+}
+
+void Ebofs::trim_from_cache(object_t oid, off_t off, size_t len)
+{
+  ebofs_lock.Lock();
+  _trim_from_cache(oid, off, len);
+  ebofs_lock.Unlock();
+}
+
+void Ebofs::_trim_from_cache(object_t oid, off_t off, size_t len)
+{
+  Onode *on = 0;
+  if (onode_map.count(oid) == 0) {
+    dout(7) << "_trim_from_cache " << oid << " " << off << "~" << len << " ... onode not in cache  " << endl;
+    return; 
+  } 
+  
+  if (!on->have_oc()) 
+    return; // nothing is cached. 
+
+  // map to blocks
+  block_t bstart = off / EBOFS_BLOCK_SIZE;
+  block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
+
+  ObjectCache *oc = on->get_oc(&bc);
+  oc->touch_bottom(bstart, blast);
+  
+  return;
+}
+
+
 int Ebofs::read(object_t oid, 
                 off_t off, size_t len,
                 bufferlist& bl)
@@ -1969,6 +2046,15 @@ unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
       }
       break;
 
+    case Transaction::OP_TRIMCACHE:
+      {
+        object_t oid = t.oids.front(); t.oids.pop_front();
+        off_t offset = t.offsets.front(); t.offsets.pop_front();
+        size_t len = t.lengths.front(); t.lengths.pop_front();
+        _trim_from_cache(oid, offset, len);
+      }
+      break;
+
     case Transaction::OP_TRUNCATE:
       {
         object_t oid = t.oids.front(); t.oids.pop_front();
index a8efe3b6a6b4c7be1e4b6cb899fd802de29ff9f4..bf7311e1d4c93820b3cd23cf31ea3fbdf56cdddc 100644 (file)
@@ -242,8 +242,10 @@ class Ebofs : public ObjectStore {
   bool exists(object_t);
   int stat(object_t, struct stat*);
   int read(object_t, off_t off, size_t len, bufferlist& bl);
-  //int write(object_t oid, off_t off, size_t len, bufferlist& bl, bool fsync=true);
+  int is_cached(object_t oid, off_t off, size_t len);
+
   int write(object_t oid, off_t off, size_t len, bufferlist& bl, Context *onsafe);
+  void trim_from_cache(object_t oid, off_t off, size_t len);
   int truncate(object_t oid, off_t size, Context *onsafe=0);
   int truncate_front(object_t oid, off_t size, Context *onsafe=0);
   int remove(object_t oid, Context *onsafe=0);
@@ -298,12 +300,14 @@ class Ebofs : public ObjectStore {
 private:
   // private interface -- use if caller already holds lock
   int _read(object_t oid, off_t off, size_t len, bufferlist& bl);
+  int _is_cached(object_t oid, off_t off, size_t len);
   int _stat(object_t oid, struct stat *st);
   int _getattr(object_t oid, const char *name, void *value, size_t size);
   int _getattrs(object_t oid, map<string,bufferptr> &aset);
 
   bool _write_will_block();
   int _write(object_t oid, off_t off, size_t len, bufferlist& bl);
+  void _trim_from_cache(object_t oid, off_t off, size_t len);
   int _truncate(object_t oid, off_t size);
   int _truncate_front(object_t oid, off_t size);
   int _remove(object_t oid);
index e9de7a9894b2af1f06d709659280fcc69493b964..fbcf4e3c130b6f3b92b163a1c3b24f525927b1f6 100644 (file)
@@ -108,7 +108,7 @@ private:
   class raw_mmap_pages : public raw {
   public:
     raw_mmap_pages(unsigned l) : raw(l) {
-      data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
+      data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0);
       inc_total_alloc(len);
     }
     ~raw_mmap_pages() {
@@ -123,7 +123,11 @@ private:
   class raw_posix_aligned : public raw {
   public:
     raw_posix_aligned(unsigned l) : raw(l) {
+#ifdef DARWIN
+      data = (char *) valloc (len);
+#else
       ::posix_memalign((void**)&data, BUFFER_PAGE_SIZE, len);
+#endif /* DARWIN */
       inc_total_alloc(len);
     }
     ~raw_posix_aligned() {
index 677589fb33de66c3b4da2d017bde58d490269292..66b086e5ea39f8d49ac6f628839899a52a124812 100644 (file)
@@ -60,6 +60,7 @@ class MDSMap {
     case STATE_STOPPED:  return "up:stopped";
     default: assert(0);
     }
+    return 0;
   }
 
  protected:
index 0b73505642d7d39b1b3e969112f3ab50cb57ef0b..460f9f02e27f411050b3fecec7f0bfc8e3063467 100644 (file)
 class MClientBoot : public Message {
 
  public:
-  MClientBoot() : Message(MSG_CLIENT_BOOT) { 
-  }
+  MClientBoot() : Message(MSG_CLIENT_BOOT) { }
 
-  char *get_type_name() { return "Cboot"; }
+  char *get_type_name() { return "ClientBoot"; }
 
-  virtual void decode_payload(crope& s, int& off) {  
-  }
-  virtual void encode_payload(crope& s) {  
-  }
+  void encode_payload() { }
+  void decode_payload() { }
 };
 
 #endif
index dbfa30c9cb099e958cd937f3f1bab4f6dcdafb46..2399cca73d60c44f5193a3d0075880b00a7f6e86 100644 (file)
 
 class MMonElectionAck : public Message {
  public:
-  int q;
-  int refresh_num;
+  MMonElectionAck() : Message(MSG_MON_ELECTION_ACK) {}
+  
+  virtual char *get_type_name() { return "election_ack"; }
 
-  MMonElectionAck() {}
-  MMonElectionAck(int _q, int _n) :
-    Message(MSG_MON_ELECTION_ACK),
-    q(_q), refresh_num(_n) {}
-  void decode_payload() {
-    int off = 0;
-    payload.copy(off, sizeof(q), (char*)&q);
-    off += sizeof(q);
-    payload.copy(off, sizeof(refresh_num), (char*)&refresh_num);
-    off += sizeof(refresh_num);
-  }
-  void encode_payload() {
-    payload.append((char*)&q, sizeof(q));
-    payload.append((char*)&refresh_num, sizeof(refresh_num));
-  }
-
-  virtual char *get_type_name() { return "MonElAck"; }
+  void encode_payload() {}
+  void decode_payload() {}
 };
 
 #endif
diff --git a/branches/sage/cephmds2/messages/MMonElectionPropose.h b/branches/sage/cephmds2/messages/MMonElectionPropose.h
new file mode 100644 (file)
index 0000000..d9310f2
--- /dev/null
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MMONELECTIONPROPOSE_H
+#define __MMONELECTIONPROPOSE_H
+
+#include "msg/Message.h"
+
+
+class MMonElectionPropose : public Message {
+ public:
+  MMonElectionPropose() : Message(MSG_MON_ELECTION_PROPOSE) {}
+  
+  virtual char *get_type_name() { return "election_propose"; }
+
+  void encode_payload() {}
+  void decode_payload() {}
+
+};
+
+#endif
diff --git a/branches/sage/cephmds2/messages/MMonElectionVictory.h b/branches/sage/cephmds2/messages/MMonElectionVictory.h
new file mode 100644 (file)
index 0000000..8bdbf2f
--- /dev/null
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MMONELECTIONVICTORY_H
+#define __MMONELECTIONVICTORY_H
+
+#include "msg/Message.h"
+
+
+class MMonElectionVictory : public Message {
+ public:
+  //set<int> active_set;
+
+  MMonElectionVictory(/*set<int>& as*/) : Message(MSG_MON_ELECTION_VICTORY)//,
+       //active_set(as) 
+       {}
+  
+  virtual char *get_type_name() { return "election_victory"; }
+  
+  void encode_payload() {
+    //::_encode(active_set, payload);
+  }
+  void decode_payload() {
+    //int off = 0;
+    //::_decode(active_set, payload, off);
+  }
+};
+
+#endif
index 5816bf035739e95a6bfa8857622e8aff67246c63..563fcc687489b2cc72893a67a108ccd4eea4e15c 100644 (file)
 
 #include "common/Timer.h"
 
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionPropose.h"
 #include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
+#include "messages/MMonElectionVictory.h"
 
 #include "config.h"
 #undef dout
-#define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << "mon" << whoami << " "
-#define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << "mon" << whoami << " "
+#define  derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
+#define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << mon->whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".elector "
 
 
+void Elector::start()
+{
+  dout(5) << "start -- can i be leader?" << endl;
 
-class C_Elect_ReadTimer : public Context {
-  Elector *mon;
-public:
-  C_Elect_ReadTimer(Elector *m) : mon(m){}
-  void finish(int r) {
-    mon->read_timer();
+  leader_acked = -1;
+
+  // start by trying to elect me
+  start_stamp = g_clock.now();
+  acked_me.clear();
+  acked_me.insert(whoami);
+  electing_me = true;
+  
+  // bcast to everyone else
+  for (int i=0; i<mon->monmap->num_mon; ++i) {
+       if (i == whoami) continue;
+       mon->messenger->send_message(new MMonElectionPropose,
+                                                                mon->monmap->get_inst(i));
   }
-};
+  
+  reset_timer();
+}
 
-void Elector::read_timer()
+void Elector::defer(int who)
 {
-  read_num++;
-  status_msg_count = 0;
-  old_views = views;   // TODO deep copy
-  for (unsigned i=0; i<processes.size(); i++) {
-       mon->messenger->send_message(new MMonElectionCollect(read_num), 
-                                                                mon->monmap->get_inst(processes[i]));
-  }
-};
+  dout(5) << "defer to " << who << endl;
 
-class C_Elect_TripTimer : public Context {
-  Elector *mon;
-public:
-  C_Elect_TripTimer(Elector *m) : mon(m){}
-  void finish(int r) {
-    mon->trip_timer();
+  if (electing_me) {
+       acked_me.clear();
+       electing_me = false;
   }
-};
-
-void Elector::trip_timer()
-{
-  views[whoami].expired = true;
-  registry[whoami].epoch.s_num++;
-  dout(1) << "Process " << whoami
-                 <<  " timed out (" << ack_msg_count << "/" << (f + 1)
-                 << ") ... increasing epoch. Now epoch is "
-                 << registry[whoami].epoch.s_num
-                 << endl;
-};
 
+  // ack them
+  leader_acked = who;
+  ack_stamp = g_clock.now();
+  mon->messenger->send_message(new MMonElectionAck,
+                                                          mon->monmap->get_inst(who));
+  
+  // set a timer
+  reset_timer();
+}
 
 
-class C_Elect_RefreshTimer : public Context {
-  Elector *mon;
+class C_Mon_ElectionExpire : public Context {
+  Elector *elector;
 public:
-  C_Elect_RefreshTimer(Elector *m) : mon(m) {}
+  C_Mon_ElectionExpire(Elector *e) : elector(e) { }
   void finish(int r) {
-    mon->refresh_timer();
+       elector->expire();
   }
 };
 
-void Elector::refresh_timer()
+void Elector::reset_timer()
 {
-  ack_msg_count = 0;
-  refresh_num++;
-  MMonElectionRefresh *msg = new MMonElectionRefresh(whoami, registry[whoami], refresh_num);
-  for (unsigned i=0; i<processes.size(); i++) {
-       mon->messenger->send_message(msg, mon->monmap->get_inst(processes[i]));
-  }
-  
-  // Start the trip timer
-  //round_trip_timer = new C_Elect_TripTimer(this);
-  mon->timer.add_event_after(trip_delta, new C_Elect_TripTimer(this));
-};
+  // set the timer
+  cancel_timer();
+  expire_event = new C_Mon_ElectionExpire(this);
+  g_timer.add_event_after(g_conf.mon_lease,
+                                                        expire_event);
+}
 
 
+void Elector::cancel_timer()
+{
+  if (expire_event)
+       g_timer.cancel_event(expire_event);
+}
 
-//////////////////////////
+void Elector::expire()
+{
+  dout(5) << "election timer expired" << endl;
+  
+  // did i win?
+  if (electing_me &&
+         acked_me.size() > (unsigned)(mon->monmap->num_mon / 2)) {
+       // i win
+       victory();
+  } else {
+       // whoever i deferred to didn't declare victory quickly enough.
+       start();
+  }
+}
 
 
-Elector::Epoch Elector::get_min_epoch()
+void Elector::victory()
 {
-  assert(!views.empty());
-  Epoch min = views[0].state.epoch;
-  for (unsigned i=1; i<views.size(); i++) {
-    if (views[i].state.epoch < min && !views[i].expired) {
-      min = views[i].state.epoch;
-    }
+  leader_acked = -1;
+  electing_me = false;
+
+  // tell everyone
+  for (int i=0; i<mon->monmap->num_mon; ++i) {
+       if (i == whoami) continue;
+       mon->messenger->send_message(new MMonElectionVictory,
+                                                                mon->monmap->get_inst(i));
   }
-  return min;
+    
+  // tell monitor
+  mon->win_election(acked_me);
 }
 
 
-void Elector::dispatch(Message *m)
+void Elector::handle_propose(MMonElectionPropose *m)
 {
-  switch (m->get_type()) {
-  case MSG_MON_ELECTION_ACK:
-       handle_ack((MMonElectionAck*)m);
-       break;
-       
-  case MSG_MON_ELECTION_STATUS:
-       handle_status((MMonElectionStatus*)m);
-       break;
-    
-  case MSG_MON_ELECTION_COLLECT:
-       handle_collect((MMonElectionCollect*)m);
-       break;
-    
-  case MSG_MON_ELECTION_REFRESH:
-       handle_refresh((MMonElectionRefresh*)m);
-       break;
-       
-  default:
-       assert(0);
+  dout(5) << "handle_propose from " << m->get_source() << endl;
+  int from = m->get_source().num();
+
+  if (from > whoami) {
+       // wait, i should win!
+       if (!electing_me)
+         start();
+  } else {
+       // they would win over me
+       if (leader_acked < 0 ||      // haven't acked anyone yet, or
+               leader_acked > from) {   // they would win over who you did ack
+         defer(from);
+       } else {
+         // ignore them!
+         dout(5) << "no, we already acked " << leader_acked << endl;
+       }
   }
+  
+  delete m;
 }
-
-void Elector::handle_ack(MMonElectionAck* msg)
+void Elector::handle_ack(MMonElectionAck *m)
 {
-  assert(refresh_num >= msg->refresh_num);
+  dout(5) << "handle_ack from " << m->get_source() << endl;
+  int from = m->get_source().num();
   
-  if (refresh_num > msg->refresh_num) {
-    // we got the message too late... discard it
-    return;
-  }
-  ack_msg_count++;
-  if (ack_msg_count >= f + 1) {
-    dout(5) << "Received _f+1 acks, increase freshness" << endl;
-    //mon->timer.cancel_event(round_trip_task);
-    //round_trip_timer->cancel();
-    registry[whoami].freshness++;         
+  if (electing_me) {
+       // thanks
+       acked_me.insert(from);
+       dout(5) << " so far i have " << acked_me << endl;
+       
+       // is that _everyone_?
+       if (acked_me.size() == (unsigned)mon->monmap->num_mon) {
+         // if yes, shortcut to election finish
+         victory();
+       }
+  } else {
+       // ignore, i'm deferring already.
   }
   
-  delete msg;
+  delete m;
 }
 
-void Elector::handle_collect(MMonElectionCollect* msg)
+void Elector::handle_victory(MMonElectionVictory *m)
 {
-  mon->messenger->send_message(new MMonElectionStatus(msg->get_source().num(),
-                                                      msg->read_num,
-                                                      registry),
-                                                          mon->monmap->get_inst(msg->get_source().num()));
-  delete msg;
+  dout(5) << "handle_victory from " << m->get_source() << endl;
+  int from = m->get_source().num();
+  
+  if (from < whoami) {
+       // ok, fine, they win
+       mon->lose_election(from);
+
+       // cancel my timer
+       cancel_timer(); 
+  } else {
+       // no, that makes no sense, i should win.  start over!
+       start();
+  }
 }
 
-void Elector::handle_refresh(MMonElectionRefresh* msg)
-{
-  if (registry[msg->p] < msg->state) {
-    // update local data
-    registry[msg->p] = msg->state;
-
-    // reply to msg
-    mon->messenger->send_message(new MMonElectionAck(msg->p, 
-                                                     msg->refresh_num),
-                                                                mon->monmap->get_inst(msg->get_source().num()));
-  }
 
-  delete msg;
-}
 
 
-void Elector::handle_status(MMonElectionStatus* msg)
+void Elector::dispatch(Message *m)
 {
-  if (read_num != msg->read_num) {
-    dout(1) << "handle_status "
-            << ":DISCARDED B/C OF READNUM(" << read_num << ":"
-            << msg->read_num << ")" 
-            << endl;
-    return;
-  }
-  for (unsigned i=0; i<processes.size(); i++) {
-    int r = processes[i];
-    // Put in the view the max value between then new state and the stored one
-    if ( msg->registry[r] > views[r].state ) {
-      views[r].state = msg->registry[r];
-    }
-  }
-        
-  status_msg_count++;
-  if (status_msg_count >= (int)processes.size() - f) { // Responses from quorum collected
-    for (unsigned i=0; i<processes.size(); i++) {
-      int r = processes[i];
-      // Check if r has refreshed its epoch number
-      if (!( views[r].state > old_views[r].state )) {
-        dout(5) << ":Other process (" << r << ") has expired" << endl;
-        views[r].expired = true;
-      }
-      if (views[r].state.epoch > old_views[r].state.epoch) {
-        views[r].expired = false;
-      }
-    }
-    Epoch leader_epoch = get_min_epoch();
-    leader_id = leader_epoch.p_id;
-    dout(1) << " thinks leader has ID: " << leader_id << endl;
+  switch (m->get_type()) {
+  case MSG_MON_ELECTION_ACK:
+       handle_ack((MMonElectionAck*)m);
+       break;
+    
+  case MSG_MON_ELECTION_PROPOSE:
+       handle_propose((MMonElectionPropose*)m);
+       break;
     
-    // Restarts the timer for the next iteration
-    mon->timer.add_event_after(main_delta + trip_delta, new C_Elect_ReadTimer(this));
+  case MSG_MON_ELECTION_VICTORY:
+       handle_victory((MMonElectionVictory*)m);
+       break;
+       
+  default:
+       assert(0);
   }
 }
 
index 57ca7556170761e9c8164d6fdd90c052d9aa91a5..bc556ae04ea7cb20399b1af81259b3af5601cbec 100644 (file)
@@ -21,142 +21,52 @@ using namespace std;
 #include "include/types.h"
 #include "msg/Message.h"
 
+#include "include/Context.h"
+
+#include "common/Timer.h"
 
 class Monitor;
 
 
 class Elector {
- public:
-
-  //// sub-classes
-
-  // Epoch
-  class Epoch {
-  public:
-    int p_id;
-    int s_num;
-    
-    Epoch(int p_id=0, int s_num=0) {
-      this->p_id = p_id;
-      this->s_num = s_num;
-    }
-  };    
-
-
-  // State
-  class State {
-  public:
-    Epoch epoch;
-    int freshness;
-
-    State() : freshness(0) {};
-    State(Epoch& e, int f) :
-      epoch(e), freshness(f) {}
-  };
-
-
-  class View {
-  public:
-    State state;
-    bool expired;
-    View() : expired(false) {}
-    View(State& s, bool e) : state(s), expired(e) {}
-  };
-
-
-  ///////////////
  private:
   Monitor *mon;
   int whoami;
 
-  // used during refresh phase
-  int ack_msg_count;
-  int refresh_num;
-  
-  // used during read phase
-  int read_num;
-  int status_msg_count;
-  
-  // the leader process id
-  int leader_id;
-  // f-accessible
-  int f;
-  
-  // the processes that compose the group
-  vector<int> processes;
-  // parameters for the process
-  int main_delta;
-  int trip_delta;
-  
-  // state variables
-  map<int, State> registry;
-  map<int, View>  views;
-  map<int, View>  old_views;
+  Context *expire_event;
 
-  // get the minimum epoch in the view map
-  Epoch get_min_epoch();
+  void reset_timer();
+  void cancel_timer();
+
+  // electing me
+  bool     electing_me;
+  utime_t  start_stamp;
+  set<int> acked_me;
+
+  // electing them
+  int     leader_acked;  // who i've acked
+  utime_t ack_stamp;     // and when
   
-  // handlers for election messages
+ public:
+  void start();   // start an electing me
+  void defer(int who);
+  void expire();  // timer goes off
+  void victory();
+   
+  void handle_propose(class MMonElectionPropose *m);
   void handle_ack(class MMonElectionAck *m);
-  void handle_collect(class MMonElectionCollect *m);
-  void handle_refresh(class MMonElectionRefresh *m);
-  void handle_status(class MMonElectionStatus *m);
+  void handle_victory(class MMonElectionVictory *m);
 
+  
  public:  
   Elector(Monitor *m, int w) : mon(m), whoami(w) {
     // initialize all those values!
     // ...
   }
 
-  // timer methods
-  void read_timer();
-  void trip_timer();
-  void refresh_timer();
-  
   void dispatch(Message *m);
-
 };
 
 
-inline bool operator>(const Elector::Epoch& l, const Elector::Epoch& r) {
-  if (l.s_num == r.s_num)
-    return (l.p_id > r.p_id);
-  else
-    return (l.s_num > r.s_num);
-}
-
-inline bool operator<(const Elector::Epoch& l, const Elector::Epoch& r) {
-  if (l.s_num == r.s_num)
-    return (l.p_id < r.p_id);
-  else
-    return (l.s_num < r.s_num);
-}
-
-inline bool operator==(const Elector::Epoch& l, const Elector::Epoch& r) {
-  return ((l.s_num == r.s_num) && (l.p_id > r.p_id));
-}
-
-  
-inline bool operator>(const Elector::State& l, const Elector::State& r) 
-{
-  if (l.epoch == r.epoch)
-    return (l.freshness > r.freshness);
-  else
-    return l.epoch > r.epoch;
-}
-inline bool operator<(const Elector::State& l, const Elector::State& r) 
-{
-  if (l.epoch == r.epoch)
-    return (l.freshness < r.freshness);
-  else
-    return l.epoch < r.epoch;
-}
-inline bool operator==(const Elector::State& l, const Elector::State& r) 
-{
-  return ( (l.epoch == r.epoch) && (l.freshness == r.freshness) );
-}
-
-
 #endif
index a92ab181159051aacf0ecc43b0bacf43856acb45..8261b52cc3fe56002a68716c91d0fae5d1bf7715 100644 (file)
@@ -66,7 +66,18 @@ void Monitor::init()
   
   // start ticker
   reset_tick();
-  
+
+  // call election?
+  if (monmap->num_mon > 1) {
+    assert(monmap->num_mon != 2); 
+    call_election();
+  } else {
+    // we're standalone.
+    set<int> q;
+    q.insert(whoami);
+    win_election(q);
+  }
+
   lock.Unlock();
 }
 
@@ -115,14 +126,35 @@ void Monitor::shutdown()
 
 void Monitor::call_election()
 {
+  if (monmap->num_mon == 1) return;
+
   dout(10) << "call_election" << endl;
   state = STATE_STARTING;
 
+  elector.start();
+
   osdmon->election_starting();
   //mdsmon->election_starting();
 }
 
+void Monitor::win_election(set<int>& active) 
+{
+  state = STATE_LEADER;
+  leader = whoami;
+  quorum = active;
+  dout(10) << "win_election, quorum is " << quorum << endl;
+
+  // init
+  osdmon->election_finished();
+  //mdsmon->election_finished();
+} 
 
+void Monitor::lose_election(int l) 
+{
+  state = STATE_PEON;
+  leader = l;
+  dout(10) << "lose_election, leader is mon" << leader << endl;
+}
 
 
 
@@ -172,10 +204,9 @@ void Monitor::dispatch(Message *m)
 
 
       // elector messages
+    case MSG_MON_ELECTION_PROPOSE:
     case MSG_MON_ELECTION_ACK:
-    case MSG_MON_ELECTION_STATUS:
-    case MSG_MON_ELECTION_COLLECT:
-    case MSG_MON_ELECTION_REFRESH:
+    case MSG_MON_ELECTION_VICTORY:
       elector.dispatch(m);
       break;
 
index a9c05709047175627abed1cf1cd244c45092ccf7..0b8f921720d5b1c1dff3d94ed8de13ca8393418c 100644 (file)
@@ -60,12 +60,12 @@ protected:
   epoch_t  mon_epoch;    // monitor epoch (election instance)
   set<int> quorum;       // current active set of monitors (if !starting)
 
-  void call_election();
+  //void call_election();
 
   // monitor state
-  const static int STATE_STARTING = 0;
-  const static int STATE_LEADER = 1;
-  const static int STATE_PEON =   2;
+  const static int STATE_STARTING = 0; // electing
+  const static int STATE_LEADER =   1;
+  const static int STATE_PEON =     2;
   int state;
 
   int leader;                    // current leader (to best of knowledge)
@@ -88,6 +88,14 @@ protected:
   friend class MDSMonitor;
   friend class ClientMonitor;
 
+  // initiate election
+  void call_election();
+
+  // end election (called by Elector)
+  void win_election(set<int>& q);
+  void lose_election(int l);
+
+
  public:
   Monitor(int w, Messenger *m, MonMap *mm) : 
     whoami(w), 
@@ -101,13 +109,9 @@ protected:
     leader(0),
     osdmon(0), mdsmon(0), clientmon(0)
   {
-    // hack leader, until election works.
-    if (whoami == 0)
-      state = STATE_LEADER;
-    else
-      state = STATE_PEON;
   }
 
+
   void init();
   void shutdown();
   void dispatch(Message *m);
index 7e0fe856a94dfd5d0098a7d0d3dcd50b04cf2f04..7a19fb910a67d8ee18a8d7f07627555e0a8a4f2e 100644 (file)
@@ -104,13 +104,6 @@ void OSDMonitor::init()
 
     // set up pending_inc
     pending_inc.epoch = osdmap.get_epoch()+1;
-
-  } else {
-    // FIXME. when elections work!
-    if (mon->is_leader()) {
-      create_initial();
-      issue_leases();
-    }
   }
 }
 
@@ -627,10 +620,18 @@ void OSDMonitor::election_starting()
 
 void OSDMonitor::election_finished()
 {
-  dout(10) << "election_starting" << endl;
+  dout(10) << "election_finished" << endl;
 
   state = STATE_INIT;
 
+  // map?
+  if (osdmap.get_epoch() == 0 &&
+      mon->is_leader()) {
+    create_initial();
+  }
+
+
+
   if (mon->is_leader()) {
     // leader.
     if (mon->monmap->num_mon == 1) {
@@ -640,8 +641,8 @@ void OSDMonitor::election_finished()
   } 
   else if (mon->is_peon()) {
     // peon. send info
-    messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
-                           mon->monmap->get_inst(mon->leader));
+    //messenger->send_message(new MMonOSDMapInfo(osdmap.epoch, osdmap.mon_epoch),
+    //                     mon->monmap->get_inst(mon->leader));
   }
   
 }
index 2e17e7f34c45f3d29c26c0d54bbd3993c492876f..0e2381a1823bebff5ff6ed4c73f6f50c95228199 100644 (file)
@@ -20,9 +20,8 @@ using namespace std;
 */
 
 #include "messages/MMonElectionAck.h"
-#include "messages/MMonElectionCollect.h"
-#include "messages/MMonElectionRefresh.h"
-#include "messages/MMonElectionStatus.h"
+#include "messages/MMonElectionPropose.h"
+#include "messages/MMonElectionVictory.h"
 
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
@@ -157,17 +156,14 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
     break;
        */
 
-  case MSG_MON_ELECTION_ACK:
-    m = new MMonElectionAck();
-    break;
-  case MSG_MON_ELECTION_COLLECT:
-    m = new MMonElectionCollect();
+  case MSG_MON_ELECTION_PROPOSE:
+    m = new MMonElectionPropose;
     break;
-  case MSG_MON_ELECTION_REFRESH:
-    m = new MMonElectionRefresh();
+  case MSG_MON_ELECTION_ACK:
+    m = new MMonElectionAck;
     break;
-  case MSG_MON_ELECTION_STATUS:
-    m = new MMonElectionStatus();
+  case MSG_MON_ELECTION_VICTORY:
+    m = new MMonElectionVictory;
     break;
 
   case MSG_PING:
index adae304b5887066ebd46085587263d187035898a..bb5cd1bc6dd8d9656daed77a6943f476be25a789 100644 (file)
@@ -37,9 +37,8 @@
 
 
 #define MSG_MON_ELECTION_ACK       15
-#define MSG_MON_ELECTION_COLLECT   16
-#define MSG_MON_ELECTION_REFRESH   17
-#define MSG_MON_ELECTION_STATUS    18
+#define MSG_MON_ELECTION_PROPOSE   16
+#define MSG_MON_ELECTION_VICTORY   17
 
 #define MSG_MON_OSDMAP_INFO            20
 #define MSG_MON_OSDMAP_LEASE           21
index f5d0195ae3e6d8ef45b10d4c2e9aac92d4057fb2..c9cef7239e4cb3386a31c62fb2f38bf6ab66c7b9 100644 (file)
@@ -17,6 +17,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include <signal.h>
 
 #include "config.h"
 
@@ -44,6 +45,23 @@ Rank rank;
  * Accepter
  */
 
+void simplemessenger_sigint(int r)
+{
+  rank.sigint();
+}
+
+void Rank::sigint()
+{
+  lock.Lock();
+  derr(0) << "got control-c, exiting" << endl;
+  ::close(accepter.listen_sd);
+  exit(-1);
+  lock.Unlock();
+}
+
+
+
+
 int Rank::Accepter::start()
 {
   // bind to a socket
@@ -62,10 +80,12 @@ int Rank::Accepter::start()
     derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl;
   assert(rc >= 0);
 
+  // what port did we get?
   socklen_t llen = sizeof(rank.listen_addr);
   getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
   
-  int myport = rank.listen_addr.sin_port;
+  int myport = ntohs(rank.listen_addr.sin_port);
+  dout(10) << "accepter.start bound to port " << myport << endl;
 
   // listen!
   rc = ::listen(listen_sd, 1000);
@@ -93,7 +113,7 @@ int Rank::Accepter::start()
     memcpy((char *) &rank.listen_addr.sin_addr.s_addr, 
           myhostname->h_addr_list[0], 
           myhostname->h_length);
-    rank.listen_addr.sin_port = myport;
+    rank.listen_addr.sin_port = htons(myport);
     rank.my_addr.set_addr(rank.listen_addr);
   }
   
@@ -102,6 +122,9 @@ int Rank::Accepter::start()
   
   dout(10) << "accepter.start my addr is " << rank.my_addr << endl;
 
+  // set up signal handler
+  signal(SIGINT, simplemessenger_sigint);
+
   // start thread
   create();
 
@@ -213,9 +236,13 @@ int Rank::Pipe::connect()
   tcpaddr_t tcpaddr;
   peer_addr.make_addr(tcpaddr);
   rc = ::connect(sd, (sockaddr*)&tcpaddr, sizeof(myAddr));
-  if (rc < 0) return rc;
+  if (rc < 0) {
+    dout(10) << "connect error " << peer_addr
+            << ", " << errno << ": " << strerror(errno) << endl;
+    return rc;
+  }
 
-  // identify peer
+  // identify peer ..... FIXME
   entity_addr_t paddr;
   rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
   if (peer_addr != paddr) {
@@ -377,7 +404,9 @@ void Rank::Pipe::writer()
   if (!server) {
     int rc = connect();
     if (rc < 0) {
-      derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error connecting" << endl;
+      derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error connecting, " 
+             << errno << ": " << strerror(errno)
+             << endl;
       done = true;
       list<Message*> out;
       fail(out);
@@ -413,7 +442,9 @@ void Rank::Pipe::writer()
         
         if (write_message(m) < 0) {
           // failed!
-          derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() << endl;
+          derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest()
+                 << ", " << errno << ": " << strerror(errno)
+                 << endl;
           out.push_front(m);
           fail(out);
           done = true;
index 9e2ba28131e8f99d6484ef7e9733732f437a25e7..fb8e3dcee9fe13181dac85ecc096aaeb4beaecf7 100644 (file)
@@ -39,7 +39,10 @@ using namespace __gnu_cxx;
 /* Rank - per-process
  */
 class Rank {
+public:
+  void sigint();
+
+private:
   class EntityMessenger;
   class Pipe;
 
@@ -60,6 +63,8 @@ class Rank {
     }
     int start();
   } accepter;
+
+  void sigint(int r);
   
 
   // pipe
index f38388d456a8cadbc26c48557a2a7cf38f3755a2..65043cda8e2ace2607f663c0f5044f4b94f18d89 100644 (file)
@@ -18,7 +18,7 @@ inline ostream& operator<<(ostream& out, const tcpaddr_t &a)
       << (unsigned)addr[1] << "."
       << (unsigned)addr[2] << "."
       << (unsigned)addr[3] << ":"
-      << (int)a.sin_port;
+      << ntohs(a.sin_port);
   return out;
 }
 
index 8e21a5b871e0be66d7be98224d491c6ef7d1995f..038688c5cdfd536ee48621d531f8054abe3b6e17 100644 (file)
 #include <sys/stat.h>
 #include <fcntl.h>
 
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
 
 int myrand() 
 {
index c2f573a81038fd5fa2db2dbc8bf011a59af45b7e..36dc01127107e3fa87067c6872f3003103a44c0a 100644 (file)
 //#include <sys/xattr.h>
 //#include <sys/vfs.h>
 
+#ifdef DARWIN
+#include <sys/param.h>
+#include <sys/mount.h>
+#endif // DARWIN
+
 #include "config.h"
 #undef dout
 #define  dout(l)    if (l<=g_conf.debug) cout << "osd" << whoami << ".fakestore "
index e7b992d081b0b8a2b17bd565181728efd6097788..70bc92dd653f740fbcef97a8a428c29f7ea23cb0 100644 (file)
 #include "include/Distribution.h"
 
 #include <sys/stat.h>
+
+#ifdef DARWIN
+#include <sys/statvfs.h>
+#else
 #include <sys/vfs.h>    /* or <sys/statfs.h> */
+#endif /* DARWIN */
 
 #include <list>
 using namespace std;
@@ -80,6 +85,8 @@ public:
     static const int OP_RMATTR =       16;  // oid, attrname
     static const int OP_CLONE =        17;  // oid, newoid
 
+    static const int OP_TRIMCACHE =    18;  // oid, offset, len
+
     static const int OP_MKCOLL =       20;  // cid
     static const int OP_RMCOLL =       21;  // cid
     static const int OP_COLL_ADD =     22;  // cid, oid
@@ -138,6 +145,13 @@ public:
       lengths.push_back(len);
       bls.push_back(bl);
     }
+    void trim_from_cache(object_t oid, off_t off, size_t len) {
+      int op = OP_TRIMCACHE;
+      ops.push_back(op);
+      oids.push_back(oid);
+      offsets.push_back(off);
+      lengths.push_back(len);
+    }
     void truncate(object_t oid, off_t off) {
       int op = OP_TRUNCATE;
       ops.push_back(op);
@@ -272,6 +286,15 @@ public:
         }
         break;
 
+      case Transaction::OP_TRIMCACHE:
+        {
+          object_t oid = t.oids.front(); t.oids.pop_front();
+          off_t offset = t.offsets.front(); t.offsets.pop_front();
+          size_t len = t.lengths.front(); t.lengths.pop_front();
+          trim_from_cache(oid, offset, len);
+        }
+        break;
+
       case Transaction::OP_TRUNCATE:
         {
           object_t oid = t.oids.front(); t.oids.pop_front();
@@ -424,6 +447,8 @@ public:
                     off_t offset, size_t len,
                     bufferlist& bl, 
                     Context *onsafe) = 0;//{ return -1; }
+  virtual void trim_from_cache(object_t oid, 
+                              off_t offset, size_t len) { }
 
   virtual int setattr(object_t oid, const char *name,
                       const void *value, size_t size,