#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) *_dout << dbeginl << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".objectcacher.object(" << oid << ") "
-ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, off_t off)
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t off)
{
dout(20) << "split " << *left << " at " << off << dendl;
right->set_state(left->get_state());
right->snapc = left->snapc;
- off_t newleftlen = off - left->start();
+ loff_t newleftlen = off - left->start();
right->set_start(off);
right->set_length(left->length() - newleftlen);
// move read waiters
if (!left->waitfor_read.empty()) {
- map<off_t, list<Context*> >::iterator o, p = left->waitfor_read.end();
+ map<loff_t, list<Context*> >::iterator o, p = left->waitfor_read.end();
p--;
while (p != left->waitfor_read.begin()) {
if (p->first < right->start()) break;
left->last_write = MAX( left->last_write, right->last_write );
// waiters
- for (map<off_t, list<Context*> >::iterator p = right->waitfor_read.begin();
+ for (map<loff_t, list<Context*> >::iterator p = right->waitfor_read.begin();
p != right->waitfor_read.end();
p++)
left->waitfor_read[p->first].splice( left->waitfor_read[p->first].begin(),
dout(10) << "try_merge_bh " << *bh << dendl;
// to the left?
- map<off_t,BufferHead*>::iterator p = data.find(bh->start());
+ map<loff_t,BufferHead*>::iterator p = data.find(bh->start());
assert(p->second == bh);
if (p != data.begin()) {
p--;
* - create missing buffer_heads as necessary.
*/
int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
- map<off_t, BufferHead*>& hits,
- map<off_t, BufferHead*>& missing,
- map<off_t, BufferHead*>& rx)
+ map<loff_t, BufferHead*>& hits,
+ map<loff_t, BufferHead*>& missing,
+ map<loff_t, BufferHead*>& rx)
{
for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
dout(10) << "map_read " << ex_it->oid
<< " " << ex_it->start << "~" << ex_it->length << dendl;
- map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+ map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
// p->first >= start
- off_t cur = ex_it->start;
- off_t left = ex_it->length;
+ loff_t cur = ex_it->start;
+ loff_t left = ex_it->length;
if (p != data.begin() &&
(p == data.end() || p->first > cur)) {
cur += left;
left -= left;
assert(left == 0);
- assert(cur == ex_it->start + (off_t)ex_it->length);
+ assert(cur == ex_it->start + (loff_t)ex_it->length);
break; // no more.
}
}
else assert(0);
- off_t lenfromcur = MIN(e->end() - cur, left);
+ loff_t lenfromcur = MIN(e->end() - cur, left);
cur += lenfromcur;
left -= lenfromcur;
p++;
} else if (p->first > cur) {
// gap.. miss
- off_t next = p->first;
+ loff_t next = p->first;
BufferHead *n = new BufferHead(this);
n->set_start( cur );
n->set_length( MIN(next - cur, left) );
dout(10) << "map_write oex " << ex_it->oid
<< " " << ex_it->start << "~" << ex_it->length << dendl;
- map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
+ map<loff_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
// p->first >= start
- off_t cur = ex_it->start;
- off_t left = ex_it->length;
+ loff_t cur = ex_it->start;
+ loff_t left = ex_it->length;
if (p != data.begin() &&
(p == data.end() || p->first > cur)) {
}
while (left > 0) {
- off_t max = left;
+ loff_t max = left;
// at end ?
if (p == data.end()) {
}
// keep going.
- off_t lenfromcur = final->end() - cur;
+ loff_t lenfromcur = final->end() - cur;
cur += lenfromcur;
left -= lenfromcur;
p++;
continue;
} else {
// gap!
- off_t next = p->first;
- off_t glen = MIN(next - cur, max);
+ loff_t next = p->first;
+ loff_t glen = MIN(next - cur, max);
dout(10) << "map_write gap " << cur << "~" << glen << dendl;
if (final) {
final->set_length( final->length() + glen );
}
-void ObjectCacher::Object::truncate(off_t s)
+void ObjectCacher::Object::truncate(loff_t s)
{
dout(10) << "truncate to " << s << dendl;
onfinish);
}
-void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl)
+void ObjectCacher::bh_read_finish(object_t oid, loff_t start, size_t length, bufferlist &bl)
{
//lock.Lock();
dout(7) << "bh_read_finish "
Object *ob = objects[oid];
// apply to bh's!
- off_t opos = start;
- map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
+ loff_t opos = start;
+ map<loff_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
while (p != ob->data.end() &&
- opos < start+(off_t)length) {
+ opos < start+(loff_t)length) {
BufferHead *bh = p->second;
if (bh->start() > opos) {
assert(opos >= bh->start());
assert(bh->start() == opos); // we don't merge rx bh's... yet!
- assert(bh->length() <= start+(off_t)length-opos);
+ assert(bh->length() <= start+(loff_t)length-opos);
bh->bl.substr_of(bl,
opos-bh->start(),
// finishers?
// called with lock held.
list<Context*> ls;
- for (map<off_t, list<Context*> >::iterator p = bh->waitfor_read.begin();
+ for (map<loff_t, list<Context*> >::iterator p = bh->waitfor_read.begin();
p != bh->waitfor_read.end();
p++)
ls.splice(ls.end(), p->second);
}
}
-void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t tid)
+void ObjectCacher::bh_write_ack(object_t oid, loff_t start, size_t length, tid_t tid)
{
//lock.Lock();
Object *ob = objects[oid];
// apply to bh's!
- for (map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
+ for (map<loff_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
p != ob->data.end();
p++) {
BufferHead *bh = p->second;
- if (bh->start() > start+(off_t)length) break;
+ if (bh->start() > start+(loff_t)length) break;
if (bh->start() < start &&
- bh->end() > start+(off_t)length) {
+ bh->end() > start+(loff_t)length) {
dout(20) << "bh_write_ack skipping " << *bh << dendl;
continue;
}
//lock.Unlock();
}
-void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(object_t oid, loff_t start, size_t length, tid_t tid)
{
//lock.Lock();
}
-void ObjectCacher::flush(off_t amount)
+void ObjectCacher::flush(loff_t amount)
{
utime_t cutoff = g_clock.now();
//cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age;
* tail item. Then we call bh_write, which moves it to the other LRU, so that we
* can call lru_dirty.lru_get_next_expire() again.
*/
- off_t did = 0;
+ loff_t did = 0;
while (amount == 0 || did < amount) {
BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire();
if (!bh) break;
}
-void ObjectCacher::trim(off_t max)
+void ObjectCacher::trim(loff_t max)
{
if (max < 0)
max = g_conf.client_oc_size;
Object *o = get_object(ex_it->oid, ino, ex_it->layout);
// map extent into bufferheads
- map<off_t, BufferHead*> hits, missing, rx;
+ map<loff_t, BufferHead*> hits, missing, rx;
o->map_read(rd, hits, missing, rx);
if (!missing.empty() || !rx.empty()) {
// read missing
- for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
+ for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
bh_it++) {
bh_read(bh_it->second);
}
// bump rx
- for (map<off_t, BufferHead*>::iterator bh_it = rx.begin();
+ for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
bh_it != rx.end();
bh_it++) {
touch_bh(bh_it->second); // bump in lru, so we don't lose it.
assert(!hits.empty());
// make a plain list
- for (map<off_t, BufferHead*>::iterator bh_it = hits.begin();
+ for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
bh_it != hits.end();
bh_it++) {
dout(10) << "readx hit bh " << *bh_it->second << dendl;
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
- off_t opos = ex_it->start;
- map<off_t, BufferHead*>::iterator bh_it = hits.begin();
+ loff_t opos = ex_it->start;
+ map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
size_t bhoff = opos - bh_it->second->start();
map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
size_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
- assert(opos == (off_t)(bh->start() + bhoff));
+ assert(opos == (loff_t)(bh->start() + bhoff));
dout(10) << "readx rmap opos " << opos
<< ": " << *bh << " +" << bhoff
if (f_it == ex_it->buffer_extents.end()) break;
}
assert(f_it == ex_it->buffer_extents.end());
- assert(opos == ex_it->start + (off_t)ex_it->length);
+ assert(opos == ex_it->start + (loff_t)ex_it->length);
}
}
// - there is one contiguous bh
// - the buffer frags need not be (and almost certainly aren't)
// note: i assume striping is monotonic... no jumps backwards, ever!
- off_t opos = ex_it->start;
+ loff_t opos = ex_it->start;
for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
f_it != ex_it->buffer_extents.end();
f_it++) {
lock.Lock();
while (!flusher_stop) {
while (!flusher_stop) {
- off_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + get_stat_dirty();
+ loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() + get_stat_dirty();
dout(11) << "flusher "
<< all << " / " << g_conf.client_oc_size << ": "
<< get_stat_tx() << " tx, "
i != s.end();
i++) {
Object *ob = *i;
- for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
p != ob->data.end();
p++) {
BufferHead *bh = p->second;
i++) {
Object *ob = *i;
- for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
p != ob->data.end();
p++) {
BufferHead *bh = p->second;
bool ObjectCacher::flush(Object *ob)
{
bool clean = true;
- for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
p != ob->data.end();
p++) {
BufferHead *bh = p->second;
}
-off_t ObjectCacher::release(Object *ob)
+loff_t ObjectCacher::release(Object *ob)
{
list<BufferHead*> clean;
- off_t o_unclean = 0;
+ loff_t o_unclean = 0;
- for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
p != ob->data.end();
p++) {
BufferHead *bh = p->second;
return o_unclean;
}
-off_t ObjectCacher::release_set(inodeno_t ino)
+loff_t ObjectCacher::release_set(inodeno_t ino)
{
// return # bytes not clean (and thus not released).
- off_t unclean = 0;
+ loff_t unclean = 0;
if (objects_by_ino.count(ino) == 0) {
dout(10) << "release_set on " << ino << " dne" << dendl;
i++) {
Object *ob = *i;
- off_t o_unclean = release(ob);
+ loff_t o_unclean = release(ob);
unclean += o_unclean;
if (o_unclean)
int state;
int ref;
struct {
- off_t start, length; // bh extent in object
+ loff_t start, length; // bh extent in object
} ex;
public:
utime_t last_write;
SnapContext snapc;
- map< off_t, list<Context*> > waitfor_read;
+ map< loff_t, list<Context*> > waitfor_read;
public:
// cons
last_write_tid(0) {}
// extent
- off_t start() { return ex.start; }
- void set_start(off_t s) { ex.start = s; }
- off_t length() { return ex.length; }
- void set_length(off_t l) { ex.length = l; }
- off_t end() { return ex.start + ex.length; }
- off_t last() { return end() - 1; }
+ loff_t start() { return ex.start; }
+ void set_start(loff_t s) { ex.start = s; }
+ loff_t length() { return ex.length; }
+ void set_length(loff_t l) { ex.length = l; }
+ loff_t end() { return ex.start + ex.length; }
+ loff_t last() { return end() - 1; }
// states
void set_state(int s) {
ceph_object_layout layout;
public:
- map<off_t, BufferHead*> data;
+ map<loff_t, BufferHead*> data;
tid_t last_write_tid; // version of bh (if non-zero)
tid_t last_ack_tid; // last update acked.
if (0) { // sanity check FIXME DEBUG
//cout << "add_bh " << bh->start() << "~" << bh->length() << endl;
- map<off_t,BufferHead*>::iterator p = data.lower_bound(bh->start());
+ map<loff_t,BufferHead*>::iterator p = data.lower_bound(bh->start());
if (p != data.end()) {
//cout << " after " << *p->second << endl;
//cout << " after starts at " << p->first << endl;
bool is_empty() { return data.empty(); }
// mid-level
- BufferHead *split(BufferHead *bh, off_t off);
+ BufferHead *split(BufferHead *bh, loff_t off);
void merge_left(BufferHead *left, BufferHead *right);
void try_merge_bh(BufferHead *bh);
int map_read(Objecter::OSDRead *rd,
- map<off_t, BufferHead*>& hits,
- map<off_t, BufferHead*>& missing,
- map<off_t, BufferHead*>& rx);
+ map<loff_t, BufferHead*>& hits,
+ map<loff_t, BufferHead*>& missing,
+ map<loff_t, BufferHead*>& rx);
BufferHead *map_write(Objecter::OSDWrite *wr);
- void truncate(off_t s);
+ void truncate(loff_t s);
};
Cond stat_cond;
int stat_waiter;
- off_t stat_clean;
- off_t stat_dirty;
- off_t stat_rx;
- off_t stat_tx;
- off_t stat_missing;
+ loff_t stat_clean;
+ loff_t stat_dirty;
+ loff_t stat_rx;
+ loff_t stat_tx;
+ loff_t stat_missing;
void bh_stat_add(BufferHead *bh) {
switch (bh->get_state()) {
case BufferHead::STATE_RX: stat_rx -= bh->length(); break;
}
}
- off_t get_stat_tx() { return stat_tx; }
- off_t get_stat_rx() { return stat_rx; }
- off_t get_stat_dirty() { return stat_dirty; }
- off_t get_stat_clean() { return stat_clean; }
+ loff_t get_stat_tx() { return stat_tx; }
+ loff_t get_stat_rx() { return stat_rx; }
+ loff_t get_stat_dirty() { return stat_dirty; }
+ loff_t get_stat_clean() { return stat_clean; }
void touch_bh(BufferHead *bh) {
if (bh->is_dirty())
void bh_read(BufferHead *bh);
void bh_write(BufferHead *bh);
- void trim(off_t max=-1);
- void flush(off_t amount=0);
+ void trim(loff_t max=-1);
+ void flush(loff_t amount=0);
bool flush(Object *o);
- off_t release(Object *o);
+ loff_t release(Object *o);
void purge(Object *o);
void rdlock(Object *o);
void wrunlock(Object *o);
public:
- void bh_read_finish(object_t oid, off_t offset, size_t length, bufferlist &bl);
- void bh_write_ack(object_t oid, off_t offset, size_t length, tid_t t);
- void bh_write_commit(object_t oid, off_t offset, size_t length, tid_t t);
+ void bh_read_finish(object_t oid, loff_t offset, size_t length, bufferlist &bl);
+ void bh_write_ack(object_t oid, loff_t offset, size_t length, tid_t t);
+ void bh_write_commit(object_t oid, loff_t offset, size_t length, tid_t t);
void lock_ack(list<object_t>& oids, tid_t tid);
class C_ReadFinish : public Context {
ObjectCacher *oc;
object_t oid;
- off_t start;
+ loff_t start;
size_t length;
public:
bufferlist bl;
- C_ReadFinish(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_ReadFinish(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_read_finish(oid, start, length, bl);
}
class C_WriteAck : public Context {
ObjectCacher *oc;
object_t oid;
- off_t start;
+ loff_t start;
size_t length;
public:
tid_t tid;
- C_WriteAck(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteAck(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_write_ack(oid, start, length, tid);
}
class C_WriteCommit : public Context {
ObjectCacher *oc;
object_t oid;
- off_t start;
+ loff_t start;
size_t length;
public:
tid_t tid;
- C_WriteCommit(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteCommit(ObjectCacher *c, object_t o, loff_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
void finish(int r) {
oc->bh_write_commit(oid, start, length, tid);
}
void purge_set(inodeno_t ino);
- off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
+ loff_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
void truncate_set(inodeno_t ino, list<ObjectExtent>& ex);
/*** async+caching (non-blocking) file interface ***/
int file_read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid,
- off_t offset, size_t len,
+ loff_t offset, size_t len,
bufferlist *bl,
int flags,
Context *onfinish) {
}
int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc,
- off_t offset, size_t len,
+ loff_t offset, size_t len,
bufferlist& bl, int flags) {
Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);
filer.file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, wr->extents);
int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout,
snapid_t snapid,
- off_t offset, size_t len,
+ loff_t offset, size_t len,
bufferlist *bl, int flags,
Mutex &lock) {
Objecter::OSDRead *rd = objecter->prepare_read(bl, flags);
int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout,
const SnapContext& snapc,
- off_t offset, size_t len,
+ loff_t offset, size_t len,
bufferlist& bl, int flags,
Mutex &lock) {
Objecter::OSDWrite *wr = objecter->prepare_write(snapc, bl, flags);