]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: simplify objecter (no scatter/gather, generic ops vector)
authorSage Weil <sage@newdream.net>
Sun, 9 Nov 2008 23:52:02 +0000 (15:52 -0800)
committerSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 00:10:56 +0000 (16:10 -0800)
src/client/SyntheticClient.cc
src/mds/CDir.cc
src/mds/MDSTable.cc
src/mds/SessionMap.cc
src/osdc/Filer.cc
src/osdc/Filer.h
src/osdc/ObjectCacher.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 3a094f285f708fa152e272942e8f3dc2546bd6b5..ad30ecf878bff1dbf40c7dd8d5d80eb4fa9f4ed3 100644 (file)
@@ -1345,7 +1345,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       lock.Lock();
       ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
       __u64 size;
-      client->objecter->stat(oid, &size, layout, 0, new C_SafeCond(&lock, &cond, &ack));
+      client->objecter->stat(oid, layout, &size, 0, new C_SafeCond(&lock, &cond, &ack));
       while (!ack) cond.Wait(lock);
       lock.Unlock();
     }
@@ -1358,7 +1358,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       lock.Lock();
       ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
       bufferlist bl;
-      client->objecter->read(oid, off, len, layout, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
+      client->objecter->read(oid, layout, off, len, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
       while (!ack) cond.Wait(lock);
       lock.Unlock();
     }
@@ -1374,7 +1374,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       bufferlist bl;
       bl.push_back(bp);
       SnapContext snapc;
-      client->objecter->write(oid, off, len, layout, snapc, bl, 0,
+      client->objecter->write(oid, layout, off, len, snapc, bl, 0,
                              new C_SafeCond(&lock, &cond, &ack),
                              safeg->new_sub());
       while (!ack) cond.Wait(lock);
@@ -1389,7 +1389,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
       lock.Lock();
       ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
       SnapContext snapc;
-      client->objecter->zero(oid, off, len, layout, snapc, 0,
+      client->objecter->zero(oid, layout, off, len, snapc, 0,
                             new C_SafeCond(&lock, &cond, &ack),
                             safeg->new_sub());
       while (!ack) cond.Wait(lock);
@@ -2157,7 +2157,7 @@ int SyntheticClient::create_objects(int nobj, int osize, int inflight)
     
     starts.push_back(g_clock.now());
     client->client_lock.Lock();
-    client->objecter->write(oid, 0, osize, layout, snapc, bl, 0,
+    client->objecter->write(oid, layout, 0, osize, snapc, bl, 0,
                            new C_Ref(lock, cond, &unack),
                            new C_Ref(lock, cond, &unsafe));
     client->client_lock.Unlock();
@@ -2258,13 +2258,13 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc,
     utime_t start = g_clock.now();
     if (write) {
       dout(10) << "write to " << oid << dendl;
-      client->objecter->write(oid, 0, osize, layout, snapc, bl, 0,
+      client->objecter->write(oid, layout, 0, osize, snapc, bl, 0,
                              new C_Ref(lock, cond, &unack),
                              new C_Ref(lock, cond, &unsafe));
     } else {
       dout(10) << "read from " << oid << dendl;
       bufferlist inbl;
-      client->objecter->read(oid, 0, osize, layout, &inbl, 0,
+      client->objecter->read(oid, layout, 0, osize, &inbl, 0,
                             new C_Ref(lock, cond, &unack));
     }
     client->client_lock.Unlock();
index 093de51c22060a07a84ea04be4cf078b05067ec0..8980b3cbcf12d05234c2a7f008b4d2b1fa7afd6e 100644 (file)
@@ -1044,9 +1044,9 @@ void CDir::fetch(Context *c, bool ignore_authpinnability)
   // start by reading the first hunk of it
   C_Dir_Fetch *fin = new C_Dir_Fetch(this);
   cache->mds->objecter->read( get_ondisk_object(),
-                             0, 0,   // whole object
                              cache->mds->objecter->osdmap->file_to_object_layout( get_ondisk_object(),
                                                                                   g_default_mds_dir_layout ),
+                             0, 0,   // whole object
                              &fin->bl, 0,
                              fin );
 }
index a4c9bc107f6a54ff943458d32fbf974382aced22..92f4be5cbe6f961c06a59c8f5609c91c39928b41 100644 (file)
@@ -121,9 +121,9 @@ void MDSTable::load(Context *onfinish)
   C_MT_Load *c = new C_MT_Load(this, onfinish);
   object_t oid(ino, 0);
   mds->objecter->read(oid,
-                     0, 0, // whole object
                      mds->objecter->osdmap->file_to_object_layout(oid,
                                                                   g_default_mds_dir_layout),
+                     0, 0, // whole object
                      &c->bl, 0, c);
 }
 
index 5b27dc1ab45068589d40b8f3f0b7b0daaa18aa8f..05a8540c6bdb86387f1fb8c58c5e64310b5a87e0 100644 (file)
@@ -67,9 +67,9 @@ void SessionMap::load(Context *onload)
   C_SM_Load *c = new C_SM_Load(this);
   object_t oid(inode.ino, 0);
   mds->objecter->read(oid,
-                     0, 0, // whole object
                      mds->objecter->osdmap->file_to_object_layout(oid,
                                                                   g_default_mds_dir_layout),
+                     0, 0, // whole object
                      &c->bl, 0,
                      c);
 
index 85c886987e4b402d1a0ed64ba90435486e28bbe3..309bc97db66d58433837518657efff85b6640f65 100644 (file)
@@ -95,7 +95,7 @@ void Filer::_probe(Probe *probe)
        p++) {
     dout(10) << "_probe  probing " << p->oid << dendl;
     C_Probe *c = new C_Probe(this, probe, p->oid);
-    probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c);
+    probe->ops[p->oid] = objecter->stat(p->oid, p->layout, &c->size, probe->flags, c);
   }
 }
 
index b2bbc06e6e4971a4cfec05bf49c38ca19116b69b..7073a26cad71fa32b735560515a6d28d829051f2 100644 (file)
@@ -138,7 +138,7 @@ class Filer {
     vector<ObjectExtent> extents;
     file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
     if (extents.size() == 1) {
-      objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+      objecter->zero(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length, 
                     snapc, flags, onack, oncommit);
     } else {
       C_Gather *gack = 0, *gcom = 0;
@@ -147,7 +147,7 @@ class Filer {
       if (oncommit)
        gcom = new C_Gather(oncommit);
       for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
-       objecter->zero(p->oid, p->offset, p->length, p->layout,
+       objecter->zero(p->oid, p->layout, p->offset, p->length, 
                       snapc, flags,
                       gack ? gack->new_sub():0,
                       gcom ? gcom->new_sub():0);
index 164182da61f2b103d5076787e32118e71f766839..9a879900fcf015dad9434e00f87c24c3ec673c6f 100644 (file)
@@ -404,7 +404,8 @@ void ObjectCacher::bh_read(BufferHead *bh)
   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
+  objecter->read(bh->ob->get_oid(), bh->ob->get_layout(), 
+                bh->start(), bh->length(), 
                 &onfinish->bl, 0,
                 onfinish);
 }
@@ -494,7 +495,8 @@ void ObjectCacher::bh_write(BufferHead *bh)
   C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
+  tid_t tid = objecter->write(bh->ob->get_oid(), bh->ob->get_layout(),
+                             bh->start(), bh->length(), 
                              bh->snapc, bh->bl, 0,
                              onack, oncommit);
 
@@ -1031,8 +1033,9 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
     Cond cond;
     bool done = false;
     //objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
-    objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length,
-                  rd->extents[0].layout, rd->bl, 0,
+    objecter->read(rd->extents[0].oid, rd->extents[0].layout, 
+                  rd->extents[0].offset, rd->extents[0].length,
+                  rd->bl, 0,
                   new C_SafeCond(&lock, &cond, &done));
 
     // block
@@ -1169,7 +1172,7 @@ void ObjectCacher::rdlock(Object *o)
     
     commit->tid = 
       ack->tid = 
-      o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDLOCK, o->get_oid(), 0, o->get_layout(), ack, commit);
+      o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDLOCK, 0, ack, commit);
   }
   
   // stake our claim.
@@ -1212,7 +1215,7 @@ void ObjectCacher::wrlock(Object *o)
     
     commit->tid = 
       ack->tid = 
-      o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), ack, commit);
+      o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, ack, commit);
   }
   
   // stake our claim.
@@ -1255,7 +1258,7 @@ void ObjectCacher::rdunlock(Object *o)
   C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
   commit->tid = 
     lockack->tid = 
-    o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDUNLOCK, o->get_oid(), 0, o->get_layout(), lockack, commit);
+    o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit);
 }
 
 void ObjectCacher::wrunlock(Object *o)
@@ -1287,7 +1290,7 @@ void ObjectCacher::wrunlock(Object *o)
   C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
   commit->tid = 
     lockack->tid = 
-    o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), lockack, commit);
+    o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit);
 }
 
 
index 296a067eb5cd0c7bb6527d5492111e4403126f3a..8dbbd98be68823ad014b809312304f772fb48708 100644 (file)
@@ -248,7 +248,7 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
       tid_t tid = *p;
       
       if (op_modify.count(tid)) {
-        OSDModify *wr = op_modify[tid];
+        ModifyOp *wr = op_modify[tid];
         op_modify.erase(tid);
 
        if (wr->onack)
@@ -257,49 +257,25 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
          num_uncommitted--;
        
         // WRITE
-       if (wr->waitfor_ack.count(tid)) {
+       if (wr->onack) {
           dout(3) << "kick_requests missing ack, resub write " << tid << dendl;
-          modifyx_submit(wr, wr->waitfor_ack[tid], tid);
+          modify_submit(wr);
         } else {
-         assert(wr->waitfor_commit.count(tid));
-         
-         if (wr->tid_version.count(tid)) {
-           if ((wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL) &&
-               !g_conf.objecter_buffer_uncommitted) {
-             dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
-             assert(0);  // crap. fixme.
-           } else {
-             dout(3) << "kick_requests missing commit, replay write " << tid
-                     << " v " << wr->tid_version[tid] << dendl;
-           }
-         } else {
-           dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
-         }
-         modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+         assert(wr->oncommit);
+         dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
+         modify_submit(wr);
         } 
       }
 
       else if (op_read.count(tid)) {
         // READ
-        OSDRead *rd = op_read[tid];
+        ReadOp *rd = op_read[tid];
         op_read.erase(tid);
         dout(3) << "kick_requests resub read " << tid << dendl;
 
         // resubmit
-        readx_submit(rd, rd->ops[tid], true);
-        rd->ops.erase(tid);
+        read_submit(rd);
       }
-
-      else if (op_stat.count(tid)) {
-       OSDStat *st = op_stat[tid];
-       op_stat.erase(tid);
-       
-       dout(3) << "kick_requests resub stat " << tid << dendl;
-               
-        // resubmit
-        stat_submit(st);
-      }
-         
       else 
         assert(0);
     }         
@@ -338,215 +314,57 @@ void Objecter::tick()
 
 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 {
-  assert(m->ops.size() >= 1);
-  int op = m->ops[0].op;
-
-  // read or modify?
-  switch (op) {
-  case CEPH_OSD_OP_READ:
-    handle_osd_read_reply(m);
-    break;
-
-  case CEPH_OSD_OP_STAT:
-    handle_osd_stat_reply(m);
-    break;
-    
-  default:
-    assert(m->is_modify());
+  if (m->is_modify())
     handle_osd_modify_reply(m);
-    break;
-  }
-}
-
-
-
-// stat -----------------------------------
-
-tid_t Objecter::stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish)
-{
-  OSDStat *st = prepare_stat(size, flags);
-  st->extents.push_back(ObjectExtent(oid, 0, 0));
-  st->extents.front().layout = ol;
-  st->onfinish = onfinish;
-
-  return stat_submit(st);
-}
-
-tid_t Objecter::stat_submit(OSDStat *st) 
-{
-  // find OSD
-  ObjectExtent &ex = st->extents.front();
-  PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
-
-  // pick tid
-  last_tid++;
-  assert(client_inc >= 0);
-
-  // add to gather set
-  st->tid = last_tid;
-  op_stat[last_tid] = st;    
-
-  pg.active_tids.insert(last_tid);
-
-  // send?
-
-  dout(10) << "stat_submit " << st << " tid " << last_tid
-           << " oid " << ex.oid
-           << " " << ex.layout
-           << " osd" << pg.acker() 
-           << dendl;
-
-  if (pg.acker() >= 0) {
-    int flags = st->flags;
-    if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
-
-    MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
-                          ex.oid, ex.layout, osdmap->get_epoch(), 
-                          flags);
-    m->stat();
-    if (inc_lock > 0) {
-      st->inc_lock = inc_lock;
-      m->set_inc_lock(inc_lock);
-    }
-    
-    messenger->send_message(m, osdmap->get_inst(pg.acker()));
-  }
-  
-  return last_tid;
+  else
+    handle_osd_read_reply(m);
 }
 
-void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
-{
-  // get pio
-  tid_t tid = m->get_tid();
-
-  if (op_stat.count(tid) == 0) {
-    dout(7) << "handle_osd_stat_reply " << tid << " ... stray" << dendl;
-    delete m;
-    return;
-  }
-
-  ceph_osd_op& op = m->ops[0];
-
-  dout(7) << "handle_osd_stat_reply " << tid 
-         << " r=" << m->get_result()
-         << " size=" << op.length
-         << dendl;
-  OSDStat *st = op_stat[ tid ];
-  op_stat.erase( tid );
-
-  // remove from osd/tid maps
-  PG& pg = get_pg( m->get_pg() );
-  assert(pg.active_tids.count(tid));
-  pg.active_tids.erase(tid);
-  if (pg.active_tids.empty()) close_pg( m->get_pg() );
-  
-  // success?
-  if (m->get_result() == -EINCLOCKED &&
-      (st->flags & CEPH_OSD_OP_INCLOCK_FAIL) == 0) {
-    dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
-    stat_submit(st);
-    delete m;
-    return;
-  }
-  if (m->get_result() == -EAGAIN) {
-    dout(7) << " got -EAGAIN, resubmitting" << dendl;
-    stat_submit(st);
-    delete m;
-    return;
-  }
-
-  // ok!
-  if (m->get_result() < 0) {
-    *st->size = 0;
-  } else {
-    *st->size = op.length;
-  }
-
-  // finish, clean up
-  Context *onfinish = st->onfinish;
-
-  // done
-  delete st;
-  if (onfinish) {
-    onfinish->finish(m->get_result());
-    delete onfinish;
-  }
-
-  delete m;
-}
 
 
 // read -----------------------------------
 
-
-tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, 
-                    bufferlist *bl, int flags, 
-                     Context *onfinish)
-{
-  OSDRead *rd = prepare_read(bl, flags);
-  rd->extents.push_back(ObjectExtent(oid, off, len));
-  rd->extents.front().layout = ol;
-  readx(rd, onfinish);
-  return last_tid;
-}
-
-
-tid_t Objecter::readx(OSDRead *rd, Context *onfinish)
-{
-  rd->onfinish = onfinish;
-  
-  // issue reads
-  for (list<ObjectExtent>::iterator it = rd->extents.begin();
-       it != rd->extents.end();
-       it++) 
-    readx_submit(rd, *it);
-
-  return last_tid;
-}
-
-tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) 
+tid_t Objecter::read_submit(ReadOp *rd)
 {
   // find OSD
-  PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
+  PG &pg = get_pg( pg_t(rd->layout.ol_pgid) );
 
   // pick tid
-  last_tid++;
-  assert(client_inc >= 0);
-
-  // add to gather set
-  rd->ops[last_tid] = ex;
+  rd->tid = ++last_tid;
   op_read[last_tid] = rd;    
 
+  assert(client_inc >= 0);
+
   pg.active_tids.insert(last_tid);
   pg.last = g_clock.now();
 
   // send?
-  dout(10) << "readx_submit " << rd << " tid " << last_tid
-           << " oid " << ex.oid << " " << ex.offset << "~" << ex.length
-           << " (" << ex.buffer_extents.size() << " buffer fragments)" 
-           << " " << ex.layout
+  dout(10) << "read_submit " << rd << " tid " << last_tid
+           << " oid " << rd->oid
+          << " " << rd->ops
+           << " " << rd->layout
            << " osd" << pg.acker() 
            << dendl;
 
   if (pg.acker() >= 0) {
     int flags = rd->flags;
-    if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
+    if (rd->onfinish)
+      flags |= CEPH_OSD_OP_ACK;
     MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
-                          ex.oid, ex.layout, osdmap->get_epoch(), 
+                          rd->oid, rd->layout, osdmap->get_epoch(), 
                           flags);
-    m->read(ex.offset, ex.length);
+    m->ops = rd->ops;
     if (inc_lock > 0) {
       rd->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
-    m->set_retry_attempt(retry);
+    m->set_retry_attempt(rd->attempts++);
     
     int who = pg.acker();
     if (rd->flags & CEPH_OSD_OP_BALANCE_READS) {
       int replica = messenger->get_myname().num() % pg.acting.size();
       who = pg.acting[replica];
-      dout(-10) << "readx_submit reading from random replica " << replica
+      dout(-10) << "read_submit reading from random replica " << replica
                << " = osd" << who <<  dendl;
     }
     messenger->send_message(m, osdmap->get_inst(who));
@@ -569,20 +387,15 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   }
 
   dout(7) << "handle_osd_read_reply " << tid << dendl;
-  OSDRead *rd = op_read[ tid ];
+  ReadOp *rd = op_read[ tid ];
   op_read.erase( tid );
   
-  ceph_osd_op& op = m->ops[0];
-
   // remove from osd/tid maps
   PG& pg = get_pg( m->get_pg() );
   assert(pg.active_tids.count(tid));
   pg.active_tids.erase(tid);
   if (pg.active_tids.empty()) close_pg( m->get_pg() );
   
-  // our op finished
-  rd->ops.erase(tid);
-
   // fail?
   if (m->get_result() == -EINCLOCKED &&
       rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
@@ -600,141 +413,33 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   if (m->get_result() == -EAGAIN ||
       m->get_result() == -EINCLOCKED) {
     dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
-    readx_submit(rd, rd->ops[tid], true);
+    read_submit(rd);
     delete m;
     return;
   }
 
   // what buffer offset are we?
-  dout(7) << " got frag from " << m->get_oid() << " "
-          << op.offset << "~" << op.length
-          << ", still have " << rd->ops.size() << " more ops" << dendl;
-  
-  if (rd->ops.empty()) {
-    // all done
-    size_t bytes_read = 0;
-    
-    if (rd->read_data.size()) {
-      dout(15) << " assembling frags" << dendl;
-
-      /** FIXME This doesn't handle holes efficiently.
-       * It allocates zero buffers to fill whole buffer, and
-       * then discards trailing ones at the end.
-       *
-       * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over
-       * the heap. 
-       */
-
-      // we have other fragments, assemble them all... blech!
-      rd->read_data[m->get_oid()] = new bufferlist;
-      rd->read_data[m->get_oid()]->claim( m->get_data() );
-
-      // map extents back into buffer
-      map<__u64, bufferlist*> by_off;  // buffer offset -> bufferlist
-
-      // for each object extent...
-      for (list<ObjectExtent>::iterator eit = rd->extents.begin();
-           eit != rd->extents.end();
-           eit++) {
-        bufferlist *ox_buf = rd->read_data[eit->oid];
-        unsigned ox_len = ox_buf->length();
-        unsigned ox_off = 0;
-        assert(ox_len <= eit->length);           
-
-        // for each buffer extent we're mapping into...
-        for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
-             bit != eit->buffer_extents.end();
-             bit++) {
-          dout(21) << " object " << eit->oid
-                  << " extent " << eit->offset << "~" << eit->length
-                  << " : ox offset " << ox_off
-                  << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
-          by_off[bit->first] = new bufferlist;
-
-          if (ox_off + bit->second <= ox_len) {
-            // we got the whole bx
-            by_off[bit->first]->substr_of(*ox_buf, ox_off, bit->second);
-            if (bytes_read < bit->first + bit->second) 
-              bytes_read = bit->first + bit->second;
-          } else if (ox_off + bit->second > ox_len && ox_off < ox_len) {
-            // we got part of this bx
-            by_off[bit->first]->substr_of(*ox_buf, ox_off, (ox_len-ox_off));
-            if (bytes_read < bit->first + ox_len-ox_off) 
-              bytes_read = bit->first + ox_len-ox_off;
-
-            // zero end of bx
-            dout(21) << "  adding some zeros to the end " << ox_off + bit->second-ox_len << dendl;
-            bufferptr z(ox_off + bit->second - ox_len);
-                       z.zero();
-            by_off[bit->first]->append( z );
-          } else {
-            // we got none of this bx.  zero whole thing.
-            assert(ox_off >= ox_len);
-            dout(21) << "  adding all zeros for this bit " << bit->second << dendl;
-            bufferptr z(bit->second);
-                       z.zero();
-            by_off[bit->first]->append( z );
-          }
-          ox_off += bit->second;
-        }
-        assert(ox_off == eit->length);
-      }
+  dout(7) << " got reply on " << rd->ops << dendl;
 
-      // sort and string bits together
-      for (map<__u64, bufferlist*>::iterator it = by_off.begin();
-           it != by_off.end();
-           it++) {
-        assert(it->second->length());
-        if (it->first < (__u64)bytes_read) {
-          dout(21) << "  concat buffer frag off " << it->first << " len " << it->second->length() << dendl;
-          rd->bl->claim_append(*(it->second));
-        } else {
-          dout(21) << "  NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl;          
-        }
-        delete it->second;
-      }
-
-      // trim trailing zeros?
-      if (rd->bl->length() > bytes_read) {
-        dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read 
-                 << " len=" << rd->bl->length() << dendl;
-        rd->bl->splice(bytes_read, rd->bl->length() - bytes_read);
-        assert(bytes_read == rd->bl->length());
-      }
-      
-      // hose p->read_data bufferlist*'s
-      for (map<object_t, bufferlist*>::iterator it = rd->read_data.begin();
-           it != rd->read_data.end();
-           it++) {
-        delete it->second;
-      }
-    } else {
-      dout(15) << "  only one frag" << dendl;
+  int bytes_read = m->get_data().length();
 
-      // only one fragment, easy
-      rd->bl->claim( m->get_data() );
-      bytes_read = rd->bl->length();
-    }
-
-    // finish, clean up
-    Context *onfinish = rd->onfinish;
-
-    dout(7) << " " << bytes_read << " bytes " 
-            << rd->bl->length()
-            << dendl;
-    
-    // done
-    delete rd;
-    if (onfinish) {
-      onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
-      delete onfinish;
-    }
-  } else {
-    // store my bufferlist for later assembling
-    rd->read_data[m->get_oid()] = new bufferlist;
-    rd->read_data[m->get_oid()]->claim( m->get_data() );
+  if (rd->pbl)
+    rd->pbl->claim(m->get_data());
+  if (rd->psize) {
+    ceph_osd_op& op = m->ops[0];
+    *(rd->psize) = op.length;
   }
 
+  // finish, clean up
+  Context *onfinish = rd->onfinish;
+  dout(7) << " " << bytes_read << " bytes " << dendl;
+  
+  // done
+  delete rd;
+  if (onfinish) {
+    onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
+    delete onfinish;
+  }
   delete m;
 }
 
@@ -742,177 +447,65 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 
 // write ------------------------------------
 
-tid_t Objecter::write(object_t oid, __u64 off, size_t len,
-                     ceph_object_layout ol, const SnapContext& snapc, 
-                     bufferlist &bl, int flags,
-                      Context *onack, Context *oncommit)
-{
-  OSDWrite *wr = prepare_write(snapc, bl, flags);
-  wr->extents.push_back(ObjectExtent(oid, off, len));
-  wr->extents.front().layout = ol;
-  wr->extents.front().buffer_extents[0] = len;
-  modifyx(wr, onack, oncommit);
-  return last_tid;
-}
-
-tid_t Objecter::write_full(object_t oid, 
-                          ceph_object_layout ol, const SnapContext& snapc,
-                          bufferlist &bl, int flags,
-                          Context *onack, Context *oncommit)
-{
-  OSDWrite *wr = prepare_write_full(snapc, bl, flags);
-  wr->extents.push_back(ObjectExtent(oid, 0, bl.length()));
-  wr->extents.front().layout = ol;
-  wr->extents.front().buffer_extents[0] = bl.length();
-  modifyx(wr, onack, oncommit);
-  return last_tid;
-}
-
-
-// zero
-
-tid_t Objecter::zero(object_t oid, __u64 off, size_t len, 
-                    ceph_object_layout ol, const SnapContext& snapc,
-                    int flags, 
-                     Context *onack, Context *oncommit)
-{
-  OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags);
-  z->extents.push_back(ObjectExtent(oid, off, len));
-  z->extents.front().layout = ol;
-  modifyx(z, onack, oncommit);
-  return last_tid;
-}
-
-// remove
-
-tid_t Objecter::remove(object_t oid,
-                      ceph_object_layout ol, const SnapContext& snapc,
-                      int flags, 
-                      Context *onack, Context *oncommit)
-{
-  OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
-  z->extents.push_back(ObjectExtent(oid, 0, 0));
-  z->extents.front().layout = ol;
-  modifyx(z, onack, oncommit);
-  return last_tid;
-}
-
-
-// lock ops
-
-tid_t Objecter::lock(int op, object_t oid, int flags,
-                    ceph_object_layout ol,
-                     Context *onack, Context *oncommit)
-{
-  SnapContext snapc; // null is fine
-  OSDModify *l = prepare_modify(snapc, op, flags);
-  l->extents.push_back(ObjectExtent(oid, 0, 0));
-  l->extents.front().layout = ol;
-  modifyx(l, onack, oncommit);
-  return last_tid;
-}
-
-
-
-// generic modify -----------------------------------
-
-tid_t Objecter::modifyx(OSDModify *wr, Context *onack, Context *oncommit)
-{
-  wr->onack = onack;
-  wr->oncommit = oncommit;
-
-  // issue writes/whatevers
-  for (list<ObjectExtent>::iterator it = wr->extents.begin();
-       it != wr->extents.end();
-       it++) 
-    modifyx_submit(wr, *it);
-
-  return last_tid;
-}
-
-
-tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
+tid_t Objecter::modify_submit(ModifyOp *wr)
 {
   // find
-  PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
+  PG &pg = get_pg( pg_t(wr->layout.ol_pgid) );
     
   // pick tid
-  tid_t tid;
-  if (usetid > 0) 
-    tid = usetid;
-  else
-    tid = ++last_tid;
+  if (!wr->tid)
+    wr->tid = ++last_tid;
   assert(client_inc >= 0);
 
   // add to gather set(s)
   int flags = wr->flags;
   if (wr->onack) {
     flags |= CEPH_OSD_OP_ACK;
-    wr->waitfor_ack[tid] = ex;
     ++num_unacked;
   } else {
     dout(20) << " note: not requesting ack" << dendl;
   }
   if (wr->oncommit) {
     flags |= CEPH_OSD_OP_SAFE;
-    wr->waitfor_commit[tid] = ex;
     ++num_uncommitted;
   } else {
     dout(20) << " note: not requesting commit" << dendl;
   }
-  op_modify[tid] = wr;
-  pg.active_tids.insert(tid);
+  op_modify[wr->tid] = wr;
+  pg.active_tids.insert(wr->tid);
   pg.last = g_clock.now();
 
   // send?
-  dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid
-           << "  oid " << ex.oid
-           << " " << ex.offset << "~" << ex.length 
-           << " " << ex.layout 
+  dout(10) << "modify_submit oid " << wr->oid
+          << " " << wr->ops << " tid " << wr->tid
+           << " " << wr->layout 
            << " osd" << pg.primary()
            << dendl;
   if (pg.primary() >= 0) {
-    MOSDOp *m = new MOSDOp(client_inc, tid, true,
-                          ex.oid, ex.layout, osdmap->get_epoch(),
+    MOSDOp *m = new MOSDOp(client_inc, wr->tid, true,
+                          wr->oid, wr->layout, osdmap->get_epoch(),
                           flags);
-    m->add_simple_op(wr->op, ex.offset, ex.length);
+    m->ops = wr->ops;
     m->set_snap_seq(wr->snapc.seq);
     m->get_snaps() = wr->snapc.snaps;
     if (inc_lock > 0) {
       wr->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
-    if (usetid > 0)
-      m->set_retry_attempt(true);
-    
-    if (wr->tid_version.count(tid)) 
-      m->set_version(wr->tid_version[tid]);  // we're replaying this op!
-    
-    // what type of op?
-    switch (wr->op) {
-    case CEPH_OSD_OP_WRITE:
-    case CEPH_OSD_OP_WRITEFULL:
-      {
-       // map buffer segments into this extent
-       // (may be fragmented bc of striping)
-       bufferlist cur;
-       for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin();
-            bit != ex.buffer_extents.end();
-            bit++) 
-         ((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur);
-       assert(cur.length() == ex.length);
-       m->set_data(cur);//.claim(cur);
-      }
-      break;
-    }
+    m->set_retry_attempt(wr->attempts++);
     
+    if (wr->version != eversion_t())
+      m->set_version(wr->version);  // we're replaying this op!
+
+    m->set_data(wr->bl);
+
     messenger->send_message(m, osdmap->get_inst(pg.primary()));
   } else 
     maybe_request_map();
   
   dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
   
-  return tid;
+  return wr->tid;
 }
 
 
@@ -934,7 +527,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
           << (m->is_safe() ? " commit":" ack")
           << " v " << m->get_version() << " in " << m->get_pg()
           << dendl;
-  OSDModify *wr = op_modify[ tid ];
+  ModifyOp *wr = op_modify[ tid ];
 
   Context *onack = 0;
   Context *oncommit = 0;
@@ -970,11 +563,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
     dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
     if (wr->onack) num_unacked--;
     if (wr->oncommit) num_uncommitted--;
-    if (wr->waitfor_ack.count(tid)) 
-      modifyx_submit(wr, wr->waitfor_ack[tid]);
-    else if (wr->waitfor_commit.count(tid)) 
-      modifyx_submit(wr, wr->waitfor_commit[tid]);
-    else assert(0);
+    modify_submit(wr);
     delete m;
     return;
   }
@@ -982,75 +571,32 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
   assert(m->get_result() >= 0); // FIXME
 
   // ack|commit -> ack
-  if (wr->waitfor_ack.count(tid)) {
-    wr->waitfor_ack.erase(tid);
-    num_unacked--;
+  if (wr->onack) {
     dout(15) << "handle_osd_modify_reply ack" << dendl;
-    
-    /*
-      osd uses v to reorder during replay, but doesn't preserve it
-    if (wr->tid_version.count(tid) &&
-       wr->tid_version[tid].version != m->get_version().version) {
-      dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid 
-               << " did not achieve previous ordering" << dendl;
-    }
-    */
-    wr->tid_version[tid] = m->get_version();
-    
-    if (wr->waitfor_ack.empty()) {
-      onack = wr->onack;
-      wr->onack = 0;  // only do callback once
-      
-      // buffer uncommitted?
-      if (!g_conf.objecter_buffer_uncommitted &&
-         (wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL)) {
-       // discard buffer!
-       ((OSDWrite*)wr)->bl.clear();
-      }
-    } else {
-      dout(15) << "handle_osd_modify_reply still need " 
-              << wr->waitfor_ack.size() << " acks" << dendl;
-    }
+    wr->version = m->get_version();
+    onack = wr->onack;
+    wr->onack = 0;  // only do callback once
+    num_unacked--;
   }
-  if (m->is_safe()) {
-    // safe
-    /*
-      osd uses v to reorder during replay, but doesn't preserve it
-    assert(wr->tid_version.count(tid) == 0 ||
-           m->get_version() == wr->tid_version[tid]);
-    */
-
-    wr->waitfor_commit.erase(tid);
-    num_uncommitted--;
+  if (wr->oncommit) {
     dout(15) << "handle_osd_modify_reply safe" << dendl;
-    
-    if (wr->waitfor_commit.empty()) {
-      oncommit = wr->oncommit;
-      wr->oncommit = 0;
-    } else {
-      dout(15) << "handle_osd_modify_reply still need "
-              << wr->waitfor_commit.size() << " safes" << dendl;
-    }
+    oncommit = wr->oncommit;
+    wr->oncommit = 0;
+    num_uncommitted--;
   }
 
   // done?
  done:
 
   // done with this tid?
-  if (wr->waitfor_commit.count(tid) == 0 &&
-      wr->waitfor_ack.count(tid) == 0) {
+  if (!wr->onack && !wr->oncommit) {
     assert(pg.active_tids.count(tid));
     pg.active_tids.erase(tid);
-    dout(15) << "handle_osd_modify_reply pg " << m->get_pg()
+    dout(15) << "handle_osd_modify_reply completed tid " << tid << ", pg " << m->get_pg()
             << " still has " << pg.active_tids << dendl;
     if (pg.active_tids.empty()) 
       close_pg( m->get_pg() );
     op_modify.erase( tid );
-  }
-  
-  // done with this overall op?
-  if (wr->onack == 0 && wr->oncommit == 0) {
-    dout(15) << "handle_osd_modify_reply completed" << dendl;
     delete wr;
   }
   
@@ -1215,11 +761,9 @@ void Objecter::dump_active()
 {
   dout(10) << "dump_active" << dendl;
   
-  for (hash_map<tid_t,OSDStat*>::iterator p = op_stat.begin(); p != op_stat.end(); p++)
-    dout(10) << " stat " << p->first << dendl;
-  for (hash_map<tid_t,OSDRead*>::iterator p = op_read.begin(); p != op_read.end(); p++)
+  for (hash_map<tid_t,ReadOp*>::iterator p = op_read.begin(); p != op_read.end(); p++)
     dout(10) << " read " << p->first << dendl;
-  for (hash_map<tid_t,OSDModify*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
+  for (hash_map<tid_t,ModifyOp*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
     dout(10) << " modify " << p->first << dendl;
 
 }
index 11ad8c18a5e7ea375c6097bebf17bdc39ad6af1c..c00fa4c8240125f3143432fe2d2cfe5e12e88788 100644 (file)
@@ -68,86 +68,55 @@ class Objecter {
   /*** track pending operations ***/
   // read
  public:
-  class OSDOp {
-  public:
-    list<ObjectExtent> extents;
-    int inc_lock;
-    OSDOp() : inc_lock(0) {}
-    virtual ~OSDOp() {}
-  };
-
-  class OSDRead : public OSDOp {
-  public:
-    bufferlist *bl;
-    Context *onfinish;
-    map<tid_t, ObjectExtent> ops;
-    map<object_t, bufferlist*> read_data;  // bits of data as they come back
+  struct ReadOp {
+    object_t oid;
+    ceph_object_layout layout;
+    vector<ceph_osd_op> ops;
+    bufferlist *pbl;
+    __u64 *psize;
     int flags;
+    Context *onfinish;
+    int inc_lock;
 
-    OSDRead(bufferlist *b, int f) : OSDOp(), bl(b), onfinish(0), flags(f) {
-      if (bl)
-       bl->clear();
-    }
-  };
-
-  OSDRead *prepare_read(bufferlist *b, int f) {
-    return new OSDRead(b, f);
-  }
-
-  class OSDStat : public OSDOp {
-  public:
     tid_t tid;
-    __u64 *size;  // where the size goes.
-    int flags;
-    Context *onfinish;
-    OSDStat(__u64 *s, int f) : OSDOp(), tid(0), size(s), flags(f), onfinish(0) { }
+    int attempts;
+
+    ReadOp(object_t o, ceph_object_layout& ol, int f, Context *of, int n=0) :
+      oid(o), layout(ol), ops(n),
+      pbl(0), psize(0), flags(f), onfinish(of), inc_lock(-1),
+      tid(0), attempts(0) {
+      for (int i=0; i<n; i++)
+       memset(&ops[i], 0,sizeof(ops[i]));
+    }
   };
 
-  OSDStat *prepare_stat(__u64 *s, int f) {
-    return new OSDStat(s, f);
-  }
 
-  // generic modify
-  class OSDModify : public OSDOp {
-  public:
+  struct ModifyOp {
+    object_t oid;
+    ceph_object_layout layout;
     SnapContext snapc;
-    int op;
-    list<ObjectExtent> extents;
+    vector<ceph_osd_op> ops;
+    bufferlist bl;
     int flags;
-    Context *onack;
-    Context *oncommit;
-    map<tid_t, ObjectExtent> waitfor_ack;
-    map<tid_t, eversion_t>   tid_version;
-    map<tid_t, ObjectExtent> waitfor_commit;
-
-    OSDModify(const SnapContext& sc, int o, int f) : OSDOp(), snapc(sc), op(o), flags(f), onack(0), oncommit(0) {}
-  };
+    Context *onack, *oncommit;
+    int inc_lock;
 
-  OSDModify *prepare_modify(const SnapContext& sc, int o, int f) { 
-    return new OSDModify(sc, o, f); 
-  }
-  
-  // write (includes the bufferlist)
-  class OSDWrite : public OSDModify {
-  public:
-    bufferlist bl;
-    OSDWrite(int op, const SnapContext& sc, bufferlist &b, int f) : OSDModify(sc, op, f), bl(b) {}
+    tid_t tid;
+    eversion_t version;
+    int attempts;
+
+    ModifyOp(object_t o, ceph_object_layout& l, const SnapContext& sc, int f, Context *ac, Context *co, int n=0) :
+      oid(o), layout(l), snapc(sc), ops(n), flags(f), onack(ac), oncommit(co), inc_lock(-1),
+      tid(0), attempts(0) {
+      for (int i=0; i<n; i++)
+       memset(&ops[i], 0,sizeof(ops[i]));
+    }
   };
 
-  OSDWrite *prepare_write(const SnapContext& sc, bufferlist &b, int f) { 
-    return new OSDWrite(CEPH_OSD_OP_WRITE, sc, b, f); 
-  }
-  OSDWrite *prepare_write_full(const SnapContext& sc, bufferlist &b, int f) { 
-    return new OSDWrite(CEPH_OSD_OP_WRITEFULL, sc, b, f); 
-  }
-
-  
-
  private:
   // pending ops
-  hash_map<tid_t,OSDStat*>   op_stat;
-  hash_map<tid_t,OSDRead*>   op_read;
-  hash_map<tid_t,OSDModify*> op_modify;
+  hash_map<tid_t,ReadOp* >  op_read;
+  hash_map<tid_t,ModifyOp*> op_modify;
 
   /**
    * track pending ops by pg
@@ -207,16 +176,14 @@ class Objecter {
  public:
   void dispatch(Message *m);
   void handle_osd_op_reply(class MOSDOpReply *m);
-  void handle_osd_stat_reply(class MOSDOpReply *m);
   void handle_osd_read_reply(class MOSDOpReply *m);
   void handle_osd_modify_reply(class MOSDOpReply *m);
   void handle_osd_lock_reply(class MOSDOpReply *m);
   void handle_osd_map(class MOSDMap *m);
 
  private:
-  tid_t readx_submit(OSDRead *rd, ObjectExtent& ex, bool retry=false);
-  tid_t modifyx_submit(OSDModify *wr, ObjectExtent& ex, tid_t tid=0);
-  tid_t stat_submit(OSDStat *st);
+  tid_t read_submit(ReadOp *rd);
+  tid_t modify_submit(ModifyOp *wr);
 
   // public interface
  public:
@@ -232,26 +199,70 @@ class Objecter {
   void set_inc_lock(int l) { inc_lock = l; }
     
 
-  // med level
-  tid_t readx(OSDRead *read, Context *onfinish);
-  tid_t modifyx(OSDModify *wr, Context *onack, Context *oncommit);
+  // 
+  tid_t stat(object_t oid, ceph_object_layout ol,
+            __u64 *size, int flags, 
+            Context *onfinish) {
+    ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1);
+    rd->psize = size;
+    rd->ops[0].op = CEPH_OSD_OP_STAT;
+    return read_submit(rd);
+  }
 
-  // even lazier
-  tid_t read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
-             Context *onfinish);
-  tid_t stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish);
+  tid_t read(object_t oid, ceph_object_layout ol,
+            __u64 off, size_t len, bufferlist *bl, int flags,
+            Context *onfinish) {
+    ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1);
+    rd->pbl = bl;
+    rd->ops[0].op = CEPH_OSD_OP_READ;
+    rd->ops[0].offset = off;
+    rd->ops[0].length = len;
+    return read_submit(rd);
+  }
 
-  tid_t write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags,
-              Context *onack, Context *oncommit);
-  tid_t write_full(object_t oid, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags,
-              Context *onack, Context *oncommit);
-  tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags,
-             Context *onack, Context *oncommit);
-  tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags,
-              Context *onack, Context *oncommit);
+  tid_t write(object_t oid, ceph_object_layout ol,
+             __u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, int flags,
+              Context *onack, Context *oncommit) {
+    ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+    wr->bl = bl;
+    wr->ops[0].op = CEPH_OSD_OP_WRITE;
+    wr->ops[0].offset = off;
+    wr->ops[0].length = len;
+    return modify_submit(wr);    
+  }
+  tid_t write_full(object_t oid, ceph_object_layout ol,
+                  const SnapContext& snapc, bufferlist &bl, int flags,
+                  Context *onack, Context *oncommit) {
+    ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+    wr->bl = bl;
+    wr->ops[0].op = CEPH_OSD_OP_WRITEFULL;
+    wr->ops[0].offset = 0;
+    wr->ops[0].length = bl.length();
+    return modify_submit(wr);    
+  }
+  tid_t zero(object_t oid, ceph_object_layout ol, 
+            __u64 off, size_t len, const SnapContext& snapc, int flags,
+             Context *onack, Context *oncommit) {
+    ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+    wr->ops[0].op = CEPH_OSD_OP_ZERO;
+    wr->ops[0].offset = 0;
+    wr->ops[0].length = len;
+    return modify_submit(wr);    
+  }
+  tid_t remove(object_t oid, ceph_object_layout ol, 
+              const SnapContext& snapc, int flags,
+              Context *onack, Context *oncommit) {
+    ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+    wr->ops[0].op = CEPH_OSD_OP_DELETE;
+    return modify_submit(wr);    
+  }
 
-  // no snapc for lock ops
-  tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit);
+  tid_t lock(object_t oid, ceph_object_layout ol, int op, int flags, Context *onack, Context *oncommit) {
+    SnapContext snapc;  // no snapc for lock ops
+    ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+    wr->ops[0].op = op;
+    return modify_submit(wr);    
+  }
 
 
 
@@ -280,14 +291,14 @@ class Objecter {
 
   void sg_read(vector<ObjectExtent>& extents, bufferlist *bl, int flags, Context *onfinish) {
     if (extents.size() == 1) {
-      read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+      read(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
           bl, flags, onfinish);
     } else {
       C_Gather *g = new C_Gather;
       vector<bufferlist> resultbl(extents.size());
       int i=0;
       for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
-       read(p->oid, p->offset, p->length, p->layout,
+       read(p->oid, p->layout, p->offset, p->length,
             &resultbl[i++], flags, onfinish);
       }
       g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
@@ -298,7 +309,7 @@ class Objecter {
   void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, bufferlist bl,
                int flags, Context *onack, Context *oncommit) {
     if (extents.size() == 1) {
-      write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+      write(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
            snapc, bl, flags, onack, oncommit);
     } else {
       C_Gather *gack = 0, *gcom = 0;
@@ -313,7 +324,7 @@ class Objecter {
             bit++)
          bl.copy(bit->first, bit->second, cur);
        assert(cur.length() == p->length);
-       write(p->oid, p->offset, p->length, p->layout,
+       write(p->oid, p->layout, p->offset, p->length, 
              snapc, cur, flags,
              gack ? gack->new_sub():0,
              gcom ? gcom->new_sub():0);