if (in->file_wr_mtime > in->inode.mtime)
m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime;
+
+
if (g_conf.client_oc) {
// caching on, use FileCache.
Context *onimplement = 0;
assert(diri);
assert(diri->inode.mode & INODE_MODE_DIR);
- // add . and ..?
- string dot(".");
- contents[dot] = diri->inode;
- if (diri != root) {
- string dotdot("..");
- contents[dotdot] = diri->dn->dir->parent_inode->inode;
- }
+ // add . and ..?
+ string dot(".");
+ contents[dot] = diri->inode;
+ if (diri != root) {
+ string dotdot("..");
+ contents[dotdot] = diri->dn->dir->parent_inode->inode;
+ }
if (!reply->get_dir_in().empty()) {
// only open dir if we're actually adding stuff to it!
for (list<InodeStat*>::const_iterator pin = reply->get_dir_in().begin();
pin != reply->get_dir_in().end();
++pin, ++pdn) {
-
- if (*pdn == ".")
- continue;
-
- // count entries
+
+ if (*pdn == ".")
+ continue;
+
+ // count entries
res++;
// put in cache
// contents to caller too!
contents[*pdn] = in->inode;
}
- if (dir->is_empty())
- close_dir(dir);
+
+ if (dir->is_empty())
+ close_dir(dir);
}
- // add .. too?
- //if (diri != root && diri->dn && diri->dn->dir) {
- //Inode *parent = diri->dn->dir->parent_inode;
- //contents[".."] = parent->inode;
- //}
// FIXME: remove items in cache that weren't in my readdir?
// ***
tout << size << endl;
tout << offset << endl;
- //assert(offset >= 0);
assert(fh_map.count(fh));
Fh *f = fh_map[fh];
Inode *in = f->inode;
ExtCap *read_ext_cap = capcache->get_cache_cap(in->ino(), uid);
assert(read_ext_cap);
- // do we have read file cap?
- while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
- dout(7) << " don't have read cap, waiting" << endl;
- Cond cond;
- in->waitfor_read.push_back(&cond);
- cond.Wait(client_lock);
- }
- // lazy cap?
- while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
- dout(7) << " don't have lazy cap, waiting" << endl;
- Cond cond;
- in->waitfor_lazy.push_back(&cond);
- cond.Wait(client_lock);
- }
-
// determine whether read range overlaps with file
// ...ONLY if we're doing async io
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
rvalue = r = in->fc.read(offset, size, blist, client_lock, read_ext_cap); // may block.
} else {
// object cache OFF -- legacy inconsistent way.
+
+ // do we have read file cap?
+ while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
+ dout(7) << " don't have read cap, waiting" << endl;
+ Cond cond;
+ in->waitfor_read.push_back(&cond);
+ cond.Wait(client_lock);
+ }
+ // lazy cap?
+ while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+ dout(7) << " don't have lazy cap, waiting" << endl;
+ Cond cond;
+ in->waitfor_lazy.push_back(&cond);
+ cond.Wait(client_lock);
+ }
+
+ // do sync read
Cond cond;
bool done = false;
C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
tout << size << endl;
tout << offset << endl;
- //assert(offset >= 0);
assert(fh_map.count(fh));
Fh *f = fh_map[fh];
Inode *in = f->inode;
dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl;
- //ExtCap *write_ext_cap = in->get_ext_cap(uid);
- ExtCap *write_ext_cap = capcache->get_cache_cap(in->ino(), uid);
- assert(write_ext_cap);
-
- // do we have write file cap?
- while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
- dout(7) << " don't have write cap, waiting" << endl;
- Cond cond;
- in->waitfor_write.push_back(&cond);
- cond.Wait(client_lock);
- }
- while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
- dout(7) << " don't have lazy cap, waiting" << endl;
- Cond cond;
- in->waitfor_lazy.push_back(&cond);
- cond.Wait(client_lock);
- }
-
- // adjust fd pos
- f->pos = offset+size;
+ ExtCap *write_ext_cap = capcache->get_cache_cap(in->ino(), uid);
+ assert(write_ext_cap);
// time it.
utime_t start = g_clock.now();
// write (this may block!)
in->fc.write(offset, size, blist, client_lock, write_ext_cap);
+
+ // adjust fd pos
+ f->pos = offset+size;
} else {
// legacy, inconsistent synchronous write.
dout(7) << "synchronous write" << endl;
+ // do we have write file cap?
+ while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
+ dout(7) << " don't have write cap, waiting" << endl;
+ Cond cond;
+ in->waitfor_write.push_back(&cond);
+ cond.Wait(client_lock);
+ }
+ while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+ dout(7) << " don't have lazy cap, waiting" << endl;
+ Cond cond;
+ in->waitfor_lazy.push_back(&cond);
+ cond.Wait(client_lock);
+ }
+
// prepare write
Cond cond;
bool done = false;
//, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
);
+ // adjust fd pos
+ f->pos = offset+size;
+
while (!done) {
cond.Wait(client_lock);
dout(20) << " sync write bump " << onfinish << endl;
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#include "config.h"
#include "include/types.h"
#include "crypto/ExtCap.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
-#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
+#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
+#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
// flush/release/clean
{
off_t unclean = release_clean();
if (unclean) {
- dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
- oc->purge_set(inode.ino);
+ dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+ oc->purge_set(inode.ino);
}
}
// caps
+class C_FC_CheckCaps : public Context {
+ FileCache *fc;
+public:
+ C_FC_CheckCaps(FileCache *f) : fc(f) {}
+ void finish(int r) {
+ fc->check_caps();
+ }
+};
+
void FileCache::set_caps(int caps, Context *onimplement)
{
if (onimplement) {
+ dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl;
assert(latest_caps & ~caps); // we should be losing caps.
caps_callbacks[caps].push_back(onimplement);
}
latest_caps = caps;
check_caps();
+
+ // kick waiters? (did we gain caps?)
+ if (can_read() && !waitfor_read.empty())
+ for (set<Cond*>::iterator p = waitfor_read.begin();
+ p != waitfor_read.end();
+ ++p)
+ (*p)->Signal();
+ if (can_write() && !waitfor_write.empty())
+ for (set<Cond*>::iterator p = waitfor_write.begin();
+ p != waitfor_write.end();
+ ++p)
+ (*p)->Signal();
+
}
void FileCache::check_caps()
{
+ // calc used
int used = 0;
if (num_reading) used |= CAP_FILE_RD;
if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
dout(10) << "check_caps used " << cap_string(used) << endl;
+ // try to implement caps?
+ // BUG? latest_caps, not least caps i've seen?
+ if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
+ (used & CAP_FILE_RDCACHE))
+ release_clean();
+ if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
+ (used & CAP_FILE_WRBUFFER))
+ flush_dirty(new C_FC_CheckCaps(this));
+ //if (latest_caps == 0 &&
+ // used != 0)
+ //empty(new C_FC_CheckCaps(this));
+
// check callbacks
map<int, list<Context*> >::iterator p = caps_callbacks.begin();
while (p != caps_callbacks.end()) {
{
int r = 0;
+ // can i read?
+ while ((latest_caps & CAP_FILE_RD) == 0) {
+ dout(10) << "read doesn't have RD cap, blocking" << endl;
+ Cond c;
+ waitfor_read.insert(&c);
+ c.Wait(client_lock);
+ waitfor_read.erase(&c);
+ }
+
// inc reading counter
num_reading++;
void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock, ExtCap *write_ext_cap)
{
+ // can i write
+ while ((latest_caps & CAP_FILE_WR) == 0) {
+ dout(10) << "write doesn't have WR cap, blocking" << endl;
+ Cond c;
+ waitfor_write.insert(&c);
+ c.Wait(client_lock);
+ waitfor_write.erase(&c);
+ }
+
// inc writing counter
num_writing++;
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
#ifndef __FILECACHE_H
#define __FILECACHE_H
//int num_unsafe;
// waiters
- list<Cond*> waitfor_read;
- list<Cond*> waitfor_write;
- //list<Context*> waitfor_safe;
+ set<Cond*> waitfor_read;
+ set<Cond*> waitfor_write;
+
bool waitfor_release;
public:
num_reading(0), num_writing(0),// num_unsafe(0),
waitfor_release(false) {}
~FileCache() {
- tear_down();
+ tear_down();
}
// waiters/waiting
bool can_write() { return latest_caps & CAP_FILE_WR; }
bool all_safe();// { return num_unsafe == 0; }
- void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
- void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
- void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
+ void add_safe_waiter(Context *c);
// ...
void flush_dirty(Context *onflush=0);
for (map<string, inode_t>::iterator it = contents.begin();
it != contents.end();
it++) {
+ if (it->first == ".") continue;
+ if (it->first == "..") continue;
string file = basedir + "/" + it->first;
if (time_to_stop()) break;
// start up network
rank.start_rank();
- // start client
- Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
- client->init();
+ list<Client*> clients;
+ list<SyntheticClient*> synclients;
+
+ cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << endl;
+ for (int i=0; i<g_conf.num_client; i++) {
+ // start client
+ Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
+ client->init();
- // start syntheticclient
- SyntheticClient *syn = new SyntheticClient(client);
+ // start syntheticclient
+ SyntheticClient *syn = new SyntheticClient(client);
- // start up fuse
- // use my argc, argv (make sure you pass a mount point!)
- cout << "mounting" << endl;
- client->mount();
-
- cout << "starting syn client" << endl;
- syn->start_thread();
+ client->mount();
+
+ syn->start_thread();
+
+ clients.push_back(client);
+ synclients.push_back(syn);
+ }
+
+ cout << "waiting for client(s) to finish" << endl;
+ while (!clients.empty()) {
+ Client *client = clients.front();
+ SyntheticClient *syn = synclients.front();
+ clients.pop_front();
+ synclients.pop_front();
+
+ // wait
+ syn->join_thread();
- // wait
- syn->join_thread();
+ // unmount
+ client->unmount();
+ client->shutdown();
- // unmount
- client->unmount();
- cout << "unmounted" << endl;
- client->shutdown();
-
- delete client;
-
+ delete syn;
+ delete client;
+ }
+
// wait for messenger to finish
rank.wait();
// unpin dir
dn->dir->auth_unpin();
- // kick waiters
- list<Context*> finished;
- dn->dir->take_waiting(CDIR_WAIT_DNREAD, finished);
- mds->queue_finished(finished);
+ // kick waiters
+ list<Context*> finished;
+ dn->dir->take_waiting(CDIR_WAIT_DNREAD, finished);
+ mds->queue_finished(finished);
}
+
/*
* onfinish->finish() will be called with
* 0 on successful xlock,
CDentry *dn = 0;
// make dentry and inode, xlock dentry.
- //int r = prepare_mknod(req, diri, &in, &dn);
- int r = prepare_mknod(req, diri, &in, &dn, true);
+ bool excl = req->get_iarg() & O_EXCL;
+ int r = prepare_mknod(req, diri, &in, &dn, !excl);
+
if (!r)
return; // wait on something
assert(in);