if (g_conf.client_oc &&
!(in->file_caps() & CAP_FILE_WRBUFFER)) {
- // **** write me ****
+ //in->fc->flush_dirty(); // FIXME don't block
}
}
}
-void Client::finish_flush(Inode *in)
+void Client::async_flush_inode_buffers(Inode *in)
{
- dout(5) << "finish_flush on ino " << in->ino() << endl;
+ dout(5) << "async_flush_inode_buffers " << hex << in->ino() << dec << endl;
+
+ in->get();
+ if (objectcacher->flush_set(in->ino(),
+ new C_Client_Flushed(this, in)))
+ finish_flush(in);
+}
- /*Filecache *fc = bc->get_fc(in);
- assert(!fc->is_dirty() &&
- !fc->is_inflight());
- */
+void Client::flush_inode_buffers(Inode *in)
+{
+ dout(5) << "flush_inode_buffers " << hex << in->ino() << dec << endl;
+
+ Cond cond;
+ bool done = false;
+ if (!objectcacher->flush_set(in->ino(),
+ new C_Cond(&cond, &done))) {
+ // wait for callback
+ while (!done) cond.Wait(client_lock);
+ }
+}
+
+void Client::release_inode_buffers(Inode *in)
+{
+ dout(5) << "release_inode_buffers " << hex << in->ino() << dec << endl;
+ flush_inode_buffers(in);
+ int left = objectcacher->release_set(in->ino());
+ assert(left == 0);
+}
+void Client::finish_flush(Inode *in)
+{
+ dout(5) << "finish_flush " << hex << in->ino() << dec << endl;
+
// release all buffers?
if (!(in->file_caps() & CAP_FILE_RDCACHE)) {
dout(5) << "flush_finish releasing all buffers on ino " << hex << in->ino() << dec << endl;
- //release_inode_buffers(in);
+ release_inode_buffers(in);
}
if (in->num_rd == 0 && in->num_wr == 0) {
return result;
}
+
+
+
int Client::close(fh_t fh)
{
client_lock.Lock();
// release caps right away?
dout(10) << "num_rd " << in->num_rd << " num_wr " << in->num_wr << endl;
- if (in->num_rd == 0 && in->num_wr == 0) {
-
- // flush anything?
- // ** WRITE ME **
- dout(10) << " flushing dirty buffers on " << hex << in->ino() << dec << endl;
-
-
- /*if (g_conf.client_oc &&
- (fc->is_dirty() || fc->is_inflight())) {
- // flushing.
- dout(10) << " waiting for inflight buffers on " << hex << in->ino() << dec << endl;
- in->get();
- fc->add_inflight_waiter( new C_Client_Flushed(this, in) );
- } else {*/
- // all clean!
- dout(10) << " releasing buffers and caps on " << hex << in->ino() << dec << endl;
- release_inode_buffers(in); // free buffers
- release_caps(in); // release caps now.
- //}
+ if (in->num_wr == 0) {
+ //dout(10) << " starting flush of dirty buffers on " << hex << in->ino() << dec << endl;
+ async_flush_inode_buffers(in);
}
-
+ if (in->num_rd == 0) {
+ //dout(10) << " releasing buffers and caps on " << hex << in->ino() << dec << endl;
+ release_inode_buffers(in);
+ release_caps(in); // release caps now.
+ }
+
put_inode( in );
int result = 0;
Inode *in = f->inode;
dout(3) << "fsync fh " << fh << " ino " << hex << in->inode.ino << dec << " syncdataonly " << syncdataonly << endl;
-
- // blocking flush
- assert(0); // WRITE ME
+ // metadata?
+ if (!syncdataonly) {
+ dout(0) << "fsync - not syncing metadata yet.. implement me" << endl;
+ }
- if (syncdataonly &&
- (in->file_caps() & CAP_FILE_WR)) {
- // flush metadata too.. size, mtime
- // ... WRITE ME ...
+ // data?
+ Cond cond;
+ bool done = false;
+ if (!objectcacher->commit_set(in->ino(),
+ new C_Cond(&cond, &done))) {
+ // wait for callback
+ while (!done) cond.Wait(client_lock);
}
client_lock.Unlock();
Dentry *dn; // if i'm linked to a dentry.
string *symlink; // symlink content, if it's a symlink
+ //FileCache *fc;
+
list<Cond*> waitfor_write;
list<Cond*> waitfor_read;
- list<Cond*> waitfor_flushed;
- set<bufferlist*> inflight_buffers;
void get() {
ref++;
valid_until(0),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
file_wr_mtime(0), file_wr_size(0), num_rd(0), num_wr(0),
- ref(0), dir(0), dn(0), symlink(0) { }
+ ref(0), dir(0), dn(0), symlink(0)
+ { }
~Inode() {
if (symlink) { delete symlink; symlink = 0; }
}
}
return dir;
}
+
};
// find dentry based on filepath
Dentry *lookup(filepath& path);
-
- // blocking mds call
+ // make blocking mds request
MClientReply *make_request(MClientRequest *req, bool auth_best=false, int use_auth=-1);
-
- // -- buffer cache --
- void flush_inode_buffers(Inode *in) { // flush buffered writes
- // write me
- }
- void release_inode_buffers(Inode *in) { // release cached reads
- // write me
- }
-
-
// friends
friend class SyntheticClient;
void handle_file_caps(class MClientFileCaps *m);
void release_caps(Inode *in, int retain=0);
void update_caps_wanted(Inode *in);
+
+ // data cache
+ void async_flush_inode_buffers(Inode *in); // start flushing buffered writes. won't block.
+ void flush_inode_buffers(Inode *in); // flush buffered writes. may block.
+ void release_inode_buffers(Inode *in); // release cached reads, +flush as necessary. may block.
void finish_flush(Inode *in);
// metadata cache
Inode* insert_inode_info(Dir *dir, c_inode_info *in_info);
void insert_trace(const vector<c_inode_info*>& trace);
+
// ----------------------
// fs ops.
int mount(int mkfs=0);
oc->bh_stat_add(bh);
// add right
- add_bh(right);
+ oc->bh_add(this, right);
// split buffers too
bufferlist bl;
assert(left->get_state() == right->get_state());
dout(10) << "merge " << *left << " + " << *right << endl;
+ oc->bh_remove(this, right);
+ oc->bh_stat_sub(left);
left->set_length( left->length() + right->length());
+ oc->bh_stat_add(left);
// data
left->bl.claim_append(right->bl);
p->second );
// hose right
- data.erase(right->start());
delete right;
dout(10) << "merge result " << *left << endl;
BufferHead *n = new BufferHead();
n->set_start( cur );
n->set_length( left );
- add_bh(n);
+ oc->bh_add(this, n);
missing[cur] = n;
dout(20) << "map_read miss " << left << " left, " << *n << endl;
cur += left;
BufferHead *n = new BufferHead();
n->set_start( cur );
n->set_length( MIN(next - cur, left) );
- add_bh(n);
+ oc->bh_add(this,n);
missing[cur] = n;
cur += MIN(left, n->length());
left -= MIN(left, n->length());
final = new BufferHead();
final->set_start( cur );
final->set_length( max );
- add_bh(final);
+ oc->bh_add(this, final);
} else {
final->set_length( final->length() + max );
}
final = new BufferHead();
final->set_start( cur );
final->set_length( glen );
- add_bh(final);
+ oc->bh_add(this, final);
}
cur += glen;
}
+int ObjectCacher::release_set(inodeno_t ino)
+{
+ // return # bytes not clean (and thus not released).
+ int unclean = 0;
+
+ if (objects.count(ino) == 0) {
+ dout(10) << "release_set on " << hex << ino << dec << " dne" << endl;
+ return 0;
+ }
+
+ Object *ob = objects[ino];
+ dout(10) << "release_set " << *ob << endl;
+
+ for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ p != ob->data.end();
+ p++) {
+ BufferHead *bh = p->second;
+ if (bh->is_clean())
+ bh_remove(ob, bh);
+ else
+ unclean += bh->length();
+ }
+
+ if (unclean) {
+ dout(10) << "release_set " << *ob
+ << ", " << unclean << " bytes left" << endl;
+ }
+
+ return unclean;
+}
//bh->set_dirty_stamp(g_clock.now());
};
-
+ void bh_add(Object *ob, BufferHead *bh) {
+ ob->add_bh(bh);
+ if (bh->is_dirty())
+ lru_dirty.lru_insert_top(bh);
+ else
+ lru_rest.lru_insert_top(bh);
+ bh_stat_add(bh);
+ }
+ void bh_remove(Object *ob, BufferHead *bh) {
+ ob->remove_bh(bh);
+ if (bh->is_dirty())
+ lru_dirty.lru_remove(bh);
+ else
+ lru_rest.lru_remove(bh);
+ bh_stat_sub(bh);
+ }
// io
void bh_read(Object *ob, BufferHead *bh);
bool commit_set(inodeno_t ino, Context *oncommit);
void commit_all(Context *oncommit=0);
+ int release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
+
+
// file functions
/*** async+caching (non-blocking) file interface ***/