]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: avoid using objecter readx/writex
authorSage Weil <sage@newdream.net>
Sun, 9 Nov 2008 23:05:37 +0000 (15:05 -0800)
committerSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 00:10:55 +0000 (16:10 -0800)
src/client/Client.cc
src/client/Client.h
src/client/SyntheticClient.cc
src/osd/osd_types.h
src/osdc/Filer.cc
src/osdc/Filer.h
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index a07dae29bf247f82f4b37db8d74488cf7a755913..5f126406783f541c4d160848b04fa7dafc95991f 100644 (file)
@@ -2078,7 +2078,7 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m)
   if (g_conf.client_oc &&
       m->get_size() < in->inode.size) {
     // map range to objects
-    list<ObjectExtent> ls;
+    vector<ObjectExtent> ls;
     filer->file_to_extents(in->inode.ino, &in->inode.layout, CEPH_NOSNAP,
                           m->get_size(), in->inode.size - m->get_size(),
                           ls);
@@ -3699,12 +3699,11 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl)
     // object cache OFF -- non-atomic sync read from osd
   
     // do sync read
-    Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, in->snapid,
-                                               offset, size, bl, 0);
+    int flags = 0;
     if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
-      rd->flags |= CEPH_OSD_OP_BALANCE_READS;
-    r = objecter->readx(rd, onfinish);
-    assert(r >= 0);
+      flags |= CEPH_OSD_OP_BALANCE_READS;
+    filer->read(in->inode.ino, &in->inode.layout, in->snapid,
+               offset, size, bl, flags, onfinish);
 
     while (!done)
       cond.Wait(client_lock);
@@ -5010,7 +5009,7 @@ int Client::get_stripe_period(int fd)
   return ceph_file_layout_period(layout);
 }
 
-int Client::enumerate_layout(int fd, list<ObjectExtent>& result,
+int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
                             loff_t length, loff_t offset)
 {
   Mutex::Locker lock(client_lock);
index 80592ec4c7d2a948352bc465505676d6785a8afc..a37ff7f71056c74dc52f5c5f6c77e8254ec4116d 100644 (file)
@@ -1011,7 +1011,7 @@ public:
   int get_stripe_unit(int fd);
   int get_stripe_width(int fd);
   int get_stripe_period(int fd);
-  int enumerate_layout(int fd, list<ObjectExtent>& result,
+  int enumerate_layout(int fd, vector<ObjectExtent>& result,
                       loff_t length, loff_t offset);
 
   int mksnap(const char *path, const char *name);
index 639178f8fd74a27508d44960cb6b854a4f1a640c..3a094f285f708fa152e272942e8f3dc2546bd6b5 100644 (file)
@@ -1604,7 +1604,7 @@ int SyntheticClient::dump_placement(string& fn) {
   off_t filesize = stbuf.st_size;
  
   // grab the placement info
-  list<ObjectExtent> extents;
+  vector<ObjectExtent> extents;
   off_t offset = 0;
   client->enumerate_layout(fd, extents, filesize, offset);
   client->close(fd);
@@ -1613,13 +1613,13 @@ int SyntheticClient::dump_placement(string& fn) {
   // run through all the object extents
   dout(0) << "file size is " << filesize << dendl;
   dout(0) << "(osd, start, length) tuples for file " << fn << dendl;
-  for (list<ObjectExtent>::iterator i = extents.begin(); 
+  for (vector<ObjectExtent>::iterator i = extents.begin(); 
        i != extents.end(); ++i) {
     
     int osd = client->osdmap->get_pg_primary(pg_t(i->layout.ol_pgid.v));
 
     // run through all the buffer extents
-    for (map<size_t, size_t>::iterator j = i ->buffer_extents.begin();
+    for (map<__u32,__u32>::iterator j = i ->buffer_extents.begin();
         j != i->buffer_extents.end(); ++j) {
       
       dout(0) << "OSD " << osd << ", offset " << (*j).first <<
@@ -1897,7 +1897,7 @@ int SyntheticClient::overload_osd_0(int n, int size, int wrsize) {
 
 // See what the primary is for the first object in this file.
 int SyntheticClient::check_first_primary(int fh) {
-  list<ObjectExtent> extents;
+  vector<ObjectExtent> extents;
   client->enumerate_layout(fh, extents, 1, 0);  
   return client->osdmap->get_pg_primary(pg_t((extents.begin())->layout.ol_pgid));
 }
index 18d8c2c30969b30f41660154245ee53f59b61667..04896ac55e557be6cd1cfea00ac447c08cd0aa41 100644 (file)
@@ -396,22 +396,22 @@ inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) {
 class ObjectExtent {
  public:
   object_t    oid;       // object id
-  off_t       start;     // in object
-  size_t      length;    // in object
+  __u32      offset;    // in object
+  __u32      length;    // in object
 
   ceph_object_layout layout;   // object layout (pgid, etc.)
 
-  map<size_t, size_t>  buffer_extents;  // off -> len.  extents in buffer being mapped (may be fragmented bc of striping!)
+  map<__u32, __u32>  buffer_extents;  // off -> len.  extents in buffer being mapped (may be fragmented bc of striping!)
   
-  ObjectExtent() : start(0), length(0) {}
-  ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l) { }
+  ObjectExtent() : offset(0), length(0) {}
+  ObjectExtent(object_t o, __u32 off=0, __u32 l=0) : oid(o), offset(off), length(l) { }
 };
 
 inline ostream& operator<<(ostream& out, ObjectExtent &ex)
 {
   return out << "extent(" 
              << ex.oid << " in " << ex.layout
-             << " " << ex.start << "~" << ex.length
+             << " " << ex.offset << "~" << ex.length
              << ")";
 }
 
index e3f4ff8ebac1a6d873bcfa809d8b24c94be56a58..85c886987e4b402d1a0ed64ba90435486e28bbe3 100644 (file)
@@ -90,7 +90,7 @@ void Filer::_probe(Probe *probe)
   // map range onto objects
   file_to_extents(probe->ino, &probe->layout, probe->snapid, probe->from, probe->probing_len, probe->probing);
   
-  for (list<ObjectExtent>::iterator p = probe->probing.begin();
+  for (vector<ObjectExtent>::iterator p = probe->probing.begin();
        p != probe->probing.end();
        p++) {
     dout(10) << "_probe  probing " << p->oid << dendl;
@@ -114,13 +114,20 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size)
   bool found = false;
   __u64 end = 0;
 
-  if (!probe->fwd)
-    probe->probing.reverse();
+  if (!probe->fwd) {
+    // reverse
+    vector<ObjectExtent> r;
+    for (vector<ObjectExtent>::reverse_iterator p = probe->probing.rbegin();
+        p != probe->probing.rend();
+        p++)
+      r.push_back(*p);
+    probe->probing.swap(r);
+  }
 
-  for (list<ObjectExtent>::iterator p = probe->probing.begin();
+  for (vector<ObjectExtent>::iterator p = probe->probing.begin();
        p != probe->probing.end();
        p++) {
-    __u64 shouldbe = p->length+p->start;
+    __u64 shouldbe = p->length + p->offset;
     dout(10) << "_probed  " << probe->ino << " object " << hex << p->oid << dec
             << " should be " << shouldbe
             << ", actual is " << probe->known[p->oid]
@@ -134,8 +141,8 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size)
    
     // aha, we found the end!
     // calc offset into buffer_extent to get distance from probe->from.
-    __u64 oleft = probe->known[p->oid] - p->start;
-    for (map<size_t,size_t>::iterator i = p->buffer_extents.begin();
+    __u64 oleft = probe->known[p->oid] - p->offset;
+    for (map<__u32,__u32>::iterator i = p->buffer_extents.begin();
         i != p->buffer_extents.end();
         i++) {
       if (oleft <= (__u64)i->second) {
@@ -187,11 +194,12 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size)
 
 void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
                             __u64 offset, size_t len,
-                            list<ObjectExtent>& extents)
+                            vector<ObjectExtent>& extents)
 {
   dout(10) << "file_to_extents " << offset << "~" << len 
            << " on " << hex << ino << dec
            << dendl;
+  assert(len > 0);
 
   /* we want only one extent per object!
    * this means that each extent we read may map into different bits of the 
@@ -239,14 +247,14 @@ void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t sn
     else
       x_len = left;
     
-    if (ex->start + (__u64)ex->length == x_offset) {
+    if (ex->offset + (__u64)ex->length == x_offset) {
       // add to extent
       ex->length += x_len;
     } else {
       // new extent
       assert(ex->length == 0);
-      assert(ex->start == 0);
-      ex->start = x_offset;
+      assert(ex->offset == 0);
+      ex->offset = x_offset;
       ex->length = x_len;
     }
     ex->buffer_extents[cur-offset] = x_len;
index df104e1a8f3a88f75a5e8246417604bdec589410..b2bbc06e6e4971a4cfec05bf49c38ca19116b69b 100644 (file)
@@ -54,7 +54,7 @@ class Filer {
 
     Context *onfinish;
     
-    list<ObjectExtent> probing;
+    vector<ObjectExtent> probing;
     __u64 probing_len;
     
     map<object_t, __u64> known;
@@ -79,18 +79,23 @@ class Filer {
     return objecter->is_active(); // || (oc && oc->is_active());
   }
 
-  /*** async file interface ***/
-  Objecter::OSDRead *prepare_read(inodeno_t ino,
-                                 ceph_file_layout *layout,
-                                 snapid_t snapid,
-                                 __u64 offset, 
-                                 size_t len, 
-                                 bufferlist *bl, 
-                                 int flags) {
-    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
-    file_to_extents(ino, layout, snapid, offset, len, rd->extents);
-    return rd;
-  }
+
+  /***** mapping *****/
+
+  /*
+   * map (ino, layout, offset, len) to a (list of) OSDExtents (byte
+   * ranges in objects on (primary) osds)
+   */
+  void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
+                      __u64 offset,
+                      size_t len,
+                      vector<ObjectExtent>& extents);
+
+
+  
+
+  /*** async file interface.  scatter/gather as needed. ***/
+
   int read(inodeno_t ino,
           ceph_file_layout *layout,
           snapid_t snapid,
@@ -100,10 +105,13 @@ class Filer {
           int flags,
            Context *onfinish) {
     assert(snapid);  // (until there is a non-NOSNAP write)
-    Objecter::OSDRead *rd = prepare_read(ino, layout, snapid, offset, len, bl, flags);
-    return objecter->readx(rd, onfinish) > 0 ? 0:-1;
+    vector<ObjectExtent> extents;
+    file_to_extents(ino, layout, snapid, offset, len, extents);
+    objecter->sg_read(extents, bl, flags, onfinish);
+    return 0;
   }
 
+
   int write(inodeno_t ino,
            ceph_file_layout *layout,
            const SnapContext& snapc,
@@ -113,9 +121,10 @@ class Filer {
             int flags, 
             Context *onack,
             Context *oncommit) {
-    Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
-    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
-    return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
+    vector<ObjectExtent> extents;
+    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+    objecter->sg_write(extents, snapc, bl, flags, onack, oncommit);
+    return 0;
   }
 
   int zero(inodeno_t ino,
@@ -126,9 +135,25 @@ class Filer {
           int flags,
            Context *onack,
            Context *oncommit) {
-    Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags);
-    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents);
-    return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
+    vector<ObjectExtent> extents;
+    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+    if (extents.size() == 1) {
+      objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+                    snapc, flags, onack, oncommit);
+    } else {
+      C_Gather *gack = 0, *gcom = 0;
+      if (onack)
+       gack = new C_Gather(onack);
+      if (oncommit)
+       gcom = new C_Gather(oncommit);
+      for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+       objecter->zero(p->oid, p->offset, p->length, p->layout,
+                      snapc, flags,
+                      gack ? gack->new_sub():0,
+                      gcom ? gcom->new_sub():0);
+      }
+    }
+    return 0;
   }
 
   int remove(inodeno_t ino,
@@ -139,9 +164,24 @@ class Filer {
             int flags,
             Context *onack,
             Context *oncommit) {
-    Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
-    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents);
-    return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
+    vector<ObjectExtent> extents;
+    file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+    if (extents.size() == 1) {
+      objecter->remove(extents[0].oid, extents[0].layout,
+                      snapc, flags, onack, oncommit);
+    } else {
+      C_Gather *gack = 0, *gcom = 0;
+      if (onack)
+       gack = new C_Gather(onack);
+      if (oncommit)
+       gcom = new C_Gather(oncommit);
+      for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++)
+       objecter->remove(p->oid, p->layout,
+                        snapc, flags,
+                        gack ? gack->new_sub():0,
+                        gcom ? gcom->new_sub():0);
+    }
+    return 0;
   }
 
   /*
@@ -158,18 +198,6 @@ class Filer {
            int flags,
            Context *onfinish);
 
-
-  /***** mapping *****/
-
-  /*
-   * map (ino, layout, offset, len) to a (list of) OSDExtents (byte
-   * ranges in objects on (primary) osds)
-   */
-  void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
-                      __u64 offset,
-                      size_t len,
-                      list<ObjectExtent>& extents);
-  
 };
 
 
index 3cfb5f6c51c454c49425aca6c066641680f2255c..164182da61f2b103d5076787e32118e71f766839 100644 (file)
@@ -131,24 +131,24 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
  * map a range of bytes into buffer_heads.
  * - create missing buffer_heads as necessary.
  */
-int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
+int ObjectCacher::Object::map_read(OSDRead *rd,
                                    map<loff_t, BufferHead*>& hits,
                                    map<loff_t, BufferHead*>& missing,
                                    map<loff_t, BufferHead*>& rx)
 {
-  for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
        ex_it != rd->extents.end();
        ex_it++) {
     
     if (ex_it->oid != oid) continue;
     
     dout(10) << "map_read " << ex_it->oid 
-             << " " << ex_it->start << "~" << ex_it->length << dendl;
+             << " " << ex_it->offset << "~" << ex_it->length << dendl;
     
-    map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+    map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->offset);
     // p->first >= start
     
-    loff_t cur = ex_it->start;
+    loff_t cur = ex_it->offset;
     loff_t left = ex_it->length;
     
     if (p != data.begin() && 
@@ -171,7 +171,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
         cur += left;
         left -= left;
         assert(left == 0);
-        assert(cur == ex_it->start + (loff_t)ex_it->length);
+        assert(cur == ex_it->offset + (loff_t)ex_it->length);
         break;  // no more.
       }
       
@@ -223,23 +223,23 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
  * - break up bufferheads that don't fall completely within the range
  * //no! - return a bh that includes the write.  may also include other dirty data to left and/or right.
  */
-ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
 {
   BufferHead *final = 0;
 
-  for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
        ex_it != wr->extents.end();
        ex_it++) {
     
     if (ex_it->oid != oid) continue;
     
     dout(10) << "map_write oex " << ex_it->oid
-             << " " << ex_it->start << "~" << ex_it->length << dendl;
+             << " " << ex_it->offset << "~" << ex_it->length << dendl;
     
-    map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+    map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->offset);
     // p->first >= start
     
-    loff_t cur = ex_it->start;
+    loff_t cur = ex_it->offset;
     loff_t left = ex_it->length;
     
     if (p != data.begin() && 
@@ -404,7 +404,7 @@ void ObjectCacher::bh_read(BufferHead *bh)
   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), 
+  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
                 &onfinish->bl, 0,
                 onfinish);
 }
@@ -746,13 +746,13 @@ void ObjectCacher::trim(loff_t max)
  * returns # bytes read (if in cache).  onfinish is untouched (caller must delete it)
  * returns 0 if doing async read
  */
-int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
 {
   bool success = true;
   list<BufferHead*> hit_ls;
   map<size_t, bufferlist> stripe_map;  // final buffer offset -> substring
 
-  for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
        ex_it != rd->extents.end();
        ex_it++) {
     dout(10) << "readx " << *ex_it << dendl;
@@ -805,11 +805,11 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
       // this is over a single ObjectExtent, so we know that
       //  - the bh's are contiguous
       //  - the buffer frags need not be (and almost certainly aren't)
-      loff_t opos = ex_it->start;
+      loff_t opos = ex_it->offset;
       map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
       assert(bh_it->second->start() <= opos);
       size_t bhoff = opos - bh_it->second->start();
-      map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+      map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin();
       size_t foff = 0;
       while (1) {
         BufferHead *bh = bh_it->second;
@@ -844,7 +844,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
         if (f_it == ex_it->buffer_extents.end()) break;
       }
       assert(f_it == ex_it->buffer_extents.end());
-      assert(opos == ex_it->start + (loff_t)ex_it->length);
+      assert(opos == ex_it->offset + (loff_t)ex_it->length);
     }
   }
   
@@ -887,11 +887,11 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
 }
 
 
-int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
+int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino)
 {
   utime_t now = g_clock.now();
   
-  for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
        ex_it != wr->extents.end();
        ex_it++) {
     // get object cache
@@ -906,8 +906,8 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
     //  - there is one contiguous bh
     //  - the buffer frags need not be (and almost certainly aren't)
     // note: i assume striping is monotonic... no jumps backwards, ever!
-    loff_t opos = ex_it->start;
-    for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+    loff_t opos = ex_it->offset;
+    for (map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin();
          f_it != ex_it->buffer_extents.end();
          f_it++) {
       dout(10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << dendl;
@@ -1019,7 +1019,7 @@ void ObjectCacher::flusher_entry()
 
   
 // blocking.  atomic+sync.
-int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
 {
   dout(10) << "atomic_sync_readx " << rd
            << " in " << ino
@@ -1030,7 +1030,10 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
     // just write synchronously.
     Cond cond;
     bool done = false;
-    objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
+    //objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
+    objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length,
+                  rd->extents[0].layout, rd->bl, 0,
+                  new C_SafeCond(&lock, &cond, &done));
 
     // block
     while (!done) cond.Wait(lock);
@@ -1039,7 +1042,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
 
     // sort by object...
     map<object_t,ObjectExtent> by_oid;
-    for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+    for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
          ex_it != rd->extents.end();
          ex_it++) 
       by_oid[ex_it->oid] = *ex_it;
@@ -1053,7 +1056,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
     }
 
     // readx will hose rd
-    list<ObjectExtent> extents = rd->extents;
+    vector<ObjectExtent> extents = rd->extents;
 
     // do the read, into our cache
     Cond cond;
@@ -1064,7 +1067,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
     while (!done) cond.Wait(lock);
     
     // release the locks
-    for (list<ObjectExtent>::iterator ex_it = extents.begin();
+    for (vector<ObjectExtent>::iterator ex_it = extents.begin();
          ex_it != extents.end();
          ex_it++) {
       assert(objects.count(ex_it->oid));
@@ -1076,7 +1079,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
   return 0;
 }
 
-int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
 {
   dout(10) << "atomic_sync_writex " << wr
            << " in " << ino
@@ -1102,7 +1105,8 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
 
       Cond cond;
       bool done = false;
-      objecter->modifyx(wr, new C_SafeCond(&lock, &cond, &done), 0);
+      objecter->sg_write(wr->extents, wr->snapc, wr->bl, 0, 
+                        new C_SafeCond(&lock, &cond, &done), 0);
       
       // block
       while (!done) cond.Wait(lock);
@@ -1113,7 +1117,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
   // spans multiple objects, or is big.
   // sort by object...
   map<object_t,ObjectExtent> by_oid;
-  for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
        ex_it != wr->extents.end();
        ex_it++) 
     by_oid[ex_it->oid] = *ex_it;
@@ -1127,14 +1131,14 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
   }
   
   // writex will hose wr
-  list<ObjectExtent> extents = wr->extents;
+  vector<ObjectExtent> extents = wr->extents;
 
   // do the write, into our cache
   writex(wr, ino);
   
   // flush 
   // ...and release the locks?
-  for (list<ObjectExtent>::iterator ex_it = extents.begin();
+  for (vector<ObjectExtent>::iterator ex_it = extents.begin();
        ex_it != extents.end();
        ex_it++) {
     assert(objects.count(ex_it->oid));
@@ -1548,7 +1552,7 @@ loff_t ObjectCacher::release_set(inodeno_t ino)
   return unclean;
 }
 
-void ObjectCacher::truncate_set(inodeno_t ino, list<ObjectExtent>& exls)
+void ObjectCacher::truncate_set(inodeno_t ino, vector<ObjectExtent>& exls)
 {
   if (objects_by_ino.count(ino) == 0) {
     dout(10) << "truncate_set on " << ino << " dne" << dendl;
@@ -1557,27 +1561,27 @@ void ObjectCacher::truncate_set(inodeno_t ino, list<ObjectExtent>& exls)
   
   dout(10) << "truncate_set " << ino << dendl;
 
-  for (list<ObjectExtent>::iterator p = exls.begin();
-          p != exls.end();
-          ++p) {
-       ObjectExtent &ex = *p;
-       if (objects.count(ex.oid) == 0) continue;
-       Object *ob = objects[ex.oid];
-
-       // purge or truncate?
-       if (ex.start == 0) {
-         dout(10) << "truncate_set purging " << *ob << dendl;
-         purge(ob);
-       } else {
-         // hrm, truncate object
-         dout(10) << "truncate_set truncating " << *ob << " at " << ex.start << dendl;
-         ob->truncate(ex.start);
-
-         if (ob->can_close()) {
-               dout(10) << "truncate_set trimming " << *ob << dendl;
-               close_object(ob);
-         }
-       }
+  for (vector<ObjectExtent>::iterator p = exls.begin();
+       p != exls.end();
+       ++p) {
+    ObjectExtent &ex = *p;
+    if (objects.count(ex.oid) == 0) continue;
+    Object *ob = objects[ex.oid];
+    
+    // purge or truncate?
+    if (ex.offset == 0) {
+      dout(10) << "truncate_set purging " << *ob << dendl;
+      purge(ob);
+    } else {
+      // hrm, truncate object
+      dout(10) << "truncate_set truncating " << *ob << " at " << ex.offset << dendl;
+      ob->truncate(ex.offset);
+      
+      if (ob->can_close()) {
+       dout(10) << "truncate_set trimming " << *ob << dendl;
+       close_object(ob);
+      }
+    }
   }
 }
 
index b4d629adc7a57bc2bd0d55fa64a6237ef2158bac..34a0f1c59e4858510f2c46f7e9af46fd5c8b8ae3 100644 (file)
@@ -23,6 +23,34 @@ class ObjectCacher {
 
   class Object;
 
+  // read scatter/gather  
+  struct OSDRead {
+    vector<ObjectExtent> extents;
+    map<object_t, bufferlist*> read_data;  // bits of data as they come back
+    bufferlist *bl;
+    int flags;
+    OSDRead(bufferlist *b, int f) : bl(b), flags(f) {}
+  };
+
+  OSDRead *prepare_read(bufferlist *b, int f) {
+    return new OSDRead(b, f);
+  }
+  
+  // write scatter/gather  
+  struct OSDWrite {
+    vector<ObjectExtent> extents;
+    SnapContext snapc;
+    bufferlist bl;
+    int flags;
+    OSDWrite(const SnapContext& sc, bufferlist& b, int f) : snapc(sc), bl(b), flags(f) {}
+  };
+
+  OSDWrite *prepare_write(const SnapContext& sc, bufferlist &b, int f) { 
+    return new OSDWrite(sc, b, f); 
+  }
+
+
+
   // ******* BufferHead *********
   class BufferHead : public LRUObject {
   public:
@@ -191,11 +219,11 @@ class ObjectCacher {
     void merge_left(BufferHead *left, BufferHead *right);
     void try_merge_bh(BufferHead *bh);
 
-    int map_read(Objecter::OSDRead *rd,
+    int map_read(OSDRead *rd,
                  map<loff_t, BufferHead*>& hits,
                  map<loff_t, BufferHead*>& missing,
                  map<loff_t, BufferHead*>& rx);
-    BufferHead *map_write(Objecter::OSDWrite *wr);
+    BufferHead *map_write(OSDWrite *wr);
     
     void truncate(loff_t s);
 
@@ -462,11 +490,11 @@ class ObjectCacher {
 
   class C_RetryRead : public Context {
     ObjectCacher *oc;
-    Objecter::OSDRead *rd;
+    OSDRead *rd;
     inodeno_t ino;
     Context *onfinish;
   public:
-    C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+    C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
     void finish(int) {
       int r = oc->readx(rd, ino, onfinish);
       if (r > 0 && onfinish) {
@@ -476,16 +504,18 @@ class ObjectCacher {
     }
   };
 
+
+
   // non-blocking.  async.
-  int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
-  int writex(Objecter::OSDWrite *wr, inodeno_t ino);
+  int readx(OSDRead *rd, inodeno_t ino, Context *onfinish);
+  int writex(OSDWrite *wr, inodeno_t ino);
 
   // write blocking
   bool wait_for_write(size_t len, Mutex& lock);
   
   // blocking.  atomic+sync.
-  int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);
-  int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock);
+  int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock);
+  int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock);
 
   bool set_is_cached(inodeno_t ino);
   bool set_is_dirty_or_committing(inodeno_t ino);
@@ -500,7 +530,7 @@ class ObjectCacher {
 
   loff_t release_set(inodeno_t ino);  // returns # of bytes not released (ie non-clean)
 
-  void truncate_set(inodeno_t ino, list<ObjectExtent>& ex);
+  void truncate_set(inodeno_t ino, vector<ObjectExtent>& ex);
 
   void kick_sync_writers(inodeno_t ino);
   void kick_sync_readers(inodeno_t ino);
@@ -514,7 +544,7 @@ class ObjectCacher {
                 bufferlist *bl,
                int flags,
                 Context *onfinish) {
-    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+    OSDRead *rd = prepare_read(bl, flags);
     filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
     return readx(rd, ino, onfinish);
   }
@@ -522,7 +552,7 @@ class ObjectCacher {
   int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc,
                  loff_t offset, size_t len, 
                  bufferlist& bl, int flags) {
-    Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
+    OSDWrite *wr = prepare_write(snapc, bl, flags);
     filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
     return writex(wr, ino);
   }
@@ -530,13 +560,13 @@ class ObjectCacher {
 
 
   /*** sync+blocking file interface ***/
-  
+
   int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout, 
                            snapid_t snapid,
                             loff_t offset, size_t len, 
                             bufferlist *bl, int flags,
                             Mutex &lock) {
-    Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+    OSDRead *rd = prepare_read(bl, flags);
     filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
     return atomic_sync_readx(rd, ino, lock);
   }
@@ -546,7 +576,7 @@ class ObjectCacher {
                              loff_t offset, size_t len, 
                              bufferlist& bl, int flags,
                              Mutex &lock) {
-    Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
+    OSDWrite *wr = prepare_write(snapc, bl, flags);
     filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
     return atomic_sync_writex(wr, ino, lock);
   }
index 59359121f572761d90ccf066228b2bbb7db4b2e5..296a067eb5cd0c7bb6527d5492111e4403126f3a 100644 (file)
@@ -480,7 +480,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
 // read -----------------------------------
 
 
-tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags, 
+tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, 
+                    bufferlist *bl, int flags, 
                      Context *onfinish)
 {
   OSDRead *rd = prepare_read(bl, flags);
@@ -522,7 +523,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
 
   // send?
   dout(10) << "readx_submit " << rd << " tid " << last_tid
-           << " oid " << ex.oid << " " << ex.start << "~" << ex.length
+           << " oid " << ex.oid << " " << ex.offset << "~" << ex.length
            << " (" << ex.buffer_extents.size() << " buffer fragments)" 
            << " " << ex.layout
            << " osd" << pg.acker() 
@@ -534,7 +535,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
     MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
                           flags);
-    m->read(ex.start, ex.length);
+    m->read(ex.offset, ex.length);
     if (inc_lock > 0) {
       rd->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
@@ -641,10 +642,13 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
         assert(ox_len <= eit->length);           
 
         // for each buffer extent we're mapping into...
-        for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
+        for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
              bit != eit->buffer_extents.end();
              bit++) {
-          dout(21) << " object " << eit->oid << " extent " << eit->start << "~" << eit->length << " : ox offset " << ox_off << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
+          dout(21) << " object " << eit->oid
+                  << " extent " << eit->offset << "~" << eit->length
+                  << " : ox offset " << ox_off
+                  << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
           by_off[bit->first] = new bufferlist;
 
           if (ox_off + bit->second <= ox_len) {
@@ -779,6 +783,20 @@ tid_t Objecter::zero(object_t oid, __u64 off, size_t len,
   return last_tid;
 }
 
+// remove
+
+tid_t Objecter::remove(object_t oid,
+                      ceph_object_layout ol, const SnapContext& snapc,
+                      int flags, 
+                      Context *onack, Context *oncommit)
+{
+  OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
+  z->extents.push_back(ObjectExtent(oid, 0, 0));
+  z->extents.front().layout = ol;
+  modifyx(z, onack, oncommit);
+  return last_tid;
+}
+
 
 // lock ops
 
@@ -849,7 +867,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   // send?
   dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid
            << "  oid " << ex.oid
-           << " " << ex.start << "~" << ex.length 
+           << " " << ex.offset << "~" << ex.length 
            << " " << ex.layout 
            << " osd" << pg.primary()
            << dendl;
@@ -857,7 +875,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     MOSDOp *m = new MOSDOp(client_inc, tid, true,
                           ex.oid, ex.layout, osdmap->get_epoch(),
                           flags);
-    m->add_simple_op(wr->op, ex.start, ex.length);
+    m->add_simple_op(wr->op, ex.offset, ex.length);
     m->set_snap_seq(wr->snapc.seq);
     m->get_snaps() = wr->snapc.snaps;
     if (inc_lock > 0) {
@@ -878,7 +896,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
        // map buffer segments into this extent
        // (may be fragmented bc of striping)
        bufferlist cur;
-       for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
+       for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin();
             bit != ex.buffer_extents.end();
             bit++) 
          ((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur);
@@ -1053,6 +1071,121 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
 
 
 
+
+// scatter/gather
+
+void Objecter::_sg_read_finish(vector<ObjectExtent>& extents, vector<bufferlist>& resultbl, 
+                              bufferlist *bl, Context *onfinish)
+{
+  // all done
+  size_t bytes_read = 0;
+  
+  dout(15) << "_sg_read_finish" << dendl;
+
+  if (extents.size() > 1) {
+    /** FIXME This doesn't handle holes efficiently.
+     * It allocates zero buffers to fill whole buffer, and
+     * then discards trailing ones at the end.
+     *
+     * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over
+     * the heap. 
+     */
+    
+    // map extents back into buffer
+    map<__u64, bufferlist*> by_off;  // buffer offset -> bufferlist
+    
+    // for each object extent...
+    vector<bufferlist>::iterator bit = resultbl.begin();
+    for (vector<ObjectExtent>::iterator eit = extents.begin();
+        eit != extents.end();
+        eit++, bit++) {
+      bufferlist& ox_buf = *bit;
+      unsigned ox_len = ox_buf.length();
+      unsigned ox_off = 0;
+      assert(ox_len <= eit->length);           
+      
+      // for each buffer extent we're mapping into...
+      for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
+          bit != eit->buffer_extents.end();
+          bit++) {
+       dout(21) << " object " << eit->oid
+                << " extent " << eit->offset << "~" << eit->length
+                << " : ox offset " << ox_off
+                << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
+       by_off[bit->first] = new bufferlist;
+       
+       if (ox_off + bit->second <= ox_len) {
+         // we got the whole bx
+         by_off[bit->first]->substr_of(ox_buf, ox_off, bit->second);
+         if (bytes_read < bit->first + bit->second) 
+           bytes_read = bit->first + bit->second;
+       } else if (ox_off + bit->second > ox_len && ox_off < ox_len) {
+         // we got part of this bx
+         by_off[bit->first]->substr_of(ox_buf, ox_off, (ox_len-ox_off));
+         if (bytes_read < bit->first + ox_len-ox_off) 
+           bytes_read = bit->first + ox_len-ox_off;
+         
+         // zero end of bx
+         dout(21) << "  adding some zeros to the end " << ox_off + bit->second-ox_len << dendl;
+         bufferptr z(ox_off + bit->second - ox_len);
+         z.zero();
+         by_off[bit->first]->append( z );
+       } else {
+         // we got none of this bx.  zero whole thing.
+         assert(ox_off >= ox_len);
+         dout(21) << "  adding all zeros for this bit " << bit->second << dendl;
+         bufferptr z(bit->second);
+         z.zero();
+         by_off[bit->first]->append( z );
+       }
+       ox_off += bit->second;
+      }
+      assert(ox_off == eit->length);
+    }
+    
+    // sort and string bits together
+    for (map<__u64, bufferlist*>::iterator it = by_off.begin();
+        it != by_off.end();
+        it++) {
+      assert(it->second->length());
+      if (it->first < (__u64)bytes_read) {
+       dout(21) << "  concat buffer frag off " << it->first << " len " << it->second->length() << dendl;
+       bl->claim_append(*(it->second));
+      } else {
+       dout(21) << "  NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl;          
+      }
+      delete it->second;
+    }
+    
+    // trim trailing zeros?
+    if (bl->length() > bytes_read) {
+      dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read 
+              << " len=" << bl->length() << dendl;
+      bl->splice(bytes_read, bl->length() - bytes_read);
+      assert(bytes_read == bl->length());
+    }
+    
+  } else {
+    dout(15) << "  only one frag" << dendl;
+  
+    // only one fragment, easy
+    bl->claim(resultbl[0]);
+    bytes_read = bl->length();
+  }
+  
+  // finish, clean up
+  dout(7) << " " << bytes_read << " bytes " 
+         << bl->length()
+         << dendl;
+    
+  // done
+  if (onfinish) {
+    onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
+    delete onfinish;
+  }
+}
+
+
 void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest)
 {
   entity_inst_t inst;
index ab8a49e9ca4181979dc9e0148d8bb8818b1a801f..11ad8c18a5e7ea375c6097bebf17bdc39ad6af1c 100644 (file)
@@ -247,11 +247,80 @@ class Objecter {
               Context *onack, Context *oncommit);
   tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags,
              Context *onack, Context *oncommit);
+  tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags,
+              Context *onack, Context *oncommit);
 
   // no snapc for lock ops
   tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit);
 
 
+
+  // ---------------------------
+  // some scatter/gather hackery
+
+  void _sg_read_finish(vector<ObjectExtent>& extents, vector<bufferlist>& resultbl, 
+                      bufferlist *bl, Context *onfinish);
+
+  struct C_SGRead : public Context {
+    Objecter *objecter;
+    vector<ObjectExtent> extents;
+    vector<bufferlist> resultbl;
+    bufferlist *bl;
+    Context *onfinish;
+    C_SGRead(Objecter *ob, 
+            vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b, Context *c) :
+      objecter(ob), bl(b), onfinish(c) {
+      extents.swap(e);
+      resultbl.swap(r);
+    }
+    void finish(int r) {
+      objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
+    }      
+  };
+
+  void sg_read(vector<ObjectExtent>& extents, bufferlist *bl, int flags, Context *onfinish) {
+    if (extents.size() == 1) {
+      read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+          bl, flags, onfinish);
+    } else {
+      C_Gather *g = new C_Gather;
+      vector<bufferlist> resultbl(extents.size());
+      int i=0;
+      for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+       read(p->oid, p->offset, p->length, p->layout,
+            &resultbl[i++], flags, onfinish);
+      }
+      g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
+    }
+  }
+
+
+  void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, bufferlist bl,
+               int flags, Context *onack, Context *oncommit) {
+    if (extents.size() == 1) {
+      write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+           snapc, bl, flags, onack, oncommit);
+    } else {
+      C_Gather *gack = 0, *gcom = 0;
+      if (onack)
+       gack = new C_Gather(onack);
+      if (oncommit)
+       gcom = new C_Gather(oncommit);
+      for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+       bufferlist cur;
+       for (map<__u32,__u32>::iterator bit = p->buffer_extents.begin();
+            bit != p->buffer_extents.end();
+            bit++)
+         bl.copy(bit->first, bit->second, cur);
+       assert(cur.length() == p->length);
+       write(p->oid, p->offset, p->length, p->layout,
+             snapc, cur, flags,
+             gack ? gack->new_sub():0,
+             gcom ? gcom->new_sub():0);
+      }
+    }
+  }
+
   void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest);
 
 };