From: Sage Weil Date: Sun, 9 Nov 2008 23:05:37 +0000 (-0800) Subject: osdc: avoid using objecter readx/writex X-Git-Tag: v0.5~43 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7bfde0a78e2a81ed9dac80b04ed17bfd426fed45;p=ceph.git osdc: avoid using objecter readx/writex --- diff --git a/src/client/Client.cc b/src/client/Client.cc index a07dae29bf24..5f126406783f 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -2078,7 +2078,7 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m) if (g_conf.client_oc && m->get_size() < in->inode.size) { // map range to objects - list ls; + vector ls; filer->file_to_extents(in->inode.ino, &in->inode.layout, CEPH_NOSNAP, m->get_size(), in->inode.size - m->get_size(), ls); @@ -3699,12 +3699,11 @@ int Client::_read(Fh *f, __s64 offset, __u64 size, bufferlist *bl) // object cache OFF -- non-atomic sync read from osd // do sync read - Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, in->snapid, - offset, size, bl, 0); + int flags = 0; if (in->hack_balance_reads || g_conf.client_hack_balance_reads) - rd->flags |= CEPH_OSD_OP_BALANCE_READS; - r = objecter->readx(rd, onfinish); - assert(r >= 0); + flags |= CEPH_OSD_OP_BALANCE_READS; + filer->read(in->inode.ino, &in->inode.layout, in->snapid, + offset, size, bl, flags, onfinish); while (!done) cond.Wait(client_lock); @@ -5010,7 +5009,7 @@ int Client::get_stripe_period(int fd) return ceph_file_layout_period(layout); } -int Client::enumerate_layout(int fd, list& result, +int Client::enumerate_layout(int fd, vector& result, loff_t length, loff_t offset) { Mutex::Locker lock(client_lock); diff --git a/src/client/Client.h b/src/client/Client.h index 80592ec4c7d2..a37ff7f71056 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -1011,7 +1011,7 @@ public: int get_stripe_unit(int fd); int get_stripe_width(int fd); int get_stripe_period(int fd); - int enumerate_layout(int fd, list& result, + int enumerate_layout(int fd, vector& result, loff_t length, loff_t offset); int mksnap(const char *path, const char *name); diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 639178f8fd74..3a094f285f70 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -1604,7 +1604,7 @@ int SyntheticClient::dump_placement(string& fn) { off_t filesize = stbuf.st_size; // grab the placement info - list extents; + vector extents; off_t offset = 0; client->enumerate_layout(fd, extents, filesize, offset); client->close(fd); @@ -1613,13 +1613,13 @@ int SyntheticClient::dump_placement(string& fn) { // run through all the object extents dout(0) << "file size is " << filesize << dendl; dout(0) << "(osd, start, length) tuples for file " << fn << dendl; - for (list::iterator i = extents.begin(); + for (vector::iterator i = extents.begin(); i != extents.end(); ++i) { int osd = client->osdmap->get_pg_primary(pg_t(i->layout.ol_pgid.v)); // run through all the buffer extents - for (map::iterator j = i ->buffer_extents.begin(); + for (map<__u32,__u32>::iterator j = i ->buffer_extents.begin(); j != i->buffer_extents.end(); ++j) { dout(0) << "OSD " << osd << ", offset " << (*j).first << @@ -1897,7 +1897,7 @@ int SyntheticClient::overload_osd_0(int n, int size, int wrsize) { // See what the primary is for the first object in this file. int SyntheticClient::check_first_primary(int fh) { - list extents; + vector extents; client->enumerate_layout(fh, extents, 1, 0); return client->osdmap->get_pg_primary(pg_t((extents.begin())->layout.ol_pgid)); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 18d8c2c30969..04896ac55e55 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -396,22 +396,22 @@ inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) { class ObjectExtent { public: object_t oid; // object id - off_t start; // in object - size_t length; // in object + __u32 offset; // in object + __u32 length; // in object ceph_object_layout layout; // object layout (pgid, etc.) - map buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) + map<__u32, __u32> buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) - ObjectExtent() : start(0), length(0) {} - ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l) { } + ObjectExtent() : offset(0), length(0) {} + ObjectExtent(object_t o, __u32 off=0, __u32 l=0) : oid(o), offset(off), length(l) { } }; inline ostream& operator<<(ostream& out, ObjectExtent &ex) { return out << "extent(" << ex.oid << " in " << ex.layout - << " " << ex.start << "~" << ex.length + << " " << ex.offset << "~" << ex.length << ")"; } diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index e3f4ff8ebac1..85c886987e4b 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -90,7 +90,7 @@ void Filer::_probe(Probe *probe) // map range onto objects file_to_extents(probe->ino, &probe->layout, probe->snapid, probe->from, probe->probing_len, probe->probing); - for (list::iterator p = probe->probing.begin(); + for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); p++) { dout(10) << "_probe probing " << p->oid << dendl; @@ -114,13 +114,20 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size) bool found = false; __u64 end = 0; - if (!probe->fwd) - probe->probing.reverse(); + if (!probe->fwd) { + // reverse + vector r; + for (vector::reverse_iterator p = probe->probing.rbegin(); + p != probe->probing.rend(); + p++) + r.push_back(*p); + probe->probing.swap(r); + } - for (list::iterator p = probe->probing.begin(); + for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); p++) { - __u64 shouldbe = p->length+p->start; + __u64 shouldbe = p->length + p->offset; dout(10) << "_probed " << probe->ino << " object " << hex << p->oid << dec << " should be " << shouldbe << ", actual is " << probe->known[p->oid] @@ -134,8 +141,8 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size) // aha, we found the end! // calc offset into buffer_extent to get distance from probe->from. - __u64 oleft = probe->known[p->oid] - p->start; - for (map::iterator i = p->buffer_extents.begin(); + __u64 oleft = probe->known[p->oid] - p->offset; + for (map<__u32,__u32>::iterator i = p->buffer_extents.begin(); i != p->buffer_extents.end(); i++) { if (oleft <= (__u64)i->second) { @@ -187,11 +194,12 @@ void Filer::_probed(Probe *probe, object_t oid, __u64 size) void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap, __u64 offset, size_t len, - list& extents) + vector& extents) { dout(10) << "file_to_extents " << offset << "~" << len << " on " << hex << ino << dec << dendl; + assert(len > 0); /* we want only one extent per object! * this means that each extent we read may map into different bits of the @@ -239,14 +247,14 @@ void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t sn else x_len = left; - if (ex->start + (__u64)ex->length == x_offset) { + if (ex->offset + (__u64)ex->length == x_offset) { // add to extent ex->length += x_len; } else { // new extent assert(ex->length == 0); - assert(ex->start == 0); - ex->start = x_offset; + assert(ex->offset == 0); + ex->offset = x_offset; ex->length = x_len; } ex->buffer_extents[cur-offset] = x_len; diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index df104e1a8f3a..b2bbc06e6e49 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -54,7 +54,7 @@ class Filer { Context *onfinish; - list probing; + vector probing; __u64 probing_len; map known; @@ -79,18 +79,23 @@ class Filer { return objecter->is_active(); // || (oc && oc->is_active()); } - /*** async file interface ***/ - Objecter::OSDRead *prepare_read(inodeno_t ino, - ceph_file_layout *layout, - snapid_t snapid, - __u64 offset, - size_t len, - bufferlist *bl, - int flags) { - Objecter::OSDRead *rd = objecter->prepare_read(bl, flags); - file_to_extents(ino, layout, snapid, offset, len, rd->extents); - return rd; - } + + /***** mapping *****/ + + /* + * map (ino, layout, offset, len) to a (list of) OSDExtents (byte + * ranges in objects on (primary) osds) + */ + void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap, + __u64 offset, + size_t len, + vector& extents); + + + + + /*** async file interface. scatter/gather as needed. ***/ + int read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid, @@ -100,10 +105,13 @@ class Filer { int flags, Context *onfinish) { assert(snapid); // (until there is a non-NOSNAP write) - Objecter::OSDRead *rd = prepare_read(ino, layout, snapid, offset, len, bl, flags); - return objecter->readx(rd, onfinish) > 0 ? 0:-1; + vector extents; + file_to_extents(ino, layout, snapid, offset, len, extents); + objecter->sg_read(extents, bl, flags, onfinish); + return 0; } + int write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc, @@ -113,9 +121,10 @@ class Filer { int flags, Context *onack, Context *oncommit) { - Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags); - file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents); - return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1; + vector extents; + file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents); + objecter->sg_write(extents, snapc, bl, flags, onack, oncommit); + return 0; } int zero(inodeno_t ino, @@ -126,9 +135,25 @@ class Filer { int flags, Context *onack, Context *oncommit) { - Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags); - file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents); - return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1; + vector extents; + file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents); + if (extents.size() == 1) { + objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + snapc, flags, onack, oncommit); + } else { + C_Gather *gack = 0, *gcom = 0; + if (onack) + gack = new C_Gather(onack); + if (oncommit) + gcom = new C_Gather(oncommit); + for (vector::iterator p = extents.begin(); p != extents.end(); p++) { + objecter->zero(p->oid, p->offset, p->length, p->layout, + snapc, flags, + gack ? gack->new_sub():0, + gcom ? gcom->new_sub():0); + } + } + return 0; } int remove(inodeno_t ino, @@ -139,9 +164,24 @@ class Filer { int flags, Context *onack, Context *oncommit) { - Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags); - file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents); - return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1; + vector extents; + file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents); + if (extents.size() == 1) { + objecter->remove(extents[0].oid, extents[0].layout, + snapc, flags, onack, oncommit); + } else { + C_Gather *gack = 0, *gcom = 0; + if (onack) + gack = new C_Gather(onack); + if (oncommit) + gcom = new C_Gather(oncommit); + for (vector::iterator p = extents.begin(); p != extents.end(); p++) + objecter->remove(p->oid, p->layout, + snapc, flags, + gack ? gack->new_sub():0, + gcom ? gcom->new_sub():0); + } + return 0; } /* @@ -158,18 +198,6 @@ class Filer { int flags, Context *onfinish); - - /***** mapping *****/ - - /* - * map (ino, layout, offset, len) to a (list of) OSDExtents (byte - * ranges in objects on (primary) osds) - */ - void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap, - __u64 offset, - size_t len, - list& extents); - }; diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 3cfb5f6c51c4..164182da61f2 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -131,24 +131,24 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh) * map a range of bytes into buffer_heads. * - create missing buffer_heads as necessary. */ -int ObjectCacher::Object::map_read(Objecter::OSDRead *rd, +int ObjectCacher::Object::map_read(OSDRead *rd, map& hits, map& missing, map& rx) { - for (list::iterator ex_it = rd->extents.begin(); + for (vector::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); ex_it++) { if (ex_it->oid != oid) continue; dout(10) << "map_read " << ex_it->oid - << " " << ex_it->start << "~" << ex_it->length << dendl; + << " " << ex_it->offset << "~" << ex_it->length << dendl; - map::iterator p = data.lower_bound(ex_it->start); + map::iterator p = data.lower_bound(ex_it->offset); // p->first >= start - loff_t cur = ex_it->start; + loff_t cur = ex_it->offset; loff_t left = ex_it->length; if (p != data.begin() && @@ -171,7 +171,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd, cur += left; left -= left; assert(left == 0); - assert(cur == ex_it->start + (loff_t)ex_it->length); + assert(cur == ex_it->offset + (loff_t)ex_it->length); break; // no more. } @@ -223,23 +223,23 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd, * - break up bufferheads that don't fall completely within the range * //no! - return a bh that includes the write. may also include other dirty data to left and/or right. */ -ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr) +ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) { BufferHead *final = 0; - for (list::iterator ex_it = wr->extents.begin(); + for (vector::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ex_it++) { if (ex_it->oid != oid) continue; dout(10) << "map_write oex " << ex_it->oid - << " " << ex_it->start << "~" << ex_it->length << dendl; + << " " << ex_it->offset << "~" << ex_it->length << dendl; - map::iterator p = data.lower_bound(ex_it->start); + map::iterator p = data.lower_bound(ex_it->offset); // p->first >= start - loff_t cur = ex_it->start; + loff_t cur = ex_it->offset; loff_t left = ex_it->length; if (p != data.begin() && @@ -404,7 +404,7 @@ void ObjectCacher::bh_read(BufferHead *bh) C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length()); // go - objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), + objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl, 0, onfinish); } @@ -746,13 +746,13 @@ void ObjectCacher::trim(loff_t max) * returns # bytes read (if in cache). onfinish is untouched (caller must delete it) * returns 0 if doing async read */ -int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) +int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) { bool success = true; list hit_ls; map stripe_map; // final buffer offset -> substring - for (list::iterator ex_it = rd->extents.begin(); + for (vector::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); ex_it++) { dout(10) << "readx " << *ex_it << dendl; @@ -805,11 +805,11 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) // this is over a single ObjectExtent, so we know that // - the bh's are contiguous // - the buffer frags need not be (and almost certainly aren't) - loff_t opos = ex_it->start; + loff_t opos = ex_it->offset; map::iterator bh_it = hits.begin(); assert(bh_it->second->start() <= opos); size_t bhoff = opos - bh_it->second->start(); - map::iterator f_it = ex_it->buffer_extents.begin(); + map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin(); size_t foff = 0; while (1) { BufferHead *bh = bh_it->second; @@ -844,7 +844,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) if (f_it == ex_it->buffer_extents.end()) break; } assert(f_it == ex_it->buffer_extents.end()); - assert(opos == ex_it->start + (loff_t)ex_it->length); + assert(opos == ex_it->offset + (loff_t)ex_it->length); } } @@ -887,11 +887,11 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) } -int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) +int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino) { utime_t now = g_clock.now(); - for (list::iterator ex_it = wr->extents.begin(); + for (vector::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ex_it++) { // get object cache @@ -906,8 +906,8 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) // - there is one contiguous bh // - the buffer frags need not be (and almost certainly aren't) // note: i assume striping is monotonic... no jumps backwards, ever! - loff_t opos = ex_it->start; - for (map::iterator f_it = ex_it->buffer_extents.begin(); + loff_t opos = ex_it->offset; + for (map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin(); f_it != ex_it->buffer_extents.end(); f_it++) { dout(10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << dendl; @@ -1019,7 +1019,7 @@ void ObjectCacher::flusher_entry() // blocking. atomic+sync. -int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock) +int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) { dout(10) << "atomic_sync_readx " << rd << " in " << ino @@ -1030,7 +1030,10 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& // just write synchronously. Cond cond; bool done = false; - objecter->readx(rd, new C_SafeCond(&lock, &cond, &done)); + //objecter->readx(rd, new C_SafeCond(&lock, &cond, &done)); + objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length, + rd->extents[0].layout, rd->bl, 0, + new C_SafeCond(&lock, &cond, &done)); // block while (!done) cond.Wait(lock); @@ -1039,7 +1042,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& // sort by object... map by_oid; - for (list::iterator ex_it = rd->extents.begin(); + for (vector::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); ex_it++) by_oid[ex_it->oid] = *ex_it; @@ -1053,7 +1056,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& } // readx will hose rd - list extents = rd->extents; + vector extents = rd->extents; // do the read, into our cache Cond cond; @@ -1064,7 +1067,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& while (!done) cond.Wait(lock); // release the locks - for (list::iterator ex_it = extents.begin(); + for (vector::iterator ex_it = extents.begin(); ex_it != extents.end(); ex_it++) { assert(objects.count(ex_it->oid)); @@ -1076,7 +1079,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& return 0; } -int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock) +int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock) { dout(10) << "atomic_sync_writex " << wr << " in " << ino @@ -1102,7 +1105,8 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute Cond cond; bool done = false; - objecter->modifyx(wr, new C_SafeCond(&lock, &cond, &done), 0); + objecter->sg_write(wr->extents, wr->snapc, wr->bl, 0, + new C_SafeCond(&lock, &cond, &done), 0); // block while (!done) cond.Wait(lock); @@ -1113,7 +1117,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute // spans multiple objects, or is big. // sort by object... map by_oid; - for (list::iterator ex_it = wr->extents.begin(); + for (vector::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ex_it++) by_oid[ex_it->oid] = *ex_it; @@ -1127,14 +1131,14 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute } // writex will hose wr - list extents = wr->extents; + vector extents = wr->extents; // do the write, into our cache writex(wr, ino); // flush // ...and release the locks? - for (list::iterator ex_it = extents.begin(); + for (vector::iterator ex_it = extents.begin(); ex_it != extents.end(); ex_it++) { assert(objects.count(ex_it->oid)); @@ -1548,7 +1552,7 @@ loff_t ObjectCacher::release_set(inodeno_t ino) return unclean; } -void ObjectCacher::truncate_set(inodeno_t ino, list& exls) +void ObjectCacher::truncate_set(inodeno_t ino, vector& exls) { if (objects_by_ino.count(ino) == 0) { dout(10) << "truncate_set on " << ino << " dne" << dendl; @@ -1557,27 +1561,27 @@ void ObjectCacher::truncate_set(inodeno_t ino, list& exls) dout(10) << "truncate_set " << ino << dendl; - for (list::iterator p = exls.begin(); - p != exls.end(); - ++p) { - ObjectExtent &ex = *p; - if (objects.count(ex.oid) == 0) continue; - Object *ob = objects[ex.oid]; - - // purge or truncate? - if (ex.start == 0) { - dout(10) << "truncate_set purging " << *ob << dendl; - purge(ob); - } else { - // hrm, truncate object - dout(10) << "truncate_set truncating " << *ob << " at " << ex.start << dendl; - ob->truncate(ex.start); - - if (ob->can_close()) { - dout(10) << "truncate_set trimming " << *ob << dendl; - close_object(ob); - } - } + for (vector::iterator p = exls.begin(); + p != exls.end(); + ++p) { + ObjectExtent &ex = *p; + if (objects.count(ex.oid) == 0) continue; + Object *ob = objects[ex.oid]; + + // purge or truncate? + if (ex.offset == 0) { + dout(10) << "truncate_set purging " << *ob << dendl; + purge(ob); + } else { + // hrm, truncate object + dout(10) << "truncate_set truncating " << *ob << " at " << ex.offset << dendl; + ob->truncate(ex.offset); + + if (ob->can_close()) { + dout(10) << "truncate_set trimming " << *ob << dendl; + close_object(ob); + } + } } } diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index b4d629adc7a5..34a0f1c59e48 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -23,6 +23,34 @@ class ObjectCacher { class Object; + // read scatter/gather + struct OSDRead { + vector extents; + map read_data; // bits of data as they come back + bufferlist *bl; + int flags; + OSDRead(bufferlist *b, int f) : bl(b), flags(f) {} + }; + + OSDRead *prepare_read(bufferlist *b, int f) { + return new OSDRead(b, f); + } + + // write scatter/gather + struct OSDWrite { + vector extents; + SnapContext snapc; + bufferlist bl; + int flags; + OSDWrite(const SnapContext& sc, bufferlist& b, int f) : snapc(sc), bl(b), flags(f) {} + }; + + OSDWrite *prepare_write(const SnapContext& sc, bufferlist &b, int f) { + return new OSDWrite(sc, b, f); + } + + + // ******* BufferHead ********* class BufferHead : public LRUObject { public: @@ -191,11 +219,11 @@ class ObjectCacher { void merge_left(BufferHead *left, BufferHead *right); void try_merge_bh(BufferHead *bh); - int map_read(Objecter::OSDRead *rd, + int map_read(OSDRead *rd, map& hits, map& missing, map& rx); - BufferHead *map_write(Objecter::OSDWrite *wr); + BufferHead *map_write(OSDWrite *wr); void truncate(loff_t s); @@ -462,11 +490,11 @@ class ObjectCacher { class C_RetryRead : public Context { ObjectCacher *oc; - Objecter::OSDRead *rd; + OSDRead *rd; inodeno_t ino; Context *onfinish; public: - C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {} + C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {} void finish(int) { int r = oc->readx(rd, ino, onfinish); if (r > 0 && onfinish) { @@ -476,16 +504,18 @@ class ObjectCacher { } }; + + // non-blocking. async. - int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish); - int writex(Objecter::OSDWrite *wr, inodeno_t ino); + int readx(OSDRead *rd, inodeno_t ino, Context *onfinish); + int writex(OSDWrite *wr, inodeno_t ino); // write blocking bool wait_for_write(size_t len, Mutex& lock); // blocking. atomic+sync. - int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock); - int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock); + int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock); + int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock); bool set_is_cached(inodeno_t ino); bool set_is_dirty_or_committing(inodeno_t ino); @@ -500,7 +530,7 @@ class ObjectCacher { loff_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean) - void truncate_set(inodeno_t ino, list& ex); + void truncate_set(inodeno_t ino, vector& ex); void kick_sync_writers(inodeno_t ino); void kick_sync_readers(inodeno_t ino); @@ -514,7 +544,7 @@ class ObjectCacher { bufferlist *bl, int flags, Context *onfinish) { - Objecter::OSDRead *rd = objecter->prepare_read(bl, flags); + OSDRead *rd = prepare_read(bl, flags); filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents); return readx(rd, ino, onfinish); } @@ -522,7 +552,7 @@ class ObjectCacher { int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc, loff_t offset, size_t len, bufferlist& bl, int flags) { - Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags); + OSDWrite *wr = prepare_write(snapc, bl, flags); filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents); return writex(wr, ino); } @@ -530,13 +560,13 @@ class ObjectCacher { /*** sync+blocking file interface ***/ - + int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid, loff_t offset, size_t len, bufferlist *bl, int flags, Mutex &lock) { - Objecter::OSDRead *rd = objecter->prepare_read(bl, flags); + OSDRead *rd = prepare_read(bl, flags); filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents); return atomic_sync_readx(rd, ino, lock); } @@ -546,7 +576,7 @@ class ObjectCacher { loff_t offset, size_t len, bufferlist& bl, int flags, Mutex &lock) { - Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags); + OSDWrite *wr = prepare_write(snapc, bl, flags); filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents); return atomic_sync_writex(wr, ino, lock); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 59359121f572..296a067eb5cd 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -480,7 +480,8 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) // read ----------------------------------- -tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags, +tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, + bufferlist *bl, int flags, Context *onfinish) { OSDRead *rd = prepare_read(bl, flags); @@ -522,7 +523,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) // send? dout(10) << "readx_submit " << rd << " tid " << last_tid - << " oid " << ex.oid << " " << ex.start << "~" << ex.length + << " oid " << ex.oid << " " << ex.offset << "~" << ex.length << " (" << ex.buffer_extents.size() << " buffer fragments)" << " " << ex.layout << " osd" << pg.acker() @@ -534,7 +535,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) MOSDOp *m = new MOSDOp(client_inc, last_tid, false, ex.oid, ex.layout, osdmap->get_epoch(), flags); - m->read(ex.start, ex.length); + m->read(ex.offset, ex.length); if (inc_lock > 0) { rd->inc_lock = inc_lock; m->set_inc_lock(inc_lock); @@ -641,10 +642,13 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) assert(ox_len <= eit->length); // for each buffer extent we're mapping into... - for (map::iterator bit = eit->buffer_extents.begin(); + for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin(); bit != eit->buffer_extents.end(); bit++) { - dout(21) << " object " << eit->oid << " extent " << eit->start << "~" << eit->length << " : ox offset " << ox_off << " -> buffer extent " << bit->first << "~" << bit->second << dendl; + dout(21) << " object " << eit->oid + << " extent " << eit->offset << "~" << eit->length + << " : ox offset " << ox_off + << " -> buffer extent " << bit->first << "~" << bit->second << dendl; by_off[bit->first] = new bufferlist; if (ox_off + bit->second <= ox_len) { @@ -779,6 +783,20 @@ tid_t Objecter::zero(object_t oid, __u64 off, size_t len, return last_tid; } +// remove + +tid_t Objecter::remove(object_t oid, + ceph_object_layout ol, const SnapContext& snapc, + int flags, + Context *onack, Context *oncommit) +{ + OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags); + z->extents.push_back(ObjectExtent(oid, 0, 0)); + z->extents.front().layout = ol; + modifyx(z, onack, oncommit); + return last_tid; +} + // lock ops @@ -849,7 +867,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) // send? dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid << " oid " << ex.oid - << " " << ex.start << "~" << ex.length + << " " << ex.offset << "~" << ex.length << " " << ex.layout << " osd" << pg.primary() << dendl; @@ -857,7 +875,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) MOSDOp *m = new MOSDOp(client_inc, tid, true, ex.oid, ex.layout, osdmap->get_epoch(), flags); - m->add_simple_op(wr->op, ex.start, ex.length); + m->add_simple_op(wr->op, ex.offset, ex.length); m->set_snap_seq(wr->snapc.seq); m->get_snaps() = wr->snapc.snaps; if (inc_lock > 0) { @@ -878,7 +896,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) // map buffer segments into this extent // (may be fragmented bc of striping) bufferlist cur; - for (map::iterator bit = ex.buffer_extents.begin(); + for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin(); bit != ex.buffer_extents.end(); bit++) ((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur); @@ -1053,6 +1071,121 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) + +// scatter/gather + +void Objecter::_sg_read_finish(vector& extents, vector& resultbl, + bufferlist *bl, Context *onfinish) +{ + // all done + size_t bytes_read = 0; + + dout(15) << "_sg_read_finish" << dendl; + + if (extents.size() > 1) { + /** FIXME This doesn't handle holes efficiently. + * It allocates zero buffers to fill whole buffer, and + * then discards trailing ones at the end. + * + * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over + * the heap. + */ + + // map extents back into buffer + map<__u64, bufferlist*> by_off; // buffer offset -> bufferlist + + // for each object extent... + vector::iterator bit = resultbl.begin(); + for (vector::iterator eit = extents.begin(); + eit != extents.end(); + eit++, bit++) { + bufferlist& ox_buf = *bit; + unsigned ox_len = ox_buf.length(); + unsigned ox_off = 0; + assert(ox_len <= eit->length); + + // for each buffer extent we're mapping into... + for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin(); + bit != eit->buffer_extents.end(); + bit++) { + dout(21) << " object " << eit->oid + << " extent " << eit->offset << "~" << eit->length + << " : ox offset " << ox_off + << " -> buffer extent " << bit->first << "~" << bit->second << dendl; + by_off[bit->first] = new bufferlist; + + if (ox_off + bit->second <= ox_len) { + // we got the whole bx + by_off[bit->first]->substr_of(ox_buf, ox_off, bit->second); + if (bytes_read < bit->first + bit->second) + bytes_read = bit->first + bit->second; + } else if (ox_off + bit->second > ox_len && ox_off < ox_len) { + // we got part of this bx + by_off[bit->first]->substr_of(ox_buf, ox_off, (ox_len-ox_off)); + if (bytes_read < bit->first + ox_len-ox_off) + bytes_read = bit->first + ox_len-ox_off; + + // zero end of bx + dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << dendl; + bufferptr z(ox_off + bit->second - ox_len); + z.zero(); + by_off[bit->first]->append( z ); + } else { + // we got none of this bx. zero whole thing. + assert(ox_off >= ox_len); + dout(21) << " adding all zeros for this bit " << bit->second << dendl; + bufferptr z(bit->second); + z.zero(); + by_off[bit->first]->append( z ); + } + ox_off += bit->second; + } + assert(ox_off == eit->length); + } + + // sort and string bits together + for (map<__u64, bufferlist*>::iterator it = by_off.begin(); + it != by_off.end(); + it++) { + assert(it->second->length()); + if (it->first < (__u64)bytes_read) { + dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << dendl; + bl->claim_append(*(it->second)); + } else { + dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl; + } + delete it->second; + } + + // trim trailing zeros? + if (bl->length() > bytes_read) { + dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read + << " len=" << bl->length() << dendl; + bl->splice(bytes_read, bl->length() - bytes_read); + assert(bytes_read == bl->length()); + } + + } else { + dout(15) << " only one frag" << dendl; + + // only one fragment, easy + bl->claim(resultbl[0]); + bytes_read = bl->length(); + } + + // finish, clean up + dout(7) << " " << bytes_read << " bytes " + << bl->length() + << dendl; + + // done + if (onfinish) { + onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result()); + delete onfinish; + } +} + + void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest) { entity_inst_t inst; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index ab8a49e9ca41..11ad8c18a5e7 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -247,11 +247,80 @@ class Objecter { Context *onack, Context *oncommit); tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags, Context *onack, Context *oncommit); + tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags, + Context *onack, Context *oncommit); // no snapc for lock ops tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit); + + // --------------------------- + // some scatter/gather hackery + + void _sg_read_finish(vector& extents, vector& resultbl, + bufferlist *bl, Context *onfinish); + + struct C_SGRead : public Context { + Objecter *objecter; + vector extents; + vector resultbl; + bufferlist *bl; + Context *onfinish; + C_SGRead(Objecter *ob, + vector& e, vector& r, bufferlist *b, Context *c) : + objecter(ob), bl(b), onfinish(c) { + extents.swap(e); + resultbl.swap(r); + } + void finish(int r) { + objecter->_sg_read_finish(extents, resultbl, bl, onfinish); + } + }; + + void sg_read(vector& extents, bufferlist *bl, int flags, Context *onfinish) { + if (extents.size() == 1) { + read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + bl, flags, onfinish); + } else { + C_Gather *g = new C_Gather; + vector resultbl(extents.size()); + int i=0; + for (vector::iterator p = extents.begin(); p != extents.end(); p++) { + read(p->oid, p->offset, p->length, p->layout, + &resultbl[i++], flags, onfinish); + } + g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); + } + } + + + void sg_write(vector& extents, const SnapContext& snapc, bufferlist bl, + int flags, Context *onack, Context *oncommit) { + if (extents.size() == 1) { + write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + snapc, bl, flags, onack, oncommit); + } else { + C_Gather *gack = 0, *gcom = 0; + if (onack) + gack = new C_Gather(onack); + if (oncommit) + gcom = new C_Gather(oncommit); + for (vector::iterator p = extents.begin(); p != extents.end(); p++) { + bufferlist cur; + for (map<__u32,__u32>::iterator bit = p->buffer_extents.begin(); + bit != p->buffer_extents.end(); + bit++) + bl.copy(bit->first, bit->second, cur); + assert(cur.length() == p->length); + write(p->oid, p->offset, p->length, p->layout, + snapc, cur, flags, + gack ? gack->new_sub():0, + gcom ? gcom->new_sub():0); + } + } + } + void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest); };