if (g_conf.client_oc &&
m->get_size() < in->inode.size) {
// map range to objects
- list<ObjectExtent> ls;
+ vector<ObjectExtent> ls;
filer->file_to_extents(in->inode.ino, &in->inode.layout, CEPH_NOSNAP,
m->get_size(), in->inode.size - m->get_size(),
ls);
// object cache OFF -- non-atomic sync read from osd
// do sync read
- Objecter::OSDRead *rd = filer->prepare_read(in->inode.ino, &in->inode.layout, in->snapid,
- offset, size, bl, 0);
+ int flags = 0;
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
- rd->flags |= CEPH_OSD_OP_BALANCE_READS;
- r = objecter->readx(rd, onfinish);
- assert(r >= 0);
+ flags |= CEPH_OSD_OP_BALANCE_READS;
+ filer->read(in->inode.ino, &in->inode.layout, in->snapid,
+ offset, size, bl, flags, onfinish);
while (!done)
cond.Wait(client_lock);
return ceph_file_layout_period(layout);
}
-int Client::enumerate_layout(int fd, list<ObjectExtent>& result,
+int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
loff_t length, loff_t offset)
{
Mutex::Locker lock(client_lock);
int get_stripe_unit(int fd);
int get_stripe_width(int fd);
int get_stripe_period(int fd);
- int enumerate_layout(int fd, list<ObjectExtent>& result,
+ int enumerate_layout(int fd, vector<ObjectExtent>& result,
loff_t length, loff_t offset);
int mksnap(const char *path, const char *name);
off_t filesize = stbuf.st_size;
// grab the placement info
- list<ObjectExtent> extents;
+ vector<ObjectExtent> extents;
off_t offset = 0;
client->enumerate_layout(fd, extents, filesize, offset);
client->close(fd);
// run through all the object extents
dout(0) << "file size is " << filesize << dendl;
dout(0) << "(osd, start, length) tuples for file " << fn << dendl;
- for (list<ObjectExtent>::iterator i = extents.begin();
+ for (vector<ObjectExtent>::iterator i = extents.begin();
i != extents.end(); ++i) {
int osd = client->osdmap->get_pg_primary(pg_t(i->layout.ol_pgid.v));
// run through all the buffer extents
- for (map<size_t, size_t>::iterator j = i ->buffer_extents.begin();
+ for (map<__u32,__u32>::iterator j = i ->buffer_extents.begin();
j != i->buffer_extents.end(); ++j) {
dout(0) << "OSD " << osd << ", offset " << (*j).first <<
// See what the primary is for the first object in this file.
int SyntheticClient::check_first_primary(int fh) {
- list<ObjectExtent> extents;
+ vector<ObjectExtent> extents;
client->enumerate_layout(fh, extents, 1, 0);
return client->osdmap->get_pg_primary(pg_t((extents.begin())->layout.ol_pgid));
}
class ObjectExtent {
public:
object_t oid; // object id
- off_t start; // in object
- size_t length; // in object
+ __u32 offset; // in object
+ __u32 length; // in object
ceph_object_layout layout; // object layout (pgid, etc.)
- map<size_t, size_t> buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!)
+ map<__u32, __u32> buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!)
- ObjectExtent() : start(0), length(0) {}
- ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l) { }
+ ObjectExtent() : offset(0), length(0) {}
+ ObjectExtent(object_t o, __u32 off=0, __u32 l=0) : oid(o), offset(off), length(l) { }
};
inline ostream& operator<<(ostream& out, ObjectExtent &ex)
{
return out << "extent("
<< ex.oid << " in " << ex.layout
- << " " << ex.start << "~" << ex.length
+ << " " << ex.offset << "~" << ex.length
<< ")";
}
// map range onto objects
file_to_extents(probe->ino, &probe->layout, probe->snapid, probe->from, probe->probing_len, probe->probing);
- for (list<ObjectExtent>::iterator p = probe->probing.begin();
+ for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
bool found = false;
__u64 end = 0;
- if (!probe->fwd)
- probe->probing.reverse();
+ if (!probe->fwd) {
+ // reverse
+ vector<ObjectExtent> r;
+ for (vector<ObjectExtent>::reverse_iterator p = probe->probing.rbegin();
+ p != probe->probing.rend();
+ p++)
+ r.push_back(*p);
+ probe->probing.swap(r);
+ }
- for (list<ObjectExtent>::iterator p = probe->probing.begin();
+ for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
p++) {
- __u64 shouldbe = p->length+p->start;
+ __u64 shouldbe = p->length + p->offset;
dout(10) << "_probed " << probe->ino << " object " << hex << p->oid << dec
<< " should be " << shouldbe
<< ", actual is " << probe->known[p->oid]
// aha, we found the end!
// calc offset into buffer_extent to get distance from probe->from.
- __u64 oleft = probe->known[p->oid] - p->start;
- for (map<size_t,size_t>::iterator i = p->buffer_extents.begin();
+ __u64 oleft = probe->known[p->oid] - p->offset;
+ for (map<__u32,__u32>::iterator i = p->buffer_extents.begin();
i != p->buffer_extents.end();
i++) {
if (oleft <= (__u64)i->second) {
void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
__u64 offset, size_t len,
- list<ObjectExtent>& extents)
+ vector<ObjectExtent>& extents)
{
dout(10) << "file_to_extents " << offset << "~" << len
<< " on " << hex << ino << dec
<< dendl;
+ assert(len > 0);
/* we want only one extent per object!
* this means that each extent we read may map into different bits of the
else
x_len = left;
- if (ex->start + (__u64)ex->length == x_offset) {
+ if (ex->offset + (__u64)ex->length == x_offset) {
// add to extent
ex->length += x_len;
} else {
// new extent
assert(ex->length == 0);
- assert(ex->start == 0);
- ex->start = x_offset;
+ assert(ex->offset == 0);
+ ex->offset = x_offset;
ex->length = x_len;
}
ex->buffer_extents[cur-offset] = x_len;
Context *onfinish;
- list<ObjectExtent> probing;
+ vector<ObjectExtent> probing;
__u64 probing_len;
map<object_t, __u64> known;
return objecter->is_active(); // || (oc && oc->is_active());
}
- /*** async file interface ***/
- Objecter::OSDRead *prepare_read(inodeno_t ino,
- ceph_file_layout *layout,
- snapid_t snapid,
- __u64 offset,
- size_t len,
- bufferlist *bl,
- int flags) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
- file_to_extents(ino, layout, snapid, offset, len, rd->extents);
- return rd;
- }
+
+ /***** mapping *****/
+
+ /*
+ * map (ino, layout, offset, len) to a (list of) OSDExtents (byte
+ * ranges in objects on (primary) osds)
+ */
+ void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
+ __u64 offset,
+ size_t len,
+ vector<ObjectExtent>& extents);
+
+
+
+
+ /*** async file interface. scatter/gather as needed. ***/
+
int read(inodeno_t ino,
ceph_file_layout *layout,
snapid_t snapid,
int flags,
Context *onfinish) {
assert(snapid); // (until there is a non-NOSNAP write)
- Objecter::OSDRead *rd = prepare_read(ino, layout, snapid, offset, len, bl, flags);
- return objecter->readx(rd, onfinish) > 0 ? 0:-1;
+ vector<ObjectExtent> extents;
+ file_to_extents(ino, layout, snapid, offset, len, extents);
+ objecter->sg_read(extents, bl, flags, onfinish);
+ return 0;
}
+
int write(inodeno_t ino,
ceph_file_layout *layout,
const SnapContext& snapc,
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
- return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
+ vector<ObjectExtent> extents;
+ file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ objecter->sg_write(extents, snapc, bl, flags, onack, oncommit);
+ return 0;
}
int zero(inodeno_t ino,
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags);
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents);
- return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
+ vector<ObjectExtent> extents;
+ file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ if (extents.size() == 1) {
+ objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ snapc, flags, onack, oncommit);
+ } else {
+ C_Gather *gack = 0, *gcom = 0;
+ if (onack)
+ gack = new C_Gather(onack);
+ if (oncommit)
+ gcom = new C_Gather(oncommit);
+ for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+ objecter->zero(p->oid, p->offset, p->length, p->layout,
+ snapc, flags,
+ gack ? gack->new_sub():0,
+ gcom ? gcom->new_sub():0);
+ }
+ }
+ return 0;
}
int remove(inodeno_t ino,
int flags,
Context *onack,
Context *oncommit) {
- Objecter::OSDModify *z = objecter->prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
- file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, z->extents);
- return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
+ vector<ObjectExtent> extents;
+ file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
+ if (extents.size() == 1) {
+ objecter->remove(extents[0].oid, extents[0].layout,
+ snapc, flags, onack, oncommit);
+ } else {
+ C_Gather *gack = 0, *gcom = 0;
+ if (onack)
+ gack = new C_Gather(onack);
+ if (oncommit)
+ gcom = new C_Gather(oncommit);
+ for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++)
+ objecter->remove(p->oid, p->layout,
+ snapc, flags,
+ gack ? gack->new_sub():0,
+ gcom ? gcom->new_sub():0);
+ }
+ return 0;
}
/*
int flags,
Context *onfinish);
-
- /***** mapping *****/
-
- /*
- * map (ino, layout, offset, len) to a (list of) OSDExtents (byte
- * ranges in objects on (primary) osds)
- */
- void file_to_extents(inodeno_t ino, ceph_file_layout *layout, snapid_t snap,
- __u64 offset,
- size_t len,
- list<ObjectExtent>& extents);
-
};
* map a range of bytes into buffer_heads.
* - create missing buffer_heads as necessary.
*/
-int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
+int ObjectCacher::Object::map_read(OSDRead *rd,
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx)
{
- for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
ex_it++) {
if (ex_it->oid != oid) continue;
dout(10) << "map_read " << ex_it->oid
- << " " << ex_it->start << "~" << ex_it->length << dendl;
+ << " " << ex_it->offset << "~" << ex_it->length << dendl;
- map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+ map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->offset);
// p->first >= start
- loff_t cur = ex_it->start;
+ loff_t cur = ex_it->offset;
loff_t left = ex_it->length;
if (p != data.begin() &&
cur += left;
left -= left;
assert(left == 0);
- assert(cur == ex_it->start + (loff_t)ex_it->length);
+ assert(cur == ex_it->offset + (loff_t)ex_it->length);
break; // no more.
}
* - break up bufferheads that don't fall completely within the range
* //no! - return a bh that includes the write. may also include other dirty data to left and/or right.
*/
-ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
{
BufferHead *final = 0;
- for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
ex_it++) {
if (ex_it->oid != oid) continue;
dout(10) << "map_write oex " << ex_it->oid
- << " " << ex_it->start << "~" << ex_it->length << dendl;
+ << " " << ex_it->offset << "~" << ex_it->length << dendl;
- map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+ map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->offset);
// p->first >= start
- loff_t cur = ex_it->start;
+ loff_t cur = ex_it->offset;
loff_t left = ex_it->length;
if (p != data.begin() &&
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
+ objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
&onfinish->bl, 0,
onfinish);
}
* returns # bytes read (if in cache). onfinish is untouched (caller must delete it)
* returns 0 if doing async read
*/
-int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
{
bool success = true;
list<BufferHead*> hit_ls;
map<size_t, bufferlist> stripe_map; // final buffer offset -> substring
- for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
ex_it++) {
dout(10) << "readx " << *ex_it << dendl;
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
- loff_t opos = ex_it->start;
+ loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
size_t bhoff = opos - bh_it->second->start();
- map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+ map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin();
size_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
if (f_it == ex_it->buffer_extents.end()) break;
}
assert(f_it == ex_it->buffer_extents.end());
- assert(opos == ex_it->start + (loff_t)ex_it->length);
+ assert(opos == ex_it->offset + (loff_t)ex_it->length);
}
}
}
-int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
+int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino)
{
utime_t now = g_clock.now();
- for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
ex_it++) {
// get object cache
// - there is one contiguous bh
// - the buffer frags need not be (and almost certainly aren't)
// note: i assume striping is monotonic... no jumps backwards, ever!
- loff_t opos = ex_it->start;
- for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+ loff_t opos = ex_it->offset;
+ for (map<__u32,__u32>::iterator f_it = ex_it->buffer_extents.begin();
f_it != ex_it->buffer_extents.end();
f_it++) {
dout(10) << "writex writing " << f_it->first << "~" << f_it->second << " into " << *bh << " at " << opos << dendl;
// blocking. atomic+sync.
-int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
{
dout(10) << "atomic_sync_readx " << rd
<< " in " << ino
// just write synchronously.
Cond cond;
bool done = false;
- objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
+ //objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
+ objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length,
+ rd->extents[0].layout, rd->bl, 0,
+ new C_SafeCond(&lock, &cond, &done));
// block
while (!done) cond.Wait(lock);
// sort by object...
map<object_t,ObjectExtent> by_oid;
- for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
ex_it++)
by_oid[ex_it->oid] = *ex_it;
}
// readx will hose rd
- list<ObjectExtent> extents = rd->extents;
+ vector<ObjectExtent> extents = rd->extents;
// do the read, into our cache
Cond cond;
while (!done) cond.Wait(lock);
// release the locks
- for (list<ObjectExtent>::iterator ex_it = extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
ex_it++) {
assert(objects.count(ex_it->oid));
return 0;
}
-int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
{
dout(10) << "atomic_sync_writex " << wr
<< " in " << ino
Cond cond;
bool done = false;
- objecter->modifyx(wr, new C_SafeCond(&lock, &cond, &done), 0);
+ objecter->sg_write(wr->extents, wr->snapc, wr->bl, 0,
+ new C_SafeCond(&lock, &cond, &done), 0);
// block
while (!done) cond.Wait(lock);
// spans multiple objects, or is big.
// sort by object...
map<object_t,ObjectExtent> by_oid;
- for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
ex_it++)
by_oid[ex_it->oid] = *ex_it;
}
// writex will hose wr
- list<ObjectExtent> extents = wr->extents;
+ vector<ObjectExtent> extents = wr->extents;
// do the write, into our cache
writex(wr, ino);
// flush
// ...and release the locks?
- for (list<ObjectExtent>::iterator ex_it = extents.begin();
+ for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
ex_it++) {
assert(objects.count(ex_it->oid));
return unclean;
}
-void ObjectCacher::truncate_set(inodeno_t ino, list<ObjectExtent>& exls)
+void ObjectCacher::truncate_set(inodeno_t ino, vector<ObjectExtent>& exls)
{
if (objects_by_ino.count(ino) == 0) {
dout(10) << "truncate_set on " << ino << " dne" << dendl;
dout(10) << "truncate_set " << ino << dendl;
- for (list<ObjectExtent>::iterator p = exls.begin();
- p != exls.end();
- ++p) {
- ObjectExtent &ex = *p;
- if (objects.count(ex.oid) == 0) continue;
- Object *ob = objects[ex.oid];
-
- // purge or truncate?
- if (ex.start == 0) {
- dout(10) << "truncate_set purging " << *ob << dendl;
- purge(ob);
- } else {
- // hrm, truncate object
- dout(10) << "truncate_set truncating " << *ob << " at " << ex.start << dendl;
- ob->truncate(ex.start);
-
- if (ob->can_close()) {
- dout(10) << "truncate_set trimming " << *ob << dendl;
- close_object(ob);
- }
- }
+ for (vector<ObjectExtent>::iterator p = exls.begin();
+ p != exls.end();
+ ++p) {
+ ObjectExtent &ex = *p;
+ if (objects.count(ex.oid) == 0) continue;
+ Object *ob = objects[ex.oid];
+
+ // purge or truncate?
+ if (ex.offset == 0) {
+ dout(10) << "truncate_set purging " << *ob << dendl;
+ purge(ob);
+ } else {
+ // hrm, truncate object
+ dout(10) << "truncate_set truncating " << *ob << " at " << ex.offset << dendl;
+ ob->truncate(ex.offset);
+
+ if (ob->can_close()) {
+ dout(10) << "truncate_set trimming " << *ob << dendl;
+ close_object(ob);
+ }
+ }
}
}
class Object;
+ // read scatter/gather
+ struct OSDRead {
+ vector<ObjectExtent> extents;
+ map<object_t, bufferlist*> read_data; // bits of data as they come back
+ bufferlist *bl;
+ int flags;
+ OSDRead(bufferlist *b, int f) : bl(b), flags(f) {}
+ };
+
+ OSDRead *prepare_read(bufferlist *b, int f) {
+ return new OSDRead(b, f);
+ }
+
+ // write scatter/gather
+ struct OSDWrite {
+ vector<ObjectExtent> extents;
+ SnapContext snapc;
+ bufferlist bl;
+ int flags;
+ OSDWrite(const SnapContext& sc, bufferlist& b, int f) : snapc(sc), bl(b), flags(f) {}
+ };
+
+ OSDWrite *prepare_write(const SnapContext& sc, bufferlist &b, int f) {
+ return new OSDWrite(sc, b, f);
+ }
+
+
+
// ******* BufferHead *********
class BufferHead : public LRUObject {
public:
void merge_left(BufferHead *left, BufferHead *right);
void try_merge_bh(BufferHead *bh);
- int map_read(Objecter::OSDRead *rd,
+ int map_read(OSDRead *rd,
map<loff_t, BufferHead*>& hits,
map<loff_t, BufferHead*>& missing,
map<loff_t, BufferHead*>& rx);
- BufferHead *map_write(Objecter::OSDWrite *wr);
+ BufferHead *map_write(OSDWrite *wr);
void truncate(loff_t s);
class C_RetryRead : public Context {
ObjectCacher *oc;
- Objecter::OSDRead *rd;
+ OSDRead *rd;
inodeno_t ino;
Context *onfinish;
public:
- C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+ C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
void finish(int) {
int r = oc->readx(rd, ino, onfinish);
if (r > 0 && onfinish) {
}
};
+
+
// non-blocking. async.
- int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
- int writex(Objecter::OSDWrite *wr, inodeno_t ino);
+ int readx(OSDRead *rd, inodeno_t ino, Context *onfinish);
+ int writex(OSDWrite *wr, inodeno_t ino);
// write blocking
bool wait_for_write(size_t len, Mutex& lock);
// blocking. atomic+sync.
- int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);
- int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock);
+ int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock);
+ int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock);
bool set_is_cached(inodeno_t ino);
bool set_is_dirty_or_committing(inodeno_t ino);
loff_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
- void truncate_set(inodeno_t ino, list<ObjectExtent>& ex);
+ void truncate_set(inodeno_t ino, vector<ObjectExtent>& ex);
void kick_sync_writers(inodeno_t ino);
void kick_sync_readers(inodeno_t ino);
bufferlist *bl,
int flags,
Context *onfinish) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+ OSDRead *rd = prepare_read(bl, flags);
filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
return readx(rd, ino, onfinish);
}
int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc,
loff_t offset, size_t len,
bufferlist& bl, int flags) {
- Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
+ OSDWrite *wr = prepare_write(snapc, bl, flags);
filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
return writex(wr, ino);
}
/*** sync+blocking file interface ***/
-
+
int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout,
snapid_t snapid,
loff_t offset, size_t len,
bufferlist *bl, int flags,
Mutex &lock) {
- Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
+ OSDRead *rd = prepare_read(bl, flags);
filer.file_to_extents(ino, layout, snapid, offset, len, rd->extents);
return atomic_sync_readx(rd, ino, lock);
}
loff_t offset, size_t len,
bufferlist& bl, int flags,
Mutex &lock) {
- Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
+ OSDWrite *wr = prepare_write(snapc, bl, flags);
filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
return atomic_sync_writex(wr, ino, lock);
}
// read -----------------------------------
-tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
+tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol,
+ bufferlist *bl, int flags,
Context *onfinish)
{
OSDRead *rd = prepare_read(bl, flags);
// send?
dout(10) << "readx_submit " << rd << " tid " << last_tid
- << " oid " << ex.oid << " " << ex.start << "~" << ex.length
+ << " oid " << ex.oid << " " << ex.offset << "~" << ex.length
<< " (" << ex.buffer_extents.size() << " buffer fragments)"
<< " " << ex.layout
<< " osd" << pg.acker()
MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
ex.oid, ex.layout, osdmap->get_epoch(),
flags);
- m->read(ex.start, ex.length);
+ m->read(ex.offset, ex.length);
if (inc_lock > 0) {
rd->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
assert(ox_len <= eit->length);
// for each buffer extent we're mapping into...
- for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
+ for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
bit != eit->buffer_extents.end();
bit++) {
- dout(21) << " object " << eit->oid << " extent " << eit->start << "~" << eit->length << " : ox offset " << ox_off << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
+ dout(21) << " object " << eit->oid
+ << " extent " << eit->offset << "~" << eit->length
+ << " : ox offset " << ox_off
+ << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
by_off[bit->first] = new bufferlist;
if (ox_off + bit->second <= ox_len) {
return last_tid;
}
+// remove
+
+tid_t Objecter::remove(object_t oid,
+ ceph_object_layout ol, const SnapContext& snapc,
+ int flags,
+ Context *onack, Context *oncommit)
+{
+ OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
+ z->extents.push_back(ObjectExtent(oid, 0, 0));
+ z->extents.front().layout = ol;
+ modifyx(z, onack, oncommit);
+ return last_tid;
+}
+
// lock ops
// send?
dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid
<< " oid " << ex.oid
- << " " << ex.start << "~" << ex.length
+ << " " << ex.offset << "~" << ex.length
<< " " << ex.layout
<< " osd" << pg.primary()
<< dendl;
MOSDOp *m = new MOSDOp(client_inc, tid, true,
ex.oid, ex.layout, osdmap->get_epoch(),
flags);
- m->add_simple_op(wr->op, ex.start, ex.length);
+ m->add_simple_op(wr->op, ex.offset, ex.length);
m->set_snap_seq(wr->snapc.seq);
m->get_snaps() = wr->snapc.snaps;
if (inc_lock > 0) {
// map buffer segments into this extent
// (may be fragmented bc of striping)
bufferlist cur;
- for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
+ for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin();
bit != ex.buffer_extents.end();
bit++)
((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur);
+
+// scatter/gather
+
+void Objecter::_sg_read_finish(vector<ObjectExtent>& extents, vector<bufferlist>& resultbl,
+ bufferlist *bl, Context *onfinish)
+{
+ // all done
+ size_t bytes_read = 0;
+
+ dout(15) << "_sg_read_finish" << dendl;
+
+ if (extents.size() > 1) {
+ /** FIXME This doesn't handle holes efficiently.
+ * It allocates zero buffers to fill whole buffer, and
+ * then discards trailing ones at the end.
+ *
+ * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over
+ * the heap.
+ */
+
+ // map extents back into buffer
+ map<__u64, bufferlist*> by_off; // buffer offset -> bufferlist
+
+ // for each object extent...
+ vector<bufferlist>::iterator bit = resultbl.begin();
+ for (vector<ObjectExtent>::iterator eit = extents.begin();
+ eit != extents.end();
+ eit++, bit++) {
+ bufferlist& ox_buf = *bit;
+ unsigned ox_len = ox_buf.length();
+ unsigned ox_off = 0;
+ assert(ox_len <= eit->length);
+
+ // for each buffer extent we're mapping into...
+ for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
+ bit != eit->buffer_extents.end();
+ bit++) {
+ dout(21) << " object " << eit->oid
+ << " extent " << eit->offset << "~" << eit->length
+ << " : ox offset " << ox_off
+ << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
+ by_off[bit->first] = new bufferlist;
+
+ if (ox_off + bit->second <= ox_len) {
+ // we got the whole bx
+ by_off[bit->first]->substr_of(ox_buf, ox_off, bit->second);
+ if (bytes_read < bit->first + bit->second)
+ bytes_read = bit->first + bit->second;
+ } else if (ox_off + bit->second > ox_len && ox_off < ox_len) {
+ // we got part of this bx
+ by_off[bit->first]->substr_of(ox_buf, ox_off, (ox_len-ox_off));
+ if (bytes_read < bit->first + ox_len-ox_off)
+ bytes_read = bit->first + ox_len-ox_off;
+
+ // zero end of bx
+ dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << dendl;
+ bufferptr z(ox_off + bit->second - ox_len);
+ z.zero();
+ by_off[bit->first]->append( z );
+ } else {
+ // we got none of this bx. zero whole thing.
+ assert(ox_off >= ox_len);
+ dout(21) << " adding all zeros for this bit " << bit->second << dendl;
+ bufferptr z(bit->second);
+ z.zero();
+ by_off[bit->first]->append( z );
+ }
+ ox_off += bit->second;
+ }
+ assert(ox_off == eit->length);
+ }
+
+ // sort and string bits together
+ for (map<__u64, bufferlist*>::iterator it = by_off.begin();
+ it != by_off.end();
+ it++) {
+ assert(it->second->length());
+ if (it->first < (__u64)bytes_read) {
+ dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << dendl;
+ bl->claim_append(*(it->second));
+ } else {
+ dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl;
+ }
+ delete it->second;
+ }
+
+ // trim trailing zeros?
+ if (bl->length() > bytes_read) {
+ dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read
+ << " len=" << bl->length() << dendl;
+ bl->splice(bytes_read, bl->length() - bytes_read);
+ assert(bytes_read == bl->length());
+ }
+
+ } else {
+ dout(15) << " only one frag" << dendl;
+
+ // only one fragment, easy
+ bl->claim(resultbl[0]);
+ bytes_read = bl->length();
+ }
+
+ // finish, clean up
+ dout(7) << " " << bytes_read << " bytes "
+ << bl->length()
+ << dendl;
+
+ // done
+ if (onfinish) {
+ onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
+ delete onfinish;
+ }
+}
+
+
void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest)
{
entity_inst_t inst;
Context *onack, Context *oncommit);
tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags,
Context *onack, Context *oncommit);
+ tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags,
+ Context *onack, Context *oncommit);
// no snapc for lock ops
tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit);
+
+ // ---------------------------
+ // some scatter/gather hackery
+
+ void _sg_read_finish(vector<ObjectExtent>& extents, vector<bufferlist>& resultbl,
+ bufferlist *bl, Context *onfinish);
+
+ struct C_SGRead : public Context {
+ Objecter *objecter;
+ vector<ObjectExtent> extents;
+ vector<bufferlist> resultbl;
+ bufferlist *bl;
+ Context *onfinish;
+ C_SGRead(Objecter *ob,
+ vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b, Context *c) :
+ objecter(ob), bl(b), onfinish(c) {
+ extents.swap(e);
+ resultbl.swap(r);
+ }
+ void finish(int r) {
+ objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
+ }
+ };
+
+ void sg_read(vector<ObjectExtent>& extents, bufferlist *bl, int flags, Context *onfinish) {
+ if (extents.size() == 1) {
+ read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ bl, flags, onfinish);
+ } else {
+ C_Gather *g = new C_Gather;
+ vector<bufferlist> resultbl(extents.size());
+ int i=0;
+ for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+ read(p->oid, p->offset, p->length, p->layout,
+ &resultbl[i++], flags, onfinish);
+ }
+ g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
+ }
+ }
+
+
+ void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, bufferlist bl,
+ int flags, Context *onack, Context *oncommit) {
+ if (extents.size() == 1) {
+ write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ snapc, bl, flags, onack, oncommit);
+ } else {
+ C_Gather *gack = 0, *gcom = 0;
+ if (onack)
+ gack = new C_Gather(onack);
+ if (oncommit)
+ gcom = new C_Gather(oncommit);
+ for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
+ bufferlist cur;
+ for (map<__u32,__u32>::iterator bit = p->buffer_extents.begin();
+ bit != p->buffer_extents.end();
+ bit++)
+ bl.copy(bit->first, bit->second, cur);
+ assert(cur.length() == p->length);
+ write(p->oid, p->offset, p->length, p->layout,
+ snapc, cur, flags,
+ gack ? gack->new_sub():0,
+ gcom ? gcom->new_sub():0);
+ }
+ }
+ }
+
void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest);
};