-class C_Client_CloseRelease : public Context {
- Client *cl;
- Inode *in;
-public:
- C_Client_CloseRelease(Client *c, Inode *i) : cl(c), in(i) {
- in->get();
- }
- void finish(int) {
- cl->close_release(in);
- }
-};
-
-
-
ostream& operator<<(ostream &out, Inode &in)
{
out << in.inode.ino << "("
}
if (!dn) {
- Inode *in = new Inode(ist->ino, &ist->layout, objectcacher);
+ Inode *in = new Inode(ist->ino, &ist->layout);
inode_map[ist->ino] = in;
dn = link(dir, dname, in);
dout(12) << " new dentry+node with ino " << ist->ino << dendl;
Inode *curi = 0;
inodeno_t ino = ist[0].ino;
if (!root && ino == 1) {
- curi = root = new Inode(ino, &ist[0].layout, objectcacher);
+ curi = root = new Inode(ino, &ist[0].layout);
dout(10) << "insert_trace new root is " << root << dendl;
inode_map[ino] = root;
root->dir_auth = 0;
// flush dirty data (from objectcache)
+void Client::_release(Inode *in, bool checkafter)
+{
+ if (in->cap_refs[CEPH_CAP_RDCACHE]) {
+ objectcacher->release_set(in->inode.ino);
+ if (checkafter)
+ put_cap_ref(in, CEPH_CAP_RDCACHE);
+ else
+ in->put_cap_ref(CEPH_CAP_RDCACHE);
+ }
+}
+
struct C_Flush : public Context {
Client *client;
Inode *in;
- C_Flush(Client *c, Inode *i) : client(c), in(i) {}
+ bool checkafter;
+ C_Flush(Client *c, Inode *i, bool ch) : client(c), in(i), checkafter(ch) {}
void finish(int r) {
- client->_flushed(in);
+ client->_flushed(in, checkafter);
}
};
-void Client::_flush(Inode *in)
+void Client::_flush(Inode *in, bool checkafter)
{
dout(10) << "_flush " << *in << dendl;
if (in->cap_refs[CEPH_CAP_WRBUFFER] == 1) {
in->get_cap_ref(CEPH_CAP_WRBUFFER); // for the (one!) waiter
- in->fc.flush_dirty(0);
- in->fc.add_safe_waiter(new C_Flush(this, in));
+
+ Context *c = new C_Flush(this, in, checkafter);
+ bool safe = objectcacher->commit_set(in->inode.ino, c);
+ if (safe) {
+ c->finish(0);
+ delete c;
+ }
}
}
-void Client::_flushed(Inode *in)
+void Client::_flushed(Inode *in, bool checkafter)
{
dout(10) << "_flushed " << *in << dendl;
assert(in->cap_refs[CEPH_CAP_WRBUFFER] == 2);
+
+ // release clean pages too, if we dont hold RDCACHE reference
+ if (in->cap_refs[CEPH_CAP_RDCACHE] == 0)
+ objectcacher->release_set(in->inode.ino);
+
put_cap_ref(in, CEPH_CAP_WRBUFFER);
- put_cap_ref(in, CEPH_CAP_WRBUFFER);
+ if (checkafter)
+ put_cap_ref(in, CEPH_CAP_WRBUFFER);
+ else
+ in->put_cap_ref(CEPH_CAP_WRBUFFER);
}
<< " size " << in->inode.size << " -> " << m->get_size()
<< dendl;
// trim filecache?
- if (g_conf.client_oc)
- in->fc.truncate(in->inode.size, m->get_size());
+ if (g_conf.client_oc &&
+ m->get_size() < in->inode.size) {
+ // map range to objects
+ list<ObjectExtent> ls;
+ filer->file_to_extents(in->inode.ino, &in->inode.layout,
+ m->get_size(), in->inode.size - m->get_size(),
+ ls);
+ objectcacher->truncate_set(in->inode.ino, ls);
+ }
in->inode.size = m->get_size();
delete m;
cap.issued = new_caps;
if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
- in->fc.release_clean();
+ _release(in, false);
if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
- _flush(in);
+ _flush(in, false);
else {
ack = true;
cap.implemented = new_caps;
p++) {
Inode *in = p->second;
if (!in->caps.empty()) {
- in->fc.release_clean();
- if (in->fc.is_dirty()) {
- dout(10) << "unmount residual caps on " << in->ino() << ", flushing" << dendl;
- in->fc.empty(new C_Client_CloseRelease(this, in));
- } else {
- dout(10) << "unmount residual caps on " << in->ino() << ", releasing" << dendl;
- check_caps(in);
- }
+ _release(in);
+ _flush(in);
}
}
}
if (!dn)
in->get_open_ref(f->mode); // i may have alrady added it above!
- dout(10) << in->inode.ino << " mode " << cmode
- << " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
+ dout(10) << in->inode.ino << " mode " << cmode << dendl;
// add the cap
int mds = reply->get_source().num();
if (new_caps & ~old_caps)
signal_cond_list(in->waitfor_caps);
- if (g_conf.client_oc)
- in->fc.set_caps(new_caps);
-
} else {
dout(7) << "open got SAME caps " << cap_string(new_caps)
<< " for " << in->ino()
-void Client::close_release(Inode *in)
-{
- dout(10) << "close_release on " << in->ino() << dendl;
- dout(10) << in->inode.ino
- << " dirty " << in->fc.is_dirty() << " cached " << in->fc.is_cached() << dendl;
-
- check_caps(in);
- put_inode(in);
-}
-
-void Client::close_safe(Inode *in)
-{
- dout(10) << "close_safe on " << in->ino() << dendl;
- put_inode(in);
- if (unmounting)
- mount_cond.Signal();
-}
-
int Client::close(int fd)
{
bool lazy = f->mode == CEPH_FILE_MODE_LAZY;
// wait for RD cap and/or a valid file size
+ int issued;
while (1) {
+ issued = in->caps_issued();
if (lazy) {
// wait for lazy cap
- if ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
+ if ((issued & CEPH_CAP_LAZYIO) == 0) {
dout(7) << " don't have lazy cap, waiting" << dendl;
goto wait;
}
} else {
// wait for RD cap?
- while ((in->caps_issued() & CEPH_CAP_RD) == 0) {
+ while ((issued & CEPH_CAP_RD) == 0) {
dout(7) << " don't have read cap, waiting" << dendl;
goto wait;
}
}
// async i/o?
- if ((in->caps_issued() & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
+ if ((issued & (CEPH_CAP_WRBUFFER|CEPH_CAP_RDCACHE))) {
// FIXME: this logic needs to move info FileCache!
}
break;
} else {
- // unbuffered, sync i/o. defer to osd.
+ // unbuffered, sync i/o. we will defer to osd.
break;
}
-
+
wait:
wait_on_list(in->waitfor_caps);
}
-
+
in->get_cap_ref(CEPH_CAP_RD);
+
+ int rvalue = 0;
+ Cond cond;
+ bool done = false;
+ Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
int r = 0;
- int rvalue = 0;
-
if (g_conf.client_oc) {
- // object cache ON
- rvalue = r = in->fc.read(offset, size, *bl, client_lock); // may block.
+
+ if (issued & CEPH_CAP_RDCACHE) {
+ // we will populate the cache here
+ if (in->cap_refs[CEPH_CAP_RDCACHE] == 0)
+ in->get_cap_ref(CEPH_CAP_RDCACHE);
+
+ // read (and possibly block)
+ r = objectcacher->file_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, onfinish);
+
+ if (r == 0) {
+ while (!done)
+ cond.Wait(client_lock);
+ r = rvalue;
+ } else {
+ // it was cached.
+ delete onfinish;
+ }
+ } else {
+ r = objectcacher->file_atomic_sync_read(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
+ }
+
} else {
- // object cache OFF -- legacy inconsistent way.
+ // object cache OFF -- non-atomic sync read from osd
// do sync read
- Cond cond;
- bool done = false;
- Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
-
Objecter::OSDRead *rd = filer->prepare_read(in->inode, offset, size, bl, 0);
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
rd->flags |= CEPH_OSD_OP_BALANCE_READS;
r = objecter->readx(rd, onfinish);
assert(r >= 0);
- // wait!
while (!done)
cond.Wait(client_lock);
+ r = rvalue;
}
if (movepos) {
// copy into fresh buffer (since our write may be resub, async)
bufferptr bp;
if (size > 0) bp = buffer::copy(buf, size);
- bufferlist blist;
- blist.push_back( bp );
+ bufferlist bl;
+ bl.push_back( bp );
// request larger max_size?
__u64 endoff = offset + size;
in->get_cap_ref(CEPH_CAP_WRBUFFER);
// wait? (this may block!)
- oc->wait_for_write(size, client_lock);
+ objectcacher->wait_for_write(size, client_lock);
// async, caching, non-blocking.
- oc->file_write(ino, &layout, offset, size, blist, 0);
+ objectcacher->file_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0);
} else {
// atomic, synchronous, blocking.
- oc->file_atomic_sync_write(ino, &layout, offset, size, blist, 0, client_lock);
+ objectcacher->file_atomic_sync_write(in->inode.ino, &in->inode.layout, offset, size, bl, 0, client_lock);
}
} else {
// simple, non-atomic sync write
unsafe_sync_write++;
in->get_cap_ref(CEPH_CAP_WRBUFFER);
- filer->write(in->inode, offset, size, blist, 0,
- onfinish, onsafe);
+ filer->write(in->inode, offset, size, bl, 0, onfinish, onsafe);
while (!done)
cond.Wait(client_lock);
Inode *in = f->inode;
- dout(3) << "_fsync(" << f << ", " << (syndataonly ? "dataonly)":"data+metadata)") << dendl;
+ dout(3) << "_fsync(" << f << ", " << (syncdataonly ? "dataonly)":"data+metadata)") << dendl;
// metadata?
if (!syncdataonly)
assert(fd_map.count(fd));
Fh *f = fd_map[fd];
- Inode *in = f->inode;
- if (f->mode & CEPH_FILE_MODE_LAZY) {
- // wait for lazy cap
- while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
- dout(7) << " don't have lazy cap, waiting" << dendl;
- wait_on_list(in->waitfor_caps);
- }
-
- if (g_conf.client_oc) {
- Cond cond;
- bool done = false;
- in->fc.flush_dirty(new C_SafeCond(&client_lock, &cond, &done));
-
- while (!done)
- cond.Wait(client_lock);
-
- } else {
- // mmm, nothin to do.
- }
- }
+ // for now
+ _fsync(f, true);
client_lock.Unlock();
return 0;
Fh *f = fd_map[fd];
Inode *in = f->inode;
- if (f->mode & CEPH_FILE_MODE_LAZY) {
- // wait for lazy cap
- while ((in->caps_issued() & CEPH_CAP_LAZYIO) == 0) {
- dout(7) << " don't have lazy cap, waiting" << dendl;
- wait_on_list(in->waitfor_caps);
- }
-
- if (g_conf.client_oc) {
- in->fc.flush_dirty(0); // flush to invalidate.
- in->fc.release_clean();
- } else {
- // mm, nothin to do.
- }
- }
+ _fsync(f, true);
+ _release(in);
client_lock.Unlock();
return 0;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "include/types.h"
-
-#include "FileCache.h"
-#include "osdc/ObjectCacher.h"
-
-#include "msg/Messenger.h"
-
-#include "config.h"
-#define dout(x) if (x <= g_conf.debug_client) *_dout << dbeginl << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
-#define derr(x) if (x <= g_conf.debug_client) *_derr << dbeginl << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
-
-
-
-// flush/release/clean
-
-void FileCache::flush_dirty(Context *onflush)
-{
- if (oc->flush_set(ino, onflush)) {
- onflush->finish(0);
- delete onflush;
- }
-}
-
-off_t FileCache::release_clean()
-{
- return oc->release_set(ino);
-}
-
-bool FileCache::is_cached()
-{
- return oc->set_is_cached(ino);
-}
-
-bool FileCache::is_dirty()
-{
- return oc->set_is_dirty_or_committing(ino);
-}
-
-void FileCache::empty(Context *onempty)
-{
- off_t unclean = release_clean();
- bool clean = oc->flush_set(ino, onempty);
- assert(!unclean == clean);
-
- if (clean) {
- onempty->finish(0);
- delete onempty;
- }
-}
-
-
-void FileCache::tear_down()
-{
- off_t unclean = release_clean();
- if (unclean) {
- dout(0) << "tear_down " << unclean << " unclean bytes, purging" << dendl;
- oc->purge_set(ino);
- }
-}
-
-// truncate
-
-void FileCache::truncate(off_t olds, off_t news)
-{
- dout(5) << "truncate " << olds << " -> " << news << dendl;
-
- // map range to objects
- list<ObjectExtent> ls;
- oc->filer.file_to_extents(ino, &layout, news, olds-news, ls);
- oc->truncate_set(ino, ls);
-}
-
-// caps
-
-class C_FC_CheckCaps : public Context {
- FileCache *fc;
-public:
- C_FC_CheckCaps(FileCache *f) : fc(f) {}
- void finish(int r) {
- fc->check_caps();
- }
-};
-
-void FileCache::set_caps(int caps, Context *onimplement)
-{
- if (onimplement) {
- dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << dendl;
- assert(latest_caps & ~caps); // we should be losing caps.
- caps_callbacks[caps].push_back(onimplement);
- }
-
- latest_caps = caps;
- check_caps();
-
- // kick waiters? (did we gain caps?)
- if (can_read() && !waitfor_read.empty())
- for (set<Cond*>::iterator p = waitfor_read.begin();
- p != waitfor_read.end();
- ++p)
- (*p)->Signal();
- if (can_write() && !waitfor_write.empty())
- for (set<Cond*>::iterator p = waitfor_write.begin();
- p != waitfor_write.end();
- ++p)
- (*p)->Signal();
-
-}
-
-int FileCache::get_used_caps()
-{
- int used = 0;
- if (num_reading) used |= CEPH_CAP_RD;
- if (oc->set_is_cached(ino)) used |= CEPH_CAP_RDCACHE;
- if (num_writing) used |= CEPH_CAP_WR;
- if (oc->set_is_dirty_or_committing(ino)) used |= CEPH_CAP_WRBUFFER;
- return used;
-}
-
-void FileCache::check_caps()
-{
- // calc used
- int used = get_used_caps();
- dout(10) << "check_caps used was " << cap_string(used) << dendl;
-
- // try to implement caps?
- // BUG? latest_caps, not least caps i've seen?
- if ((latest_caps & CEPH_CAP_RDCACHE) == 0 &&
- (used & CEPH_CAP_RDCACHE))
- release_clean();
- if ((latest_caps & CEPH_CAP_WRBUFFER) == 0 &&
- (used & CEPH_CAP_WRBUFFER))
- flush_dirty(new C_FC_CheckCaps(this));
-
- used = get_used_caps();
- dout(10) << "check_caps used now " << cap_string(used) << dendl;
-
- // check callbacks
- map<int, list<Context*> >::iterator p = caps_callbacks.begin();
- while (p != caps_callbacks.end()) {
- if (used == 0 || (~(p->first) & used) == 0) {
- // implemented.
- dout(10) << "check_caps used is " << cap_string(used)
- << ", caps " << cap_string(p->first) << " implemented, doing callback(s)" << dendl;
- finish_contexts(p->second);
- map<int, list<Context*> >::iterator o = p;
- p++;
- caps_callbacks.erase(o);
- } else {
- dout(10) << "check_caps used is " << cap_string(used)
- << ", caps " << cap_string(p->first) << " not yet implemented" << dendl;
- p++;
- }
- }
-}
-
-
-
-// read/write
-
-int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
-{
- int r = 0;
-
- // can i read?
- while ((latest_caps & CEPH_CAP_RD) == 0) {
- dout(10) << "read doesn't have RD cap, blocking" << dendl;
- Cond c;
- waitfor_read.insert(&c);
- c.Wait(client_lock);
- waitfor_read.erase(&c);
- }
-
- // inc reading counter
- num_reading++;
-
- if (latest_caps & CEPH_CAP_RDCACHE) {
- // read (and block)
- Cond cond;
- bool done = false;
- int rvalue = 0;
- C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
-
- r = oc->file_read(ino, &layout, offset, size, &blist, 0, onfinish);
-
- if (r == 0) {
- // block
- while (!done)
- cond.Wait(client_lock);
- r = rvalue;
- } else {
- // it was cached.
- delete onfinish;
- }
- } else {
- r = oc->file_atomic_sync_read(ino, &layout, offset, size, &blist, 0, client_lock);
- }
-
- // dec reading counter
- num_reading--;
-
- if (num_reading == 0 && !caps_callbacks.empty())
- check_caps();
-
- return r;
-}
-
-void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
-{
- // inc writing counter
- num_writing++;
-
- if (size > 0) {
- if (latest_caps & CEPH_CAP_WRBUFFER) { // caps buffered write?
- // wait? (this may block!)
- oc->wait_for_write(size, client_lock);
-
- // async, caching, non-blocking.
- oc->file_write(ino, &layout, offset, size, blist, 0);
- } else {
- // atomic, synchronous, blocking.
- oc->file_atomic_sync_write(ino, &layout, offset, size, blist, 0, client_lock);
- }
- }
-
- // dec writing counter
- num_writing--;
- if (num_writing == 0 && !caps_callbacks.empty())
- check_caps();
-}
-
-bool FileCache::all_safe()
-{
- return !oc->set_is_dirty_or_committing(ino);
-}
-
-void FileCache::add_safe_waiter(Context *c)
-{
- bool safe = oc->commit_set(ino, c);
- if (safe) {
- c->finish(0);
- delete c;
- }
-}