Caller provides an ObjectSet* to group objects into.
Later we can put other info here, like truncate_seq and
truncate_size.
}
-void client_flush_set_callback(void *p, inodeno_t ino)
+void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
{
Client *client = (Client*)p;
- client->flush_set_callback(ino);
+ client->flush_set_callback(oset);
}
remove_all_caps(in);
//cout << "put_inode deleting " << in << " " << in->ino << std::endl;
- objectcacher->release_set(in->ino);
+ objectcacher->release_set(&in->oset);
if (in->snapdir_parent)
put_inode(in->snapdir_parent);
inode_map.erase(in->vino());
void Client::_release(Inode *in, bool checkafter)
{
if (in->cap_refs[CEPH_CAP_FILE_CACHE]) {
- objectcacher->release_set(in->ino);
+ objectcacher->release_set(&in->oset);
if (checkafter)
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
else
dout(10) << "_flush " << *in << dendl;
if (!onfinish)
onfinish = new C_NoopContext;
- bool safe = objectcacher->commit_set(in->ino, onfinish);
+ bool safe = objectcacher->commit_set(&in->oset, onfinish);
if (safe && onfinish) {
onfinish->finish(0);
delete onfinish;
}
}
-void Client::flush_set_callback(inodeno_t ino)
+void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
{
// Mutex::Locker l(client_lock);
assert(client_lock.is_locked()); // will be called via dispatch() -> objecter -> ...
- Inode *in = inode_map[vinodeno_t(ino,CEPH_NOSNAP)];
+ Inode *in = (Inode *)oset->parent;
if (in)
_flushed(in);
}
// release clean pages too, if we dont hold RDCACHE reference
if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0)
- objectcacher->release_set(in->ino);
+ objectcacher->release_set(&in->oset);
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
}
filer->file_to_extents(in->ino, &in->layout,
m->get_size(), in->size - m->get_size(),
ls);
- objectcacher->truncate_set(in->ino, ls);
+ objectcacher->truncate_set(&in->oset, ls);
}
in->reported_size = in->size = m->get_size();
<< " min " << min
<< " (caller wants " << off << "~" << len << ")" << dendl;
if (l > (loff_t)len) {
- if (objectcacher->file_is_cached(in->ino, &in->layout, in->snapid, off, min))
+ if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min))
dout(20) << "readahead already have min" << dendl;
else {
- objectcacher->file_read(in->ino, &in->layout, in->snapid, off, l, NULL, 0, 0);
+ objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, l, NULL, 0, 0);
dout(20) << "readahead initiated" << dendl;
}
}
bool done = false;
Context *onfinish = new C_SafeCond(&flock, &cond, &done, &rvalue);
if (in->snapid == CEPH_NOSNAP)
- r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
+ r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
off, len, bl, 0, onfinish);
else
- r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
+ r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
off, len, bl, 0, onfinish);
if (r == 0) {
while (!done)
objectcacher->wait_for_write(size, client_lock);
// async, caching, non-blocking.
- objectcacher->file_write(in->ino, &in->layout, in->snaprealm->get_snap_context(),
+ objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, g_clock.now(), 0);
} else {
/*
using namespace __gnu_cxx;
+#include "osdc/ObjectCacher.h"
class MClientSession;
class MClientRequest;
map<int,int> open_by_mode;
map<int,int> cap_refs;
+ ObjectCacher::ObjectSet oset;
+
__u64 reported_size, wanted_max_size, requested_max_size;
int ref; // ref count. 1 for each dentry, fh that links to me.
exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
cap_item(this), flushing_cap_item(this), last_flush_tid(0),
snaprealm(0), snaprealm_item(this), snapdir_parent(0),
+ oset((void *)this, ino),
reported_size(0), wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
dir(0), dn(0),
void _release(Inode *in, bool checkafter=true);
void _flush(Inode *in, Context *onfinish=NULL);
void _flushed(Inode *in);
- void flush_set_callback(inodeno_t ino);
+ void flush_set_callback(ObjectCacher::ObjectSet *oset);
void close_release(Inode *in);
void close_safe(Inode *in);
// ok!
objects.erase(ob->get_soid());
- objects_by_ino[ob->get_ino()].erase(ob);
- if (objects_by_ino[ob->get_ino()].empty())
- objects_by_ino.erase(ob->get_ino());
delete ob;
}
oncommit->tid = tid;
bh->ob->last_write_tid = tid;
bh->last_write_tid = tid;
- if (commit_set_callback) {
- uncommitted_by_ino[bh->ob->get_ino()].push_back(&bh->ob->uncommitted_item);
- }
+ if (commit_set_callback)
+ bh->ob->oset->uncommitted.push_back(&bh->ob->uncommitted_item);
mark_tx(bh);
}
}
// is the entire object set now clean?
- if (flush_set_callback &&
- dirty_tx_by_ino[ob->get_ino()] == 0) {
- flush_set_callback(flush_set_callback_arg, ob->get_ino());
- dirty_tx_by_ino.erase(ob->get_ino());
+ if (flush_set_callback && ob->oset->dirty_tx == 0) {
+ flush_set_callback(flush_set_callback_arg, ob->oset);
}
}
//lock.Unlock();
if (commit_set_callback &&
ob->last_commit_tid == ob->last_write_tid) {
ob->uncommitted_item.remove_myself();
- inodeno_t ino = ob->get_ino();
+ ObjectSet *oset = ob->oset;
if (ob->can_close())
close_object(ob);
- if (uncommitted_by_ino[ino].empty()) { // no uncommitted in flight
- uncommitted_by_ino.erase(ino);
- if (dirty_tx_by_ino[ino] == 0) // AND nothing dirty/tx
- commit_set_callback(flush_set_callback_arg, ino);
+ if (oset->uncommitted.empty()) { // no uncommitted in flight
+ if (oset->dirty_tx == 0) // AND nothing dirty/tx
+ commit_set_callback(flush_set_callback_arg, oset);
}
}
}
/* public */
-bool ObjectCacher::is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapid_t snapid)
+bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid)
{
for (vector<ObjectExtent>::iterator ex_it = extents.begin();
ex_it != extents.end();
// get Object cache
sobject_t soid(ex_it->oid, snapid);
- Object *o = get_object_maybe(soid, ino, ex_it->layout);
+ Object *o = get_object_maybe(soid, ex_it->layout);
if (!o)
return false;
if (!o->is_cached(ex_it->offset, ex_it->length))
* returns # bytes read (if in cache). onfinish is untouched (caller must delete it)
* returns 0 if doing async read
*/
-int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
{
bool success = true;
list<BufferHead*> hit_ls;
// get Object cache
sobject_t soid(ex_it->oid, rd->snap);
- Object *o = get_object(soid, ino, ex_it->layout);
+ Object *o = get_object(soid, oset, ex_it->layout);
// map extent into bufferheads
map<loff_t, BufferHead*> hits, missing, rx;
if (success && onfinish) {
dout(10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
- bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+ bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
success = false;
}
if (success && onfinish) {
dout(10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
- bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+ bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
success = false;
}
}
-int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset)
{
utime_t now = g_clock.now();
ex_it++) {
// get object cache
sobject_t soid(ex_it->oid, CEPH_NOSNAP);
- Object *o = get_object(soid, ino, ex_it->layout);
+ Object *o = get_object(soid, oset, ex_it->layout);
// map it all into a single bufferhead.
BufferHead *bh = o->map_write(wr);
// blocking. atomic+sync.
-int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock)
{
dout(10) << "atomic_sync_readx " << rd
- << " in " << ino
+ << " in " << oset
<< dendl;
if (rd->extents.size() == 1) {
i != by_oid.end();
i++) {
sobject_t soid(i->first, rd->snap);
- Object *o = get_object(soid, ino, i->second.layout);
+ Object *o = get_object(soid, oset, i->second.layout);
rdlock(o);
}
Mutex flock("ObjectCacher::atomic_sync_readx flock 2");
Cond cond;
bool done = false;
- readx(rd, ino, new C_SafeCond(&flock, &cond, &done));
+ readx(rd, oset, new C_SafeCond(&flock, &cond, &done));
// block
while (!done) cond.Wait(lock);
return 0;
}
-int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock)
{
dout(10) << "atomic_sync_writex " << wr
- << " in " << ino
+ << " in " << oset
<< dendl;
if (wr->extents.size() == 1 &&
// make sure we aren't already locking/locked...
sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP);
Object *o = 0;
- if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout);
+ if (objects.count(oid))
+ o = get_object(oid, oset, wr->extents.front().layout);
if (!o ||
(o->lock_state != Object::LOCK_WRLOCK &&
o->lock_state != Object::LOCK_WRLOCKING &&
o->lock_state != Object::LOCK_UPGRADING)) {
// just write synchronously.
dout(10) << "atomic_sync_writex " << wr
- << " in " << ino
+ << " in " << oset
<< " doing sync write"
<< dendl;
i != by_oid.end();
i++) {
sobject_t soid(i->first, CEPH_NOSNAP);
- Object *o = get_object(soid, ino, i->second.layout);
+ Object *o = get_object(soid, oset, i->second.layout);
wrlock(o);
}
vector<ObjectExtent> extents = wr->extents;
// do the write, into our cache
- writex(wr, ino);
+ writex(wr, oset);
// flush
// ...and release the locks?
// -------------------------------------------------
-bool ObjectCacher::set_is_cached(inodeno_t ino)
+bool ObjectCacher::set_is_cached(ObjectSet *oset)
{
- if (objects_by_ino.count(ino) == 0)
+ if (oset->objects.empty())
return false;
- set<Object*>& s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
- Object *ob = *i;
- for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
- p != ob->data.end();
- p++) {
- BufferHead *bh = p->second;
+ for (xlist<Object*>::iterator p = oset->objects.begin();
+ !p.end(); ++p) {
+ Object *ob = *p;
+ for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
+ q != ob->data.end();
+ q++) {
+ BufferHead *bh = q->second;
if (!bh->is_dirty() && !bh->is_tx())
return true;
}
return false;
}
-bool ObjectCacher::set_is_dirty_or_committing(inodeno_t ino)
+bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
{
- if (objects_by_ino.count(ino) == 0)
+ if (oset->objects.empty())
return false;
- set<Object*>& s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
// flush. non-blocking, takes callback.
// returns true if already flushed
-bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
+bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
{
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "flush_set on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "flush_set on " << oset << " dne" << dendl;
return true;
}
- dout(10) << "flush_set " << ino << dendl;
+ dout(10) << "flush_set " << oset << dendl;
C_Gather *gather = 0; // we'll need to wait for all objects to flush!
- set<Object*>& s = objects_by_ino[ino];
bool safe = true;
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
if (!flush(ob)) {
gather = new C_Gather(onfinish);
safe = false;
- dout(10) << "flush_set " << ino << " will wait for ack tid "
+ dout(10) << "flush_set " << oset << " will wait for ack tid "
<< ob->last_write_tid
<< " on " << *ob
<< dendl;
}
if (safe) {
- dout(10) << "flush_set " << ino << " has no dirty|tx bhs" << dendl;
+ dout(10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
return true;
}
return false;
// commit. non-blocking, takes callback.
// return true if already flushed.
-bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
+bool ObjectCacher::commit_set(ObjectSet *oset, Context *onfinish)
{
assert(onfinish); // doesn't make any sense otherwise.
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "commit_set on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "commit_set on " << oset << " dne" << dendl;
return true;
}
- dout(10) << "commit_set " << ino << dendl;
+ dout(10) << "commit_set " << oset << dendl;
// make sure it's flushing.
- flush_set(ino);
+ flush_set(oset);
C_Gather *gather = 0; // we'll need to wait for all objects to commit
- set<Object*>& s = objects_by_ino[ino];
bool safe = true;
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
if (ob->last_write_tid > ob->last_commit_tid) {
- dout(10) << "commit_set " << ino << " " << *ob
+ dout(10) << "commit_set " << oset << " " << *ob
<< " will finish on commit tid " << ob->last_write_tid
<< dendl;
if (!gather && onfinish) gather = new C_Gather(onfinish);
}
if (safe) {
- dout(10) << "commit_set " << ino << " all committed" << dendl;
+ dout(10) << "commit_set " << oset << " all committed" << dendl;
return true;
}
return false;
}
-void ObjectCacher::purge_set(inodeno_t ino)
+void ObjectCacher::purge_set(ObjectSet *oset)
{
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "purge_set on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "purge_set on " << oset << " dne" << dendl;
return;
}
- dout(10) << "purge_set " << ino << dendl;
+ dout(10) << "purge_set " << oset << dendl;
- set<Object*>& s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
purge(ob);
}
return o_unclean;
}
-loff_t ObjectCacher::release_set(inodeno_t ino)
+loff_t ObjectCacher::release_set(ObjectSet *oset)
{
// return # bytes not clean (and thus not released).
loff_t unclean = 0;
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "release_set on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "release_set on " << oset << " dne" << dendl;
return 0;
}
- dout(10) << "release_set " << ino << dendl;
+ dout(10) << "release_set " << oset << dendl;
- set<Object*> s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
- Object *ob = *i;
+ for (xlist<Object*>::iterator p = oset->objects.begin();
+ !p.end(); ++p) {
+ Object *ob = *p;
loff_t o_unclean = release(ob);
unclean += o_unclean;
if (o_unclean)
- dout(10) << "release_set " << ino << " " << *ob
+ dout(10) << "release_set " << oset << " " << *ob
<< " has " << o_unclean << " bytes left"
<< dendl;
}
if (unclean) {
- dout(10) << "release_set " << ino
+ dout(10) << "release_set " << oset
<< ", " << unclean << " bytes left" << dendl;
}
-void ObjectCacher::truncate_set(inodeno_t ino, vector<ObjectExtent>& exls)
+void ObjectCacher::truncate_set(ObjectSet *oset, vector<ObjectExtent>& exls)
{
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "truncate_set on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "truncate_set on " << oset << " dne" << dendl;
return;
}
- dout(10) << "truncate_set " << ino << dendl;
+ dout(10) << "truncate_set " << oset << dendl;
for (vector<ObjectExtent>::iterator p = exls.begin();
p != exls.end();
}
-void ObjectCacher::kick_sync_writers(inodeno_t ino)
+void ObjectCacher::kick_sync_writers(ObjectSet *oset)
{
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "kick_sync_writers on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "kick_sync_writers on " << oset << " dne" << dendl;
return;
}
- dout(10) << "kick_sync_writers on " << ino << dendl;
+ dout(10) << "kick_sync_writers on " << oset << dendl;
list<Context*> ls;
- set<Object*>& s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
ls.splice(ls.begin(), ob->waitfor_wr);
finish_contexts(ls);
}
-void ObjectCacher::kick_sync_readers(inodeno_t ino)
+void ObjectCacher::kick_sync_readers(ObjectSet *oset)
{
- if (objects_by_ino.count(ino) == 0) {
- dout(10) << "kick_sync_readers on " << ino << " dne" << dendl;
+ if (oset->objects.empty()) {
+ dout(10) << "kick_sync_readers on " << oset << " dne" << dendl;
return;
}
- dout(10) << "kick_sync_readers on " << ino << dendl;
+ dout(10) << "kick_sync_readers on " << oset << dendl;
list<Context*> ls;
- set<Object*>& s = objects_by_ino[ino];
- for (set<Object*>::iterator i = s.begin();
- i != s.end();
- i++) {
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
Object *ob = *i;
ls.splice(ls.begin(), ob->waitfor_rd);
class ObjectCacher {
public:
- typedef void (*flush_set_callback_t) (void *p, inodeno_t ino);
-
class Object;
+ class ObjectSet;
+
+ typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
// read scatter/gather
struct OSDRead {
// ObjectCacher::Object fields
ObjectCacher *oc;
sobject_t oid;
- inodeno_t ino;
+ public:
+ ObjectSet *oset;
+ xlist<Object*>::item set_item;
+ private:
ceph_object_layout layout;
+ friend class ObjectSet;
+
public:
map<loff_t, BufferHead*> data;
int rdlock_ref; // how many ppl want or are using a READ lock
public:
- Object(ObjectCacher *_oc, sobject_t o, inodeno_t i, ceph_object_layout& l) :
+ Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, ceph_object_layout& l) :
oc(_oc),
- oid(o), ino(i), layout(l),
+ oid(o), oset(os), set_item(this), layout(l),
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
uncommitted_item(this),
- lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
- {}
+ lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {
+ // add to set
+ os->objects.push_back(&set_item);
+ }
~Object() {
assert(data.empty());
+ set_item.remove_myself();
}
sobject_t get_soid() { return oid; }
object_t get_oid() { return oid.oid; }
snapid_t get_snap() { return oid.snap; }
- inodeno_t get_ino() { return ino; }
+ ObjectSet *get_object_set() { return oset; }
ceph_object_layout& get_layout() { return layout; }
void set_layout(ceph_object_layout& l) { layout = l; }
};
+
+ struct ObjectSet {
+ void *parent;
+
+ inodeno_t ino;
+ __u64 truncate_seq, truncate_size;
+
+ xlist<Object*> objects;
+ xlist<Object*> uncommitted;
+
+ int dirty_tx;
+
+ ObjectSet(void *p, inodeno_t i) : parent(p), ino(i), truncate_seq(0), truncate_size(0), dirty_tx(0) {}
+ };
+
+
// ******* ObjectCacher *********
// ObjectCacher fields
public:
void *flush_set_callback_arg;
hash_map<sobject_t, Object*> objects;
- hash_map<inodeno_t, set<Object*> > objects_by_ino;
- hash_map<inodeno_t, int> dirty_tx_by_ino;
- hash_map<inodeno_t, xlist<Object*> > uncommitted_by_ino;
set<BufferHead*> dirty_bh;
LRU lru_dirty, lru_rest;
// objects
- Object *get_object_maybe(sobject_t oid, inodeno_t ino, ceph_object_layout &l) {
+ Object *get_object_maybe(sobject_t oid, ceph_object_layout &l) {
// have it?
if (objects.count(oid))
return objects[oid];
return NULL;
}
- Object *get_object(sobject_t oid, inodeno_t ino, ceph_object_layout &l) {
+ Object *get_object(sobject_t oid, ObjectSet *oset, ceph_object_layout &l) {
// have it?
if (objects.count(oid))
return objects[oid];
// create it.
- Object *o = new Object(this, oid, ino, l);
+ Object *o = new Object(this, oid, oset, l);
objects[oid] = o;
- objects_by_ino[ino].insert(o);
return o;
}
void close_object(Object *ob);
case BufferHead::STATE_CLEAN: stat_clean += bh->length(); break;
case BufferHead::STATE_DIRTY:
stat_dirty += bh->length();
- dirty_tx_by_ino[bh->ob->get_ino()] += bh->length();
+ bh->ob->oset->dirty_tx += bh->length();
break;
case BufferHead::STATE_TX:
stat_tx += bh->length();
- dirty_tx_by_ino[bh->ob->get_ino()] += bh->length();
+ bh->ob->oset->dirty_tx += bh->length();
break;
case BufferHead::STATE_RX: stat_rx += bh->length(); break;
}
case BufferHead::STATE_CLEAN: stat_clean -= bh->length(); break;
case BufferHead::STATE_DIRTY:
stat_dirty -= bh->length();
- dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length();
+ bh->ob->oset->dirty_tx -= bh->length();
break;
case BufferHead::STATE_TX:
stat_tx -= bh->length();
- dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length();
+ bh->ob->oset->dirty_tx -= bh->length();
break;
case BufferHead::STATE_RX: stat_rx -= bh->length(); break;
}
class C_RetryRead : public Context {
ObjectCacher *oc;
OSDRead *rd;
- inodeno_t ino;
+ ObjectSet *oset;
Context *onfinish;
public:
- C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+ C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {}
void finish(int) {
- int r = oc->readx(rd, ino, onfinish);
+ int r = oc->readx(rd, oset, onfinish);
if (r > 0 && onfinish) {
onfinish->finish(r);
delete onfinish;
// non-blocking. async.
- int readx(OSDRead *rd, inodeno_t ino, Context *onfinish);
- int writex(OSDWrite *wr, inodeno_t ino);
- bool is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapid_t snapid);
+ int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
+ int writex(OSDWrite *wr, ObjectSet *oset);
+ bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
// write blocking
bool wait_for_write(__u64 len, Mutex& lock);
// blocking. atomic+sync.
- int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock);
- int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock);
+ int atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock);
+ int atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock);
- bool set_is_cached(inodeno_t ino);
- bool set_is_dirty_or_committing(inodeno_t ino);
+ bool set_is_cached(ObjectSet *oset);
+ bool set_is_dirty_or_committing(ObjectSet *oset);
- bool flush_set(inodeno_t ino, Context *onfinish=0);
+ bool flush_set(ObjectSet *oset, Context *onfinish=0);
void flush_all(Context *onfinish=0);
- bool commit_set(inodeno_t ino, Context *oncommit);
+ bool commit_set(ObjectSet *oset, Context *oncommit);
void commit_all(Context *oncommit=0);
- void purge_set(inodeno_t ino);
+ void purge_set(ObjectSet *oset);
- loff_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
+ loff_t release_set(ObjectSet *oset); // returns # of bytes not released (ie non-clean)
__u64 release_all();
- void truncate_set(inodeno_t ino, vector<ObjectExtent>& ex);
+ void truncate_set(ObjectSet *oset, vector<ObjectExtent>& ex);
- void kick_sync_writers(inodeno_t ino);
- void kick_sync_readers(inodeno_t ino);
+ void kick_sync_writers(ObjectSet *oset);
+ void kick_sync_readers(ObjectSet *oset);
// file functions
/*** async+caching (non-blocking) file interface ***/
- int file_is_cached(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid,
+ int file_is_cached(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid,
loff_t offset, __u64 len) {
vector<ObjectExtent> extents;
- filer.file_to_extents(ino, layout, offset, len, extents);
- return is_cached(ino, extents, snapid);
+ filer.file_to_extents(oset->ino, layout, offset, len, extents);
+ return is_cached(oset, extents, snapid);
}
- int file_read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid,
+ int file_read(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid,
loff_t offset, __u64 len,
bufferlist *bl,
int flags,
Context *onfinish) {
OSDRead *rd = prepare_read(snapid, bl, flags);
- filer.file_to_extents(ino, layout, offset, len, rd->extents);
- return readx(rd, ino, onfinish);
+ filer.file_to_extents(oset->ino, layout, offset, len, rd->extents);
+ return readx(rd, oset, onfinish);
}
- int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc,
+ int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
loff_t offset, __u64 len,
bufferlist& bl, utime_t mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
- filer.file_to_extents(ino, layout, offset, len, wr->extents);
- return writex(wr, ino);
+ filer.file_to_extents(oset->ino, layout, offset, len, wr->extents);
+ return writex(wr, oset);
}
/*** sync+blocking file interface ***/
- int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout,
+ int file_atomic_sync_read(ObjectSet *oset, ceph_file_layout *layout,
snapid_t snapid,
loff_t offset, __u64 len,
bufferlist *bl, int flags,
Mutex &lock) {
OSDRead *rd = prepare_read(snapid, bl, flags);
- filer.file_to_extents(ino, layout, offset, len, rd->extents);
- return atomic_sync_readx(rd, ino, lock);
+ filer.file_to_extents(oset->ino, layout, offset, len, rd->extents);
+ return atomic_sync_readx(rd, oset, lock);
}
- int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout,
+ int file_atomic_sync_write(ObjectSet *oset, ceph_file_layout *layout,
const SnapContext& snapc,
loff_t offset, __u64 len,
bufferlist& bl, utime_t mtime, int flags,
Mutex &lock) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
- filer.file_to_extents(ino, layout, offset, len, wr->extents);
- return atomic_sync_writex(wr, ino, lock);
+ filer.file_to_extents(oset->ino, layout, offset, len, wr->extents);
+ return atomic_sync_writex(wr, oset, lock);
}
};
inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob)
{
out << "object["
- << ob.get_soid() << " ino " << hex << ob.get_ino() << dec
+ << ob.get_soid() << " oset " << ob.oset << dec
<< " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid;
switch (ob.lock_state) {
struct StatfsOp {
tid_t tid;
- ceph_statfs *stats;
+ struct ceph_statfs *stats;
Context *onfinish;
utime_t last_submit;
void fs_stats_submit(StatfsOp *op);
public:
void handle_fs_stats_reply(MStatfsReply *m);
- void get_fs_stats(ceph_statfs& result, Context *onfinish);
+ void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
// ---------------------------
// some scatter/gather hackery