]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
semi-busted client progress.
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 18 May 2006 18:31:16 +0000 (18:31 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 18 May 2006 18:31:16 +0000 (18:31 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@771 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/client/Client.cc
ceph/client/Client.h
ceph/osdc/ObjectCacher.cc
ceph/osdc/ObjectCacher.h

index 7ce3aeceff7c3c7c789ff99848d6aa66dec4ec83..9e4c8db293458a69f2807af6e7de42d1adbcd4df 100644 (file)
@@ -698,7 +698,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
   if (g_conf.client_oc &&
          !(in->file_caps() & CAP_FILE_WRBUFFER)) {
 
-       // **** write me ****
+       //in->fc->flush_dirty();  // FIXME don't block
 
   } 
   
@@ -738,19 +738,45 @@ void Client::handle_file_caps(MClientFileCaps *m)
   }
 }
 
-void Client::finish_flush(Inode *in)
+void Client::async_flush_inode_buffers(Inode *in)
 {
-  dout(5) << "finish_flush on ino " << in->ino() << endl;
+  dout(5) << "async_flush_inode_buffers " << hex << in->ino() << dec << endl;
+  
+  in->get();
+  if (objectcacher->flush_set(in->ino(),
+                                                         new C_Client_Flushed(this, in)))
+       finish_flush(in);
+}
 
-  /*Filecache *fc = bc->get_fc(in);
-  assert(!fc->is_dirty() &&
-                !fc->is_inflight());
-  */
+void Client::flush_inode_buffers(Inode *in)
+{
+  dout(5) << "flush_inode_buffers " << hex << in->ino() << dec << endl;
+
+  Cond cond;
+  bool done = false;
+  if (!objectcacher->flush_set(in->ino(),
+                                                          new C_Cond(&cond, &done))) {
+       // wait for callback
+       while (!done) cond.Wait(client_lock);
+  }
+}
+
+void Client::release_inode_buffers(Inode *in)
+{
+  dout(5) << "release_inode_buffers " << hex << in->ino() << dec << endl;
+  flush_inode_buffers(in);
+  int left = objectcacher->release_set(in->ino());
+  assert(left == 0);
+}
 
+void Client::finish_flush(Inode *in)
+{
+  dout(5) << "finish_flush " << hex << in->ino() << dec << endl;
+  
   // release all buffers?
   if (!(in->file_caps() & CAP_FILE_RDCACHE)) {
        dout(5) << "flush_finish releasing all buffers on ino " << hex << in->ino() << dec << endl;
-       //release_inode_buffers(in);
+       release_inode_buffers(in);
   }
   
   if (in->num_rd == 0 && in->num_wr == 0) {
@@ -1585,6 +1611,9 @@ int Client::open(const char *relpath, int mode)
   return result;
 }
 
+
+
+
 int Client::close(fh_t fh)
 {
   client_lock.Lock();
@@ -1616,27 +1645,16 @@ int Client::close(fh_t fh)
 
   // release caps right away?
   dout(10) << "num_rd " << in->num_rd << "  num_wr " << in->num_wr << endl;
-  if (in->num_rd == 0 && in->num_wr == 0) {
-       
-       // flush anything?
-       // ** WRITE ME **
-       dout(10) << "  flushing dirty buffers on " << hex << in->ino() << dec << endl;
-       
-
-       /*if (g_conf.client_oc && 
-               (fc->is_dirty() || fc->is_inflight())) {
-         // flushing.
-         dout(10) << "  waiting for inflight buffers on " << hex << in->ino() << dec << endl;
-         in->get();
-         fc->add_inflight_waiter(  new C_Client_Flushed(this, in) );
-         } else {*/
-         // all clean!
-       dout(10) << "  releasing buffers and caps on " << hex << in->ino() << dec << endl;
-         release_inode_buffers(in);  // free buffers
-         release_caps(in);               // release caps now.
-         //}
+  if (in->num_wr == 0) {
+       //dout(10) << "  starting flush of dirty buffers on " << hex << in->ino() << dec << endl;
+       async_flush_inode_buffers(in);
   }
-
+  if (in->num_rd == 0) {
+       //dout(10) << "  releasing buffers and caps on " << hex << in->ino() << dec << endl;
+       release_inode_buffers(in);
+       release_caps(in);                 // release caps now.
+  }
+       
   put_inode( in );
   int result = 0;
 
@@ -1917,15 +1935,19 @@ int Client::fsync(fh_t fh, bool syncdataonly)
   Inode *in = f->inode;
 
   dout(3) << "fsync fh " << fh << " ino " << hex << in->inode.ino << dec << " syncdataonly " << syncdataonly << endl;
-  // blocking flush
 
-  assert(0);  // WRITE ME
+  // metadata?
+  if (!syncdataonly) {
+       dout(0) << "fsync - not syncing metadata yet.. implement me" << endl;
+  }
 
-  if (syncdataonly &&
-         (in->file_caps() & CAP_FILE_WR)) {
-       // flush metadata too.. size, mtime
-       // ... WRITE ME ...
+  // data?
+  Cond cond;
+  bool done = false;
+  if (!objectcacher->commit_set(in->ino(),
+                                                               new C_Cond(&cond, &done))) {
+       // wait for callback
+       while (!done) cond.Wait(client_lock);
   }
 
   client_lock.Unlock();
index 48f176dc1a4e369fddc9197f36651b17d544fa16..cf18c9402ff06d6bc86085e3ec624ce7089505a8 100644 (file)
@@ -138,10 +138,10 @@ class Inode {
   Dentry    *dn;      // if i'm linked to a dentry.
   string    *symlink; // symlink content, if it's a symlink
 
+  //FileCache *fc;
+
   list<Cond*>       waitfor_write;
   list<Cond*>       waitfor_read;
-  list<Cond*>       waitfor_flushed;
-  set<bufferlist*>  inflight_buffers;
 
   void get() { 
        ref++; 
@@ -156,7 +156,8 @@ class Inode {
        valid_until(0),
        dir_auth(-1), dir_hashed(false), dir_replicated(false), 
        file_wr_mtime(0), file_wr_size(0), num_rd(0), num_wr(0),
-       ref(0), dir(0), dn(0), symlink(0) { }
+       ref(0), dir(0), dn(0), symlink(0)
+  { }
   ~Inode() {
        if (symlink) { delete symlink; symlink = 0; }
   }
@@ -247,6 +248,7 @@ class Inode {
        }
        return dir;
   }
+
 };
 
 
@@ -431,20 +433,9 @@ protected:
   // find dentry based on filepath
   Dentry *lookup(filepath& path);
 
-
-  // blocking mds call
+  // make blocking mds request
   MClientReply *make_request(MClientRequest *req, bool auth_best=false, int use_auth=-1);
-
   
-  // -- buffer cache --
-  void flush_inode_buffers(Inode *in) {     // flush buffered writes
-       // write me
-  }
-  void release_inode_buffers(Inode *in) {   // release cached reads
-       // write me
-  }
-               
-
   // friends
   friend class SyntheticClient;
 
@@ -465,12 +456,18 @@ protected:
   void handle_file_caps(class MClientFileCaps *m);
   void release_caps(Inode *in, int retain=0);
   void update_caps_wanted(Inode *in);
+
+  // data cache
+  void async_flush_inode_buffers(Inode *in);  // start flushing buffered writes.  won't block.
+  void flush_inode_buffers(Inode *in);   // flush buffered writes.  may block.
+  void release_inode_buffers(Inode *in); // release cached reads, +flush as necessary.  may block.
   void finish_flush(Inode *in);
 
   // metadata cache
   Inode* insert_inode_info(Dir *dir, c_inode_info *in_info);
   void insert_trace(const vector<c_inode_info*>& trace);
 
+
   // ----------------------
   // fs ops.
   int mount(int mkfs=0);
index e7da2e20eda43c2757411d367b75f56520b521fb..42371b09207ae976c6bb8e78b8f1c90c82fc67f1 100644 (file)
@@ -30,7 +30,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
   oc->bh_stat_add(bh);
   
   // add right
-  add_bh(right);
+  oc->bh_add(this, right);
   
   // split buffers too
   bufferlist bl;
@@ -67,7 +67,10 @@ void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
   assert(left->get_state() == right->get_state());
 
   dout(10) << "merge " << *left << " + " << *right << endl;
+  oc->bh_remove(this, right);
+  oc->bh_stat_sub(left);
   left->set_length( left->length() + right->length());
+  oc->bh_stat_add(left);
 
   // data
   left->bl.claim_append(right->bl);
@@ -84,7 +87,6 @@ void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
                                                                                 p->second );
   
   // hose right
-  data.erase(right->start());
   delete right;
 
   dout(10) << "merge result " << *left << endl;
@@ -128,7 +130,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
                BufferHead *n = new BufferHead();
                n->set_start( cur );
                n->set_length( left );
-               add_bh(n);
+               oc->bh_add(this, n);
                missing[cur] = n;
                dout(20) << "map_read miss " << left << " left, " << *n << endl;
                cur += left;
@@ -166,7 +168,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
                BufferHead *n = new BufferHead();
                n->set_start( cur );
                n->set_length( MIN(next - cur, left) );
-               add_bh(n);
+               oc->bh_add(this,n);
                missing[cur] = n;
                cur += MIN(left, n->length());
                left -= MIN(left, n->length());
@@ -220,7 +222,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
           final = new BufferHead();
           final->set_start( cur );
           final->set_length( max );
-          add_bh(final);
+          oc->bh_add(this, final);
         } else {
           final->set_length( final->length() + max );
         }
@@ -279,7 +281,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
                  final = new BufferHead();
                  final->set_start( cur );
                  final->set_length( glen );
-                 add_bh(final);
+                 oc->bh_add(this, final);
                }
         
         cur += glen;
@@ -757,3 +759,33 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
 }
 
 
+int ObjectCacher::release_set(inodeno_t ino)
+{
+  // return # bytes not clean (and thus not released).
+  int unclean = 0;
+
+  if (objects.count(ino) == 0) {
+       dout(10) << "release_set on " << hex << ino << dec << " dne" << endl;
+       return 0;
+  }
+
+  Object *ob = objects[ino];
+  dout(10) << "release_set " << *ob << endl;
+
+  for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+          p != ob->data.end();
+          p++) {
+       BufferHead *bh = p->second;
+       if (bh->is_clean()) 
+         bh_remove(ob, bh);
+       else 
+         unclean += bh->length();
+  }
+
+  if (unclean) {
+       dout(10) << "release_set " << *ob 
+                        << ", " << unclean << " bytes left" << endl;
+  }
+
+  return unclean;
+}
index 8ebfa1b724e4de06e04fb5898da1422253039829..48e24a9ecba3807199140e2812d8d0d348e886a1 100644 (file)
@@ -249,7 +249,22 @@ class ObjectCacher {
        //bh->set_dirty_stamp(g_clock.now());
   };
 
-
+  void bh_add(Object *ob, BufferHead *bh) {
+       ob->add_bh(bh);
+       if (bh->is_dirty())
+         lru_dirty.lru_insert_top(bh);
+       else
+         lru_rest.lru_insert_top(bh);
+       bh_stat_add(bh);
+  }
+  void bh_remove(Object *ob, BufferHead *bh) {
+       ob->remove_bh(bh);
+       if (bh->is_dirty())
+         lru_dirty.lru_remove(bh);
+       else
+         lru_rest.lru_remove(bh);
+       bh_stat_sub(bh);
+  }
 
   // io
   void bh_read(Object *ob, BufferHead *bh);
@@ -341,6 +356,9 @@ class ObjectCacher {
   bool commit_set(inodeno_t ino, Context *oncommit);
   void commit_all(Context *oncommit=0);
 
+  int release_set(inodeno_t ino);  // returns # of bytes not released (ie non-clean)
+
+
   // file functions
 
   /*** async+caching (non-blocking) file interface ***/