]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
new objectstore transactions. way slicker!
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 18 May 2006 18:31:00 +0000 (18:31 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 18 May 2006 18:31:00 +0000 (18:31 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@770 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/ebofs/Ebofs.cc
ceph/ebofs/Ebofs.h
ceph/osd/Fake.h
ceph/osd/OSD.cc
ceph/osd/ObjectStore.h

index 8206c07a3c71d1ad19fc209f4463b939519a32e5..23f97d3c6bf4dbadc4595c3308d21753cec9dd3a 100644 (file)
@@ -1737,6 +1737,192 @@ bool Ebofs::write_will_block()
 }
 
 
+unsigned Ebofs::apply_transaction(Transaction& t, Context *onsafe)
+{
+  ebofs_lock.Lock();
+  dout(7) << "apply_transaction start (" << t.ops.size() << " ops)" << endl;
+
+  // do ops
+  unsigned r = 0;  // bit fields indicate which ops failed.
+  int bit = 1;
+  for (list<int>::iterator p = t.ops.begin();
+          p != t.ops.end();
+          p++) {
+       switch (*p) {
+       case Transaction::OP_WRITE:
+         {
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               off_t offset = t.offsets.front(); t.offsets.pop_front();
+               size_t len = t.lengths.front(); t.lengths.pop_front();
+               bufferlist bl = t.bls.front(); t.bls.pop_front();
+               if (_write(oid, len, offset, bl) < 0) {
+                 dout(7) << "apply_transaction fail on _write" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+
+       case Transaction::OP_TRUNCATE:
+         {
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               size_t len = t.lengths.front(); t.lengths.pop_front();
+               if (_truncate(oid, len) < 0) {
+                 dout(7) << "apply_transaction fail on _truncate" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_REMOVE:
+         {
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               if (_remove(oid) < 0) {
+                 dout(7) << "apply_transaction fail on _remove" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_SETATTR:
+         {
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+               pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+               if (_setattr(oid, attrname, attrval.first, attrval.second) < 0) {
+                 dout(7) << "apply_transaction fail on _setattr" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_RMATTR:
+         {
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+               if (_rmattr(oid, attrname) < 0) {
+                 dout(7) << "apply_transaction fail on _rmattr" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+
+       case Transaction::OP_MKCOLL:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               if (_create_collection(cid) < 0) {
+                 dout(7) << "apply_transaction fail on _create_collection" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_RMCOLL:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               if (_destroy_collection(cid) < 0) {
+                 dout(7) << "apply_transaction fail on _remove_collection" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_COLL_ADD:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               if (_collection_add(cid, oid) < 0) {
+                 //dout(7) << "apply_transaction fail on _collection_add" << endl;
+                 //r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_COLL_REMOVE:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               object_t oid = t.oids.front(); t.oids.pop_front();
+               if (_collection_remove(cid, oid) < 0) {
+                 dout(7) << "apply_transaction fail on _collection_remove" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_COLL_SETATTR:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+               pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+               if (_collection_setattr(cid, attrname, attrval.first, attrval.second) < 0) {
+                 dout(7) << "apply_transaction fail on _collection_setattr" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       case Transaction::OP_COLL_RMATTR:
+         {
+               coll_t cid = t.cids.front(); t.cids.pop_front();
+               const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+               if (_collection_rmattr(cid, attrname) < 0) {
+                 dout(7) << "apply_transaction fail on _collection_rmattr" << endl;
+                 r &= bit;
+               }
+         }
+         break;
+         
+       default:
+         cerr << "bad op " << *p << endl;
+         assert(0);
+       }
+
+       bit = bit << 1;
+  }
+  
+  dout(7) << "apply_transaction finish (r = " << r << ")" << endl;
+  
+  // set up commit waiter
+  if (r == 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+  
+  ebofs_lock.Unlock();
+  return r;
+}
+
+
+
+int Ebofs::_write(object_t oid, size_t length, off_t offset, bufferlist& bl)
+{
+  dout(7) << "_write " << hex << oid << dec << " len " << length << " off " << offset << endl;
+
+  // out of space?
+  unsigned max = (length+offset) / EBOFS_BLOCK_SIZE + 10;  // very conservative; assumes we have to rewrite
+  max += dirty_onodes.size() + dirty_cnodes.size();
+  if (max >= free_blocks) {
+       dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl;
+       return -ENOSPC;
+  }
+  
+  // get|create inode
+  Onode *on = get_onode(oid);
+  if (!on) on = new_onode(oid);        // new inode!
+
+  dirty_onode(on);  // dirty onode!
+  
+  // apply write to buffer cache
+  apply_write(on, length, offset, bl);
+
+  // done.
+  put_onode(on);
+  trim_bc();
+
+  return length;
+}
+
+
 int Ebofs::write(object_t oid, 
                                 size_t len, off_t off, 
                                 bufferlist& bl, bool fsync)
@@ -1769,146 +1955,78 @@ int Ebofs::write(object_t oid,
 int Ebofs::write(object_t oid, 
                                 size_t len, off_t off, 
                                 bufferlist& bl, Context *onsafe)
-{
-  static map<const char*, pair<void*,int> > null_setattrs;
-  static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
-  return write_transaction(oid, len, off, bl, null_setattrs, null_cmods, onsafe);
-}
-
-int Ebofs::write_transaction(object_t oid, 
-                                                        size_t len, off_t off, 
-                                                        bufferlist& bl, 
-                                                        map<const char*, pair<void*,int> >& setattrs,
-                                                        map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                                        Context *onsafe) 
 {
   ebofs_lock.Lock();
-  dout(7) << "write " << hex << oid << dec << " len " << len << " off " << off << endl;
   assert(len > 0);
 
   // too much unflushed dirty data?  (if so, block!)
   if (_write_will_block()) {
-       dout(10) << "write blocking" << endl;
+       dout(10) << "write blocking " 
+                        << hex << oid << dec << " len " << len << " off " << off << endl;
+
        while (_write_will_block()) 
          bc.waitfor_stat();  // waits on ebofs_lock
-       dout(7) << "write unblocked " << hex << oid << dec << " len " << len << " off " << off << endl;
-  }
 
-  // out of space?
-  //unsigned max = ((len+off) - MIN(off, on->object_size)) / EBOFS_BLOCK_SIZE + 10;
-  unsigned max = (len+off) / EBOFS_BLOCK_SIZE + 10;  // very conservative; assumes we have to rewrite
-  max += dirty_onodes.size() + dirty_cnodes.size();
-  if (max >= free_blocks) {
-       dout(1) << "write failing, only " << free_blocks << " blocks free, may need up to " << max << endl;
-       if (onsafe) delete onsafe;
-       ebofs_lock.Unlock();
-       return -ENOSPC;
+       dout(7) << "write unblocked " 
+                       << hex << oid << dec << " len " << len << " off " << off << endl;
   }
-  
-  // get|create inode
-  Onode *on = get_onode(oid);
-  if (!on) on = new_onode(oid);        // new inode!
 
-  dirty_onode(on);  // dirty onode!
-  
-  // apply write to buffer cache
-  apply_write(on, len, off, bl);
-
-  // join collections?
-  for (map<coll_t, map<const char*, pair<void*,int> > >::iterator cit = cmods.begin();
-          cit != cmods.end();
-          cit++) {
-       const coll_t cid = cit->first;
-       if (_collection_exists(cid)) {
-         if (oc_tab->lookup(idpair_t(oid,cid)) < 0) {
-               oc_tab->insert(idpair_t(oid,cid), true);
-               co_tab->insert(idpair_t(cid,oid), true);
-         }
-       }
-  }
+  // go
+  int r = _write(oid, len, off, bl);
 
-  // apply attribute changes
-  do_setattrs(on, setattrs);
-  do_csetattrs(cmods);
-
-  // prepare (eventual) journal entry?
-
-  // set up commit waiter
-  if (onsafe) {
-       // commit on next full fs commit.
-       // FIXME when we add journaling.
-       commit_waiters[super_epoch].push_back(onsafe);
-       //on->commit_waiters.push_back(onsafe);        // in case we delete the object.
+  // commit waiter
+  if (r > 0) {
+       assert((size_t)r == len);
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
   }
 
-  // done
-  put_onode(on);
-
-  trim_bc();
-
   ebofs_lock.Unlock();
-  return len;
+  return r;
 }
 
 
-
-int Ebofs::remove(object_t oid, Context *onsafe)
+int Ebofs::_remove(object_t oid)
 {
-  static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
-  return remove_transaction(oid, null_cmods, onsafe);
-}
+  dout(7) << "_remove " << hex << oid << dec << endl;
 
-int Ebofs::remove_transaction(object_t oid, 
-                                                         map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                                         Context *onsafe)
-{
-  ebofs_lock.Lock();
-  dout(7) << "remove " << hex << oid << dec << endl;
-  
   // get inode
   Onode *on = get_onode(oid);
-  if (!on) {
-       ebofs_lock.Unlock();
-       return -ENOENT;
-  }
+  if (!on) return -ENOENT;
 
   // ok remove it!
   remove_onode(on);
 
-  // do collection mods
-  do_csetattrs(cmods);
+  return 0;
+}
+
+
+int Ebofs::remove(object_t oid, Context *onsafe)
+{
+  ebofs_lock.Lock();
+
+  // do it
+  int r = _remove(oid);
 
   // set up commit waiter
-  if (onsafe) {
-       // commit on next full fs commit.
-       // FIXME when we add journaling.
-       commit_waiters[super_epoch].push_back(onsafe);
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
   }
 
   ebofs_lock.Unlock();
-  return 0;
+  return r;
 }
 
-int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
+int Ebofs::_truncate(object_t oid, off_t size)
 {
-  static map<const char*, pair<void*,int> > null_setattrs;
-  static map<coll_t, map<const char*, pair<void*,int> > > null_cmods;
-  return truncate_transaction(oid, size, null_setattrs, null_cmods, onsafe);
-}
+  dout(7) << "_truncate " << hex << oid << dec << " size " << size << endl;
 
-int Ebofs::truncate_transaction(object_t oid, off_t size,
-                                                               map<const char*, pair<void*,int> >& setattrs,
-                                                               map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                                               Context *onsafe)
-{
-  ebofs_lock.Lock();
-  dout(7) << "truncate " << hex << oid << dec << " size " << size << endl;
-  
   Onode *on = get_onode(oid);
-  if (!on) {
-       ebofs_lock.Unlock();
+  if (!on) 
        return -ENOENT;
-  }
   
   int r = 0;
   if (size > on->object_size) {
@@ -1943,15 +2061,24 @@ int Ebofs::truncate_transaction(object_t oid, off_t size,
        assert(size == on->object_size);
   }
 
-  // apply attribute changes
-  do_setattrs(on, setattrs);
-  do_csetattrs(cmods);
+  put_onode(on);
+  return r;
+}
 
-  // set up commit waiter
-  if (onsafe) 
-       commit_waiters[super_epoch].push_back(onsafe);
+
+int Ebofs::truncate(object_t oid, off_t size, Context *onsafe)
+{
+  ebofs_lock.Lock();
   
-  put_onode(on);
+  int r = _truncate(oid, size);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
   ebofs_lock.Unlock();
   return r;
 }
@@ -1985,76 +2112,36 @@ int Ebofs::stat(object_t oid, struct stat *st)
   return 0;
 }
 
-// attributes
 
-void Ebofs::do_setattrs(Onode *on, map<const char*, pair<void*,int> > &setattrs)
+int Ebofs::_setattr(object_t oid, const char *name, const void *value, size_t size) 
 {
-  for (map<const char*, pair<void*,int> >::iterator it = setattrs.begin();
-          it != setattrs.end();
-          it++) {
-       string n(it->first);
-       if (it->second.first) {
-         AttrVal val((char*)it->second.first, it->second.second);
-         on->attr[n] = val;
-       } else {
-         on->attr.erase(n);
-       }
-  }
-}
-                                               
-void Ebofs::do_csetattrs(map<coll_t, map<const char*, pair<void*,int> > > &cmods)
-{
-  for (map<coll_t, map<const char*, pair<void*,int> > >::iterator cit = cmods.begin();
-          cit != cmods.end();
-          cit++) {
-       const coll_t cid = cit->first;
-
-       if (!cit->second.empty()) {
-         // attrs
-         Cnode *cn = get_cnode(cid);
-         if (cn) {
-               for (map<const char*, pair<void*,int> >::iterator ait = cit->second.begin();
-                        ait != cit->second.end();
-                        ait++) {
-                 string n(ait->first);
-                 if (ait->second.first) {
-                       AttrVal val((char*)ait->second.first, ait->second.second);
-                       cn->attr[n] = val;
-                 } else {
-                       cn->attr.erase(n);
-                 }
-               }
-               dirty_cnode(cn);
-               put_cnode(cn);
-         } else {
-               dout(0) << "warning: collection " << hex << cid << dec << " dne for csetattrs" << endl;
-         }
-       }       
-  }
-}
-
-int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe)
-{
-  ebofs_lock.Lock();
   dout(8) << "setattr " << hex << oid << dec << " '" << name << "' len " << size << endl;
 
   Onode *on = get_onode(oid);
-  if (!on) {
-       ebofs_lock.Unlock();
-       return -ENOENT;
-  }
+  if (!on) return -ENOENT;
 
   string n(name);
   AttrVal val((char*)value, size);
   on->attr[n] = val;
   dirty_onode(on);
+  put_onode(on);
+  return 0;
+}
 
-  if (onsafe) 
-       commit_waiters[super_epoch].push_back(onsafe);
+int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe)
+{
+  ebofs_lock.Lock();
+  int r = _setattr(oid, name, value, size);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
 
-  put_onode(on);
   ebofs_lock.Unlock();
-  return 0;
+  return r;
 }
 
 int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size)
@@ -2081,29 +2168,38 @@ int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size)
   return r;
 }
 
-int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) 
+
+int Ebofs::_rmattr(object_t oid, const char *name) 
 {
-  ebofs_lock.Lock();
-  dout(8) << "rmattr " << hex << oid << dec << " '" << name << "'" << endl;
+  dout(8) << "_rmattr " << hex << oid << dec << " '" << name << "'" << endl;
 
   Onode *on = get_onode(oid);
-  if (!on) {
-       ebofs_lock.Unlock();
-       return -ENOENT;
-  }
+  if (!on) return -ENOENT;
 
   string n(name);
   on->attr.erase(n);
-
-  if (onsafe) 
-       commit_waiters[super_epoch].push_back(onsafe);
-
   dirty_onode(on);
   put_onode(on);
-  ebofs_lock.Unlock();
   return 0;
 }
 
+int Ebofs::rmattr(object_t oid, const char *name, Context *onsafe) 
+{
+  ebofs_lock.Lock();
+
+  int r = _rmattr(oid, name);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
+  ebofs_lock.Unlock();
+  return r;
+}
+
 int Ebofs::listattr(object_t oid, vector<string>& attrs)
 {
   ebofs_lock.Lock();
@@ -2151,32 +2247,42 @@ int Ebofs::list_collections(list<coll_t>& ls)
   return num;
 }
 
-int Ebofs::create_collection(coll_t cid)
+int Ebofs::_create_collection(coll_t cid)
 {
-  ebofs_lock.Lock();
-  dout(9) << "create_collection " << hex << cid << dec << endl;
+  dout(9) << "_create_collection " << hex << cid << dec << endl;
   
-  if (_collection_exists(cid)) {
-       ebofs_lock.Unlock();
+  if (_collection_exists(cid)) 
        return -EEXIST;
-  }
 
   Cnode *cn = new_cnode(cid);
   put_cnode(cn);
+  
+  return 0;  
+}
+
+int Ebofs::create_collection(coll_t cid, Context *onsafe)
+{
+  ebofs_lock.Lock();
+
+  int r = _create_collection(cid);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
 
   ebofs_lock.Unlock();
-  return 0;
+  return r;
 }
 
-int Ebofs::destroy_collection(coll_t cid)
+int Ebofs::_destroy_collection(coll_t cid)
 {
-  ebofs_lock.Lock();
-  dout(9) << "destroy_collection " << hex << cid << dec << endl;
+  dout(9) << "_destroy_collection " << hex << cid << dec << endl;
 
-  if (!_collection_exists(cid)) {
-       ebofs_lock.Unlock();
+  if (!_collection_exists(cid)) 
        return -ENOENT;
-  }
 
   Cnode *cn = get_cnode(cid);
   assert(cn);
@@ -2192,15 +2298,30 @@ int Ebofs::destroy_collection(coll_t cid)
   }
 
   remove_cnode(cn);
-  ebofs_lock.Lock();
   return 0;
 }
 
+int Ebofs::destroy_collection(coll_t cid, Context *onsafe)
+{
+  ebofs_lock.Lock();
+
+  int r = _destroy_collection(cid);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
+  ebofs_lock.Unlock();
+  return r;
+}
+
 bool Ebofs::collection_exists(coll_t cid)
 {
   ebofs_lock.Lock();
   dout(10) << "collection_exists " << hex << cid << dec << endl;
-
   bool r = _collection_exists(cid);
   ebofs_lock.Unlock();
   return r;
@@ -2210,32 +2331,46 @@ bool Ebofs::_collection_exists(coll_t cid)
   return (collection_tab->lookup(cid) == 0);
 }
 
-int Ebofs::collection_add(coll_t cid, object_t oid)
+int Ebofs::_collection_add(coll_t cid, object_t oid)
 {
-  ebofs_lock.Lock();
-  dout(9) << "collection_add " << hex << cid << " object " << oid << dec << endl;
+  dout(9) << "_collection_add " << hex << cid << " object " << oid << dec << endl;
 
-  if (!_collection_exists(cid)) {
-       ebofs_lock.Unlock();
+  if (!_collection_exists(cid)) 
        return -ENOENT;
-  }
+
   if (oc_tab->lookup(idpair_t(oid,cid)) < 0) {
        oc_tab->insert(idpair_t(oid,cid), true);
        co_tab->insert(idpair_t(cid,oid), true);
+       return 0;
+  } else {
+       return -ENOENT;  // FIXME?
   }
+}
+
+int Ebofs::collection_add(coll_t cid, object_t oid, Context *onsafe)
+{
+  ebofs_lock.Lock();
+
+  int r = _collection_add(cid, oid);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
   ebofs_lock.Unlock();
   return 0;
 }
 
-int Ebofs::collection_remove(coll_t cid, object_t oid)
+int Ebofs::_collection_remove(coll_t cid, object_t oid)
 {
-  ebofs_lock.Lock();
-  dout(9) << "collection_remove " << hex << cid << " object " << oid << dec << endl;
+  dout(9) << "_collection_remove " << hex << cid << " object " << oid << dec << endl;
 
-  if (!_collection_exists(cid)) {
-       ebofs_lock.Unlock();
+  if (!_collection_exists(cid)) 
        return -ENOENT;
-  }
+
   oc_tab->remove(idpair_t(oid,cid));
   co_tab->remove(idpair_t(cid,oid));
 
@@ -2245,6 +2380,23 @@ int Ebofs::collection_remove(coll_t cid, object_t oid)
        cnode_map.erase(cid);
        cnode_lru.lru_remove(cn);
        delete cn;
+       return 0;
+  } else {
+       return -ENOENT;  // FIXME?
+  } 
+}
+
+int Ebofs::collection_remove(coll_t cid, object_t oid, Context *onsafe)
+{
+  ebofs_lock.Lock();
+
+  int r = _collection_remove(cid, oid);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
   }
 
   ebofs_lock.Unlock();
@@ -2280,23 +2432,36 @@ int Ebofs::collection_list(coll_t cid, list<object_t>& ls)
 }
 
 
-int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size)
+int Ebofs::_collection_setattr(coll_t cid, const char *name, const void *value, size_t size)
 {
-  ebofs_lock.Lock();
-  dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
+  dout(10) << "_collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
 
   Cnode *cn = get_cnode(cid);
-  if (!cn) {
-       ebofs_lock.Unlock();
-       return -ENOENT;
-  }
+  if (!cn) return -ENOENT;
 
   string n(name);
   AttrVal val((char*)value, size);
   cn->attr[n] = val;
   dirty_cnode(cn);
-
   put_cnode(cn);
+
+  return 0;
+}
+
+int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe)
+{
+  ebofs_lock.Lock();
+  dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl;
+
+  int r = _collection_setattr(cid, name, value, size);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
   ebofs_lock.Unlock();
   return 0;
 }
@@ -2326,22 +2491,35 @@ int Ebofs::collection_getattr(coll_t cid, const char *name, void *value, size_t
   return r;
 }
 
-int Ebofs::collection_rmattr(coll_t cid, const char *name) 
+int Ebofs::_collection_rmattr(coll_t cid, const char *name) 
 {
-  ebofs_lock.Lock();
-  dout(10) << "collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl;
+  dout(10) << "_collection_rmattr " << hex << cid << dec << " '" << name << "'" << endl;
 
   Cnode *cn = get_cnode(cid);
-  if (!cn) {
-       ebofs_lock.Unlock();
-       return -ENOENT;
-  }
+  if (!cn) return -ENOENT;
 
   string n(name);
   cn->attr.erase(n);
 
   dirty_cnode(cn);
   put_cnode(cn);
+
+  return 0;
+}
+
+int Ebofs::collection_rmattr(coll_t cid, const char *name, Context *onsafe) 
+{
+  ebofs_lock.Lock();
+
+  int r = _collection_rmattr(cid, name);
+
+  // set up commit waiter
+  if (r >= 0) {
+       if (onsafe) commit_waiters[super_epoch].push_back(onsafe);
+  } else {
+       if (onsafe) delete onsafe;
+  }
+
   ebofs_lock.Unlock();
   return 0;
 }
index cacca9b5de32d975ff1ee0407591227505d173ea..ed2b4c4d2ffefb7d446a0047c1619b88c96badca 100644 (file)
@@ -196,7 +196,6 @@ class Ebofs : public ObjectStore {
   } finisher_thread;
 
 
-  bool _write_will_block();
   void alloc_more_node_space();
 
   void do_csetattrs(map<coll_t, map<const char*, pair<void*,int> > > &cmods);
@@ -232,63 +231,56 @@ class Ebofs : public ObjectStore {
   
   int statfs(struct statfs *buf);
 
+  // atomic transaction
+  unsigned apply_transaction(Transaction& t, Context *onsafe);
 
   // object interface
   bool exists(object_t);
   int stat(object_t, struct stat*);
   int read(object_t, size_t len, off_t off, bufferlist& bl);
-  int write(object_t oid, 
-                       size_t len, off_t off, 
-                       bufferlist& bl, bool fsync=true);
-  int write(object_t oid, 
-                       size_t len, off_t offset, 
-                       bufferlist& bl, 
-                       Context *onsafe);
-  int write_transaction(object_t oid, 
-                                               size_t len, off_t offset, 
-                                               bufferlist& bl, 
-                                               map<const char*, pair<void*,int> >& setattrs,
-                                               map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                               Context *onsafe);
-
-  int truncate(object_t oid, off_t size,
-                          Context *onsafe=0);
-  int truncate_transaction(object_t oid, off_t size, 
-                                                  map<const char*, pair<void*,int> >& setattrs,
-                                                  map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                                  Context *onsafe);
-
-  int remove(object_t oid,
-                        Context *onsafe=0);
-  int remove_transaction(object_t,
-                                                map<coll_t, map<const char*, pair<void*,int> > >& cmods,
-                                                Context *onsafe=0);
-
+  int write(object_t oid, size_t len, off_t off, bufferlist& bl, bool fsync=true);
+  int write(object_t oid, size_t len, off_t offset, bufferlist& bl, Context *onsafe);
+  int truncate(object_t oid, off_t size, Context *onsafe=0);
+  int remove(object_t oid, Context *onsafe=0);
   bool write_will_block();
 
   // object attr
-  int setattr(object_t oid, const char *name, const void *value, size_t size,
-                         Context *onsafe=0);
+  int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0);
   int getattr(object_t oid, const char *name, void *value, size_t size);
-  int rmattr(object_t oid, const char *name,
-                        Context *onsafe=0);
+  int rmattr(object_t oid, const char *name, Context *onsafe=0);
   int listattr(object_t oid, vector<string>& attrs);
-  
+
   // collections
   int list_collections(list<coll_t>& ls);
-  //int collection_stat(coll_t c, struct stat *st);
-  int create_collection(coll_t c);
-  int destroy_collection(coll_t c);
-
-  bool _collection_exists(coll_t c);
   bool collection_exists(coll_t c);
-  int collection_add(coll_t c, object_t o);
-  int collection_remove(coll_t c, object_t o);
+
+  int create_collection(coll_t c, Context *onsafe);
+  int destroy_collection(coll_t c, Context *onsafe);
+  int collection_add(coll_t c, object_t o, Context *onsafe);
+  int collection_remove(coll_t c, object_t o, Context *onsafe);
+
   int collection_list(coll_t c, list<object_t>& o);
   
-  int collection_setattr(object_t oid, const char *name, const void *value, size_t size);
+  int collection_setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe);
   int collection_getattr(object_t oid, const char *name, void *value, size_t size);
-  int collection_rmattr(coll_t cid, const char *name);
+  int collection_rmattr(coll_t cid, const char *name, Context *onsafe);
   int collection_listattr(object_t oid, vector<string>& attrs);
   
+
+private:
+  // private interface -- use if caller already holds lock
+  bool _write_will_block();
+  int _write(object_t oid, size_t len, off_t offset, bufferlist& bl);
+  int _truncate(object_t oid, off_t size);
+  int _remove(object_t oid);
+  int _setattr(object_t oid, const char *name, const void *value, size_t size);
+  int _rmattr(object_t oid, const char *name);
+  bool _collection_exists(coll_t c);
+  int _create_collection(coll_t c);
+  int _destroy_collection(coll_t c);
+  int _collection_add(coll_t c, object_t o);
+  int _collection_remove(coll_t c, object_t o);
+  int _collection_setattr(object_t oid, const char *name, const void *value, size_t size);
+  int _collection_rmattr(coll_t cid, const char *name);
+  
 };
index 3d20fe723927609cb14e8105f32bd7eb55b72d44..44bde8b42ab1d85efd4d5c2429e05c4faa78927b 100644 (file)
@@ -45,19 +45,23 @@ class FakeStoreCollections {
        return r;
   }
 
-  int create_collection(coll_t c) {
+  int create_collection(coll_t c,
+                                               Context *onsafe=0) {
        faker_lock.Lock();
        fakecollections[c].size();
+       if (onsafe) g_timer.add_event_after(2.0,onsafe);
        faker_lock.Unlock();
        return 0;
   }
 
-  int destroy_collection(coll_t c) {
+  int destroy_collection(coll_t c,
+                                                Context *onsafe=0) {
        int r = 0;
        faker_lock.Lock();
        if (fakecollections.count(c)) {
          fakecollections.erase(c);
          //fakecattr.erase(c);
+         if (onsafe) g_timer.add_event_after(2.0,onsafe);
        } else 
          r = -1;
        faker_lock.Unlock();
@@ -75,16 +79,20 @@ class FakeStoreCollections {
        return r;
   }
 
-  int collection_add(coll_t c, object_t o) {
+  int collection_add(coll_t c, object_t o,
+                                        Context *onsafe=0) {
        faker_lock.Lock();
        fakecollections[c].insert(o);
+       if (onsafe) g_timer.add_event_after(2.0,onsafe);
        faker_lock.Unlock();
        return 0;
   }
 
-  int collection_remove(coll_t c, object_t o) {
+  int collection_remove(coll_t c, object_t o,
+                                               Context *onsafe=0) {
        faker_lock.Lock();
        fakecollections[c].erase(o);
+       if (onsafe) g_timer.add_event_after(2.0,onsafe);
        faker_lock.Unlock();
        return 0;
   }
@@ -182,9 +190,19 @@ class FakeStoreAttrs {
   }
 
   int collection_setattr(coll_t c, const char *name,
-                                                void *value, size_t size) {
+                                                void *value, size_t size,
+                                                Context *onsafe=0) {
        faker_lock.Lock();
        int r = fakecattrs[c].setattr(name, value, size);
+       if (onsafe) g_timer.add_event_after(2.0,onsafe);
+       faker_lock.Unlock();
+       return r;
+  }
+  int collection_rmattr(coll_t c, const char *name,
+                                               Context *onsafe=0) {
+       faker_lock.Lock();
+       int r = fakecattrs[c].rmattr(name);
+       if (onsafe) g_timer.add_event_after(2.0,onsafe);
        faker_lock.Unlock();
        return r;
   }
index 08925e221b1a58fe86a0f55d15f2f878fa91afe8..49b7fe6b1ca47cdd31ad568b049c8a8d05f720ed 100644 (file)
@@ -2041,40 +2041,36 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit)
 {
   object_t oid = op->get_oid();
   pg_t pgid = op->get_pg();
-  int r;
   
   // if the target object is locked for writing by another client, put 'op' to the waiting queue
   // for _any_ op type -- eg only the locker can unlock!
   if (block_if_wrlocked(op)) 
        return; // op will be handled later, after the object becomes unlocked
 
+  // prepare the transaction
+  ObjectStore::Transaction t;
   
-  // everybody will want to update the version.
-  map<const char*, pair<void*,int> > setattrs;
-  setattrs["version"] = pair<void*,int>(&version, sizeof(version));
-
-  // and pg also gets matching version update.
-  map<coll_t, map<const char*, pair<void*,int> > > cmods;
-  cmods[pgid] = setattrs;
-
-
-  // do the op!
+  // the op
   switch (op->get_op()) {
   case OSD_OP_WRLOCK:
   case OSD_OP_REP_WRLOCK:
-       // lock object
-       r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
+       { // lock object
+         //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
+         t.setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t));
+       }
        break;  
 
   case OSD_OP_WRUNLOCK:
   case OSD_OP_REP_WRUNLOCK:
-       // unlock objects
-       r = store->rmattr(oid, "wrlock", oncommit);
-
-       //unblock all operations that were waiting for this object to become unlocked
-       if (waiting_for_wr_unlock.count(oid)) {
-         take_waiters(waiting_for_wr_unlock[oid]);
-         waiting_for_wr_unlock.erase(oid);             
+       { // unlock objects
+         //r = store->rmattr(oid, "wrlock", oncommit);
+         t.rmattr(oid, "wrlock");
+         
+         // unblock all operations that were waiting for this object to become unlocked
+         if (waiting_for_wr_unlock.count(oid)) {
+               take_waiters(waiting_for_wr_unlock[oid]);
+               waiting_for_wr_unlock.erase(oid);               
+         }
        }
        break;
 
@@ -2086,45 +2082,60 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit)
          //bl = op->get_data();
          bl.claim( op->get_data() );
          
-         /* old way
-               r = store->write(op->get_oid(),
-               op->get_length(),
-               op->get_offset(),
-               bl,
-               oncommit);
-               store->setattr(op->get_oid(), "version", &v, sizeof(v));
-               store->collection_add(pgid, oid);        // FIXME : be careful w/ locking
-         */
-         
          // go
-         r = store->write_transaction(op->get_oid(),
-                                                                  op->get_length(), op->get_offset(),
-                                                                  bl,
-                                                                  setattrs, cmods,
-                                                                  oncommit);
+         //r = store->write(op->get_oid(),
+         //                                               op->get_length(), op->get_offset(),
+         //                                               bl, oncommit);
+         t.write( oid, op->get_offset(), op->get_length(), bl );
        }
        break;
 
   case OSD_OP_TRUNCATE:
   case OSD_OP_REP_TRUNCATE:
-       {       // truncate
-         map<const char*, pair<void*,int> > setattrs;
-         setattrs["version"] = pair<void*,int>(&version, sizeof(version));
-         r = store->truncate_transaction(oid, op->get_offset(), 
-                                                                         setattrs, cmods, 
-                                                                         oncommit);
+       { // truncate
+         //r = store->truncate(oid, op->get_offset());
+         t.truncate(oid, op->get_length() );
        }
        break;
        
   case OSD_OP_DELETE:
   case OSD_OP_REP_DELETE:
-       // delete
-       r = store->remove_transaction(oid, 
-                                                                 cmods,
-                                                                 oncommit);
+       // delete
+         //r = store->remove(oid);
+         t.remove(oid);
+       }
        break;
-
+       
   default:
        assert(0);
   }
+
+
+  // log in pg log
+  // FIXME
+
+
+  // object collection, version
+  if (op->get_op() == OSD_OP_TRUNCATE ||
+         op->get_op() == OSD_OP_REP_TRUNCATE) {
+       // remove object from c
+       t.collection_remove(pgid, oid);
+  } else {
+       // add object to c
+       t.collection_add(pgid, oid);
+
+       // object version
+       t.setattr(oid, "version", &version, sizeof(version));
+  }
+
+  // inc pg version
+  t.collection_setattr(pgid, "version", &version, sizeof(version));
+  
+  // ok, go!
+  unsigned r = store->apply_transaction(t, oncommit);
+  if (r == 0 &&   // no errors
+         r == 2) {   // or error on collection_add
+       cerr << "error applying transaction: r = " << r << endl;
+       assert(r == 0);
+  }
 }
index bff84b1e8bbca659b977322ee50c3701baecbd3d..03c6ab2a784796bd5b2635466bf00b88e64f2563 100644 (file)
@@ -35,7 +35,224 @@ using namespace std;
  * low-level interface to the local OSD file system
  */
 
+
+
 class ObjectStore {
+public:
+
+  /*********************************
+   * transaction
+   */
+  class Transaction {
+  public:
+       static const int OP_WRITE =         1;  // oid, offset, len, bl
+       static const int OP_TRUNCATE =      2;  // oid, len
+       static const int OP_REMOVE =        3;  // oid
+       static const int OP_SETATTR =       4;  // oid, attrname, attrval
+       static const int OP_RMATTR =        5;  // oid, attrname
+       static const int OP_MKCOLL =        6;  // cid
+       static const int OP_RMCOLL =        7;  // cid
+       static const int OP_COLL_ADD =      8;  // cid, oid
+       static const int OP_COLL_REMOVE =   9;  // cid, oid
+       static const int OP_COLL_SETATTR = 10;  // cid, attrname, attrval
+       static const int OP_COLL_RMATTR =  11;  // cid, attrname
+
+       list<int> ops;
+       list<bufferlist> bls;
+       list<object_t> oids;
+       list<coll_t>   cids;
+       list<off_t>    offsets;
+       list<size_t>   lengths;
+       list<const char*> attrnames;
+       list< pair<const void*,int> > attrvals;
+
+       void write(object_t oid, off_t off, size_t len, bufferlist& bl) {
+         int op = OP_WRITE;
+         ops.push_back(op);
+         oids.push_back(oid);
+         offsets.push_back(off);
+         lengths.push_back(len);
+         bls.push_back(bl);
+       }
+       void truncate(object_t oid, off_t off) {
+         int op = OP_TRUNCATE;
+         ops.push_back(op);
+         oids.push_back(oid);
+         offsets.push_back(off);
+       }
+       void remove(object_t oid) {
+         int op = OP_REMOVE;
+         ops.push_back(op);
+         oids.push_back(oid);
+       }
+       void setattr(object_t oid, const char* name, const void* val, int len) {
+         int op = OP_SETATTR;
+         ops.push_back(op);
+         oids.push_back(oid);
+         attrnames.push_back(name);
+         attrvals.push_back(pair<const void*,int>(val,len));
+       }
+       void rmattr(object_t oid, const char* name) {
+         int op = OP_RMATTR;
+         ops.push_back(op);
+         oids.push_back(oid);
+         attrnames.push_back(name);
+       }
+       void create_collection(coll_t cid) {
+         int op = OP_MKCOLL;
+         ops.push_back(op);
+         cids.push_back(cid);
+       }
+       void remove_collection(coll_t cid) {
+         int op = OP_RMCOLL;
+         ops.push_back(op);
+         cids.push_back(cid);
+       }
+       void collection_add(coll_t cid, object_t oid) {
+         int op = OP_COLL_ADD;
+         ops.push_back(op);
+         cids.push_back(cid);
+         oids.push_back(oid);
+       }
+       void collection_remove(coll_t cid, object_t oid) {
+         int op = OP_COLL_REMOVE;
+         ops.push_back(op);
+         cids.push_back(cid);
+         oids.push_back(oid);
+       }
+       void collection_setattr(coll_t cid, const char* name, const void* val, int len) {
+         int op = OP_COLL_SETATTR;
+         ops.push_back(op);
+         cids.push_back(cid);
+         attrnames.push_back(name);
+         attrvals.push_back(pair<const void*,int>(val,len));
+       }
+       void collection_rmattr(coll_t cid, const char* name) {
+         int op = OP_COLL_RMATTR;
+         ops.push_back(op);
+         cids.push_back(cid);
+         attrnames.push_back(name);
+       }
+
+       // etc.
+  };
+
+
+
+  /* this implementation is here only for naive ObjectStores that
+   * do not do atomic transactions natively.  it is not atomic.
+   */
+  virtual unsigned apply_transaction(Transaction& t, Context *onsafe) {
+       // non-atomic implementation
+       for (list<int>::iterator p = t.ops.begin();
+                p != t.ops.end();
+                p++) {
+         Context *last = 0;
+         if (p == t.ops.end()) last = onsafe;
+
+         switch (*p) {
+         case Transaction::OP_WRITE:
+               {
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 off_t offset = t.offsets.front(); t.offsets.pop_front();
+                 size_t len = t.lengths.front(); t.lengths.pop_front();
+                 bufferlist bl = t.bls.front(); t.bls.pop_front();
+                 write(oid, len, offset, bl, last);
+               }
+               break;
+
+         case Transaction::OP_TRUNCATE:
+               {
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 size_t len = t.lengths.front(); t.lengths.pop_front();
+                 truncate(oid, len, last);
+               }
+               break;
+
+         case Transaction::OP_REMOVE:
+               {
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 remove(oid, last);
+               }
+               break;
+
+         case Transaction::OP_SETATTR:
+               {
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+                 pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+                 setattr(oid, attrname, attrval.first, attrval.second, last);
+               }
+               break;
+
+         case Transaction::OP_RMATTR:
+               {
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+                 rmattr(oid, attrname, last);
+               }
+               break;
+
+         case Transaction::OP_MKCOLL:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 create_collection(cid, last);
+               }
+               break;
+
+         case Transaction::OP_RMCOLL:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 destroy_collection(cid, last);
+               }
+               break;
+
+         case Transaction::OP_COLL_ADD:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 collection_add(cid, oid, last);
+               }
+               break;
+
+         case Transaction::OP_COLL_REMOVE:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 object_t oid = t.oids.front(); t.oids.pop_front();
+                 collection_remove(cid, oid, last);
+               }
+               break;
+
+         case Transaction::OP_COLL_SETATTR:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+                 pair<const void*,int> attrval = t.attrvals.front(); t.attrvals.pop_front();
+                 collection_setattr(cid, attrname, attrval.first, attrval.second, last);
+               }
+               break;
+
+         case Transaction::OP_COLL_RMATTR:
+               {
+                 coll_t cid = t.cids.front(); t.cids.pop_front();
+                 const char *attrname = t.attrnames.front(); t.attrnames.pop_front();
+                 collection_rmattr(cid, attrname, last);
+               }
+               break;
+
+
+         default:
+               cerr << "bad op " << *p << endl;
+               assert(0);
+         }
+       }
+       return 0;  // FIXME count errors
+  }
+
+  /*********************************************/
+
+
+
  public:
   ObjectStore() {}
   virtual ~ObjectStore() {}
@@ -141,16 +358,23 @@ class ObjectStore {
   
   // collections
   virtual int list_collections(list<coll_t>& ls) {return 0;}//= 0;
-  virtual int create_collection(coll_t c) {return 0;}//= 0;
-  virtual int destroy_collection(coll_t c) {return 0;}//= 0;
+  virtual int create_collection(coll_t c,
+                                                               Context *onsafe=0) {return 0;}//= 0;
+  virtual int destroy_collection(coll_t c,
+                                                                Context *onsafe=0) {return 0;}//= 0;
   virtual bool collection_exists(coll_t c) {return 0;}
   virtual int collection_stat(coll_t c, struct stat *st) {return 0;}//= 0;
-  virtual int collection_add(coll_t c, object_t o) {return 0;}//= 0;
-  virtual int collection_remove(coll_t c, object_t o) {return 0;}// = 0;
+  virtual int collection_add(coll_t c, object_t o,
+                                                        Context *onsafe=0) {return 0;}//= 0;
+  virtual int collection_remove(coll_t c, object_t o,
+                                                               Context *onsafe=0) {return 0;}// = 0;
   virtual int collection_list(coll_t c, list<object_t>& o) {return 0;}//= 0;
 
   virtual int collection_setattr(coll_t cid, const char *name,
-                                                                const void *value, size_t size) {return 0;} //= 0;
+                                                                const void *value, size_t size,
+                                                                Context *onsafe=0) {return 0;} //= 0;
+  virtual int collection_rmattr(coll_t cid, const char *name,
+                                                               Context *onsafe=0) {return 0;} //= 0;
   virtual int collection_getattr(coll_t cid, const char *name,
                                                                 void *value, size_t size) {return 0;} //= 0;
   virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0;