]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 14 Dec 2005 04:00:27 +0000 (04:00 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 14 Dec 2005 04:00:27 +0000 (04:00 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@520 29311d96-e01e-0410-9327-a35deaab8ce9

15 files changed:
ceph/Makefile
ceph/config.cc
ceph/ebofs/Allocator.cc
ceph/ebofs/BlockDevice.cc
ceph/ebofs/BlockDevice.h
ceph/ebofs/BufferCache.cc
ceph/ebofs/BufferCache.h
ceph/ebofs/Ebofs.cc
ceph/ebofs/Ebofs.h
ceph/ebofs/Onode.h
ceph/ebofs/Table.h
ceph/ebofs/mkfs.ebofs.cc
ceph/ebofs/nodes.h
ceph/ebofs/types.h
ceph/include/interval_set.h

index 95bb09848c5cf9348e3796a94514c6cf52396494..6ae152ac31a5c1f949c9f4e2da64bf3819c3e056 100644 (file)
@@ -129,9 +129,15 @@ obfstest: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.cc
        ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
 
 # ebofs
+ebofs: mkfs.ebofs test.ebofs
+
 mkfs.ebofs: ebofs/mkfs.ebofs.cc config.cc common/Clock.o ebofs/ebo.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
+test.ebofs: ebofs/test.ebofs.cc config.cc common/Clock.o ebofs/ebo.o
+       ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
+
+
 
 
 testmpi: test/testmpi.cc msg/MPIMessenger.cc config.o common/Timer.o common/clock.o msg/Messenger.o msg/Dispatcher.o msg/error.o
index ea7c8ed3550179af4b3199b8e7a7f464450199ee..8b818095d754ea4dfdc7301f6efd355331a562ca 100644 (file)
@@ -50,7 +50,7 @@ md_config_t g_conf = {
   fake_osdmap_expand: 0,
   fake_osd_sync: true,
 
-  debug: 100,
+  debug: 10,
   debug_mds_balancer: 1,
   debug_mds_log: 1,
   debug_buffer: 0,
index dacde19a5162ef8dc1db6fcb6b40559d93d71e2d..69bd76466dae7afc7621f7b36bc92ade18f3cda1 100644 (file)
@@ -4,11 +4,12 @@
 
 
 #undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "allocator." 
+#define dout(x) if (x <= g_conf.debug) cout << "ebofs.allocator." 
 
 
 void Allocator::dump_freelist()
 {
+  if (0)
   for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
        dout(20) << "dump bucket " << b << endl;
        if (fs->free_tab[b]->get_num_keys() > 0) {
@@ -59,9 +60,11 @@ int Allocator::find(Extent& ex, int bucket, block_t num, block_t near)
 
 int Allocator::allocate(Extent& ex, block_t num, block_t near)
 {
+  /*
   if (!near) {
        near = num/2;  // this is totally wrong and stupid.
   }
+  */
 
   int bucket;
 
@@ -109,7 +112,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
                }
          }
 
-         dout(1) << "allocator.alloc " << ex << " near " << near << endl;
+         dout(10) << "allocate " << ex << " near " << near << endl;
          dump_freelist();
          return num;
        }
@@ -124,7 +127,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
          
          fs->free_tab[bucket]->remove(ex.start);
          fs->free_blocks -= ex.length;
-         dout(1) << "allocator.alloc partial " << ex << " near " << near << endl;
+         dout(10) << "allocate partial " << ex << " near " << near << endl;
          dump_freelist();
          return ex.length;
        }       
@@ -137,7 +140,7 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
 
 int Allocator::release(Extent& ex)
 {
-  dout(1) << "release " << ex << " (into limbo)" << endl;
+  dout(10) << "release " << ex << " (into limbo)" << endl;
   limbo.insert(ex.start, ex.length);
   return 0;
 }
@@ -157,7 +160,7 @@ int Allocator::release_now(Extent& ex)
 {
   Extent newex = ex;
   
-  dout(1) << "release " << ex << endl;
+  dout(10) << "release " << ex << endl;
 
   // one after us?
   for (int b=0; b<EBOFS_NUM_FREE_BUCKETS; b++) {
index 1906f2104fbab21cb6c46109e4c45014b0b76b44..7f5dc4c6a74134c6c405f7149b6b6a285f0eb549 100644 (file)
@@ -3,13 +3,42 @@
 
 #include "config.h"
 
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+#include <iostream>
+#include <cassert>
+#include <errno.h>
+
+#include <sys/uio.h>
+
+#include <sys/ioctl.h>
+#include <linux/fs.h>
+
 #undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "blockdevice."
+#define dout(x) if (x <= g_conf.debug) cout << "dev."
+
+
+block_t BlockDevice::get_num_blocks() 
+{
+  if (!num_blocks) {
+       // stat
+       struct stat st;
+       assert(fd > 0);
+       int r = ::fstat(fd, &st);
+       assert(r == 0);
+       num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE;
+  }
+  return num_blocks;
+}
 
 
 int BlockDevice::io_thread_entry()
 {
-  dout(1) << "io_thread start" << endl;
+  dout(10) << "io_thread start" << endl;
   
   // elevator nonsense!
   bool dir_forward = true;
@@ -30,36 +59,61 @@ int BlockDevice::io_thread_entry()
                  multimap<block_t,biovec*>::iterator i = io_queue.lower_bound(pos);
                  if (i == io_queue.end()) break;
                  
+                 // merge contiguous ops
+                 list<biovec*> biols;
+                 char type = i->second->type;
                  pos = i->first;
-                 biovec *bio = i->second;
+                 while (pos == i->first && 
+                                type == i->second->type) {
+                       dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl;
+                       biovec *bio = i->second;
+                       biols.push_back(bio);
+                       pos += bio->length;
+
+                       multimap<block_t,biovec*>::iterator prev = i;
+                       i++;
+                       io_queue_map.erase(bio);
+                       io_queue.erase(prev);
+
+                       if (i == io_queue.end()) break;
+                 }
 
-                 dout(20) << "io_thread dequeue io at " << pos << endl;
-                 io_queue_map.erase(i->second);
-                 io_queue.erase(i);
-                 
                  lock.Unlock();
-                 do_io(bio);
+                 do_io(biols);
                  lock.Lock();
                }
          } else {
                // reverse sweep
                dout(20) << "io_thread reverse sweep" << endl;
                pos = get_num_blocks();
+
                while (1) {
                  // find i > pos
                  multimap<block_t,biovec*>::iterator i = io_queue.upper_bound(pos);
                  if (i == io_queue.begin()) break;
                  i--;  // and back down one (to get i <= pos)
-
-                 pos = i->first;
-                 biovec *bio = i->second;
-
-                 dout(20) << "io_thread dequeue io at " << pos << endl;
-                 io_queue_map.erase(i->second);
-                 io_queue.erase(i);
                  
+                 // merge continguous ops
+                 list<biovec*> biols;   
+                 char type = i->second->type;
+                 pos = i->first;
+                 while (pos == i->first && type == i->second->type) {
+                       dout(20) << "io_thread dequeue io at " << pos << " " << (void*)i->second << endl;
+                       biovec *bio = i->second;
+                       biols.push_back(bio);
+                       pos += bio->length;
+
+                       multimap<block_t,biovec*>::iterator prev = i;
+                       bool begin = (i == io_queue.begin());
+                       if (!begin) i--;
+                       io_queue_map.erase(bio);
+                       io_queue.erase(prev);
+
+                       if (begin) break;
+                 }
+                       
                  lock.Unlock();
-                 do_io(bio);
+                 do_io(biols);
                  lock.Lock();
                }
          }
@@ -74,30 +128,53 @@ int BlockDevice::io_thread_entry()
   }
   lock.Unlock();
 
-  dout(1) << "io_thread finish" << endl;
+  dout(10) << "io_thread finish" << endl;
   return 0;
 }
 
-void BlockDevice::do_io(biovec *bio)
+void BlockDevice::do_io(list<biovec*>& biols)
 {
   int r;
+  assert(!biols.empty());
+
+  // get full range, type, bl
+  bufferlist bl;
+  bl.claim(biols.front()->bl);
+  block_t start = biols.front()->start;
+  block_t length = biols.front()->length;
+  char type = biols.front()->type;
+
+  list<biovec*>::iterator p = biols.begin();
+  for (p++; p != biols.end(); p++) {
+       length += (*p)->length;
+       bl.claim_append((*p)->bl);
+  }
 
-  if (bio->type == biovec::IO_WRITE) {
-       r = _write(bio->start, bio->length, bio->bl);
-  } else if (bio->type == biovec::IO_READ) {
-       r = _read(bio->start, bio->length, bio->bl);
+  // do it
+  dout(20) << "do_io start " << (type==biovec::IO_WRITE?"write":"read") 
+                  << " " << start << "~" << length << endl;
+  if (type == biovec::IO_WRITE) {
+       r = _write(start, length, bl);
+  } else if (type == biovec::IO_READ) {
+       r = _read(start, length, bl);
   } else assert(0);
-
-  dout(20) << "do_io finish " << (void*)bio << " " << bio->start << "~" << bio->length << " " << (void*)bio->cond << " " << (void*)bio->context << endl;
-
-  if (bio->cond) {
-       bio->cond->Signal();
-       bio->rval = r;
-  }
-  else if (bio->context) {
-       bio->context->finish((int)bio);
-       delete bio->context;
-       delete bio;
+  dout(20) << "do_io finish " << (type==biovec::IO_WRITE?"write":"read") 
+                  << " " << start << "~" << length << endl;
+  
+  // finish
+  for (p = biols.begin(); p != biols.end(); p++) {
+       biovec *bio = *p;
+       if (bio->cond) {
+         //lock.Lock();
+         bio->rval = r;
+         bio->cond->Signal();
+         //lock.Unlock();
+       }
+       else if (bio->context) {
+         bio->context->finish((int)bio);
+         delete bio->context;
+         delete bio;
+       }
   }
 }
 
@@ -107,12 +184,12 @@ void BlockDevice::do_io(biovec *bio)
 void BlockDevice::_submit_io(biovec *b) 
 {
   // NOTE: lock must be held
-  dout(1) << "_submit_io " << (void*)b << endl;
+  dout(15) << "_submit_io " << (void*)b << endl;
   
   // wake up thread?
   if (io_queue.empty()) io_wakeup.Signal();
-  
-  // queue
+
+  // queue anew
   io_queue.insert(pair<block_t,biovec*>(b->start, b));
   io_queue_map[b] = b->start;
 }
@@ -121,11 +198,11 @@ int BlockDevice::_cancel_io(biovec *bio)
 {
   // NOTE: lock must be held
   if (io_queue_map.count(bio) == 0) {
-       dout(1) << "_cancel_io " << (void*)bio << " FAILED" << endl;
+       dout(15) << "_cancel_io " << (void*)bio << " FAILED" << endl;
        return -1;
   }
   
-  dout(1) << "_cancel_io " << (void*)bio << endl;
+  dout(15) << "_cancel_io " << (void*)bio << endl;
 
   block_t b = io_queue_map[bio];
   multimap<block_t,biovec*>::iterator p = io_queue.lower_bound(b);
@@ -150,23 +227,27 @@ int BlockDevice::_read(block_t bno, unsigned num, bufferlist& bl)
   off_t actual = lseek(fd, offset, SEEK_SET);
   assert(actual == offset);
   
-  off_t len = num*EBOFS_BLOCK_SIZE;
-  assert((int)bl.length() >= len);
-  
+  size_t len = num*EBOFS_BLOCK_SIZE;
+  assert(bl.length() >= len);
+
+  struct iovec iov[ bl.buffers().size() ];
+  int n = 0;
+  size_t left = len;
   for (list<bufferptr>::iterator i = bl.buffers().begin();
           i != bl.buffers().end();
           i++) {
        assert(i->length() % EBOFS_BLOCK_SIZE == 0);
        
-       int blen = i->length();
-       if (blen > len) blen = len;
-       
-       int got = ::read(fd, i->c_str(), blen);
-       assert(got <= blen);
-       
-       len -= blen;
-       if (len == 0) break;
+       iov[n].iov_base = i->c_str();
+       iov[n].iov_len = MIN(left, i->length());
+
+       left -= iov[n].iov_len;
+       n++;
+       if (left == 0) break;
   }
+
+  int got = ::readv(fd, iov, n);
+  assert(got <= (int)len);
   
   return 0;
 }
@@ -182,25 +263,33 @@ int BlockDevice::_write(unsigned bno, unsigned num, bufferlist& bl)
   assert(actual == offset);
   
   // write buffers
-  off_t len = num*EBOFS_BLOCK_SIZE;
-  
+  size_t len = num*EBOFS_BLOCK_SIZE;
+
+  struct iovec iov[ bl.buffers().size() ];
+
+  int n = 0;
+  size_t left = len;
   for (list<bufferptr>::iterator i = bl.buffers().begin();
           i != bl.buffers().end();
           i++) {
        assert(i->length() % EBOFS_BLOCK_SIZE == 0);
+
+       iov[n].iov_base = i->c_str();
+       iov[n].iov_len = MIN(left, i->length());
        
-       off_t left = i->length();
-       if (left > len) left = len;
-       int r = ::write(fd, i->c_str(), left);
-       dout(1) << "write " << fd << " " << (void*)i->c_str() << " " << left << endl;
-       if (r < 0) {
-         dout(1) << "couldn't write bno " << bno << " num " << num << " (" << left << " bytes) p=" << (void*)i->c_str() << " r=" << r << " errno " << errno << " " << strerror(errno) << endl;
-       } else {
-         assert(r == left);
-       }
-       
-       len -= left;
-       if (len == 0) break;
+       left -= iov[n].iov_len;
+       n++;
+       if (left == 0) break;
+  }
+
+  int r = ::writev(fd, iov, n);
+  
+  if (r < 0) {
+       dout(1) << "couldn't write bno " << bno << " num " << num 
+                       << " (" << num << " bytes) r=" << r 
+                       << " errno " << errno << " " << strerror(errno) << endl;
+  } else {
+       assert(r == (int)len);
   }
   
   return 0;
@@ -247,15 +336,18 @@ int BlockDevice::close()
   assert(fd>0);
 
   // shut down io thread
-  dout(1) << "close stopping io thread" << endl;
+  dout(10) << "close stopping io thread" << endl;
   lock.Lock();
   io_stop = true;
   io_wakeup.Signal();
   lock.Unlock();       
   io_thread.join();
 
-  dout(1) << "close closing" << endl;
-  return ::close(fd);
+  dout(1) << "close" << endl;
+  ::close(fd);
+  fd = 0;
+
+  return 0;
 }
 
 int BlockDevice::cancel_io(ioh_t ioh) 
@@ -268,7 +360,7 @@ int BlockDevice::cancel_io(ioh_t ioh)
   
   // FIXME?
   if (r == 0 && pbio->context) {
-       //pbio->context->finish(0);                      ******HELP!!!
+       //pbio->context->finish(0);
        delete pbio->context;
        delete pbio;
   }
index 7513069b8b8f49302478bb052773dc22d1e3db01..531799402f0a2b7701247f182b745f2cfeeedc4b 100644 (file)
@@ -9,18 +9,6 @@
 
 #include "types.h"
 
-#include <unistd.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <sys/file.h>
-#include <iostream>
-#include <cassert>
-#include <errno.h>
-
-#include <sys/ioctl.h>
-#include <linux/fs.h>
 
 typedef void *ioh_t;
 
@@ -58,7 +46,7 @@ class BlockDevice {
   
   void _submit_io(biovec *b);
   int _cancel_io(biovec *bio);
-  void do_io(biovec *b);
+  void do_io(list<biovec*>& biols);
    
 
   // io_thread
@@ -89,17 +77,7 @@ class BlockDevice {
   }
 
   // get size in blocks
-  block_t get_num_blocks() {
-       if (!num_blocks) {
-         // stat
-         struct stat st;
-         assert(fd > 0);
-         int r = ::fstat(fd, &st);
-         assert(r == 0);
-         num_blocks = st.st_size / (block_t)EBOFS_BLOCK_SIZE;
-       }
-       return num_blocks;
-  }
+  block_t get_num_blocks();
 
   int open();
   int close();
index 19d24daa2c28253f5dc5c4b9b4d41d9fe96f7d16..3f15e6a0a00380ab5e00d4a1cfc37b8295dfffdc 100644 (file)
@@ -1,17 +1,52 @@
 
 #include "BufferCache.h"
+#include "Onode.h"
 
-#undef dout
-#define dout(x)  if (x <= g_conf.debug) cout << "bc."
 
+/*********** BufferHead **************/
 
 
-/*********** BufferHead **************/
+#undef dout
+#define dout(x)  if (x <= g_conf.debug) cout << "ebofs.bh."
 
 
 
+void BufferHead::finish_partials()
+{
+  dout(10) << "finish_partials on " << *this << endl;
+
+  // submit partial writes
+  for (map<block_t, PartialWrite>::iterator p = partial_write.begin();
+          p != partial_write.end();
+          p++) {
+       dout(10) << "finish_partials submitting queued write to " << p->second.block << endl;
 
+       // copy raw buffer; this may be a past write
+       bufferlist bl;
+       bl.push_back( oc->bc->bufferpool.alloc(EBOFS_BLOCK_SIZE) );
+       bl.copy_in(0, EBOFS_BLOCK_SIZE, data);
+       apply_partial( bl, p->second.partial );
+       
+       oc->bc->dev.write( p->second.block, 1, bl,
+                                          new C_OC_PartialTxFinish( oc, p->second.epoch ));
+  }
+  partial_write.clear();
+}
 
+void BufferHead::queue_partial_write(block_t b)
+{
+  if (partial_write.count(b)) {
+       // overwrite previous partial write
+       // note that it better be same epoch if it's the same block!!
+       assert( partial_write[b].epoch == epoch_modified );
+       partial_write.erase(b);
+  } else {
+       oc->bc->inc_unflushed( epoch_modified );
+  }
+  partial_write[ b ].partial = partial;
+  partial_write[ b ].block = b;
+  partial_write[ b ].epoch = epoch_modified;
+}
 
 
 
 /************ ObjectCache **************/
 
 
+#undef dout
+#define dout(x)  if (x <= g_conf.debug) cout << "ebofs.oc."
+
+
 
 void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length)
 {
   list<Context*> waiters;
 
-  bc->lock.Lock();
+  bc->ebofs_lock.Lock();
 
   dout(10) << "rx_finish " << start << "~" << length << endl;
   for (map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
           p != data.end(); 
           p++) {
+       dout(10) << "rx_finish ?" << *p->second << endl;
+
        // past?
        if (p->first >= start+length) break;
+       if (p->second->end() > start+length) break;  // past
+       
+       assert(p->first >= start);
+       assert(p->second->end() <= start+length);
+
+       dout(10) << "rx_finish !" << *p->second << endl;
+
+       if (p->second->rx_ioh == ioh)
+         p->second->rx_ioh = 0;
+
+       if (p->second->is_partial_writes())
+         p->second->finish_partials();
        
        if (p->second->is_rx()) {
-         if (p->second->get_version() == 0) {
-               assert(p->second->end() <= start+length);
-               dout(10) << "rx_finish  rx -> clean on " << *p->second << endl;
-               bc->mark_clean(p->second);
-         }
+         assert(p->second->get_version() == 0);
+         assert(p->second->end() <= start+length);
+         dout(10) << "rx_finish  rx -> clean on " << *p->second << endl;
+         bc->mark_clean(p->second);
        }
        else if (p->second->is_partial()) {
-         dout(10) << "rx_finish  partial -> dirty on " << *p->second << endl;    
+         dout(10) << "rx_finish  partial -> clean on " << *p->second << endl;    
          p->second->apply_partial();
-         bc->mark_dirty(p->second);
+         bc->mark_clean(p->second);
        }
        else {
-         dout(10) << "rx_finish  ignoring " << *p->second << endl;
+         dout(10) << "rx_finish  ignoring status on (dirty|tx) " << *p->second << endl;
+         assert(p->second->is_dirty() || p->second->is_tx());
        }
-       
-       if (p->second->ioh == ioh) p->second->ioh = 0;
 
        // trigger waiters
        waiters.splice(waiters.begin(), p->second->waitfor_read);
@@ -59,27 +110,30 @@ void ObjectCache::rx_finish(ioh_t ioh, block_t start, block_t length)
 
   finish_contexts(waiters);
 
-  bc->lock.Unlock();
+  bc->ebofs_lock.Unlock();
 }
 
 
-void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t version)
+void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, 
+                                                       version_t version, version_t epoch)
 {
   list<Context*> waiters;
-
-  bc->lock.Lock();
-
+  
+  bc->ebofs_lock.Lock();
+  
   dout(10) << "tx_finish " << start << "~" << length << " v" << version << endl;
   for (map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
           p != data.end(); 
           p++) {
        dout(20) << "tx_finish ?bh " << *p->second << endl;
        assert(p->first == p->second->start());
-       //dout(10) << "tx_finish  bh " << *p->second << endl;
 
        // past?
        if (p->first >= start+length) break;
 
+       if (p->second->tx_ioh == ioh)
+         p->second->tx_ioh = 0;
+
        if (!p->second->is_tx()) {
          dout(10) << "tx_finish  bh not marked tx, skipping" << endl;
          continue;
@@ -88,35 +142,56 @@ void ObjectCache::tx_finish(ioh_t ioh, block_t start, block_t length, version_t
        assert(p->second->is_tx());
        assert(p->second->end() <= start+length);
        
-       dout(10) << "tx_finish  tx -> clean on " << *p->second << endl;
-       p->second->set_last_flushed(version);
-       bc->mark_clean(p->second);
+       if (version == p->second->version) {
+         dout(10) << "tx_finish  tx -> clean on " << *p->second << endl;
+         p->second->set_last_flushed(version);
+         bc->mark_clean(p->second);
 
-       if (p->second->ioh == ioh) {
-         p->second->ioh = 0;
-       }
-       else if (p->second->shadow_ioh == ioh) {
-         p->second->shadow_ioh = 0;
+         // trigger waiters
+         waiters.splice(waiters.begin(), p->second->waitfor_flush);
+       } else {
+         dout(10) << "tx_finish  leaving tx, " << p->second->version << " > " << version 
+                          << " on " << *p->second << endl;
        }
-
-       // trigger waiters
-       waiters.splice(waiters.begin(), p->second->waitfor_flush);
   }    
 
   finish_contexts(waiters);
 
-  bc->lock.Unlock();
+  // update unflushed counter
+  assert(bc->get_unflushed(epoch) > 0);
+  bc->dec_unflushed(epoch);
+
+  bc->ebofs_lock.Unlock();
+}
+
+void ObjectCache::partial_tx_finish(version_t epoch) 
+{
+  bc->ebofs_lock.Lock();
+
+  dout(10) << "partial_tx_finish in epoch " << epoch << endl;
+
+  // update unflushed counter
+  assert(bc->get_unflushed(epoch) > 0);
+  bc->dec_unflushed(epoch);
+
+  bc->ebofs_lock.Unlock();
 }
 
 
-int ObjectCache::map_read(block_t start, block_t len, 
+/*
+ * map a range of blocks into buffer_heads.
+ * - create missing buffer_heads as necessary.
+ *  - fragment along disk extent boundaries
+ */
+
+int ObjectCache::map_read(Onode *on,
+                                                 block_t start, block_t len, 
                                                  map<block_t, BufferHead*>& hits,
                                                  map<block_t, BufferHead*>& missing,
                                                  map<block_t, BufferHead*>& rx,
                                                  map<block_t, BufferHead*>& partial) {
   
   map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
-  // p->first >= start
 
   block_t cur = start;
   block_t left = len;
@@ -132,12 +207,17 @@ int ObjectCache::map_read(block_t start, block_t len,
        // at end?
        if (p == data.end()) {
          // rest is a miss.
-         BufferHead *n = new BufferHead(this);
-         bc->add_bh(n);
-         n->set_start( cur );
-         n->set_length( left );
-         data[cur] = n;
-         missing[cur] = n;
+         vector<Extent> exv;
+         on->map_extents(cur, left, exv);          // we might consider some prefetch here.
+         for (unsigned i=0; i<exv.size(); i++) {
+               BufferHead *n = new BufferHead(this);
+               bc->add_bh(n);
+               n->set_start( cur );
+               n->set_length( exv[i].length );
+               data[cur] = n;
+               missing[cur] = n;
+               cur += exv[i].length;
+         }
          break;
        }
        
@@ -172,18 +252,19 @@ int ObjectCache::map_read(block_t start, block_t len,
        } else if (p->first > cur) {
          // gap.. miss
          block_t next = p->first;
-         BufferHead *n = new BufferHead(this);
-         bc->add_bh(n);
-         n->set_start( cur );
-         if (next - cur < left)
-               n->set_length( next - cur );
-         else
-               n->set_length( left );
-         data[cur] = n;
-         missing[cur] = n;
-         
-         cur += n->length();
-         left -= n->length();
+         vector<Extent> exv;
+         on->map_extents(cur, MIN(next-cur, left), exv);   // we might consider some prefetch here
+
+         for (unsigned i=0; i<exv.size(); i++) {
+               BufferHead *n = new BufferHead(this);
+               bc->add_bh(n);
+               n->set_start( cur );
+               n->set_length( exv[i].length );
+               data[cur] = n;
+               missing[cur] = n;
+               cur += n->length();
+               left -= n->length();
+         }
          continue;    // more?
        }
        else 
@@ -198,9 +279,16 @@ int ObjectCache::map_read(block_t start, block_t len,
  * map a range of pages on an object's buffer cache.
  *
  * - break up bufferheads that don't fall completely within the range
+ * - cancel rx ops we obsolete.
+ *   - resubmit rx ops if we split bufferheads
+ *
+ * - leave potentially obsoleted tx ops alone (for now)
+ * - don't worry about disk extent boundaries (yet)
  */
-int ObjectCache::map_write(block_t start, block_t len,
-                                                  map<block_t, BufferHead*>& hits) 
+int ObjectCache::map_write(Onode *on, 
+                                                  block_t start, block_t len,
+                                                  interval_set<block_t>& alloc,
+                                                  map<block_t, BufferHead*>& hits)
 {
   map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
   // p->first >= start
@@ -216,52 +304,117 @@ int ObjectCache::map_write(block_t start, block_t len,
   }
 
   for (; left > 0; p++) {
+       // max for this bh (bc of (re)alloc on disk)
+       block_t max = left;
+       bool newalloc = false;
+
+       // based on alloc/no-alloc boundary ...
+       if (alloc.contains(cur, left)) {
+         if (alloc.contains(cur)) {
+               block_t ends = alloc.end_after(cur);
+               max = MIN(left, ends-cur);
+               newalloc = true;
+         } else {
+               if (alloc.starts_after(cur)) {
+                 block_t st = alloc.start_after(cur);
+                 max = MIN(left, st-cur);
+               } 
+         }
+       } 
+
+       // based on disk extent boundary ...
+       vector<Extent> exv;
+       on->map_extents(cur, max, exv);
+       if (exv.size() > 1) 
+         max = exv[0].length;
+
+       if (newalloc) {
+         dout(10) << "map_write " << cur << "~" << max << " is new alloc on disk" << endl;
+       } else {
+         dout(10) << "map_write " << cur << "~" << max << " keeps old alloc on disk" << endl;
+       }
+       
        // at end?
        if (p == data.end()) {
          BufferHead *n = new BufferHead(this);
          bc->add_bh(n);
          n->set_start( cur );
-         n->set_length( left );
+         n->set_length( max );
          data[cur] = n;
          hits[cur] = n;
          break;
        }
-       
+
        if (p->first <= cur) {
-         // have it (or part of it)
-         BufferHead *e = p->second;
-         
-         if (p->first == cur && p->second->length() <= left) {
-               // whole bufferhead, piece of cake.             
-         } else {
-               if (e->is_clean()) {
-                 // we'll need to cut the buffer!  :(
-                 if (p->first == cur && p->second->length() > left) {
-                       // we want left bit (one splice)
-                       bc->split(e, cur+left);
+         BufferHead *bh = p->second;
+         dout(10) << "map_write bh " << *bh << " intersected" << endl;
+
+         if (p->first < cur) {
+               if (cur+max >= p->first+p->second->length()) {
+                 // we want right bit (one splice)
+                 if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+                       BufferHead *right = bc->split(bh, cur);
+                       bc->bh_read(on, bh);          // reread left bit
+                       bh = right;
+                 } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+                       BufferHead *right = bc->split(bh, cur);
+                       bc->bh_write(on, bh);          // reread left bit
+                       bh = right;
+                 } else {
+                       bh = bc->split(bh, cur);   // just split it
                  }
-                 else if (p->first < cur && cur+left >= p->first+p->second->length()) {
-                       // we want right bit (one splice)
-                       e = bc->split(e, cur);
+                 p++;
+                 assert(p->second == bh);
+               } else {
+                 // we want middle bit (two splices)
+                 if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+                       BufferHead *middle = bc->split(bh, cur);
+                       bc->bh_read(on, bh);                       // reread left
                        p++;
-                       assert(p->second == e);
+                       assert(p->second == middle);
+                       BufferHead *right = bc->split(middle, cur+max);
+                       bc->bh_read(on, right);                    // reread right
+                       bh = middle;
+                 } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+                       BufferHead *middle = bc->split(bh, cur);
+                       bc->bh_write(on, bh);                       // redo left
+                       p++;
+                       assert(p->second == middle);
+                       BufferHead *right = bc->split(middle, cur+max);
+                       bc->bh_write(on, right);                    // redo right
+                       bh = middle;
                  } else {
-                       // we want middle bit (two splices)
-                       e = bc->split(e, cur);
+                       BufferHead *middle = bc->split(bh, cur);
                        p++;
-                       assert(p->second == e);
-                       bc->split(e, cur+left);
+                       assert(p->second == middle);
+                       bc->split(middle, cur+max);
+                       bh = middle;
+                 }
+               }
+         } else if (p->first == cur) {
+               if (p->second->length() <= max) {
+                 // whole bufferhead, piece of cake.
+               } else {
+                 // we want left bit (one splice)
+                 if (bh->is_rx() && bc->bh_cancel_read(bh)) {
+                       BufferHead *right = bc->split(bh, cur+max);
+                       bc->bh_read(on, right);                   // re-rx the right bit
+                 } else if (bh->is_tx() && !newalloc && bc->bh_cancel_write(bh)) {
+                       BufferHead *right = bc->split(bh, cur+max);
+                       bc->bh_write(on, right);                          // re-tx the right bit
+                 } else {
+                       bc->split(bh, cur+max);        // just split
                  }
-               }               
+               }
          }
          
-         // FIXME
-         hits[cur] = e;
+         // put in our map
+         hits[cur] = bh;
                  
          // keep going.
-         block_t lenfromcur = e->length();
-         if (e->start() < cur)
-               lenfromcur -= cur - e->start();
+         block_t lenfromcur = bh->length();
+         if (bh->start() < cur)
+               lenfromcur -= cur - bh->start();
 
          if (lenfromcur < left) {
                cur += lenfromcur;
@@ -273,18 +426,17 @@ int ObjectCache::map_write(block_t start, block_t len,
        } else {
          // gap!
          block_t next = p->first;
+         block_t glen = MIN(next-cur, max);
+         dout(10) << "map_write gap " << cur << "~" << glen << endl;
          BufferHead *n = new BufferHead(this);
          bc->add_bh(n);
          n->set_start( cur );
-         if (next - cur < left)
-               n->set_length( next - cur );
-         else
-               n->set_length( left );
+         n->set_length( glen );
          data[cur] = n;
          hits[cur] = n;
          
-         cur += n->length();
-         left -= n->length();
+         cur += glen;
+         left -= glen;
          continue;    // more?
        }
   }
@@ -326,6 +478,10 @@ int ObjectCache::scan_versions(block_t start, block_t len,
 
 /************** BufferCache ***************/
 
+#undef dout
+#define dout(x)  if (x <= g_conf.debug) cout << "ebofs.bc."
+
+
 
 BufferHead *BufferCache::split(BufferHead *orig, block_t after) 
 {
@@ -352,5 +508,106 @@ BufferHead *BufferCache::split(BufferHead *orig, block_t after)
 }
 
 
+void BufferCache::bh_read(Onode *on, BufferHead *bh)
+{
+  dout(5) << "bh_read " << *on << " on " << *bh << endl;
+
+  if (bh->is_missing())        {
+       mark_rx(bh);
+  } else {
+       assert(bh->is_partial());
+  }
+  
+  // get extent.  there should be only one!
+  vector<Extent> exv;
+  on->map_extents(bh->start(), bh->length(), exv);
+  assert(exv.size() == 1);
+  Extent ex = exv[0];
+  
+  // alloc new buffer
+  bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data);  // new buffers!
+  
+  // this should be empty!!
+  assert(bh->rx_ioh == 0);
+  
+  dout(20) << "bh_read  " << *bh << " from " << ex << endl;
+  
+  bh->rx_ioh = dev.read(ex.start, ex.length, bh->data,
+                                               new C_OC_RxFinish(on->oc, 
+                                                                                 bh->start(), bh->length()));
+}
+
+bool BufferCache::bh_cancel_read(BufferHead *bh)
+{
+  assert(bh->rx_ioh);
+  if (dev.cancel_io(bh->rx_ioh) >= 0) {
+       dout(10) << "bh_cancel_read on " << *bh << endl;
+       bh->rx_ioh = 0;
+       mark_missing(bh);
+       return true;
+  }
+  return false;
+}
+
+void BufferCache::bh_write(Onode *on, BufferHead *bh)
+{
+  dout(5) << "bh_write " << *on << " on " << *bh << endl;
+  assert(bh->get_version() > 0);
+
+  assert(bh->is_dirty());
+  mark_tx(bh);
+  
+  // get extents
+  vector<Extent> exv;
+  on->map_extents(bh->start(), bh->length(), exv);
+  assert(exv.size() == 1);
+  Extent ex = exv[0];
+
+  dout(20) << "bh_write  " << *bh << " to " << ex << endl;
+
+  //assert(bh->tx_ioh == 0);
+
+  bh->tx_ioh = dev.write(ex.start, ex.length, bh->data,
+                                                new C_OC_TxFinish(on->oc, 
+                                                                                  bh->start(), bh->length(),
+                                                                                  bh->get_version(),
+                                                                                  bh->epoch_modified));
+
+  epoch_unflushed[ bh->epoch_modified ]++;
+}
+
+
+bool BufferCache::bh_cancel_write(BufferHead *bh)
+{
+  assert(bh->tx_ioh);
+  if (dev.cancel_io(bh->tx_ioh) >= 0) {
+       dout(10) << "bh_cancel_write on " << *bh << endl;
+       bh->tx_ioh = 0;
+       mark_missing(bh);
+       epoch_unflushed[ bh->epoch_modified ]--;   // assert.. this should be the same epoch!
+       return true;
+  }
+  return false;
+}
+
+
 
+void BufferCache::bh_queue_partial_write(Onode *on, BufferHead *bh)
+{
+  dout(5) << "bh_queue_partial_write " << *on << " on " << *bh << endl;
+  assert(bh->get_version() > 0);
+
+  assert(bh->is_partial());
+  assert(bh->length() == 1);
+  
+  // get the block
+  vector<Extent> exv;
+  on->map_extents(bh->start(), bh->length(), exv);
+  assert(exv.size() == 1);
+  block_t b = exv[0].start;
+  assert(exv[0].length == 1);
+
+  // copy map state, queue for this block
+  bh->queue_partial_write( b );
+}
 
index 2d13648fb75f342484ece0a78ceea74a31c7b370..0ebefbb3603f17f29ad927b5f13bd8f0bff7f851 100644 (file)
 #include "AlignedBufferPool.h"
 #include "BlockDevice.h"
 
-#define BH_STATE_DIRTY  1
+#include "include/interval_set.h"
 
 class ObjectCache;
 class BufferCache;
+class Onode;
 
 class BufferHead : public LRUObject {
  public:
+  /*
+   * - buffer_heads should always break across disk extent boundaries
+   * - partial buffer_heads are always 1 block.
+   */
   const static int STATE_MISSING = 0; //     missing; data is on disk, but not loaded.
   const static int STATE_CLEAN = 1;   // Rw  clean
   const static int STATE_DIRTY = 2;   // RW  dirty
   const static int STATE_TX = 3;      // Rw  flushing to disk
   const static int STATE_RX = 4;      //  w  reading from disk
-  const static int STATE_PARTIAL = 5; // reading from disk, + partial content map.
+  const static int STATE_PARTIAL = 5; // reading from disk, + partial content map.  always 1 block.
+
+  class PartialWrite {
+  public:
+       map<off_t, bufferlist> partial;   // partial dirty content overlayed onto incoming data
+       block_t                block;
+       version_t              epoch;
+  };
 
  public:
   ObjectCache *oc;
 
-  bufferlist data, shadow_data;
-  ioh_t      ioh, shadow_ioh;   // any pending read/write op
-  version_t  tx_epoch;       // epoch this write is in
+  bufferlist data;
+
+  ioh_t     rx_ioh;         // 
+  ioh_t     tx_ioh;         // 
 
   list<Context*> waitfor_read;
   list<Context*> waitfor_flush;
 
  private:
-  map<off_t, bufferlist> partial;   // partial dirty content overlayed onto incoming data
+  map<off_t, bufferlist>     partial;   // partial dirty content overlayed onto incoming data
+
+  map<block_t, PartialWrite> partial_write;  // queued writes w/ partial content
 
   int        ref;
   int        state;
+
+ public:
+  version_t  epoch_modified;
+  
   version_t  version;        // current version in cache
   version_t  last_flushed;   // last version flushed to disk
  
@@ -48,8 +67,9 @@ class BufferHead : public LRUObject {
 
  public:
   BufferHead(ObjectCache *o) :
-       oc(o), ioh(0), shadow_ioh(0), tx_epoch(0),
-       ref(0), state(STATE_MISSING), version(0), last_flushed(0)
+       oc(o), //cancellable_ioh(0), tx_epoch(0),
+       rx_ioh(0), tx_ioh(0), 
+       ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
        {}
   
   ObjectCache *get_oc() { return oc; }
@@ -97,17 +117,10 @@ class BufferHead : public LRUObject {
   bool is_rx() { return state == STATE_RX; }
   bool is_partial() { return state == STATE_PARTIAL; }
   
-  /*
-  void substr(block_t start, block_t len, bufferlist& sub) {
-       // determine offset in bufferlist
-       block_t start = object_loc.start - off;  
-       block_t len = num - start;
-       if (start+len > object_loc.length)
-         len = object_loc.length - start;
-       
-       sub.substr_of(data, start*EBOFS_BLOCK_SIZE, len*EBOFS_BLOCK_SIZE);
-  }
-  */
+  bool is_partial_writes() { return !partial_write.empty(); }
+  void finish_partials();
+  void queue_partial_write(block_t b);
+
 
   void copy_partial_substr(off_t start, off_t end, bufferlist& bl) {
        map<off_t, bufferlist>::iterator i = partial.begin();
@@ -194,15 +207,18 @@ class BufferHead : public LRUObject {
        */
   }
   void apply_partial() {
+       apply_partial(data, partial);
+  }
+  void apply_partial(bufferlist& bl, map<off_t, bufferlist>& pm) {
        const off_t bhstart = start() * EBOFS_BLOCK_SIZE;
        //assert(partial_is_complete());
-       for (map<off_t, bufferlist>::iterator i = partial.begin();
-                i != partial.end();
+       for (map<off_t, bufferlist>::iterator i = pm.begin();
+                i != pm.end();
                 i++) {
          int pos = i->first - bhstart;
-         data.copy_in(pos, i->second.length(), i->second);
+         bl.copy_in(pos, i->second.length(), i->second);
        }
-       partial.clear();
+       pm.clear();
   }
   void add_partial(off_t off, bufferlist& p) {
        // trim overlap
@@ -269,10 +285,11 @@ inline ostream& operator<<(ostream& out, BufferHead& bh)
 
 
 class ObjectCache {
- private:
+ public:
   object_t object_id;
   BufferCache *bc;
 
+ private:
   map<block_t, BufferHead*>  data;
 
  public:
@@ -289,13 +306,16 @@ class ObjectCache {
   }
   bool is_empty() { return data.empty(); }
 
-  int map_read(block_t start, block_t len, 
+  int map_read(Onode *on,
+                          block_t start, block_t len, 
                           map<block_t, BufferHead*>& hits,     // hits
                           map<block_t, BufferHead*>& missing,  // read these from disk
                           map<block_t, BufferHead*>& rx,       // wait for these to finish reading from disk
                           map<block_t, BufferHead*>& partial); // (maybe) wait for these to read from disk
   
-  int map_write(block_t start, block_t len,
+  int map_write(Onode *on,
+                               block_t start, block_t len,
+                               interval_set<block_t>& alloc,
                                map<block_t, BufferHead*>& hits);   // can write to these.
 
   BufferHead *split(BufferHead *bh, block_t off);
@@ -304,8 +324,8 @@ class ObjectCache {
                                        version_t& low, version_t& high);
 
   void rx_finish(ioh_t ioh, block_t start, block_t length);
-  void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v);
-
+  void tx_finish(ioh_t ioh, block_t start, block_t length, version_t v, version_t epoch);
+  void partial_tx_finish(version_t epoch);
 };
 
 class C_OC_RxFinish : public Context {
@@ -325,20 +345,36 @@ class C_OC_TxFinish : public Context {
   ObjectCache *oc;
   block_t start, length;
   version_t version;
+  version_t epoch;
 public:
-  C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v) :
-       oc(o), start(s), length(l), version(v) {}
+  C_OC_TxFinish(ObjectCache *o, block_t s, block_t l, version_t v, version_t e) :
+       oc(o), start(s), length(l), version(v), epoch(e) {}
   void finish(int r) {
        ioh_t ioh = (ioh_t)r;
-       if (ioh) 
-         oc->tx_finish(ioh, start, length, version);
+       if (ioh) {
+         oc->tx_finish(ioh, start, length, version, epoch);
+       }
+  }  
+};
+
+class C_OC_PartialTxFinish : public Context {
+  ObjectCache *oc;
+  version_t epoch;
+public:
+  C_OC_PartialTxFinish(ObjectCache *o, version_t e) :
+       oc(o), epoch(e) {}
+  void finish(int r) {
+       ioh_t ioh = (ioh_t)r;
+       if (ioh) {
+         oc->partial_tx_finish(epoch);
+       }
   }  
 };
 
 
 class BufferCache {
  public:
-  Mutex             &lock;          // hack: ref to global lock
+  Mutex             &ebofs_lock;          // hack: this is a ref to global ebofs_lock
   BlockDevice       &dev;
   AlignedBufferPool &bufferpool;
 
@@ -357,9 +393,11 @@ class BufferCache {
   off_t stat_partial;
   off_t stat_missing;
 
+  map<version_t, int> epoch_unflushed;
+
  public:
-  BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& glock) : 
-       lock(glock), dev(d), bufferpool(bp),
+  BufferCache(BlockDevice& d, AlignedBufferPool& bp, Mutex& el) : 
+       ebofs_lock(el), dev(d), bufferpool(bp),
        stat_waiter(0),
        stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_partial(0), stat_missing(0)
        {}
@@ -416,9 +454,22 @@ class BufferCache {
   off_t get_stat_clean() { return stat_clean; }
   off_t get_stat_partial() { return stat_partial; }
 
+  int get_unflushed(version_t epoch) {
+       return epoch_unflushed[epoch];
+  }
+  void inc_unflushed(version_t epoch) {
+       epoch_unflushed[epoch]++;
+  }
+  void dec_unflushed(version_t epoch) {
+       epoch_unflushed[epoch]--;
+       if (stat_waiter && 
+               epoch_unflushed[epoch] == 0) 
+         stat_cond.Signal();
+  }
+
   void waitfor_stat() {
        stat_waiter++;
-       stat_cond.Wait(lock);
+       stat_cond.Wait(ebofs_lock);
        stat_waiter--;
   }
 
@@ -447,6 +498,7 @@ class BufferCache {
        set_state(bh2, bh1->get_state());
   }
   
+  void mark_missing(BufferHead *bh) { set_state(bh, BufferHead::STATE_MISSING); };
   void mark_clean(BufferHead *bh) { set_state(bh, BufferHead::STATE_CLEAN); };
   void mark_rx(BufferHead *bh) { set_state(bh, BufferHead::STATE_RX); };
   void mark_partial(BufferHead *bh) { set_state(bh, BufferHead::STATE_PARTIAL); };
@@ -457,6 +509,17 @@ class BufferCache {
   };
 
 
+  // io
+  void bh_read(Onode *on, BufferHead *bh);
+  void bh_write(Onode *on, BufferHead *bh);
+  void bh_queue_partial_write(Onode *on, BufferHead *bh);
+
+  bool bh_cancel_read(BufferHead *bh);
+  bool bh_cancel_write(BufferHead *bh);
+
+  friend class C_E_FlushPartial;
+
+
   BufferHead *split(BufferHead *orig, block_t after);
 
 
index 77b4471d09ad0a89b75b1c29d0683f0d69187e5f..89295c672624dbdcfb8008e01893efd77e94056f 100644 (file)
@@ -11,6 +11,8 @@ int Ebofs::mount()
   // note: this will fail in mount -> unmount -> mount type situations, bc
   //       prior state isn't fully cleaned up.
 
+  dout(1) << "mount" << endl;
+
   ebofs_lock.Lock();
   assert(!mounted);
 
@@ -22,8 +24,8 @@ int Ebofs::mount()
 
   struct ebofs_super *sb1 = (struct ebofs_super*)bp1.c_str();
   struct ebofs_super *sb2 = (struct ebofs_super*)bp2.c_str();
-  dout(2) << "mount super @0 epoch " << sb1->epoch << endl;
-  dout(2) << "mount super @1 epoch " << sb2->epoch << endl;
+  dout(3) << "mount super @0 epoch " << sb1->epoch << endl;
+  dout(3) << "mount super @1 epoch " << sb2->epoch << endl;
 
   // pick newest super
   struct ebofs_super *sb = 0;
@@ -32,17 +34,17 @@ int Ebofs::mount()
   else
        sb = sb2;
   super_epoch = sb->epoch;
-  dout(2) << "mount epoch " << super_epoch << endl;
+  dout(3) << "mount epoch " << super_epoch << endl;
   assert(super_epoch == sb->epoch);
 
   // init node pools
-  dout(2) << "mount nodepool" << endl;
+  dout(3) << "mount nodepool" << endl;
   nodepool.init( &sb->nodepool );
   nodepool.read_usemap( dev, super_epoch );
   nodepool.read_clean_nodes( dev );
   
   // open tables
-  dout(2) << "mount opening tables" << endl;
+  dout(3) << "mount opening tables" << endl;
   object_tab = new Table<object_t, Extent>( nodepool, sb->object_tab );
   for (int i=0; i<EBOFS_NUM_FREE_BUCKETS; i++)
        free_tab[i] = new Table<block_t, block_t>( nodepool, sb->free_tab[i] );
@@ -51,10 +53,10 @@ int Ebofs::mount()
   oc_tab = new Table<idpair_t, bool>( nodepool, sb->oc_tab );
   co_tab = new Table<idpair_t, bool>( nodepool, sb->co_tab );
   
-  dout(2) << "mount starting commit thread" << endl;
+  dout(3) << "mount starting commit thread" << endl;
   commit_thread.create();
   
-  dout(2) << "mount mounted" << endl;
+  dout(1) << "mount mounted" << endl;
   mounted = true;
 
   ebofs_lock.Unlock();
@@ -124,7 +126,7 @@ int Ebofs::mkfs()
   write_super(1, superbp1);
   
   // free memory
-  dout(1) << "mkfs: cleaning up" << endl;
+  dout(3) << "mkfs: cleaning up" << endl;
   close_tables();
 
   dout(1) << "mkfs: done" << endl;
@@ -149,6 +151,7 @@ int Ebofs::umount()
   ebofs_lock.Lock();
   
   // mark unmounting
+  dout(1) << "umount start" << endl;
   readonly = true;
   unmounting = true;
   
@@ -156,12 +159,16 @@ int Ebofs::umount()
   commit_cond.Signal();
 
   // wait 
+  dout(2) << "umount stopping commit thread" << endl;
   ebofs_lock.Unlock();
   commit_thread.join();
+  ebofs_lock.Lock();
 
   // free memory
+  dout(2) << "umount cleaning up" << endl;
   close_tables();
 
+  dout(1) << "umount done" << endl;
   ebofs_lock.Unlock();
   return 0;
 }
@@ -172,7 +179,7 @@ void Ebofs::prepare_super(version_t epoch, bufferptr& bp)
 {
   struct ebofs_super sb;
   
-  dout(1) << "prepare_super v" << epoch << endl;
+  dout(10) << "prepare_super v" << epoch << endl;
 
   // fill in super
   memset(&sb, 0, sizeof(sb));
@@ -223,16 +230,18 @@ void Ebofs::write_super(version_t epoch, bufferptr& bp)
 {
   block_t bno = epoch & 1;
   
-  dout(1) << "write_super v" << epoch << " to b" << bno << endl;
+  dout(10) << "write_super v" << epoch << " to b" << bno << endl;
 
   dev.write(bno, 1, bp);
 }
 
 int Ebofs::commit_thread_entry()
-{
-  dout(10) << "commit_thread start" << endl;
-  
+{  
   ebofs_lock.Lock();
+  dout(10) << "commit_thread start" << endl;
+
+  commit_thread_started = true;
+  sync_cond.Signal();
 
   while (mounted) {
        
@@ -272,13 +281,14 @@ int Ebofs::commit_thread_entry()
        ebofs_lock.Unlock();
        write_super(super_epoch, superbp);      
        ebofs_lock.Lock();
-       
+
+       sync_cond.Signal();
+
        dout(10) << "commit_thread commit finish" << endl;
   }
   
-  ebofs_lock.Unlock();
-
   dout(10) << "commit_thread finish" << endl;
+  ebofs_lock.Unlock();
   return 0;
 }
 
@@ -386,7 +396,7 @@ void Ebofs::write_onode(Onode *on, Context *c)
   
   // attr
   unsigned off = sizeof(eo);
-  for (map<string, AttrVal >::iterator i = on->attr.begin();
+  for (map<string, AttrVal>::iterator i = on->attr.begin();
           i != on->attr.end();
           i++) {
        bl.copy_in(off, i->first.length()+1, i->first.c_str());
@@ -618,7 +628,9 @@ void Ebofs::commit_inodes_start()
        inodes_flushing++;
        write_onode(on, new C_E_InodeFlush(this));
        on->mark_clean();
+       on->uncommitted.clear();   // commit allocated blocks
   }
+  dirty_onodes.clear();
 
   // cnodes
   for (set<Cnode*>::iterator i = dirty_cnodes.begin();
@@ -629,6 +641,7 @@ void Ebofs::commit_inodes_start()
        write_cnode(cn, new C_E_InodeFlush(this));
        cn->mark_clean();
   }
+  dirty_cnodes.clear();
 
   dout(10) << "commit_inodes_start writing " << inodes_flushing << " onodes+cnodes" << endl;
 }
@@ -637,7 +650,7 @@ void Ebofs::commit_inodes_wait()
 {
   // caller must hold ebofs_lock
   while (inodes_flushing > 0) {
-       dout(10) << "commit_inodes_wait for " << inodes_flushing << " onodes+cnodes to flush" << endl;
+       dout(10) << "commit_inodes_wait waiting for " << inodes_flushing << " onodes+cnodes to flush" << endl;
        inode_commit_cond.Wait(ebofs_lock);
   }
   dout(10) << "commit_inodes_wait all flushed" << endl;
@@ -651,32 +664,19 @@ void Ebofs::commit_inodes_wait()
 
 // *** buffer cache ***
 
-// ... should already hold lock ...
 void Ebofs::trim_buffer_cache()
 {
-  //ebofs_lock.Lock();
-
-  // flush any dirty items?
-  while (bc.lru_dirty.lru_get_size() > bc.lru_dirty.lru_get_max()) {
-       BufferHead *bh = (BufferHead*) bc.lru_dirty.lru_expire();
-       if (!bh) break;
-
-       bc.lru_dirty.lru_insert_bot(bh);
+  ebofs_lock.Lock();
+  dout(10) << "trim_buffer_cache start: " 
+                  << bc.lru_rest.lru_get_size() << " rest + " 
+                  << bc.lru_dirty.lru_get_size() << " dirty " << endl;
 
-       dout(10) << "trim_buffer_cache dirty " << *bh << endl;
-       assert(bh->is_dirty());
-       
-       Onode *on = get_onode( bh->oc->get_object_id() );
-       bh_write(on, bh);
-       put_onode(on);
-  }
-  
-  // trim bufferheads
+  // trim trimmable bufferheads
   while (bc.lru_rest.lru_get_size() > bc.lru_rest.lru_get_max()) {
        BufferHead *bh = (BufferHead*) bc.lru_rest.lru_expire();
        if (!bh) break;
-
-       dout(10) << "trim_buffer_cache rest " << *bh << endl;
+       
+       dout(10) << "trim_buffer_cache trimming " << *bh << endl;
        assert(bh->is_clean());
        
        ObjectCache *oc = bh->oc;
@@ -690,161 +690,72 @@ void Ebofs::trim_buffer_cache()
          put_onode(on);
        }
   }
-  dout(10) << "trim_buffer_cache " 
+  dout(10) << "trim_buffer_cache finish: 
                   << bc.lru_rest.lru_get_size() << " rest + " 
                   << bc.lru_dirty.lru_get_size() << " dirty " << endl;
-
-  //ebofs_lock.Unlock();
+  
+  ebofs_lock.Unlock();
 }
 
 
-
-void Ebofs::commit_bc_wait(version_t epoch)
-{
-  dout(1) << "commit_bc_wait" << endl;  
-}
-
-void Ebofs::flush_all()
+void Ebofs::sync()
 {
-  // FIXME what about partial heads?
+  ebofs_lock.Lock();
+  dout(3) << "sync in " << super_epoch << endl;
   
-  dout(1) << "flush_all" << endl;
-
-  bc.lock.Lock();
-
-  while (bc.get_stat_dirty() > 0 ||  // not strictly necessary
-                bc.get_stat_tx() > 0 ||
-                bc.get_stat_partial() > 0 ||
-                bc.get_stat_rx() > 0) {
-
-       // write all dirty bufferheads
-       while (!bc.dirty_bh.empty()) {
-         set<BufferHead*>::iterator i = bc.dirty_bh.begin();
-         BufferHead *bh = *i;
-         if (bh->ioh) continue;
-         Onode *on = get_onode(bh->oc->get_object_id());
-         bh_write(on, bh);
-         put_onode(on);
-       }
-
-       // wait for all tx and partial buffers to flush
-       dout(1) << "flush_all waiting for " 
-                       << bc.get_stat_dirty() << " dirty, " 
-                       << bc.get_stat_tx() << " tx, " 
-                       << bc.get_stat_rx() << " rx, " 
-                       << bc.get_stat_partial() << " partial" 
-                       << endl;
-       bc.waitfor_stat();
+  if (!commit_thread_started) {
+       dout(10) << "sync waiting for commit thread to start" << endl;
+       sync_cond.Wait(ebofs_lock);
   }
-  bc.lock.Unlock();
-  dout(1) << "flush_all done" << endl;
-}
-
-
-
 
-// ? is this the best way ?
-class C_E_FlushPartial : public Context {
-  Ebofs *ebofs;
-  Onode *on;
-  BufferHead *bh;
-public:
-  C_E_FlushPartial(Ebofs *e, Onode *o, BufferHead *b) : ebofs(e), on(o), bh(b) {}
-  void finish(int r) {
-       if (r == 0) ebofs->bh_write(on, bh);
-  }
-};
-
-
-void Ebofs::bh_read(Onode *on, BufferHead *bh)
-{
-  dout(5) << "bh_read " << *on << " on " << *bh << endl;
-
-  if (bh->is_missing())        {
-       bc.mark_rx(bh);
-  } else {
-       assert(bh->is_partial());
+  if (mid_commit) {
+       dout(10) << "sync waiting for commit in progress" << endl;
+       sync_cond.Wait(ebofs_lock);
   }
   
-  // get extents
-  vector<Extent> ex;
-  on->map_extents(bh->start(), bh->length(), ex);
-
-  // alloc new buffer
-  bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data);  // new buffers!
+  commit_cond.Signal();  // trigger a commit
   
-  // lay out on disk
-  block_t bhoff = 0;
-  for (unsigned i=0; i<ex.size(); i++) {
-       dout(10) << "bh_read  " << bhoff << ": " << ex[i] << endl;
-       bufferlist sub;
-       sub.substr_of(bh->data, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
-
-       //if (bh->is_partial())  
-       //bh->waitfor_read.push_back(new C_E_FlushPartial(this, on, bh));
+  sync_cond.Wait(ebofs_lock);  // wait
 
-       assert(bh->ioh == 0);
-       bh->ioh = dev.read(ex[i].start, ex[i].length, sub,
-                                          new C_OC_RxFinish(on->oc, 
-                                                                                bhoff + bh->start(), ex[i].length));
-       
-       bhoff += ex[i].length;
-  }
+  dout(3) << "sync finish in " << super_epoch << endl;
+  ebofs_lock.Unlock();
 }
 
-void Ebofs::bh_write(Onode *on, BufferHead *bh)
-{
-  dout(5) << "bh_write " << *on << " on " << *bh << endl;
-  assert(bh->get_version() > 0);
 
-  assert(bh->is_dirty());
-  bc.mark_tx(bh);
-  bh->tx_epoch = super_epoch;   // note the epoch!
+void Ebofs::commit_bc_wait(version_t epoch)
+{
+  dout(10) << "commit_bc_wait on epoch " << epoch << endl;  
+  
+  while (bc.get_unflushed(epoch) > 0) {
+       dout(10) << "commit_bc_wait " << bc.get_unflushed(epoch) << " unflushed in epoch " << epoch << endl;
+       bc.waitfor_stat();
+  }
 
-  // get extents
-  vector<Extent> ex;
-  on->map_extents(bh->start(), bh->length(), ex);
+  dout(10) << "commit_bc_wait all flushed for epoch " << epoch << endl;  
+}
 
-  // lay out on disk
-  block_t bhoff = 0;
-  for (unsigned i=0; i<ex.size(); i++) {
-       dout(10) << "bh_write  bh off " << bhoff << ": " << ex[i] << endl;
 
-       bufferlist sub;
-       sub.substr_of(bh->data, bhoff*EBOFS_BLOCK_SIZE, ex[i].length*EBOFS_BLOCK_SIZE);
 
-       assert(bh->ioh == 0);
-       bh->ioh = dev.write(ex[i].start, ex[i].length, sub,
-                                               new C_OC_TxFinish(on->oc, 
-                                                                                 bhoff + bh->start(), ex[i].length, 
-                                                                                 bh->get_version()));
 
-       bhoff += ex[i].length;
-  }
-}
 
 
 /*
  * allocate a write to blocks on disk.
- * take care to not overwrite any "safe" data blocks.
- * break up bufferheads in bh_hits that span realloc boundaries.
- * final bufferhead set stored in final!
+ * - take care to not overwrite any "safe" data blocks.
+ *  - allocate/map new extents on disk as necessary
  */
 void Ebofs::alloc_write(Onode *on, 
-                                               block_t start, block_t len, 
-                                               map<block_t, BufferHead*>& hits)
+                                               block_t start, block_t len,
+                                               interval_set<block_t>& alloc)
 {
   // first decide what pages to (re)allocate
-  interval_set<block_t> alloc;
   on->map_alloc_regions(start, len, alloc);
 
-  dout(10) << "alloc_write need to alloc " << alloc << endl;
+  dout(10) << "alloc_write need to (re)alloc " << alloc << endl;
 
   // merge alloc into onode uncommitted map
-  cout << "union of " << on->uncommitted << " and " << alloc << endl;
   on->uncommitted.union_of(alloc);
-
-  dout(10) << "alloc_write onode uncommitted now " << on->uncommitted << endl;
+  dout(10) << "alloc_write onode.uncommitted is now " << on->uncommitted << endl;
 
   // allocate the space
   for (map<block_t,block_t>::iterator i = alloc.m.begin();
@@ -868,58 +779,11 @@ void Ebofs::alloc_write(Onode *on,
          cur += ex.length;
        }
   }
-  
-  // now break up the bh's as necessary
-  block_t cur = start;
-  block_t left = len;
-
-  map<block_t,BufferHead*>::iterator bhp = hits.begin();
-  map<block_t,block_t>::iterator ap = alloc.m.begin();
-  
-  block_t aoff = 0;
-  while (left > 0) {
-       assert(cur == bhp->first);
-       BufferHead *bh = bhp->second;
-       assert(cur == bh->start());
-       assert(left >= bh->length());
-       
-       assert(ap->first+aoff == bh->start());
-       if (ap->second-aoff == bh->length()) {
-         // perfect.
-         cur += bh->length();
-         left -= bh->length();
-         ap++;
-         aoff = 0;
-         bhp++;
-         continue;
-       }
-
-       if (bh->length() < ap->second-aoff) {
-         // bh is within alloc range.
-         cur += bh->length();
-         left -= bh->length();
-         aoff += bh->length();
-         bhp++;
-         continue;
-       }
-       
-       // bh spans alloc boundary, split it!
-       assert(bh->length() > ap->second - aoff);
-       BufferHead *n = bc.split(bh, bh->start() + ap->second-aoff);
-       hits[n->start()] = n;   // add new guy to hit map
-
-       // bh is now shortened...
-       cur += bh->length();
-       left -= bh->length();
-       assert(ap->second == aoff + bh->length());
-       aoff = 0;
-       ap++;
-       continue;
-  }
 }
 
 
 
+
 void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
 {
   ObjectCache *oc = on->get_oc(&bc);
@@ -949,12 +813,13 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
   block_t blast = (len+off-1) / EBOFS_BLOCK_SIZE;
   block_t blen = blast-bstart+1;
 
+  // allocate write on disk.
+  interval_set<block_t> alloc;
+  alloc_write(on, bstart, blen, alloc);
+
   // map b range onto buffer_heads
   map<block_t, BufferHead*> hits;
-  oc->map_write(bstart, blen, hits);
-  
-  // allocate write on disk.  break buffer_heads across realloc/no realloc boundaries
-  alloc_write(on, bstart, blen, hits);
+  oc->map_write(on, bstart, blen, alloc, hits);
   
   // get current versions
   version_t lowv, highv;
@@ -970,28 +835,35 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
           i++) {
        BufferHead *bh = i->second;
        bh->set_version(highv+1);
+       bh->epoch_modified = super_epoch;
        
-       // cancel old io?
-       if (bh->is_tx()) {
-         if (bh->tx_epoch == super_epoch) {
-               // try to cancel the old io (just bc it's a waste)
-               dout(10) << "apply_write canceling old io on " << *bh << endl;
-               bc.dev.cancel_io(bh->ioh);
-               bh->ioh = 0;
-         } else {
-               // this tx is from prior epoch!  shadow+copy the buffer before we modify it.
-               bh->shadow_data.claim(bh->data);
-               bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data);  // new buffers!
-               bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, bh->shadow_data);
-               bh->shadow_ioh = bh->ioh;
-               bh->ioh = 0;
-         }
+       // old write in progress?
+       if (bh->is_tx()) {        // copy the buffer to avoid munging up in-flight write
+         dout(10) << "apply_write tx pending, copying buffer on " << *bh << endl;
+         bufferlist temp;
+         temp.claim(bh->data);
+         bc.bufferpool.alloc(EBOFS_BLOCK_SIZE*bh->length(), bh->data); 
+         bh->data.copy_in(0, bh->length()*EBOFS_BLOCK_SIZE, temp);
+       }
+
+       // need to split off partial?
+       if (bh->is_missing() && bh->length() > 1 &&
+               (bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0)) {
+         BufferHead *right = bc.split(bh, bh->start()+1);
+         hits[right->start()] = right;
+         dout(10) << "apply_write split off left block for partial write; rest is " << *right << endl;
+       }
+       if (bh->is_missing() && bh->length() > 1 &&
+               (bh->last() == blast && len+off % EBOFS_BLOCK_SIZE != 0) &&
+               (len+off < on->object_size)) {
+         BufferHead *right = bc.split(bh, bh->last());
+         hits[right->start()] = right;
+         dout(10) << "apply_write split off right block for upcoming partial write; rest is " << *right << endl;
        }
 
        // partial at head or tail?
        if ((bh->start() == bstart && off % EBOFS_BLOCK_SIZE != 0) ||
-               (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0) ||
-               (len % EBOFS_BLOCK_SIZE != 0)) {
+               (bh->last() == blast && (len+off) % EBOFS_BLOCK_SIZE != 0)) {
          // locate ourselves in bh
          unsigned off_in_bh = opos - bh->start()*EBOFS_BLOCK_SIZE;
          assert(off_in_bh >= 0);
@@ -1029,27 +901,27 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
                  bh->data.zero();
                  bh->apply_partial();
                  bc.mark_dirty(bh);
-                 if (bh->ioh) {
-                       bc.dev.cancel_io( bh->ioh );
-                       bh->ioh = 0;
-                 }
-                 bh_write(on, bh);
+                 bc.bh_write(on, bh);
                } 
                else if (bh->is_rx()) {
                  dout(10) << "apply_write  rx -> partial " << *bh << endl;
+                 assert(bh->length() == 1);
                  bc.mark_partial(bh);
+                 bc.bh_queue_partial_write(on, bh);              // queue the eventual write
                }
                else if (bh->is_missing()) {
                  dout(10) << "apply_write  missing -> partial " << *bh << endl;
-                 bh_read(on, bh);      
+                 assert(bh->length() == 1);
                  bc.mark_partial(bh);
+                 bc.bh_read(on, bh);   
+                 bc.bh_queue_partial_write(on, bh);              // queue the eventual write
                }
                else if (bh->is_partial()) {
-                 if (bh->ioh == 0) {
-                       dout(10) << "apply_write  submitting rx for partial " << *bh << endl;
-                       bh_read(on, bh);        
-                 }
+                 dout(10) << "apply_write  already partial, no need to submit rx on " << *bh << endl;
+                 bc.bh_queue_partial_write(on, bh);              // queue the eventual write
                }
+
+
          } else {
                assert(bh->is_clean() || bh->is_dirty() || bh->is_tx());
                
@@ -1077,7 +949,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
                if (!bh->is_dirty())
                  bc.mark_dirty(bh);
 
-               bh_write(on, bh);
+               bc.bh_write(on, bh);
          }
          continue;
        }
@@ -1118,7 +990,7 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
        if (!bh->is_dirty())
          bc.mark_dirty(bh);
 
-       bh_write(on, bh);
+       bc.bh_write(on, bh);
   }
 
   assert(zleft == 0);
@@ -1155,7 +1027,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond
   map<block_t, BufferHead*> missing;  // read these
   map<block_t, BufferHead*> rx;       // wait for these
   map<block_t, BufferHead*> partials;  // ??
-  oc->map_read(bstart, blen, hits, missing, rx, partials);
+  oc->map_read(on, bstart, blen, hits, missing, rx, partials);
 
   // missing buffers?
   if (!missing.empty()) {
@@ -1163,7 +1035,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond
                 i != missing.end();
                 i++) {
          dout(15) <<"attempt_read missing buffer " << *(i->second) << endl;
-         bh_read(on, i->second);
+         bc.bh_read(on, i->second);
        }
        BufferHead *wait_on = missing.begin()->second;
        wait_on->waitfor_read.push_back(new C_E_Cond(will_wait_on));
index 4830a8ad8c975a08e44bdfc1a236ed531c88dffa..5888594544b96e2424d2199247313fc4c0ad586d 100644 (file)
@@ -38,11 +38,12 @@ class Ebofs : public ObjectStore {
   bool         mounted, unmounting;
   bool         readonly;
   version_t    super_epoch;
+  bool         commit_thread_started, mid_commit;
+  Cond         commit_cond;   // to wake up the commit thread
+  Cond         sync_cond;
 
   void prepare_super(version_t epoch, bufferptr& bp);
   void write_super(version_t epoch, bufferptr& bp);
-
-  Cond         commit_cond;   // to wake up the commit thread
   int commit_thread_entry();
 
   class CommitThread : public Thread {
@@ -120,26 +121,22 @@ class Ebofs : public ObjectStore {
   BufferCache bc;
   pthread_t flushd_thread_id;
 
+  version_t trigger_commit();
   void commit_bc_wait(version_t epoch);
 
  public:
+  void sync();
   void trim_buffer_cache();
-  void flush_all();
+
  protected:
 
   //void zero(Onode *on, size_t len, off_t off, off_t write_thru);
   void alloc_write(Onode *on, 
                                   block_t start, block_t len, 
-                                  map<block_t, BufferHead*>& hits);
+                                  interval_set<block_t>& alloc);
   void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
   bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
 
-  // io
-  void bh_read(Onode *on, BufferHead *bh);
-  void bh_write(Onode *on, BufferHead *bh);
-
-  friend class C_E_FlushPartial;
-
   int flushd_thread();
   static int flusd_thread_entry(void *p) {
        return ((Ebofs*)p)->flushd_thread();
@@ -148,10 +145,12 @@ class Ebofs : public ObjectStore {
  public:
   Ebofs(BlockDevice& d) : 
        dev(d), 
-       mounted(false), unmounting(false), readonly(false), super_epoch(0),
+       mounted(false), unmounting(false), readonly(false), 
+       super_epoch(0), commit_thread_started(false), mid_commit(false),
        commit_thread(this),
        free_blocks(0), allocator(this),
        bufferpool(EBOFS_BLOCK_SIZE),
+       nodepool(ebofs_lock),
        object_tab(0), collection_tab(0), oc_tab(0), co_tab(0),
        inodes_flushing(0),
        bc(dev, bufferpool, ebofs_lock) {
index 3f747bd7d0163c3de94edb15f880431886c02212..d32fc94017600e3dc4764befd6a9d464296bc2a7 100644 (file)
@@ -19,8 +19,6 @@
  *
  */
 
-
-
 class Onode : public LRUObject {
 private:
   int ref;
@@ -60,12 +58,12 @@ public:
   void get() {
        if (ref == 0) lru_pin();
        ref++;
-       cout << "onode.get " << ref << endl;
+       //cout << "ebofs.onode.get " << ref << endl;
   }
   void put() {
        ref--;
        if (ref == 0) lru_unpin();
-       cout << "onode.put " << ref << endl;
+       //cout << "ebofs.onode.put " << ref << endl;
   }
 
   void mark_dirty() {
@@ -290,7 +288,6 @@ public:
        return sizeof(Extent) * extents.size();
   }
 
-
 };
 
 
index d0812841e41657854328b2a2d7b301e560072617..eb6541d0c18d31dbdc0907e1eb23f2bd823c34dc 100644 (file)
@@ -6,13 +6,11 @@
 
 /** table **/
 
-class _Table {
-  int asdfasdfasdf;
-};
+#define dbtout dout(20)
 
 
 template<class K, class V>
-class Table : public _Table {
+class Table {
  private:
   NodePool &pool;
   
index 733e202f2ca9d7351958075facbc9b730c506014..3bc6b23beecbef22273ad7a9f357bca09c0c5f67 100644 (file)
@@ -6,9 +6,15 @@
 int main(int argc, char **argv)
 {
   // args
-  char *filename = 0;
-  if (argc > 1) filename = argv[1];
-  if (!filename) return -1;
+  vector<char*> args;
+  argv_to_vec(argc, argv, args);
+  parse_config_options(args);
+
+  if (args.size() < 1) {
+       cerr << "usage: mkfs.ebofs [options] <device file>" << endl;
+       return -1;
+  }
+  char *filename = args[0];
 
   // device
   BlockDevice dev(filename);
@@ -21,89 +27,94 @@ int main(int argc, char **argv)
   Ebofs mfs(dev);
   mfs.mkfs();
 
-  // test-o-rama!
-  Ebofs fs(dev);
-  fs.mount();
-
-  if (0) {  // test
-       bufferlist bl;
-       char crap[10000];
-       memset(crap, 0, 10000);
-       bl.append(crap, 10000);
-       fs.write(10, bl.length(), 200, bl, (Context*)0);
-       fs.trim_buffer_cache();
-       fs.write(10, bl.length(), 3222, bl, (Context*)0);
-       fs.trim_buffer_cache();
-       fs.write(10, 5000, 3222, bl, (Context*)0);
-  }
-
-  // test small writes
   if (1) {
-       char crap[10000];
-       memset(crap, 0, 10000);
-       bufferlist bl;
-       bl.append(crap, 10000);
-
-       // write
-       srand(0);
-       for (int i=0; i<100; i++) {
-         off_t off = rand() % 1000000;
-         size_t len = 100;
-         cout << "writing bit at " << off << " len " << len << endl;
-         fs.write(10, len, off, bl, (Context*)0);
-       }
+       // test-o-rama!
+       Ebofs fs(dev);
+       fs.mount();
        
-       if (0) {
-       // read
-       srand(0);
-       for (int i=0; i<100; i++) {
+       if (1) {  // test
          bufferlist bl;
-         off_t off = rand() % 1000000;
-         size_t len = 100;
-         cout << "read bit at " << off << " len " << len << endl;
-         int r = fs.read(10, len, off, bl);
-         assert(bl.length() == len);
-         assert(r == 0);
-       }
+         char crap[10000];
+         memset(crap, 0, 10000);
+         bl.append(crap, 10000);
+         fs.write(10, bl.length(), 200, bl, (Context*)0);
+         sleep(1);
+         fs.trim_buffer_cache();
+         fs.write(10, bl.length(), 5222, bl, (Context*)0);
+         fs.trim_buffer_cache();
+         //fs.write(10, 5000, 3222, bl, (Context*)0);
        }
-
-       // flush
-       fs.flush_all();
-       fs.trim_buffer_cache();
-
+       
+       // test small writes
        if (0) {
-       // read again
-       srand(0);
-       for (int i=0; i<100; i++) {
+         char crap[10000];
+         memset(crap, 0, 10000);
          bufferlist bl;
-         off_t off = rand() % 1000000;
-         size_t len = 100;
-         cout << "read bit at " << off << " len " << len << endl;
-         int r = fs.read(10, len, off, bl);
-         assert(bl.length() == len);
-         assert(r == 0);
+         bl.append(crap, 10000);
+         
+         // write
+         srand(0);
+         for (int i=0; i<100; i++) {
+               off_t off = rand() % 1000000;
+               size_t len = 100;
+               cout << "writing bit at " << off << " len " << len << endl;
+               fs.write(10, len, off, bl, (Context*)0);
+         }
+         
+         if (0) {
+               // read
+               srand(0);
+               for (int i=0; i<100; i++) {
+                 bufferlist bl;
+                 off_t off = rand() % 1000000;
+                 size_t len = 100;
+                 cout << "read bit at " << off << " len " << len << endl;
+                 int r = fs.read(10, len, off, bl);
+                 assert(bl.length() == len);
+                 assert(r == 0);
+               }
+         }
+         
+         // flush
+         fs.sync();
+         fs.trim_buffer_cache();
+         //fs.trim_buffer_cache();
+         
+         if (0) {
+               // read again
+               srand(0);
+               for (int i=0; i<100; i++) {
+                 bufferlist bl;
+                 off_t off = rand() % 1000000;
+                 size_t len = 100;
+                 cout << "read bit at " << off << " len " << len << endl;
+                 int r = fs.read(10, len, off, bl);
+                 assert(bl.length() == len);
+                 assert(r == 0);
+               }
+               
+               // flush
+               fs.sync();
+               fs.trim_buffer_cache();
+         }
+         
+         // write on empty cache
+         srand(0);
+         for (int i=0; i<100; i++) {
+               off_t off = rand() % 1000000;
+               size_t len = 100;
+               cout << "writing bit at " << off << " len " << len << endl;
+               fs.write(10, len, off, bl, (Context*)0);
+         }
+         
        }
-
-       // flush
+       
+       fs.sync();
        fs.trim_buffer_cache();
-       }
-
-       // write on empty cache
-       srand(0);
-       for (int i=0; i<100; i++) {
-         off_t off = rand() % 1000000;
-         size_t len = 100;
-         cout << "writing bit at " << off << " len " << len << endl;
-         fs.write(10, len, off, bl, (Context*)0);
-       }
-
+       fs.trim_onode_cache();
+       
+       fs.umount();
   }
-
-  fs.flush_all();
-  fs.trim_buffer_cache();
-  fs.trim_onode_cache();
-
-  fs.umount();
   dev.close();
 }
 
index d9c54e5fb415ca5157d2fdf5a0639c4a9f4cc4c2..6064f9c7e33a2ca4446be33aaa52b1f4a2014d87 100644 (file)
@@ -126,7 +126,7 @@ class NodePool {
   set<nodeid_t> clean;       // aka used
   set<nodeid_t> limbo;
   
-  Mutex         lock;
+  Mutex        &ebofs_lock;
   Cond          commit_cond;
   int           flushing;
 
@@ -142,7 +142,9 @@ class NodePool {
 
 
  public:
-  NodePool() : bufferpool(EBOFS_NODE_BYTES), 
+  NodePool(Mutex &el) : 
+       bufferpool(EBOFS_NODE_BYTES), 
+       ebofs_lock(el),
        flushing(0) {}
   ~NodePool() {
        // nodes
@@ -227,13 +229,13 @@ class NodePool {
          to read.  so it only really works when called from mount()!
        */
        for (unsigned r=0; r<region_loc.size(); r++) {
-         dout(3) << "read region " << r << " at " << region_loc[r] << endl;
+         dout(3) << "ebofs.nodepool.read region " << r << " at " << region_loc[r] << endl;
          
          for (block_t boff = 0; boff < region_loc[r].length; boff++) {
                nodeid_t nid = make_nodeid(r, boff);
                
                if (!clean.count(nid)) continue;  
-               dout(20) << "read  node " << nid << endl;
+               dout(20) << "ebofs.nodepool.read  node " << nid << endl;
 
                bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
                dev.read(region_loc[r].start + (block_t)boff, EBOFS_NODE_BLOCKS, 
@@ -241,6 +243,7 @@ class NodePool {
                
                Node *n = new Node(nid, bp, Node::STATE_CLEAN);
                node_map[nid] = n;
+               dout(10) << "ebofs.nodepool.read  node " << n << " at " << (void*)n << endl;
          }
        }
        return 0;
@@ -262,11 +265,11 @@ class NodePool {
   };
   
   void flushed_usemap() {
-       lock.Lock();
+       ebofs_lock.Lock();
        flushing--;
        if (flushing == 0) 
          commit_cond.Signal();
-       lock.Unlock();
+       ebofs_lock.Unlock();
   }
 
  public:
@@ -336,26 +339,29 @@ class NodePool {
   };
 
   void flushed_node(nodeid_t nid) {
-       lock.Lock();
+       ebofs_lock.Lock();
        assert(tx.count(nid));
        
+       // mark nid clean|limbo
        if (tx.count(nid)) {
          tx.erase(nid);
          clean.insert(nid);
+
+         // make node itself clean
+         node_map[nid]->set_state(Node::STATE_CLEAN);
        }
        else {
          assert(limbo.count(nid));
        }
-       
+
        flushing--;
        if (flushing == 0) 
          commit_cond.Signal();
-       lock.Unlock();
+       ebofs_lock.Unlock();
   }
 
  public:
   void commit_start(BlockDevice& dev, version_t version) {
-       lock.Lock();
        assert(!is_committing());
 
        // write map
@@ -392,16 +398,12 @@ class NodePool {
          free.insert(*i);
        }
        limbo.clear();
-
-       lock.Unlock();
   }
 
   void commit_wait() {
-       lock.Lock();
        while (is_committing()) {
-         commit_cond.Wait(lock);
+         commit_cond.Wait(ebofs_lock);
        }
-       lock.Unlock();
   }
 
 
@@ -443,7 +445,7 @@ class NodePool {
   // new node
   Node* new_node(int type) {
        nodeid_t nid = alloc_id();
-       dbtout << "pool.new_node " << nid << endl;
+       dout(15) << "ebofs.nodepool.new_node " << nid << endl;
        
        // alloc node
        bufferptr bp = bufferpool.alloc(EBOFS_NODE_BYTES);
@@ -458,7 +460,7 @@ class NodePool {
 
   void release(Node *n) {
        const nodeid_t nid = n->get_id();
-       dbtout << "pool.release on " << nid << endl;
+       dout(15) << "ebofs.nodepool.release on " << nid << endl;
        node_map.erase(nid);
 
        if (n->is_dirty()) {
@@ -474,15 +476,11 @@ class NodePool {
   }
 
   void release_all() {
-       set<Node*> left;
-       for (map<nodeid_t,Node*>::iterator i = node_map.begin();
-                i != node_map.end();
-                i++) 
-         left.insert(i->second);
-       for (set<Node*>::iterator i = left.begin();
-                i != left.end();
-                i++) 
-         release( *i );
+       while (!node_map.empty()) {
+         map<nodeid_t,Node*>::iterator i = node_map.begin();
+         dout(2) << "ebofs.nodepool.release_all leftover " << i->first << " " << i->second << endl;
+         release( i->second );
+       }
        assert(node_map.empty());
   }
 
@@ -490,7 +488,7 @@ class NodePool {
        // get new node id?
        nodeid_t oldid = n->get_id();
        nodeid_t newid = alloc_id();
-       dbtout << "pool.dirty_node on " << oldid << " now " << newid << endl;
+       dout(2) << "ebofs.nodepool.dirty_node on " << oldid << " now " << newid << endl;
        
        // release old block
        if (n->is_clean()) {
index f5c34a873bc2ffb15471ca24959b37fe04c16cac..b0637bfcd134f63b7593a930e9f448c2eb42fba4 100644 (file)
@@ -11,8 +11,6 @@
 using namespace std;
 using namespace __gnu_cxx;
 
-#define dbtout cout
-
 
 #define MIN(a,b)  ((a)<=(b) ? (a):(b))
 #define MAX(a,b)  ((a)>=(b) ? (a):(b))
index 21fe454fad1f7dd4398fbdee6ca792f9ec0fbeb9..0214630cfda85af0556991dd656453fcbc2da1b6 100644 (file)
@@ -44,7 +44,7 @@ class interval_set {
 
   bool contains(T i) {
        typename map<T,T>::iterator p = find_inc(i);
-       if (p == end()) return false;
+       if (p == m.end()) return false;
        if (p->first > i) return false;
        if (p->first+p->second <= i) return false;
        assert(p->first <= i && p->first+p->second > i);
@@ -52,13 +52,49 @@ class interval_set {
   }
   bool contains(T start, T len) {
        typename map<T,T>::iterator p = find_inc(start);
-       if (p == end()) return false;
+       if (p == m.end()) return false;
        if (p->first > start) return false;
        if (p->first+p->second <= start) return false;
        assert(p->first <= start && p->first+p->second > start);
        if (p->first+p->second < start+len) return false;
        return true;
   }
+
+  // outer range of set
+  bool empty() {
+       return m.empty();
+  }
+  T start() {
+       assert(!empty());
+       typename map<T,T>::iterator p = m.begin();
+       return p->first;
+  }
+  T end() {
+       assert(!empty());
+       typename map<T,T>::iterator p = m.end();
+       p--;
+       return p->first+p->second;
+  }
+
+  // interval start after p (where p not in set)
+  bool starts_after(T i) {
+       assert(!contains(i));
+       typename map<T,T>::iterator p = find_inc(i);
+       if (p == m.end()) return false;
+       return true;
+  }
+  T start_after(T i) {
+       assert(!contains(i));
+       typename map<T,T>::iterator p = find_inc(i);
+       return p->first;
+  }
+
+  // interval end that contains start
+  T end_after(T start) {
+       assert(contains(start));
+       typename map<T,T>::iterator p = find_inc(start);
+       return p->first+p->second;
+  }
   
   void insert(T start, T len) {
        typename map<T,T>::iterator p = find_adj(start);