]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 1 Jan 2006 21:24:12 +0000 (21:24 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 1 Jan 2006 21:24:12 +0000 (21:24 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@558 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/TODO
ceph/client/SyntheticClient.cc
ceph/client/SyntheticClient.h
ceph/config.cc
ceph/ebofs/Allocator.cc
ceph/ebofs/Ebofs.cc
ceph/ebofs/Ebofs.h
ceph/osd/OSD.cc

index 42cdb3c17e6bb8347e238a834563cf13a8263498..e9945d1a699457608a13b65050adc9bcf76e9b7c 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -5,9 +5,26 @@ client
 
 
 ebofs
+- full fs
+- zero regions?
 - journaling
 - combine inodes into same blocks
 
+osd
+- unordered ops
+- rep ops trickyness?
+
+- ardos validation
+ - what happens if entire pg set changes, recovery doesn't finish, modifications are made, and pg set changes again?  is anybody "complete" anymore?
+
+- osdmap history distribution
+
+- handle down osds
+- pg_bit changes
+
+- 'dirty' log on primary?
+- fast recovery from degraded mode
+
 
 cluster issues
 - general problem: how to do posix ordering on object boundaries using an object store
@@ -26,7 +43,6 @@ cluster issues
 - deleting objects
  - osd's that rejoin
  - must keep stray replicas clean
- - 
 
 - communications failure model.. is it appropriate?
   - reliable, ordered, buffered and flushed on 'down' boundaries?
@@ -40,16 +56,6 @@ fakestore
 
 
 osd foo
-- osdmap history distribution
-
-- handle down osds
-- pg_bit changes
-
-- thread (and unordered!) ops
- - rep ops tricky
-
-- 'dirty' log on primary?
-- fast recovery from degraded mode
 
 
 
index 366e61b0d65c106e601795b24529d3b7ba38e274..909d9d1e6f48488fb1f9e673c49d4a1196afd21a 100644 (file)
@@ -45,6 +45,15 @@ void parse_syn_options(vector<char*>& args)
                syn_modes.push_back( SYNCLIENT_MODE_READFILE );
                syn_iargs.push_back( atoi(args[++i]) );
                syn_iargs.push_back( atoi(args[++i]) );
+         } else if (strcmp(args[i],"rw") == 0) {
+               int a = atoi(args[++i]);
+               int b = atoi(args[++i]);
+               syn_modes.push_back( SYNCLIENT_MODE_WRITEFILE );
+               syn_iargs.push_back( a );
+               syn_iargs.push_back( b );
+               syn_modes.push_back( SYNCLIENT_MODE_READFILE );
+               syn_iargs.push_back( a );
+               syn_iargs.push_back( b );
          } else if (strcmp(args[i],"makedirs") == 0) {
                syn_modes.push_back( SYNCLIENT_MODE_MAKEDIRS );
                syn_iargs.push_back( atoi(args[++i]) );
@@ -700,6 +709,20 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize)   // size is in
        if (time_to_stop()) break;
        dout(2) << "reading block " << i << "/" << chunks << endl;
        client->read(fd, buf, rdsize, i*rdsize);
+
+       // verify fingerprint
+       int *p = (int*)buf;
+       while ((char*)p < buf + rdsize) {
+         assert(*p == (char*)p - buf);
+         p++;
+         assert(*p == (int)i);
+         p++;
+         assert(*p == client->get_nodeid());
+         p++;
+         assert(*p == 0);
+         p++;
+       }
+
   }
   
   client->close(fd);
index 7894d54752707c4a2dfb51692d995b920cd88ce4..17f39e5600667557fe1811fe2df70e4187201542 100644 (file)
@@ -28,6 +28,8 @@
 
 #define SYNCLIENT_MODE_OPENTEST     30
 
+
+
 void parse_syn_options(vector<char*>& args);
 
 class SyntheticClient {
index 07537eea2d05df866ee14db44013577c085e9223..766c5c7036c9e2e64fde45d50839cd999b2c5226 100644 (file)
@@ -120,8 +120,8 @@ md_config_t g_conf = {
   // --- ebofs ---
   ebofs: 0,
   ebofs_commit_interval: 2,    // seconds.  0 = no timeout (for debugging/tracing)
-  ebofs_oc_size:      100,
-  ebofs_cc_size:      100,
+  ebofs_oc_size:      10,
+  ebofs_cc_size:      10,
   ebofs_bc_size:      (5 *256),  // measured in 4k blocks, or *256 for MB
   ebofs_bc_max_dirty: (1 *256),  // before write() will wait for data to flush
   
index d15b356ba87d2b4c026988304a027674f214ae24..63ed3aa2866a8be51f8b42bc3cb633ce0618bd6e 100644 (file)
@@ -156,7 +156,8 @@ int Allocator::allocate(Extent& ex, block_t num, block_t near)
        }       
   }
 
-  dout(1) << "allocate failed, fs full!  " << fs->free_blocks << endl;
+  dout(1) << "allocate failed, fs completely full!  " << fs->free_blocks << endl;
+  assert(0);
   dump_freelist();
   return -1;
 }
index 89d4485a487ab220316755326727b716e4f2e611..91c207574344d716f259a153f1eff3263a25eb33 100644 (file)
@@ -582,6 +582,41 @@ public:
   }
 };
 
+
+void Ebofs::encode_onode(Onode *on, bufferlist& bl, unsigned& off)
+{
+  // onode
+  struct ebofs_onode eo;
+  eo.onode_loc = on->onode_loc;
+  eo.object_id = on->object_id;
+  eo.object_size = on->object_size;
+  eo.object_blocks = on->object_blocks;
+  eo.num_attr = on->attr.size();
+  eo.num_extents = on->extents.size();
+  bl.copy_in(off, sizeof(eo), (char*)&eo);
+  off += sizeof(eo);
+
+  // attr
+  for (map<string, AttrVal>::iterator i = on->attr.begin();
+          i != on->attr.end();
+          i++) {
+       bl.copy_in(off, i->first.length()+1, i->first.c_str());
+       off += i->first.length()+1;
+       bl.copy_in(off, sizeof(int), (char*)&i->second.len);
+       off += sizeof(int);
+       bl.copy_in(off, i->second.len, i->second.data);
+       off += i->second.len;
+       dout(15) << "write_onode " << *on  << " attr " << i->first << " len " << i->second.len << endl;
+  }
+  
+  // extents
+  for (unsigned i=0; i<on->extents.size(); i++) {
+       bl.copy_in(off, sizeof(Extent), (char*)&on->extents[i]);
+       off += sizeof(Extent);
+       dout(15) << "write_onode " << *on  << " ex " << i << ": " << on->extents[i] << endl;
+  }
+}
+
 void Ebofs::write_onode(Onode *on)
 {
   // buffer
@@ -607,34 +642,9 @@ void Ebofs::write_onode(Onode *on)
 
   dout(10) << "write_onode " << *on << " to " << on->onode_loc << endl;
 
-  struct ebofs_onode *eo = (struct ebofs_onode*)bl.c_str();
-  eo->onode_loc = on->onode_loc;
-  eo->object_id = on->object_id;
-  eo->object_size = on->object_size;
-  eo->object_blocks = on->object_blocks;
-  eo->num_attr = on->attr.size();
-  eo->num_extents = on->extents.size();
-  
-  // attr
-  unsigned off = sizeof(*eo);
-  for (map<string, AttrVal>::iterator i = on->attr.begin();
-          i != on->attr.end();
-          i++) {
-       bl.copy_in(off, i->first.length()+1, i->first.c_str());
-       off += i->first.length()+1;
-       bl.copy_in(off, sizeof(int), (char*)&i->second.len);
-       off += sizeof(int);
-       bl.copy_in(off, i->second.len, i->second.data);
-       off += i->second.len;
-       dout(15) << "write_onode " << *on  << " attr " << i->first << " len " << i->second.len << endl;
-  }
-  
-  // extents
-  for (unsigned i=0; i<on->extents.size(); i++) {
-       bl.copy_in(off, sizeof(Extent), (char*)&on->extents[i]);
-       off += sizeof(Extent);
-       dout(15) << "write_onode " << *on  << " ex " << i << ": " << on->extents[i] << endl;
-  }
+  unsigned off = 0;
+  encode_onode(on, bl, off);
+  assert(off == bytes);
 
   // write
   dev.write( on->onode_loc.start, on->onode_loc.length, bl, 
@@ -874,6 +884,31 @@ Cnode* Ebofs::get_cnode(object_t cid)
   }
 }
 
+void Ebofs::encode_cnode(Cnode *cn, bufferlist& bl, unsigned& off)
+{
+  // cnode
+  struct ebofs_cnode ec;
+  ec.cnode_loc = cn->cnode_loc;
+  ec.coll_id = cn->coll_id;
+  ec.num_attr = cn->attr.size();
+  bl.copy_in(off, sizeof(ec), (char*)&ec);
+  off += sizeof(ec);
+  
+  // attr
+  for (map<string, AttrVal >::iterator i = cn->attr.begin();
+          i != cn->attr.end();
+          i++) {
+       bl.copy_in(off, i->first.length()+1, i->first.c_str());
+       off += i->first.length()+1;
+       bl.copy_in(off, sizeof(int), (char*)&i->second.len);
+       off += sizeof(int);
+       bl.copy_in(off, i->second.len, i->second.data);
+       off += i->second.len;
+
+       dout(15) << "write_cnode " << *cn  << " attr " << i->first << " len " << i->second.len << endl;
+  }
+}
+
 void Ebofs::write_cnode(Cnode *cn)
 {
   // allocate buffer
@@ -895,28 +930,10 @@ void Ebofs::write_cnode(Cnode *cn)
   
   dout(10) << "write_cnode " << *cn << " to " << cn->cnode_loc << endl;
 
-  struct ebofs_cnode ec;
-  ec.cnode_loc = cn->cnode_loc;
-  ec.coll_id = cn->coll_id;
-  ec.num_attr = cn->attr.size();
-  
-  bl.copy_in(0, sizeof(ec), (char*)&ec);
-  
-  // attr
-  unsigned off = sizeof(ec);
-  for (map<string, AttrVal >::iterator i = cn->attr.begin();
-          i != cn->attr.end();
-          i++) {
-       bl.copy_in(off, i->first.length()+1, i->first.c_str());
-       off += i->first.length()+1;
-       bl.copy_in(off, sizeof(int), (char*)&i->second.len);
-       off += sizeof(int);
-       bl.copy_in(off, i->second.len, i->second.data);
-       off += i->second.len;
+  unsigned off = 0;
+  encode_cnode(cn, bl, off);
+  assert(off == bytes);
 
-       dout(15) << "write_cnode " << *cn  << " attr " << i->first << " len " << i->second.len << endl;
-  }
-  
   // write
   dev.write( cn->cnode_loc.start, cn->cnode_loc.length, bl, 
                         new C_E_InodeFlush(this), "write_cnode" );
index 589f3d5a0dd6115e8b1ed9766d0a5bfb19eb00e8..d18f66c8a5609e22d097c1a09be1cedb222c97b8 100644 (file)
@@ -102,10 +102,11 @@ class Ebofs : public ObjectStore {
 
   Onode* new_onode(object_t oid);     // make new onode.  ref++.
   Onode* get_onode(object_t oid);     // get cached onode, or read from disk.  ref++.
-  void write_onode(Onode *on);
   void remove_onode(Onode *on);
   void put_onode(Onode* o);         // put it back down.  ref--.
   void dirty_onode(Onode* o);
+  void encode_onode(Onode *on, bufferlist& bl, unsigned& off);
+  void write_onode(Onode *on);
 
   // ** cnodes **
   hash_map<coll_t, Cnode*>    cnode_map;
@@ -115,10 +116,11 @@ class Ebofs : public ObjectStore {
 
   Cnode* new_cnode(coll_t cid);
   Cnode* get_cnode(coll_t cid);
-  void write_cnode(Cnode *cn);
   void remove_cnode(Cnode *cn);
   void put_cnode(Cnode *cn);
   void dirty_cnode(Cnode *cn);
+  void encode_cnode(Cnode *cn, bufferlist& bl, unsigned& off);
+  void write_cnode(Cnode *cn);
 
   // ** onodes+cnodes = inodes **
   int                         inodes_flushing;
index 1591b5f6aef43ac4a1f28edaea14237493b3a6ef..4243181ffc769495ffceb5585ff645974b114cc8 100644 (file)
@@ -242,6 +242,7 @@ void OSD::dispatch(Message *m)
   // check clock regularly
   g_clock.now();
 
+  osd_lock.Lock();
 
   switch (m->get_type()) {
 
@@ -277,14 +278,12 @@ void OSD::dispatch(Message *m)
        {
          // no map?  starting up?
          if (!osdmap) {
-               osd_lock.Lock();
                dout(7) << "no OSDMap, asking MDS" << endl;
                if (waiting_for_osdmap.empty()) 
                  messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
                                                                  MSG_ADDR_MDS(0), MDS_PORT_MAIN);
                waiting_for_osdmap.push_back(m);
-               osd_lock.Unlock();
-               return;
+               break;
          }
 
          // need OSDMap
@@ -326,13 +325,18 @@ void OSD::dispatch(Message *m)
   if (!finished.empty()) {
        list<Message*> waiting;
        waiting.splice(waiting.begin(), finished);
+
+       osd_lock.Unlock();
+       
        for (list<Message*>::iterator it = waiting.begin();
                 it != waiting.end();
                 it++) {
          dispatch(*it);
        }
+       return;
   }
-
+  
+  osd_lock.Unlock();
 }
 
 
@@ -538,9 +542,7 @@ void OSD::wait_for_new_map(Message *m)
   messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), 
                                                  MSG_ADDR_MDS(0), MDS_PORT_MAIN);
 
-  osd_lock.Lock();
   waiting_for_osdmap.push_back(m);
-  osd_lock.Unlock();
 }
 
 
@@ -616,14 +618,8 @@ void OSD::handle_osd_map(MOSDMap *m)
   // wait for ops to finish
   wait_for_no_ops();
 
-  osd_lock.Lock();     // actually, don't need this if we finish all ops?
-
   if (m->is_mkfs()) {
        dout(1) << "MKFS" << endl;
-  /* done on init() now
-       if (!g_conf.osd_mkfs)
-         store->mkfs();
-  */
   }
 
   if (!osdmap ||
@@ -640,8 +636,6 @@ void OSD::handle_osd_map(MOSDMap *m)
        dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
   }
   
-  osd_lock.Unlock();
-
   if (m->is_mkfs()) {
        // ack
        messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK),
@@ -1460,35 +1454,37 @@ void OSD::pull_replica(PG *pg, object_t oid)
 
 void OSD::op_rep_pull(MOSDOp *op)
 {
-  dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+  long got = 0;
   lock_object(op->get_oid());
-  
-  // get object size
-  struct stat st;
-  int r = store->stat(op->get_oid(), &st);
-  assert(r == 0);
-
-  // check version
-  version_t v = 0;
-  store->getattr(op->get_oid(), "version", &v, sizeof(v));
-  assert(v == op->get_version());
-  
-  // read
-  bufferlist bl;
-  long got = store->read(op->get_oid(), 
-                                                st.st_size, 0,
-                                                bl);
-  assert(got == st.st_size);
-  
-  // reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
-  reply->set_result(0);
-  reply->set_data(bl);
-  reply->set_length(got);
-  reply->set_offset(0);
-  
-  messenger->send_message(reply, op->get_asker());
-
+  {
+       dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl;
+       
+       // get object size
+       struct stat st;
+       int r = store->stat(op->get_oid(), &st);
+       assert(r == 0);
+       
+       // check version
+       version_t v = 0;
+       store->getattr(op->get_oid(), "version", &v, sizeof(v));
+       assert(v == op->get_version());
+       
+       // read
+       bufferlist bl;
+       got = store->read(op->get_oid(), 
+                                                  st.st_size, 0,
+                                                  bl);
+       assert(got == st.st_size);
+       
+       // reply
+       MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
+       reply->set_result(0);
+       reply->set_data(bl);
+       reply->set_length(got);
+       reply->set_offset(0);
+       
+       messenger->send_message(reply, op->get_asker());
+  }
   unlock_object(op->get_oid());
   delete op;
 
@@ -1503,22 +1499,19 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
 
   dout(7) << "rep_pull_reply " << hex << o << dec << " v " << v << " size " << op->get_length() << endl;
 
-  osd_lock.Lock();
   PGPeer *p = pull_ops[op->get_tid()];
   PG *pg = p->pg;
   assert(p);   // FIXME: how will this work?
   assert(p->is_pulling(o));
   assert(p->pulling_version(o) == v);
-  osd_lock.Unlock();
 
   // write it and add it to the PG
   store->write(o, op->get_length(), 0, op->get_data());
   p->pg->add_object(store, o);
-
+  
   store->setattr(o, "version", &v, sizeof(v));
 
   // close out pull op.
-  osd_lock.Lock();
   pull_ops.erase(op->get_tid());
 
   pg->pulled(o, v, p);
@@ -1545,8 +1538,6 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
   // more?
   do_recovery(pg);
 
-  osd_lock.Unlock();
-  
   delete op;
 }
 
@@ -1622,40 +1613,42 @@ void OSD::push_replica(PG *pg, object_t oid)
 
 void OSD::op_rep_push(MOSDOp *op)
 {
-  dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
   lock_object(op->get_oid());
+  {
+       dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
+       
+       PG *pg = get_pg(op->get_pg());
+       assert(pg);
 
-  PG *pg = get_pg(op->get_pg());
-  assert(pg);
-
-  // exists?
-  if (store->exists(op->get_oid())) {
-       store->truncate(op->get_oid(), 0);
-
-       version_t ov = 0;
-       store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
-       assert(ov <= op->get_version());
+       // exists?
+       if (store->exists(op->get_oid())) {
+         store->truncate(op->get_oid(), 0);
+         
+         version_t ov = 0;
+         store->getattr(op->get_oid(), "version", &ov, sizeof(ov));
+         assert(ov <= op->get_version());
+       }
+       
+       logger->inc("r_push");
+       logger->inc("r_pushb", op->get_length());
+       
+       // write out buffers
+       int r = store->write(op->get_oid(),
+                                                op->get_length(), 0,
+                                                op->get_data(),
+                                                false);       // FIXME
+       pg->add_object(store, op->get_oid());
+       assert(r >= 0);
+       
+       // set version
+       version_t v = op->get_version();
+       store->setattr(op->get_oid(), "version", &v, sizeof(v));
+       
+       // reply
+       MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+       messenger->send_message(reply, op->get_asker());
+       
   }
-
-  logger->inc("r_push");
-  logger->inc("r_pushb", op->get_length());
-
-  // write out buffers
-  int r = store->write(op->get_oid(),
-                                          op->get_length(), 0,
-                                          op->get_data(),
-                                          false);       // FIXME
-  pg->add_object(store, op->get_oid());
-  assert(r >= 0);
-
-  // set version
-  version_t v = op->get_version();
-  store->setattr(op->get_oid(), "version", &v, sizeof(v));
-
-  // reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
-  messenger->send_message(reply, op->get_asker());
-  
   unlock_object(op->get_oid());
   delete op;
 }
@@ -1667,7 +1660,6 @@ void OSD::op_rep_push_reply(MOSDOpReply *op)
 
   dout(7) << "rep_push_reply " << hex << oid << dec << endl;
 
-  osd_lock.Lock();
   PGPeer *p = push_ops[op->get_tid()];
   PG *pg = p->pg;
   assert(p);   // FIXME: how will this work?
@@ -1695,8 +1687,6 @@ void OSD::op_rep_push_reply(MOSDOpReply *op)
   // more?
   do_recovery(pg);
 
-  osd_lock.Unlock();
-  
   delete op;
 }
 
@@ -1753,25 +1743,26 @@ void OSD::remove_replica(PG *pg, object_t oid)
 
 void OSD::op_rep_remove(MOSDOp *op)
 {
-  dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
   lock_object(op->get_oid());
-  
-  // sanity checks
-  assert(store->exists(op->get_oid()));
-
-  version_t v = 0;
-  store->getattr(op->get_oid(), "version", &v, sizeof(v));
-  assert(v == op->get_version());
-
-  // remove
-  store->collection_remove(op->get_pg(), op->get_oid());
-  int r = store->remove(op->get_oid());
-  assert(r == 0);
-
-  // reply
-  messenger->send_message(new MOSDOpReply(op, r, osdmap, true), 
-                                                 op->get_asker());
-
+  {
+       dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
+       
+       // sanity checks
+       assert(store->exists(op->get_oid()));
+       
+       version_t v = 0;
+       store->getattr(op->get_oid(), "version", &v, sizeof(v));
+       assert(v == op->get_version());
+       
+       // remove
+       store->collection_remove(op->get_pg(), op->get_oid());
+       int r = store->remove(op->get_oid());
+       assert(r == 0);
+       
+       // reply
+       messenger->send_message(new MOSDOpReply(op, r, osdmap, true), 
+                                                       op->get_asker());
+  }
   unlock_object(op->get_oid());
   delete op;
 }
@@ -1782,7 +1773,6 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op)
   version_t v = op->get_version();
   dout(7) << "rep_remove_reply " << hex << oid << dec << endl;
 
-  osd_lock.Lock();
   PGPeer *p = remove_ops[op->get_tid()];
   PG *pg = p->pg;
   assert(p);   // FIXME: how will this work?
@@ -1802,8 +1792,6 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op)
   // more?
   do_recovery(pg);
 
-  osd_lock.Unlock();
-  
   delete op;
 }
 
@@ -1820,62 +1808,68 @@ public:
 
 void OSD::op_rep_modify_sync(MOSDOp *op)
 {
-  osd_lock.Lock();
+  object_t oid = op->get_oid();
+  lock_object(oid);
   {
        dout(2) << "rep_modify_sync on op " << op << endl;
        MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
        messenger->send_message(ack2, op->get_asker());
        delete op;
   }
-  osd_lock.Unlock();
+  unlock_object(oid);
 }
 
 void OSD::op_rep_modify(MOSDOp *op)
 { 
   // when we introduce unordered messaging.. FIXME
   object_t oid = op->get_oid();
-  version_t ov = 0;
-  if (store->exists(oid)) 
-       store->getattr(oid, "version", &ov, sizeof(ov));
-  //dout(15) << "rep_modify old versoin is " << ov << "  msg sez " << op->get_old_version() << endl;
-  assert(op->get_old_version() == ov);
 
-  // PG
-  PG *pg = get_pg(op->get_pg());
-  assert(pg);
-  
-  dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-
-  int r = 0;
-  Context *onsync = 0;
-  if (op->get_op() == OSD_OP_REP_WRITE) {
-       // write
-       assert(op->get_data().length() == op->get_length());
-       onsync = new C_OSD_RepModifySync(this, op);
-       r = apply_write(op, op->get_version(), onsync);
-       if (ov == 0) pg->add_object(store, oid);
-
-       logger->inc("r_wr");
-       logger->inc("r_wrb", op->get_length());
-  } else if (op->get_op() == OSD_OP_REP_DELETE) {
-       // delete
-       store->collection_remove(pg->get_pgid(), op->get_oid());
-       r = store->remove(oid);
-  } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
-       // truncate
-       r = store->truncate(oid, op->get_offset());
-  } else assert(0);
-  
-  if (onsync) {
-       // ack
-       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
-       messenger->send_message(ack, op->get_asker());
-  } else {
-       // sync, safe
-       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
-       messenger->send_message(ack, op->get_asker());
-       delete op;
+  lock_object(oid);
+  {
+       version_t ov = 0;
+       if (store->exists(oid)) 
+         store->getattr(oid, "version", &ov, sizeof(ov));
+       //dout(15) << "rep_modify old versoin is " << ov << "  msg sez " << op->get_old_version() << endl;
+       assert(op->get_old_version() == ov);
+       
+       // PG
+       PG *pg = get_pg(op->get_pg());
+       assert(pg);
+       
+       dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
+       
+       int r = 0;
+       Context *onsync = 0;
+       if (op->get_op() == OSD_OP_REP_WRITE) {
+         // write
+         assert(op->get_data().length() == op->get_length());
+         onsync = new C_OSD_RepModifySync(this, op);
+         r = apply_write(op, op->get_version(), onsync);
+         if (ov == 0) pg->add_object(store, oid);
+         
+         logger->inc("r_wr");
+         logger->inc("r_wrb", op->get_length());
+       } else if (op->get_op() == OSD_OP_REP_DELETE) {
+         // delete
+         store->collection_remove(pg->get_pgid(), op->get_oid());
+         r = store->remove(oid);
+       } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
+         // truncate
+         r = store->truncate(oid, op->get_offset());
+       } else assert(0);
+       
+       if (onsync) {
+         // ack
+         MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+         messenger->send_message(ack, op->get_asker());
+       } else {
+         // sync, safe
+         MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+         messenger->send_message(ack, op->get_asker());
+         delete op;
+       }
   }
+  unlock_object(oid);
 }
 
 
@@ -1886,8 +1880,6 @@ void OSD::op_rep_modify(MOSDOp *op)
 
 void OSD::handle_op(MOSDOp *op)
 {
-  osd_lock.Lock();
-
   pg_t pgid = op->get_pg();
   PG *pg = get_pg(pgid);
 
@@ -1900,7 +1892,6 @@ void OSD::handle_op(MOSDOp *op)
          // op's is newer
          dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
          wait_for_new_map(op);
-         osd_lock.Unlock();
          return;
        }
 
@@ -1918,7 +1909,6 @@ void OSD::handle_op(MOSDOp *op)
                                                                op->get_asker());
                delete op;
          }
-         osd_lock.Unlock();
          return;
        }
 
@@ -1926,7 +1916,6 @@ void OSD::handle_op(MOSDOp *op)
        if (!pg) {
          dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl;
          waiting_for_pg[pgid].push_back(op);
-         osd_lock.Unlock();
          return;
        }
        else {
@@ -1936,7 +1925,6 @@ void OSD::handle_op(MOSDOp *op)
          if (!pg->is_peered()) {
                dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
                pg->waiting_for_peered.push_back(op);
-               osd_lock.Unlock();
                return;
          }
 
@@ -1950,7 +1938,6 @@ void OSD::handle_op(MOSDOp *op)
                  if (!pg->objects_pulling.count(oid)) 
                        pull_replica(pg, oid);
                  pg->waiting_for_missing_object[oid].push_back(op);
-                 osd_lock.Unlock();
                  return;
                }
          }       
@@ -1966,7 +1953,6 @@ void OSD::handle_op(MOSDOp *op)
                  pg->waiting_for_clean_object[oid].push_back(op);
                  if (pg->objects_pushing.count(oid) == 0)
                        push_replica(pg, oid);
-                 osd_lock.Unlock();
                  return;
                }
 
@@ -1978,7 +1964,6 @@ void OSD::handle_op(MOSDOp *op)
                  pg->waiting_for_clean_object[oid].push_back(op);
                  if (pg->objects_removing.count(oid) == 0)
                        remove_replica(pg, oid);
-                 osd_lock.Unlock();
                  return;
                }
          }
@@ -2002,7 +1987,6 @@ void OSD::handle_op(MOSDOp *op)
                dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
                MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false);
                messenger->send_message(fail, op->get_asker());
-               osd_lock.Unlock();
                return;
          } else {
                dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
@@ -2011,25 +1995,16 @@ void OSD::handle_op(MOSDOp *op)
   }
 
   // queue op
-  if (g_conf.osd_maxthreads < 1) {
-       pending_ops++;
-       do_op(op);
-  } else
-       queue_op(op);
-
-  osd_lock.Unlock();
-}
-
-void OSD::queue_op(MOSDOp *op) {
-  // inc pending count
-  osd_lock.Lock();
   pending_ops++;
-  osd_lock.Unlock();
-
-  threadpool->put_op(op);
+  if (g_conf.osd_maxthreads < 1) {
+       osd_lock.Unlock();
+       do_op(op);   // or, just do it now
+       osd_lock.Lock();
+  } else {
+       threadpool->put_op(op);
+  }
 }
 
-
 void doop(OSD *u, MOSDOp *p) {
   u->do_op(p);
 }
@@ -2096,7 +2071,6 @@ void OSD::do_op(MOSDOp *op)
 
 void OSD::wait_for_no_ops()
 {
-  osd_lock.Lock();
   if (pending_ops > 0) {
        dout(7) << "wait_for_no_ops - waiting for " << pending_ops << endl;
        waiting_for_no_ops = true;
@@ -2105,7 +2079,6 @@ void OSD::wait_for_no_ops()
        assert(pending_ops == 0);
   } 
   dout(7) << "wait_for_no_ops - none" << endl;
-  osd_lock.Unlock();
 }
 
 
@@ -2123,59 +2096,59 @@ void OSD::op_read(MOSDOp *op)
 {
   object_t oid = op->get_oid();
   lock_object(oid);
-  
-  // read into a buffer
-  bufferlist bl;
-  long got = store->read(oid, 
-                                                op->get_length(), op->get_offset(),
-                                                bl);
-  // set up reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
-  if (got >= 0) {
-       reply->set_result(0);
-       reply->set_data(bl);
-       reply->set_length(got);
-
-       logger->inc("c_rd");
-       logger->inc("c_rdb", got);
-
-  } else {
-       reply->set_result(got);   // error
-       reply->set_length(0);
+  {
+       // read into a buffer
+       bufferlist bl;
+       long got = store->read(oid, 
+                                                  op->get_length(), op->get_offset(),
+                                                  bl);
+       // set up reply
+       MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
+       if (got >= 0) {
+         reply->set_result(0);
+         reply->set_data(bl);
+         reply->set_length(got);
+         
+         logger->inc("c_rd");
+         logger->inc("c_rdb", got);
+         
+       } else {
+         reply->set_result(got);   // error
+         reply->set_length(0);
+       }
+       
+       dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+       
+       logger->inc("rd");
+       if (got >= 0) logger->inc("rdb", got);
+       
+       // send it
+       messenger->send_message(reply, op->get_asker());
   }
-  
-  dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
-
-  logger->inc("rd");
-  if (got >= 0) logger->inc("rdb", got);
-  
-  // send it
-  messenger->send_message(reply, op->get_asker());
-
-  delete op;
-
   unlock_object(oid);
+  delete op;
 }
 
 void OSD::op_stat(MOSDOp *op)
 {
   object_t oid = op->get_oid();
   lock_object(oid);
+  {
+       struct stat st;
+       memset(&st, sizeof(st), 0);
+       int r = store->stat(oid, &st);
+       
+       dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+       
+       MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
+       reply->set_object_size(st.st_size);
+       messenger->send_message(reply, op->get_asker());
+       
+       logger->inc("stat");
+  }
+  unlock_object(oid);
 
-  struct stat st;
-  memset(&st, sizeof(st), 0);
-  int r = store->stat(oid, &st);
-  
-  dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
-         
-  MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
-  reply->set_object_size(st.st_size);
-  messenger->send_message(reply, op->get_asker());
-         
-  logger->inc("stat");
   delete op;
-
-  unlock_object(oid);
 }
 
 
@@ -2269,13 +2242,14 @@ public:
 
 void OSD::op_modify_sync(OSDReplicaOp *repop)
 {
-  osd_lock.Lock();
+  object_t oid = repop->op->get_oid();
+  lock_object(oid);
   {
        dout(2) << "op_modify_sync on op " << repop->op << endl;
 
        repop->local_sync = true;
        if (repop->can_send_sync()) {
-         dout(2) << "op_modify_sync on " << hex << repop->op->get_oid() << dec << " op " << repop->op << endl;
+         dout(2) << "op_modify_sync on " << hex << oid << dec << " op " << repop->op << endl;
          if (repop->op->wants_safe()) {
                MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap, true);
                messenger->send_message(reply, repop->op->get_asker());
@@ -2286,7 +2260,7 @@ void OSD::op_modify_sync(OSDReplicaOp *repop)
          delete repop;
        }
   }
-  osd_lock.Unlock();
+  unlock_object(oid);
 }
 
 void OSD::op_modify(MOSDOp *op)
@@ -2299,63 +2273,59 @@ void OSD::op_modify(MOSDOp *op)
   if (op->get_op() == OSD_OP_TRUNCATE) opname = "op_truncate";
 
   lock_object(oid);
-
-  // version?  clean?
-  version_t ov = 0;  // 0 == dne (yet)
-  store->getattr(oid, "version", &ov, sizeof(ov));
-  version_t nv = messenger->peek_lamport();
-  assert(nv > ov);
-
-  dout(12) << opname << " " << hex << oid << dec << " v " << nv << "  off " << op->get_offset() << " len " << op->get_length() << endl;  
-
-  // issue replica writes
-  OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
-
-  osd_lock.Lock();
-  PG *pg = get_pg(op->get_pg());
-  for (unsigned i=1; i<pg->acting.size(); i++) {
-       issue_replica_op(pg, repop, pg->acting[i]);
-  }
-  osd_lock.Unlock();
-
-  // pre-ack
-  //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
-  //messenger->send_message(reply, op->get_asker());
-  
-  // do it
-  int r;
-  if (op->get_op() == OSD_OP_WRITE) {
-       // write
-       assert(op->get_data().length() == op->get_length());
-       Context *onsync = new C_OSD_WriteSync(this, repop);
-       r = apply_write(op, nv, onsync);
-       
-       // put new object in proper collection
-       if (ov == 0) pg->add_object(store, oid);
-
-       repop->local_ack = true;
-
-       logger->inc("c_wr");
-       logger->inc("c_wrb", op->get_length());
-  } 
-  else if (op->get_op() == OSD_OP_TRUNCATE) {
-       // truncate
-       r = store->truncate(oid, op->get_offset());
-       repop->local_ack = true;
-       repop->local_sync = true;
-  }
-  else if (op->get_op() == OSD_OP_DELETE) {
-       // delete
-       pg->remove_object(store, op->get_oid());
-       r = store->remove(oid);
-       repop->local_ack = true;
-       repop->local_sync = true;
-  }
-  else assert(0);
-
-  // can we reply yet?
-  osd_lock.Lock();
   {
+       // version?  clean?
+       version_t ov = 0;  // 0 == dne (yet)
+       store->getattr(oid, "version", &ov, sizeof(ov));
+       version_t nv = messenger->peek_lamport();
+       assert(nv > ov);
+       
+       dout(12) << opname << " " << hex << oid << dec << " v " << nv << "  off " << op->get_offset() << " len " << op->get_length() << endl;  
+       
+       // issue replica writes
+       OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
+       
+       PG *pg = get_pg(op->get_pg());
+       for (unsigned i=1; i<pg->acting.size(); i++) {
+         issue_replica_op(pg, repop, pg->acting[i]);
+       }
+       
+       // pre-ack
+       //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+       //messenger->send_message(reply, op->get_asker());
+       
+       // do it
+       int r;
+       if (op->get_op() == OSD_OP_WRITE) {
+         // write
+         assert(op->get_data().length() == op->get_length());
+         Context *onsync = new C_OSD_WriteSync(this, repop);
+         r = apply_write(op, nv, onsync);
+         
+         // put new object in proper collection
+         if (ov == 0) pg->add_object(store, oid);
+         
+         repop->local_ack = true;
+         
+         logger->inc("c_wr");
+         logger->inc("c_wrb", op->get_length());
+       } 
+       else if (op->get_op() == OSD_OP_TRUNCATE) {
+         // truncate
+         r = store->truncate(oid, op->get_offset());
+         repop->local_ack = true;
+         repop->local_sync = true;
+       }
+       else if (op->get_op() == OSD_OP_DELETE) {
+         // delete
+         pg->remove_object(store, op->get_oid());
+         r = store->remove(oid);
+         repop->local_ack = true;
+         repop->local_sync = true;
+       }
+       else assert(0);
+       
+       // can we reply yet?
        if (repop->can_send_sync()) {
          dout(10) << opname << " sending sync on " << op << endl;
          MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
@@ -2367,8 +2337,6 @@ void OSD::op_modify(MOSDOp *op)
          messenger->send_message(reply, op->get_asker());
        }
   }
-  osd_lock.Unlock();
-
   unlock_object(oid);
 }