} else {
dout(10) << "unmounting: trim pass, size still " << lru.lru_get_size()
<< "+" << inode_map.size() << endl;
+ dump_cache();
}
}
// caps included?
int mds = MSG_ADDR_NUM(reply->get_source());
- if (f->inode->caps.empty()) // first caps?
+ if (f->inode->caps.empty()) {// first caps?
+ dout(7) << " first caps on " << hex << f->inode->inode.ino << dec << endl;
f->inode->get();
+ }
int new_caps = reply->get_file_caps();
new_caps &= CAP_FILE_WR|CAP_FILE_RD; // HACK: test synchronous read/write
// create a buffer that refers to *buf, but doesn't try to free it when it's done.
bufferlist blist;
blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
-
+
// issue write
Cond cond;
bool done = false;
void get() {
ref++;
- //cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl;
+ cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl;
}
void put() {
ref--; assert(ref >= 0);
- //cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl;
+ cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl;
}
Inode(inode_t _inode, ObjectCacher *_oc) :
void FileCache::flush_dirty(Context *onflush)
{
- oc->flush_set(inode.ino, onflush);
+ if (oc->flush_set(inode.ino, onflush)) {
+ onflush->finish(0);
+ delete onflush;
+ }
}
off_t FileCache::release_clean()
void FileCache::empty(Context *onempty)
{
off_t unclean = release_clean();
- bool clean = oc->flush_set(inode.ino);
+ bool clean = oc->flush_set(inode.ino, onempty);
assert(!unclean == clean);
-
+
if (clean) {
onempty->finish(0);
delete onempty;
- } else {
- clean = oc->flush_set(inode.ino, onempty);
- assert(!clean);
}
}
if (num_writing == 0 && !caps_callbacks.empty())
check_caps();
}
+
+bool FileCache::all_safe()
+{
+ return !oc->set_is_dirty_or_committing(inode.ino);
+}
+
+void FileCache::add_safe_waiter(Context *c)
+{
+ bool safe = oc->commit_set(inode.ino, c);
+ if (safe) {
+ c->finish(0);
+ delete c;
+ }
+}
int num_reading;
int num_writing;
- int num_unsafe;
+ //int num_unsafe;
// waiters
list<Cond*> waitfor_read;
list<Cond*> waitfor_write;
- list<Context*> waitfor_safe;
+ //list<Context*> waitfor_safe;
bool waitfor_release;
public:
oc(_oc),
inode(_inode),
latest_caps(0),
- num_reading(0), num_writing(0), num_unsafe(0),
+ num_reading(0), num_writing(0),// num_unsafe(0),
waitfor_release(false) {}
// waiters/waiting
bool can_read() { return latest_caps & CAP_FILE_RD; }
bool can_write() { return latest_caps & CAP_FILE_WR; }
- bool all_safe() { return num_unsafe == 0; }
+ bool all_safe();// { return num_unsafe == 0; }
void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
- void add_safe_waiter(Context *c) { waitfor_safe.push_back(c); }
+ void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
// ...
void flush_dirty(Context *onflush=0);
// relative time (from startup)
const utime_t& now() {
gettimeofday(&last.timeval(), NULL);
- last -= zero;
+ //last -= zero;
//last = abs_last - start_offset;
return last;
}
debug_mds_log: 1,
debug_buffer: 0,
debug_filer: 0,
+ debug_objecter: 0,
debug_objectcacher: 0,
debug_client: 0,
debug_osd: 0,
g_conf.debug_filer = atoi(args[++i]);
else
g_debug_after_conf.debug_filer = atoi(args[++i]);
+ else if (strcmp(args[i], "--debug_objecter") == 0)
+ if (!g_conf.debug_after)
+ g_conf.debug_objecter = atoi(args[++i]);
+ else
+ g_debug_after_conf.debug_objecter = atoi(args[++i]);
else if (strcmp(args[i], "--debug_objectcacher") == 0)
if (!g_conf.debug_after)
g_conf.debug_objectcacher = atoi(args[++i]);
int debug_mds_log;
int debug_buffer;
int debug_filer;
+ int debug_objecter;
int debug_objectcacher;
int debug_client;
int debug_osd;
// expire -- expire a single item
- LRUObject *lru_expire() {
+ LRUObject *lru_get_next_expire() {
LRUObject *p;
-
+
// look through tail of bot
while (lru_bot.get_length()) {
p = lru_bot.get_tail();
-
- if (!p->lru_pinned)
- return lru_remove(p); // yay.
+ if (!p->lru_pinned) return p;
// move to pintail
lru_bot.remove(p);
// ok, try head then
while (lru_top.get_length()) {
p = lru_top.get_tail();
- if (!p->lru_pinned)
- return lru_remove( p );
+ if (!p->lru_pinned) return p;
// move to pintail
lru_top.remove(p);
// no luck!
return NULL;
}
+
+ LRUObject *lru_expire() {
+ LRUObject *p = lru_get_next_expire();
+ if (p)
+ return lru_remove(p);
+ return NULL;
+ }
void lru_status() {
}
Object *ob = objects[oid];
+
+ list<Context*> ls;
assert(tid <= ob->last_write_tid);
if (ob->last_write_tid == tid) {
dout(10) << "lock_ack " << *ob
<< " tid " << tid << endl;
- list<Context*> ls;
-
switch (ob->lock_state) {
case Object::LOCK_RDUNLOCKING:
case Object::LOCK_WRUNLOCKING:
ob->last_ack_tid = tid;
- finish_contexts(ls);
-
if (ob->can_close())
close_object(ob);
} else {
dout(10) << "lock_ack " << *ob
<< " tid " << tid << " obsolete" << endl;
}
+
+ // waiters?
+ if (ob->waitfor_ack.count(tid)) {
+ ls.splice(ls.end(), ob->waitfor_ack[tid]);
+ ob->waitfor_ack.erase(tid);
+ }
+
+ finish_contexts(ls);
+
}
}
}
+void ObjectCacher::flush()
+{
+ utime_t cutoff = g_clock.now();
+ //cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age;
+
+ dout(10) << "flush" << endl;
+
+ while (1) {
+ BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire();
+ if (!bh) break;
+ if (bh->last_write > cutoff) break;
+
+ bh_write(bh->ob, bh);
+ }
+}
+
void ObjectCacher::trim(off_t max)
{
if (max < 0)
int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
{
+ utime_t now = g_clock.now();
+
for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
ex_it != wr->extents.end();
ex_it++) {
// it's dirty.
mark_dirty(bh);
+ touch_bh(bh);
+ bh->last_write = now;
}
+ delete wr;
+
trim();
return 0;
}
Object *o = get_object(i->first, ino);
rdlock(o);
}
-
+
+ // readx will hose rd
+ list<ObjectExtent> extents = rd->extents;
+
// do the read, into our cache
Cond cond;
bool done = false;
while (!done) cond.Wait(lock);
// release the locks
- for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
- ex_it != rd->extents.end();
+ for (list<ObjectExtent>::iterator ex_it = extents.begin();
+ ex_it != extents.end();
ex_it++) {
assert(objects.count(ex_it->oid));
Object *o = objects[ex_it->oid];
wrlock(o);
}
+ // writex will hose wr
+ list<ObjectExtent> extents = wr->extents;
+
// do the write, into our cache
writex(wr, ino);
// flush
// ...and release the locks?
- for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
- ex_it != wr->extents.end();
+ for (list<ObjectExtent>::iterator ex_it = extents.begin();
+ ex_it != extents.end();
ex_it++) {
assert(objects.count(ex_it->oid));
Object *o = objects[ex_it->oid];
wrunlock(o);
}
-
+
return 0;
}
}
// flush. non-blocking, takes callback.
-// returns true if already flushed, and deletes the callback.
+// returns true if already flushed
bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
{
if (objects_by_ino.count(ino) == 0) {
dout(10) << "flush_set on " << hex << ino << dec << " dne" << endl;
- delete onfinish;
return true;
}
C_Gather *gather = 0; // we'll need to wait for all objects to flush!
set<Object*>& s = objects_by_ino[ino];
+ bool safe = true;
for (set<Object*>::iterator i = s.begin();
i != s.end();
i++) {
if (!flush(ob)) {
// we'll need to gather...
- if (!gather)
+ if (!gather && onfinish)
gather = new C_Gather(onfinish);
+ safe = false;
dout(10) << "flush_set " << hex << ino << dec << " will wait for ack tid "
<< ob->last_write_tid
<< " on " << *ob
<< endl;
- ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub());
+ if (gather)
+ ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub());
}
}
- if (!gather) {
+ if (safe) {
dout(10) << "flush_set " << hex << ino << dec << " has no dirty|tx bhs" << endl;
- delete onfinish;
return true;
}
return false;
if (objects_by_ino.count(ino) == 0) {
dout(10) << "commit_set on " << hex << ino << dec << " dne" << endl;
- delete onfinish;
return true;
}
C_Gather *gather = 0; // we'll need to wait for all objects to commit
set<Object*>& s = objects_by_ino[ino];
+ bool safe = true;
for (set<Object*>::iterator i = s.begin();
i != s.end();
i++) {
// make sure it's flushing.
flush_set(ino);
- if (ob->last_write_tid < ob->last_commit_tid) {
+ if (ob->last_write_tid > ob->last_commit_tid) {
dout(10) << "commit_set " << hex << ino << dec << " " << *ob
<< " will finish on commit tid " << ob->last_write_tid
<< endl;
- if (!gather) gather = new C_Gather(onfinish);
- ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() );
+ if (!gather && onfinish) gather = new C_Gather(onfinish);
+ safe = false;
+ if (gather)
+ ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() );
}
}
- if (!gather) {
+ if (safe) {
dout(10) << "commit_set " << hex << ino << dec << " all committed" << endl;
- delete onfinish;
return true;
}
return false;
Object *ob;
bufferlist bl;
tid_t last_write_tid; // version of bh (if non-zero)
+ utime_t last_write;
map< off_t, list<Context*> > waitfor_read;
void bh_write(Object *ob, BufferHead *bh);
void trim(off_t max=-1);
+ void flush();
bool flush(Object *o);
off_t release(Object *o);
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << messenger->get_myaddr() << ".objecter "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter "
// messages ------------------------------