]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ebofs buffercache changes to fix behavior of clones.
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Dec 2006 21:44:02 +0000 (21:44 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 1 Dec 2006 21:44:02 +0000 (21:44 +0000)
conflicting reads still need to be tested/resolved.

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@972 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/TODO
ceph/ebofs/BufferCache.cc
ceph/ebofs/BufferCache.h
ceph/ebofs/Ebofs.cc
ceph/ebofs/Onode.h
ceph/ebofs/test.ebofs.cc
ceph/ebofs/types.h

index d81114ecd7fc5015cf2d31c4312466ed37f7d7f1..faeba31409b0dcda67e88de9e69a9d22afc1e479 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,13 +1,10 @@
 
 == todo
 
-papers to read
-- gribble et al 2000, scalable distributed hash table
-- sagiv blink trees
-- johnson and colbrook's DE and DB-trees (maybe fewer locks?)
-
+ebofs
+- verify proper behavior of conflicting/overlapping reads of clones
 
-rados todo
+rados paper todo
 - better experiments
 - flush log only in response to subsequent read or write?
 - better behaving recovery
@@ -16,11 +13,20 @@ rados todo
 - snapshots
 
 rados snapshots
-- write is tagged with current_rev.
-  - if latest object.rev != current_rev, we clone it before applying write.
-- reads are tagged with readv.  
-  - we read from newest object whose snapv >= readv.
-
+- attr.crev is rev we were created in.
+- oid.rev=0 is "live".  defined for attr.crev <= rev.
+- otherwise, defined for attr.crev <= rev < oid.rev  (i.e. oid.rev is upper bound, non-inclusive.)
+
+- write is tagged with op.rev
+  - if latest op.rev != attr.crev,
+    - we clone to oid.rev=rev
+    - clone keeps old crev.
+    - change live attr.crev=rev.
+- delete is tagged with op.rev
+  - rename live to oid.rev = op.rev
+- read is tagged with op.rev
+  - if 0, we read from 0 (if it exists).
+  - otherwise we choose object rev based on op.rev vs oid.rev, and then verifying attr.crev <= op.rev.
 
 
 
index 00ad9e0cd3457110f226618a6d9f427030991eb2..cee7f2c12ce79789d82e1da4fe9265d163c55264 100644 (file)
@@ -282,7 +282,10 @@ int ObjectCache::map_read(block_t start, block_t len,
         partial[cur] = e;
         dout(20) << "map_read partial " << *e << endl;
       }
-      else assert(0);
+      else {
+       dout(0) << "map_read ??? " << *e << endl;
+       assert(0);
+      }
       
       block_t lenfromcur = MIN(e->end() - cur, left);
       cur += lenfromcur;
@@ -565,8 +568,17 @@ void ObjectCache::truncate(block_t blocks, version_t super_epoch)
         bc->bh_cancel_read(bh);
       if (bh->is_tx() && uncom) 
         bc->bh_cancel_write(bh, super_epoch);
-      if (bh->is_partial() && uncom)
-        bc->bh_cancel_partial_write(bh);
+      if (bh->shadow_of) {
+       dout(10) << "truncate " << *bh << " unshadowing " << *bh->shadow_of << endl;
+       // shadow
+       bh->shadow_of->remove_shadow(bh);
+       if (bh->is_partial()) 
+         bc->cancel_shadow_partial(bh->rx_from.start, bh);
+      } else {
+       // normal
+       if (bh->is_partial() && uncom)
+         bc->bh_cancel_partial_write(bh);
+      }
     }
     
     for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
@@ -581,6 +593,43 @@ void ObjectCache::truncate(block_t blocks, version_t super_epoch)
 }
 
 
+void ObjectCache::clone_to(Onode *other)
+{
+  ObjectCache *ton = 0;
+
+  for (map<block_t, BufferHead*>::iterator p = data.begin();
+       p != data.end();
+       p++) {
+    BufferHead *bh = p->second;
+    dout(10) << "clone_to ? " << *bh << endl;
+    if (bh->is_dirty() || bh->is_tx() || bh->is_partial()) {
+      // dup dirty or tx bh's
+      if (!ton)
+       ton = other->get_oc(bc);
+      BufferHead *nbh = new BufferHead(ton);
+      nbh->set_start( bh->start() );
+      nbh->set_length( bh->length() );
+      nbh->data = bh->data;      // just copy refs to underlying buffers. 
+      bc->add_bh(nbh);
+
+      if (bh->is_partial()) {
+       dout(0) << "clone_to PARTIAL FIXME NOT FULLY IMPLEMENTED ******" << endl;
+       nbh->partial = bh->partial;
+       bc->mark_partial(nbh);
+       // register as shadow_partial
+       bc->add_shadow_partial(bh->rx_from.start, nbh);
+      } else {
+       // clean buffer will shadow
+       bh->add_shadow(nbh);
+       bc->mark_clean(nbh);
+      }
+
+      dout(10) << "clone_to dup " << *bh << " -> " << *nbh << endl;
+    } 
+  }
+}
+
+
 
 /************** BufferCache ***************/
 
@@ -603,7 +652,7 @@ BufferHead *BufferCache::split(BufferHead *orig, block_t after)
   block_t newleftlen = after - orig->start();
   right->set_start( after );
   right->set_length( orig->length() - newleftlen );
-
+  
   // shorten left
   stat_sub(orig);
   orig->set_length( newleftlen );
@@ -620,6 +669,12 @@ BufferHead *BufferCache::split(BufferHead *orig, block_t after)
     right->rx_from.start += newleftlen;
   }
 
+  // dup shadows
+  for (set<BufferHead*>::iterator p = orig->shadows.begin();
+       p != orig->shadows.end();
+       ++p)
+    right->add_shadow(*p);
+
   // split buffers too
   bufferlist bl;
   bl.claim(orig->data);
@@ -841,7 +896,53 @@ void BufferCache::rx_finish(ObjectCache *oc,
     }
   }
 
-  // 
+  // shadow partials?
+  {
+    list<Context*> waiters;
+    map<block_t, set<BufferHead*> >::iterator sp = shadow_partials.lower_bound(diskstart);
+    while (sp != shadow_partials.end()) {
+      if (sp->first >= diskstart+length) break;
+      assert(sp->first >= diskstart);
+      
+      block_t pblock = sp->first;
+      set<BufferHead*> ls;
+      ls.swap( sp->second );
+      
+      map<block_t, set<BufferHead*> >::iterator t = sp;
+      sp++;
+      shadow_partials.erase(t);
+      
+      for (set<BufferHead*>::iterator p = ls.begin();
+          p != ls.end();
+          ++p) {
+       BufferHead *bh = *p;
+       dout(10) << "rx_finish applying shadow_partial for " << pblock
+                << " to " << *bh << endl;
+       bufferptr bp = buffer::create_page_aligned(EBOFS_BLOCK_SIZE);
+       bh->data.clear();
+       bh->data.push_back( bp );
+       bh->data.copy_in((pblock-diskstart)*EBOFS_BLOCK_SIZE, 
+                        (pblock-diskstart+1)*EBOFS_BLOCK_SIZE, 
+                        bl);
+       bh->apply_partial();
+       bh->set_state(BufferHead::STATE_CLEAN);
+       
+       // trigger waiters
+       for (map<block_t,list<Context*> >::iterator p = bh->waitfor_read.begin();
+            p != bh->waitfor_read.end();
+            p++) {
+         assert(p->first >= bh->start() && p->first < bh->end());
+         waiters.splice(waiters.begin(), p->second);
+       }
+       bh->waitfor_read.clear();
+      }  
+    }
+
+    // kick waiters
+    finish_contexts(waiters);
+  }
+
+  // done.
   ebofs_lock.Unlock();
 }
 
@@ -931,4 +1032,14 @@ void BufferCache::cancel_partial(block_t from, block_t to, version_t epoch)
 }
 
 
+void BufferCache::add_shadow_partial(block_t from, BufferHead *bh)
+{
+  dout(10) << "add_shadow_partial from " << from << " " << *bh << endl;
+  shadow_partials[from].insert(bh);
+}
 
+void BufferCache::cancel_shadow_partial(block_t from, BufferHead *bh)
+{
+  dout(10) << "cancel_shadow_partial from " << from << " " << *bh << endl;
+  shadow_partials[from].erase(bh);
+}
index cdb692af4be340d312bfbca2d969e7143fb1b760..922c5e531ee566837b94f0f158f4f6e063fa688c 100644 (file)
@@ -57,6 +57,9 @@ class BufferHead : public LRUObject {
   map<off_t, bufferlist>     partial;   // partial dirty content overlayed onto incoming data
 
   map< block_t, list<Context*> > waitfor_read;
+  
+  set<BufferHead*>  shadows;     // shadow bh's that clone()ed me.
+  BufferHead*       shadow_of;
 
  private:
   int        ref;
@@ -76,8 +79,12 @@ class BufferHead : public LRUObject {
   BufferHead(ObjectCache *o) :
     oc(o), //cancellable_ioh(0), tx_epoch(0),
     rx_ioh(0), tx_ioh(0), tx_block(0), partial_tx_to(0), partial_tx_epoch(0),
+    shadow_of(0),
     ref(0), state(STATE_MISSING), epoch_modified(0), version(0), last_flushed(0)
     {}
+  ~BufferHead() {
+    unpin_shadows();
+  }
   
   ObjectCache *get_oc() { return oc; }
 
@@ -115,6 +122,11 @@ class BufferHead : public LRUObject {
   void set_state(int s) {
     if (s == STATE_PARTIAL || s == STATE_RX || s == STATE_TX) get();
     if (state == STATE_PARTIAL || state == STATE_RX || state == STATE_TX) put();
+
+    if ((state == STATE_TX && s != STATE_TX) ||
+       (state == STATE_PARTIAL && s != STATE_PARTIAL)) 
+      unpin_shadows();
+
     state = s;
   }
   int get_state() { return state; }
@@ -131,6 +143,26 @@ class BufferHead : public LRUObject {
   //void cancel_partials();
   //void queue_partial_write(block_t b);
 
+  void add_shadow(BufferHead *dup) {
+    shadows.insert(dup);
+    dup->shadow_of = this;
+    dup->get();
+  }
+  void remove_shadow(BufferHead *dup) {
+    shadows.erase(dup);
+    dup->shadow_of = 0;
+    dup->put();
+  }
+  void unpin_shadows() {
+    for (set<BufferHead*>::iterator p = shadows.begin();
+        p != shadows.end();
+        ++p) {
+      //cout << "unpin shadow " << *p << endl;
+      (*p)->shadow_of = 0;
+      (*p)->put();
+    }
+    shadows.clear();
+  }
 
   void copy_partial_substr(off_t start, off_t end, bufferlist& bl) {
     map<off_t, bufferlist>::iterator i = partial.begin();
@@ -386,6 +418,8 @@ class ObjectCache {
   void truncate(block_t blocks, version_t super_epoch);
   //  void tear_down();
 
+  void clone_to(Onode *other);
+
   void dump() {
     for (map<block_t,BufferHead*>::iterator i = data.begin();
          i != data.end();
@@ -441,6 +475,7 @@ class BufferCache {
   };
 
   map<block_t, map<block_t, PartialWrite> > partial_write;  // queued writes w/ partial content
+  map<block_t, set<BufferHead*> >           shadow_partials;
 
  public:
   BufferCache(BlockDevice& d, Mutex& el) : 
@@ -589,6 +624,9 @@ class BufferCache {
   void queue_partial(block_t from, block_t to, map<off_t, bufferlist>& partial, version_t epoch);
   void cancel_partial(block_t from, block_t to, version_t epoch);
 
+  void add_shadow_partial(block_t from, BufferHead *bh);
+  void cancel_shadow_partial(block_t from, BufferHead *bh);
+
   void rx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, block_t diskstart, bufferlist& bl);
   void tx_finish(ObjectCache *oc, ioh_t ioh, block_t start, block_t len, version_t v, version_t e);
   void partial_tx_finish(version_t epoch);
index eab79e7756ff98d2e92a0c2e5c7157b803f48ded..be67e2d9181fbb8909e02a022d31e1511f82ace2 100644 (file)
@@ -619,6 +619,7 @@ Onode* Ebofs::get_onode(object_t oid)
            << eo->num_extents << " extents" << endl;
       assert(eo->object_id == oid);
     }
+    on->readonly = eo->readonly;
     on->onode_loc = eo->onode_loc;
     on->object_size = eo->object_size;
     on->object_blocks = eo->object_blocks;
@@ -684,6 +685,7 @@ void Ebofs::encode_onode(Onode *on, bufferlist& bl, unsigned& off)
 {
   // onode
   struct ebofs_onode eo;
+  eo.readonly = on->readonly;
   eo.onode_loc = on->onode_loc;
   eo.object_id = on->object_id;
   eo.object_size = on->object_size;
@@ -2156,6 +2158,10 @@ int Ebofs::_write(object_t oid, off_t offset, size_t length, bufferlist& bl)
   // get|create inode
   Onode *on = get_onode(oid);
   if (!on) on = new_onode(oid);    // new inode!
+  if (on->readonly) {
+    put_onode(on);
+    return -EACCES;
+  }
 
   dirty_onode(on);  // dirty onode!
   
@@ -2264,7 +2270,11 @@ int Ebofs::_truncate(object_t oid, off_t size)
   Onode *on = get_onode(oid);
   if (!on) 
     return -ENOENT;
-  
+  if (on->readonly) {
+    put_onode(on);
+    return -EACCES;
+  }
+
   int r = 0;
   if (size > on->object_size) {
     r = -EINVAL;  // whatever
@@ -2285,8 +2295,11 @@ int Ebofs::_truncate(object_t oid, off_t size)
     }
 
     // truncate buffer cache
-    if (on->oc)
+    if (on->oc) {
       on->oc->truncate(on->object_blocks, super_epoch);
+      if (on->oc->is_empty())
+       on->close_oc();
+    }
 
     // update uncommitted
     interval_set<block_t> uncom;
@@ -2346,17 +2359,21 @@ int Ebofs::clone(object_t from, object_t to, Context *onsafe)
 
 int Ebofs::_clone(object_t from, object_t to)
 {
+  dout(7) << "_clone " << from << " -> " << to << endl;
+
   Onode *fon = get_onode(from);
   if (!fon) return -ENOENT;
   Onode *ton = get_onode(to);
   if (ton) {
     put_onode(fon);
+    put_onode(ton);
     return -EEXIST;
   }
   ton = new_onode(to); 
   assert(ton);
   
   // copy easy bits
+  ton->readonly = true;
   ton->object_size = fon->object_size;
   ton->object_blocks = fon->object_blocks;
   ton->attr = fon->attr;
@@ -2375,6 +2392,16 @@ int Ebofs::_clone(object_t from, object_t to)
     allocator.alloc_inc(p->second);
   }
 
+  // clear uncommitted
+  fon->uncommitted.clear();
+
+  // muck with ObjectCache
+  if (fon->oc) 
+    fon->oc->clone_to( ton );
+  
+  // ok!
+  put_onode(ton);
+  put_onode(fon);
   return 0;
 }
 
@@ -2465,6 +2492,10 @@ int Ebofs::_setattr(object_t oid, const char *name, const void *value, size_t si
 
   Onode *on = get_onode(oid);
   if (!on) return -ENOENT;
+  if (on->readonly) {
+    put_onode(on);
+    return -EACCES;
+  }
 
   string n(name);
   on->attr[n] = buffer::copy((char*)value, size);
@@ -2498,6 +2529,10 @@ int Ebofs::_setattrs(object_t oid, map<string,bufferptr>& attrset)
 
   Onode *on = get_onode(oid);
   if (!on) return -ENOENT;
+  if (on->readonly) {
+    put_onode(on);
+    return -EACCES;
+  }
 
   on->attr = attrset;
   dirty_onode(on);
@@ -2577,6 +2612,10 @@ int Ebofs::_rmattr(object_t oid, const char *name)
 
   Onode *on = get_onode(oid);
   if (!on) return -ENOENT;
+  if (on->readonly) {
+    put_onode(on);
+    return -EACCES;
+  }
 
   string n(name);
   on->attr.erase(n);
index a551e99b0554107827e7445b963b3df93d7ab345..f9cd3b7346a2e6ca26d54a12f72102ecb75f14c7 100644 (file)
@@ -42,6 +42,7 @@ public:
   version_t version;      // incremented on each modify.
 
   // data
+  bool     readonly;
   Extent   onode_loc;
   off_t    object_size;
   unsigned object_blocks;
@@ -64,8 +65,9 @@ public:
 
  public:
   Onode(object_t oid) : ref(0), object_id(oid), version(0),
-    object_size(0), object_blocks(0), oc(0),
-    dirty(false), dangling(false), deleted(false) { 
+                       readonly(false),
+                       object_size(0), object_blocks(0), oc(0),
+                       dirty(false), dangling(false), deleted(false) { 
     onode_loc.length = 0;
   }
   ~Onode() {
@@ -79,12 +81,12 @@ public:
   void get() {
     if (ref == 0) lru_pin();
     ref++;
-    //cout << "ebofs.onode.get " << hex << object_id << dec << " " << ref << endl;
+    cout << "ebofs.onode.get " << hex << object_id << dec << " " << ref << endl;
   }
   void put() {
     ref--;
     if (ref == 0) lru_unpin();
-    //cout << "ebofs.onode.put " << hex << object_id << dec << " " << ref << endl;
+    cout << "ebofs.onode.put " << hex << object_id << dec << " " << ref << endl;
   }
 
   void mark_dirty() {
index c6d2dfbd5d75101e3ce3a5ebac6325427f97704a..0e6a7625c502a7fe5661a2711f605a5361fd266b 100644 (file)
@@ -32,7 +32,7 @@ public:
 
     while (!stop) {
       object_t oid;
-      oid.ino = (rand() % 2) + 0x10000000;
+      oid.ino = (rand() % 10) + 0x10000000;
       coll_t cid = rand() % 50;
       off_t off = rand() % 10000;//0;//rand() % 1000000;
       off_t len = 1+rand() % 100000;
@@ -40,9 +40,10 @@ public:
       if (rand() % 2) a = "two";
       int l = 3;//rand() % 10;
 
-      switch (rand() % 9) {
+      switch (rand() % 10) {
       case 0:
         {
+         oid.rev = rand() % 10;
           cout << t << " read " << hex << oid << dec << " at " << off << " len " << len << endl;
           bufferlist bl;
           fs.read(oid, off, len, bl);
@@ -115,6 +116,14 @@ public:
           fs.truncate(oid, 0);
         }
         break;
+
+      case 9:
+       {
+         object_t newoid = oid;
+         newoid.rev = rand() % 10;
+         cout << t << " clone " << oid << " to " << newoid << endl;
+         fs.clone(oid, newoid, 0);
+       }
       }
 
 
@@ -141,6 +150,47 @@ int main(int argc, char **argv)
   Ebofs fs(filename);
   if (fs.mount() < 0) return -1;
 
+
+  // explicit tests
+  if (1) {
+    // verify that clone() plays nice with partial writes
+    object_t oid(1,1);
+    bufferptr bp(10000);
+    bp.zero();
+    bufferlist bl;
+    bl.push_back(bp);
+    fs.write(oid, 0, 10000, bl, 0);
+
+    fs.sync();
+    fs.trim_buffer_cache();
+
+    // induce a partial write
+    bufferlist bl2;
+    bl2.substr_of(bl, 0, 100);
+    fs.write(oid, 100, 100, bl2, 0);
+
+    // clone it
+    object_t oid2;
+    oid2 = oid;
+    oid2.rev = 1;
+    fs.clone(oid, oid2, 0);
+
+    // ... 
+    if (0) {
+      // make sure partial still behaves after orig is removed...
+      fs.remove(oid, 0);
+
+      // or i read for oid2...
+      bufferlist rbl;
+      fs.read(oid2, 0, 200, rbl);
+    }
+    if (1) {
+      // make sure things behave if we remove the clone
+      fs.remove(oid2,0);
+    }
+  }
+  // /explicit tests
+
   list<Tester*> ls;
   for (int i=0; i<threads; i++) {
     Tester *t = new Tester(fs);
index 0e9c20edfa1c7c4c314e96234f1f5cca34cb2d1a..1b85d138ec342ce69b11137feaa8188b05bfbafd 100644 (file)
@@ -109,6 +109,7 @@ struct ebofs_onode {
   object_t   object_id;       /* for kicks */
   off_t      object_size;     /* file size in bytes.  should this be 64-bit? */
   unsigned   object_blocks;
+  bool       readonly;
   
   int        num_collections;
   int        num_attr;        // num attr in onode