From: Sage Weil Date: Thu, 11 Feb 2010 23:11:23 +0000 (-0800) Subject: objectcacher: use ObjectSet container instead of inodeno_t hash_maps X-Git-Tag: v0.19~47 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f85f60f24aac0fc76b62c08dae9f56c66060d674;p=ceph.git objectcacher: use ObjectSet container instead of inodeno_t hash_maps Caller provides an ObjectSet* to group objects into. Later we can put other info here, like truncate_seq and truncate_size. --- diff --git a/src/client/Client.cc b/src/client/Client.cc index a2367cb0f7aa..1153b5f75a96 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -95,10 +95,10 @@ ostream& operator<<(ostream &out, Inode &in) } -void client_flush_set_callback(void *p, inodeno_t ino) +void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset) { Client *client = (Client*)p; - client->flush_set_callback(ino); + client->flush_set_callback(oset); } @@ -1528,7 +1528,7 @@ void Client::put_inode(Inode *in, int n) remove_all_caps(in); //cout << "put_inode deleting " << in << " " << in->ino << std::endl; - objectcacher->release_set(in->ino); + objectcacher->release_set(&in->oset); if (in->snapdir_parent) put_inode(in->snapdir_parent); inode_map.erase(in->vino()); @@ -1961,7 +1961,7 @@ void Client::wake_inode_waiters(int mds_num) void Client::_release(Inode *in, bool checkafter) { if (in->cap_refs[CEPH_CAP_FILE_CACHE]) { - objectcacher->release_set(in->ino); + objectcacher->release_set(&in->oset); if (checkafter) put_cap_ref(in, CEPH_CAP_FILE_CACHE); else @@ -1975,18 +1975,18 @@ void Client::_flush(Inode *in, Context *onfinish) dout(10) << "_flush " << *in << dendl; if (!onfinish) onfinish = new C_NoopContext; - bool safe = objectcacher->commit_set(in->ino, onfinish); + bool safe = objectcacher->commit_set(&in->oset, onfinish); if (safe && onfinish) { onfinish->finish(0); delete onfinish; } } -void Client::flush_set_callback(inodeno_t ino) +void Client::flush_set_callback(ObjectCacher::ObjectSet *oset) { // Mutex::Locker l(client_lock); assert(client_lock.is_locked()); // will be called via dispatch() -> objecter -> ... - Inode *in = inode_map[vinodeno_t(ino,CEPH_NOSNAP)]; + Inode *in = (Inode *)oset->parent; if (in) _flushed(in); } @@ -1998,7 +1998,7 @@ void Client::_flushed(Inode *in) // release clean pages too, if we dont hold RDCACHE reference if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0) - objectcacher->release_set(in->ino); + objectcacher->release_set(&in->oset); put_cap_ref(in, CEPH_CAP_FILE_BUFFER); } @@ -2555,7 +2555,7 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m) filer->file_to_extents(in->ino, &in->layout, m->get_size(), in->size - m->get_size(), ls); - objectcacher->truncate_set(in->ino, ls); + objectcacher->truncate_set(&in->oset, ls); } in->reported_size = in->size = m->get_size(); @@ -4361,10 +4361,10 @@ int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl) << " min " << min << " (caller wants " << off << "~" << len << ")" << dendl; if (l > (loff_t)len) { - if (objectcacher->file_is_cached(in->ino, &in->layout, in->snapid, off, min)) + if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min)) dout(20) << "readahead already have min" << dendl; else { - objectcacher->file_read(in->ino, &in->layout, in->snapid, off, l, NULL, 0, 0); + objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, l, NULL, 0, 0); dout(20) << "readahead initiated" << dendl; } } @@ -4377,10 +4377,10 @@ int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl) bool done = false; Context *onfinish = new C_SafeCond(&flock, &cond, &done, &rvalue); if (in->snapid == CEPH_NOSNAP) - r = objectcacher->file_read(in->ino, &in->layout, in->snapid, + r = objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, len, bl, 0, onfinish); else - r = objectcacher->file_read(in->ino, &in->layout, in->snapid, + r = objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, len, bl, 0, onfinish); if (r == 0) { while (!done) @@ -4563,7 +4563,7 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf) objectcacher->wait_for_write(size, client_lock); // async, caching, non-blocking. - objectcacher->file_write(in->ino, &in->layout, in->snaprealm->get_snap_context(), + objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(), offset, size, bl, g_clock.now(), 0); } else { /* diff --git a/src/client/Client.h b/src/client/Client.h index e00c23fb9da4..f25f9e338a55 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -62,6 +62,7 @@ using std::fstream; using namespace __gnu_cxx; +#include "osdc/ObjectCacher.h" class MClientSession; class MClientRequest; @@ -404,6 +405,8 @@ class Inode { map open_by_mode; map cap_refs; + ObjectCacher::ObjectSet oset; + __u64 reported_size, wanted_max_size, requested_max_size; int ref; // ref count. 1 for each dentry, fh that links to me. @@ -480,6 +483,7 @@ class Inode { exporting_issued(0), exporting_mds(-1), exporting_mseq(0), cap_item(this), flushing_cap_item(this), last_flush_tid(0), snaprealm(0), snaprealm_item(this), snapdir_parent(0), + oset((void *)this, ino), reported_size(0), wanted_max_size(0), requested_max_size(0), ref(0), ll_ref(0), dir(0), dn(0), @@ -1138,7 +1142,7 @@ protected: void _release(Inode *in, bool checkafter=true); void _flush(Inode *in, Context *onfinish=NULL); void _flushed(Inode *in); - void flush_set_callback(inodeno_t ino); + void flush_set_callback(ObjectCacher::ObjectSet *oset); void close_release(Inode *in); void close_safe(Inode *in); diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 3916410f87be..4e6c4df63fd0 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -419,9 +419,6 @@ void ObjectCacher::close_object(Object *ob) // ok! objects.erase(ob->get_soid()); - objects_by_ino[ob->get_ino()].erase(ob); - if (objects_by_ino[ob->get_ino()].empty()) - objects_by_ino.erase(ob->get_ino()); delete ob; } @@ -539,9 +536,8 @@ void ObjectCacher::bh_write(BufferHead *bh) oncommit->tid = tid; bh->ob->last_write_tid = tid; bh->last_write_tid = tid; - if (commit_set_callback) { - uncommitted_by_ino[bh->ob->get_ino()].push_back(&bh->ob->uncommitted_item); - } + if (commit_set_callback) + bh->ob->oset->uncommitted.push_back(&bh->ob->uncommitted_item); mark_tx(bh); } @@ -668,10 +664,8 @@ void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, __u64 length, tid_t } // is the entire object set now clean? - if (flush_set_callback && - dirty_tx_by_ino[ob->get_ino()] == 0) { - flush_set_callback(flush_set_callback_arg, ob->get_ino()); - dirty_tx_by_ino.erase(ob->get_ino()); + if (flush_set_callback && ob->oset->dirty_tx == 0) { + flush_set_callback(flush_set_callback_arg, ob->oset); } } //lock.Unlock(); @@ -708,13 +702,12 @@ void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, __u64 length, ti if (commit_set_callback && ob->last_commit_tid == ob->last_write_tid) { ob->uncommitted_item.remove_myself(); - inodeno_t ino = ob->get_ino(); + ObjectSet *oset = ob->oset; if (ob->can_close()) close_object(ob); - if (uncommitted_by_ino[ino].empty()) { // no uncommitted in flight - uncommitted_by_ino.erase(ino); - if (dirty_tx_by_ino[ino] == 0) // AND nothing dirty/tx - commit_set_callback(flush_set_callback_arg, ino); + if (oset->uncommitted.empty()) { // no uncommitted in flight + if (oset->dirty_tx == 0) // AND nothing dirty/tx + commit_set_callback(flush_set_callback_arg, oset); } } } @@ -782,7 +775,7 @@ void ObjectCacher::trim(loff_t max) /* public */ -bool ObjectCacher::is_cached(inodeno_t ino, vector& extents, snapid_t snapid) +bool ObjectCacher::is_cached(ObjectSet *oset, vector& extents, snapid_t snapid) { for (vector::iterator ex_it = extents.begin(); ex_it != extents.end(); @@ -791,7 +784,7 @@ bool ObjectCacher::is_cached(inodeno_t ino, vector& extents, snapi // get Object cache sobject_t soid(ex_it->oid, snapid); - Object *o = get_object_maybe(soid, ino, ex_it->layout); + Object *o = get_object_maybe(soid, ex_it->layout); if (!o) return false; if (!o->is_cached(ex_it->offset, ex_it->length)) @@ -805,7 +798,7 @@ bool ObjectCacher::is_cached(inodeno_t ino, vector& extents, snapi * returns # bytes read (if in cache). onfinish is untouched (caller must delete it) * returns 0 if doing async read */ -int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) +int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish) { bool success = true; list hit_ls; @@ -818,7 +811,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) // get Object cache sobject_t soid(ex_it->oid, rd->snap); - Object *o = get_object(soid, ino, ex_it->layout); + Object *o = get_object(soid, oset, ex_it->layout); // map extent into bufferheads map hits, missing, rx; @@ -833,7 +826,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) if (success && onfinish) { dout(10) << "readx missed, waiting on " << *bh_it->second << " off " << bh_it->first << dendl; - bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) ); + bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); } success = false; } @@ -846,7 +839,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) if (success && onfinish) { dout(10) << "readx missed, waiting on " << *bh_it->second << " off " << bh_it->first << dendl; - bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) ); + bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); } success = false; } @@ -947,7 +940,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish) } -int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino) +int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset) { utime_t now = g_clock.now(); @@ -956,7 +949,7 @@ int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino) ex_it++) { // get object cache sobject_t soid(ex_it->oid, CEPH_NOSNAP); - Object *o = get_object(soid, ino, ex_it->layout); + Object *o = get_object(soid, oset, ex_it->layout); // map it all into a single bufferhead. BufferHead *bh = o->map_write(wr); @@ -1080,10 +1073,10 @@ void ObjectCacher::flusher_entry() // blocking. atomic+sync. -int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) +int ObjectCacher::atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock) { dout(10) << "atomic_sync_readx " << rd - << " in " << ino + << " in " << oset << dendl; if (rd->extents.size() == 1) { @@ -1114,7 +1107,7 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) i != by_oid.end(); i++) { sobject_t soid(i->first, rd->snap); - Object *o = get_object(soid, ino, i->second.layout); + Object *o = get_object(soid, oset, i->second.layout); rdlock(o); } @@ -1125,7 +1118,7 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) Mutex flock("ObjectCacher::atomic_sync_readx flock 2"); Cond cond; bool done = false; - readx(rd, ino, new C_SafeCond(&flock, &cond, &done)); + readx(rd, oset, new C_SafeCond(&flock, &cond, &done)); // block while (!done) cond.Wait(lock); @@ -1144,10 +1137,10 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) return 0; } -int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock) +int ObjectCacher::atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock) { dout(10) << "atomic_sync_writex " << wr - << " in " << ino + << " in " << oset << dendl; if (wr->extents.size() == 1 && @@ -1157,14 +1150,15 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock) // make sure we aren't already locking/locked... sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP); Object *o = 0; - if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout); + if (objects.count(oid)) + o = get_object(oid, oset, wr->extents.front().layout); if (!o || (o->lock_state != Object::LOCK_WRLOCK && o->lock_state != Object::LOCK_WRLOCKING && o->lock_state != Object::LOCK_UPGRADING)) { // just write synchronously. dout(10) << "atomic_sync_writex " << wr - << " in " << ino + << " in " << oset << " doing sync write" << dendl; @@ -1193,7 +1187,7 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock) i != by_oid.end(); i++) { sobject_t soid(i->first, CEPH_NOSNAP); - Object *o = get_object(soid, ino, i->second.layout); + Object *o = get_object(soid, oset, i->second.layout); wrlock(o); } @@ -1201,7 +1195,7 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock) vector extents = wr->extents; // do the write, into our cache - writex(wr, ino); + writex(wr, oset); // flush // ...and release the locks? @@ -1364,20 +1358,18 @@ void ObjectCacher::wrunlock(Object *o) // ------------------------------------------------- -bool ObjectCacher::set_is_cached(inodeno_t ino) +bool ObjectCacher::set_is_cached(ObjectSet *oset) { - if (objects_by_ino.count(ino) == 0) + if (oset->objects.empty()) return false; - set& s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { - Object *ob = *i; - for (map::iterator p = ob->data.begin(); - p != ob->data.end(); - p++) { - BufferHead *bh = p->second; + for (xlist::iterator p = oset->objects.begin(); + !p.end(); ++p) { + Object *ob = *p; + for (map::iterator q = ob->data.begin(); + q != ob->data.end(); + q++) { + BufferHead *bh = q->second; if (!bh->is_dirty() && !bh->is_tx()) return true; } @@ -1386,15 +1378,13 @@ bool ObjectCacher::set_is_cached(inodeno_t ino) return false; } -bool ObjectCacher::set_is_dirty_or_committing(inodeno_t ino) +bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset) { - if (objects_by_ino.count(ino) == 0) + if (oset->objects.empty()) return false; - set& s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; for (map::iterator p = ob->data.begin(); @@ -1453,22 +1443,20 @@ bool ObjectCacher::flush(Object *ob) // flush. non-blocking, takes callback. // returns true if already flushed -bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) +bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) { - if (objects_by_ino.count(ino) == 0) { - dout(10) << "flush_set on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "flush_set on " << oset << " dne" << dendl; return true; } - dout(10) << "flush_set " << ino << dendl; + dout(10) << "flush_set " << oset << dendl; C_Gather *gather = 0; // we'll need to wait for all objects to flush! - set& s = objects_by_ino[ino]; bool safe = true; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; if (!flush(ob)) { @@ -1477,7 +1465,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) gather = new C_Gather(onfinish); safe = false; - dout(10) << "flush_set " << ino << " will wait for ack tid " + dout(10) << "flush_set " << oset << " will wait for ack tid " << ob->last_write_tid << " on " << *ob << dendl; @@ -1487,7 +1475,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) } if (safe) { - dout(10) << "flush_set " << ino << " has no dirty|tx bhs" << dendl; + dout(10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl; return true; } return false; @@ -1496,31 +1484,29 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) // commit. non-blocking, takes callback. // return true if already flushed. -bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) +bool ObjectCacher::commit_set(ObjectSet *oset, Context *onfinish) { assert(onfinish); // doesn't make any sense otherwise. - if (objects_by_ino.count(ino) == 0) { - dout(10) << "commit_set on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "commit_set on " << oset << " dne" << dendl; return true; } - dout(10) << "commit_set " << ino << dendl; + dout(10) << "commit_set " << oset << dendl; // make sure it's flushing. - flush_set(ino); + flush_set(oset); C_Gather *gather = 0; // we'll need to wait for all objects to commit - set& s = objects_by_ino[ino]; bool safe = true; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; if (ob->last_write_tid > ob->last_commit_tid) { - dout(10) << "commit_set " << ino << " " << *ob + dout(10) << "commit_set " << oset << " " << *ob << " will finish on commit tid " << ob->last_write_tid << dendl; if (!gather && onfinish) gather = new C_Gather(onfinish); @@ -1531,25 +1517,23 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) } if (safe) { - dout(10) << "commit_set " << ino << " all committed" << dendl; + dout(10) << "commit_set " << oset << " all committed" << dendl; return true; } return false; } -void ObjectCacher::purge_set(inodeno_t ino) +void ObjectCacher::purge_set(ObjectSet *oset) { - if (objects_by_ino.count(ino) == 0) { - dout(10) << "purge_set on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "purge_set on " << oset << " dne" << dendl; return; } - dout(10) << "purge_set " << ino << dendl; + dout(10) << "purge_set " << oset << dendl; - set& s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; purge(ob); } @@ -1586,36 +1570,34 @@ loff_t ObjectCacher::release(Object *ob) return o_unclean; } -loff_t ObjectCacher::release_set(inodeno_t ino) +loff_t ObjectCacher::release_set(ObjectSet *oset) { // return # bytes not clean (and thus not released). loff_t unclean = 0; - if (objects_by_ino.count(ino) == 0) { - dout(10) << "release_set on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "release_set on " << oset << " dne" << dendl; return 0; } - dout(10) << "release_set " << ino << dendl; + dout(10) << "release_set " << oset << dendl; - set s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { - Object *ob = *i; + for (xlist::iterator p = oset->objects.begin(); + !p.end(); ++p) { + Object *ob = *p; loff_t o_unclean = release(ob); unclean += o_unclean; if (o_unclean) - dout(10) << "release_set " << ino << " " << *ob + dout(10) << "release_set " << oset << " " << *ob << " has " << o_unclean << " bytes left" << dendl; } if (unclean) { - dout(10) << "release_set " << ino + dout(10) << "release_set " << oset << ", " << unclean << " bytes left" << dendl; } @@ -1654,14 +1636,14 @@ __u64 ObjectCacher::release_all() -void ObjectCacher::truncate_set(inodeno_t ino, vector& exls) +void ObjectCacher::truncate_set(ObjectSet *oset, vector& exls) { - if (objects_by_ino.count(ino) == 0) { - dout(10) << "truncate_set on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "truncate_set on " << oset << " dne" << dendl; return; } - dout(10) << "truncate_set " << ino << dendl; + dout(10) << "truncate_set " << oset << dendl; for (vector::iterator p = exls.begin(); p != exls.end(); @@ -1690,21 +1672,19 @@ void ObjectCacher::truncate_set(inodeno_t ino, vector& exls) } -void ObjectCacher::kick_sync_writers(inodeno_t ino) +void ObjectCacher::kick_sync_writers(ObjectSet *oset) { - if (objects_by_ino.count(ino) == 0) { - dout(10) << "kick_sync_writers on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "kick_sync_writers on " << oset << " dne" << dendl; return; } - dout(10) << "kick_sync_writers on " << ino << dendl; + dout(10) << "kick_sync_writers on " << oset << dendl; list ls; - set& s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; ls.splice(ls.begin(), ob->waitfor_wr); @@ -1713,21 +1693,19 @@ void ObjectCacher::kick_sync_writers(inodeno_t ino) finish_contexts(ls); } -void ObjectCacher::kick_sync_readers(inodeno_t ino) +void ObjectCacher::kick_sync_readers(ObjectSet *oset) { - if (objects_by_ino.count(ino) == 0) { - dout(10) << "kick_sync_readers on " << ino << " dne" << dendl; + if (oset->objects.empty()) { + dout(10) << "kick_sync_readers on " << oset << " dne" << dendl; return; } - dout(10) << "kick_sync_readers on " << ino << dendl; + dout(10) << "kick_sync_readers on " << oset << dendl; list ls; - set& s = objects_by_ino[ino]; - for (set::iterator i = s.begin(); - i != s.end(); - i++) { + for (xlist::iterator i = oset->objects.begin(); + !i.end(); ++i) { Object *ob = *i; ls.splice(ls.begin(), ob->waitfor_rd); diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 0447c1dc05a4..f9c93cde7724 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -19,9 +19,10 @@ class Objecter; class ObjectCacher { public: - typedef void (*flush_set_callback_t) (void *p, inodeno_t ino); - class Object; + class ObjectSet; + + typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset); // read scatter/gather struct OSDRead { @@ -131,9 +132,14 @@ class ObjectCacher { // ObjectCacher::Object fields ObjectCacher *oc; sobject_t oid; - inodeno_t ino; + public: + ObjectSet *oset; + xlist::item set_item; + private: ceph_object_layout layout; + friend class ObjectSet; + public: map data; @@ -163,21 +169,24 @@ class ObjectCacher { int rdlock_ref; // how many ppl want or are using a READ lock public: - Object(ObjectCacher *_oc, sobject_t o, inodeno_t i, ceph_object_layout& l) : + Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, ceph_object_layout& l) : oc(_oc), - oid(o), ino(i), layout(l), + oid(o), oset(os), set_item(this), layout(l), last_write_tid(0), last_ack_tid(0), last_commit_tid(0), uncommitted_item(this), - lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) - {} + lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) { + // add to set + os->objects.push_back(&set_item); + } ~Object() { assert(data.empty()); + set_item.remove_myself(); } sobject_t get_soid() { return oid; } object_t get_oid() { return oid.oid; } snapid_t get_snap() { return oid.snap; } - inodeno_t get_ino() { return ino; } + ObjectSet *get_object_set() { return oset; } ceph_object_layout& get_layout() { return layout; } void set_layout(ceph_object_layout& l) { layout = l; } @@ -235,6 +244,22 @@ class ObjectCacher { }; + + struct ObjectSet { + void *parent; + + inodeno_t ino; + __u64 truncate_seq, truncate_size; + + xlist objects; + xlist uncommitted; + + int dirty_tx; + + ObjectSet(void *p, inodeno_t i) : parent(p), ino(i), truncate_seq(0), truncate_size(0), dirty_tx(0) {} + }; + + // ******* ObjectCacher ********* // ObjectCacher fields public: @@ -248,9 +273,6 @@ class ObjectCacher { void *flush_set_callback_arg; hash_map objects; - hash_map > objects_by_ino; - hash_map dirty_tx_by_ino; - hash_map > uncommitted_by_ino; set dirty_bh; LRU lru_dirty, lru_rest; @@ -270,22 +292,21 @@ class ObjectCacher { // objects - Object *get_object_maybe(sobject_t oid, inodeno_t ino, ceph_object_layout &l) { + Object *get_object_maybe(sobject_t oid, ceph_object_layout &l) { // have it? if (objects.count(oid)) return objects[oid]; return NULL; } - Object *get_object(sobject_t oid, inodeno_t ino, ceph_object_layout &l) { + Object *get_object(sobject_t oid, ObjectSet *oset, ceph_object_layout &l) { // have it? if (objects.count(oid)) return objects[oid]; // create it. - Object *o = new Object(this, oid, ino, l); + Object *o = new Object(this, oid, oset, l); objects[oid] = o; - objects_by_ino[ino].insert(o); return o; } void close_object(Object *ob); @@ -306,11 +327,11 @@ class ObjectCacher { case BufferHead::STATE_CLEAN: stat_clean += bh->length(); break; case BufferHead::STATE_DIRTY: stat_dirty += bh->length(); - dirty_tx_by_ino[bh->ob->get_ino()] += bh->length(); + bh->ob->oset->dirty_tx += bh->length(); break; case BufferHead::STATE_TX: stat_tx += bh->length(); - dirty_tx_by_ino[bh->ob->get_ino()] += bh->length(); + bh->ob->oset->dirty_tx += bh->length(); break; case BufferHead::STATE_RX: stat_rx += bh->length(); break; } @@ -322,11 +343,11 @@ class ObjectCacher { case BufferHead::STATE_CLEAN: stat_clean -= bh->length(); break; case BufferHead::STATE_DIRTY: stat_dirty -= bh->length(); - dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length(); + bh->ob->oset->dirty_tx -= bh->length(); break; case BufferHead::STATE_TX: stat_tx -= bh->length(); - dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length(); + bh->ob->oset->dirty_tx -= bh->length(); break; case BufferHead::STATE_RX: stat_rx -= bh->length(); break; } @@ -504,12 +525,12 @@ class ObjectCacher { class C_RetryRead : public Context { ObjectCacher *oc; OSDRead *rd; - inodeno_t ino; + ObjectSet *oset; Context *onfinish; public: - C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {} + C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {} void finish(int) { - int r = oc->readx(rd, ino, onfinish); + int r = oc->readx(rd, oset, onfinish); if (r > 0 && onfinish) { onfinish->finish(r); delete onfinish; @@ -520,87 +541,87 @@ class ObjectCacher { // non-blocking. async. - int readx(OSDRead *rd, inodeno_t ino, Context *onfinish); - int writex(OSDWrite *wr, inodeno_t ino); - bool is_cached(inodeno_t ino, vector& extents, snapid_t snapid); + int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish); + int writex(OSDWrite *wr, ObjectSet *oset); + bool is_cached(ObjectSet *oset, vector& extents, snapid_t snapid); // write blocking bool wait_for_write(__u64 len, Mutex& lock); // blocking. atomic+sync. - int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock); - int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock); + int atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock); + int atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock); - bool set_is_cached(inodeno_t ino); - bool set_is_dirty_or_committing(inodeno_t ino); + bool set_is_cached(ObjectSet *oset); + bool set_is_dirty_or_committing(ObjectSet *oset); - bool flush_set(inodeno_t ino, Context *onfinish=0); + bool flush_set(ObjectSet *oset, Context *onfinish=0); void flush_all(Context *onfinish=0); - bool commit_set(inodeno_t ino, Context *oncommit); + bool commit_set(ObjectSet *oset, Context *oncommit); void commit_all(Context *oncommit=0); - void purge_set(inodeno_t ino); + void purge_set(ObjectSet *oset); - loff_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean) + loff_t release_set(ObjectSet *oset); // returns # of bytes not released (ie non-clean) __u64 release_all(); - void truncate_set(inodeno_t ino, vector& ex); + void truncate_set(ObjectSet *oset, vector& ex); - void kick_sync_writers(inodeno_t ino); - void kick_sync_readers(inodeno_t ino); + void kick_sync_writers(ObjectSet *oset); + void kick_sync_readers(ObjectSet *oset); // file functions /*** async+caching (non-blocking) file interface ***/ - int file_is_cached(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid, + int file_is_cached(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid, loff_t offset, __u64 len) { vector extents; - filer.file_to_extents(ino, layout, offset, len, extents); - return is_cached(ino, extents, snapid); + filer.file_to_extents(oset->ino, layout, offset, len, extents); + return is_cached(oset, extents, snapid); } - int file_read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid, + int file_read(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid, loff_t offset, __u64 len, bufferlist *bl, int flags, Context *onfinish) { OSDRead *rd = prepare_read(snapid, bl, flags); - filer.file_to_extents(ino, layout, offset, len, rd->extents); - return readx(rd, ino, onfinish); + filer.file_to_extents(oset->ino, layout, offset, len, rd->extents); + return readx(rd, oset, onfinish); } - int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc, + int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, loff_t offset, __u64 len, bufferlist& bl, utime_t mtime, int flags) { OSDWrite *wr = prepare_write(snapc, bl, mtime, flags); - filer.file_to_extents(ino, layout, offset, len, wr->extents); - return writex(wr, ino); + filer.file_to_extents(oset->ino, layout, offset, len, wr->extents); + return writex(wr, oset); } /*** sync+blocking file interface ***/ - int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout, + int file_atomic_sync_read(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid, loff_t offset, __u64 len, bufferlist *bl, int flags, Mutex &lock) { OSDRead *rd = prepare_read(snapid, bl, flags); - filer.file_to_extents(ino, layout, offset, len, rd->extents); - return atomic_sync_readx(rd, ino, lock); + filer.file_to_extents(oset->ino, layout, offset, len, rd->extents); + return atomic_sync_readx(rd, oset, lock); } - int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout, + int file_atomic_sync_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, loff_t offset, __u64 len, bufferlist& bl, utime_t mtime, int flags, Mutex &lock) { OSDWrite *wr = prepare_write(snapc, bl, mtime, flags); - filer.file_to_extents(ino, layout, offset, len, wr->extents); - return atomic_sync_writex(wr, ino, lock); + filer.file_to_extents(oset->ino, layout, offset, len, wr->extents); + return atomic_sync_writex(wr, oset, lock); } }; @@ -625,7 +646,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob) { out << "object[" - << ob.get_soid() << " ino " << hex << ob.get_ino() << dec + << ob.get_soid() << " oset " << ob.oset << dec << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid; switch (ob.lock_state) { diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 8042a39eeaac..789bc13fbda8 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -329,7 +329,7 @@ public: struct StatfsOp { tid_t tid; - ceph_statfs *stats; + struct ceph_statfs *stats; Context *onfinish; utime_t last_submit; @@ -653,7 +653,7 @@ private: void fs_stats_submit(StatfsOp *op); public: void handle_fs_stats_reply(MStatfsReply *m); - void get_fs_stats(ceph_statfs& result, Context *onfinish); + void get_fs_stats(struct ceph_statfs& result, Context *onfinish); // --------------------------- // some scatter/gather hackery