]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: limit size of flusher queue
authorSage Weil <sage@newdream.net>
Mon, 7 Dec 2009 22:28:45 +0000 (14:28 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 7 Dec 2009 23:43:31 +0000 (15:43 -0800)
Limit the size of the flusher queue to avoid hitting max open files
limit.  If we hit the max, just don't queue async flush, so we close the
fd immediately.

src/config.cc
src/config.h
src/os/FileStore.cc
src/os/FileStore.h

index 6bca011f74d1a3566c5b844493086a59a6d37de0..4f6bd40e9262ea3eb502de65a85ca77f4ea2e5bc 100644 (file)
@@ -522,6 +522,7 @@ static struct config_option config_optionsp[] = {
        OPTION(filestore_dev, 0, OPT_STR, 0),
        OPTION(filestore_btrfs_trans, 0, OPT_BOOL, true),
        OPTION(filestore_flusher, 0, OPT_BOOL, true),
+       OPTION(filestore_flusher_max_fds, 0, OPT_INT, 512),
        OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
        OPTION(ebofs, 0, OPT_BOOL, false),
        OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
index ad21e6ae8932e41a1694da3731a6e2e0663953c5..f7f9f7e87e43264a027d705214eb37de003d91af 100644 (file)
@@ -334,6 +334,7 @@ struct md_config_t {
   const char  *filestore_dev;
   bool filestore_btrfs_trans;
   bool filestore_flusher;
+  int filestore_flusher_max_fds;
   bool filestore_sync_flush;
   
   // ebofs
index dbdd11ce051d34c4aadfd6bc34da9bf1576fbcb7..7b0c78261fd3e4fe691eb2347b8145c9e087a764 100644 (file)
@@ -1450,9 +1450,8 @@ int FileStore::_write(coll_t cid, const sobject_t& oid,
       derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     }
 
-    if (g_conf.filestore_flusher)
-      queue_flusher(fd, offset, len);
-    else {
+    if (!g_conf.filestore_flusher ||
+       !queue_flusher(fd, offset, len)) {
       if (g_conf.filestore_sync_flush)
        ::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE);
       ::close(fd);
@@ -1585,16 +1584,30 @@ int FileStore::_clone_range(coll_t cid, const sobject_t& oldoid, const sobject_t
 }
 
 
-void FileStore::queue_flusher(int fd, __u64 off, __u64 len)
+bool FileStore::queue_flusher(int fd, __u64 off, __u64 len)
 {
+  bool queued;
   lock.Lock();
-  dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len << dendl;
-  flusher_queue.push_back(sync_epoch);
-  flusher_queue.push_back(fd);
-  flusher_queue.push_back(off);
-  flusher_queue.push_back(len);
-  flusher_cond.Signal();
+  if (flusher_queue_len < g_conf.filestore_flusher_max_fds) {
+    flusher_queue.push_back(sync_epoch);
+    flusher_queue.push_back(fd);
+    flusher_queue.push_back(off);
+    flusher_queue.push_back(len);
+    flusher_queue_len++;
+    flusher_cond.Signal();
+    dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
+            << " qlen " << flusher_queue_len
+            << dendl;
+    queued = true;
+  } else {
+    dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
+            << " qlen " << flusher_queue_len 
+            << " hit flusher_max_fds " << g_conf.filestore_flusher_max_fds
+            << ", skipping async flush" << dendl;
+    queued = false;
+  }
   lock.Unlock();
+  return queued;
 }
 
 void FileStore::flusher_entry()
@@ -1605,7 +1618,9 @@ void FileStore::flusher_entry()
     if (!flusher_queue.empty()) {
       list<__u64> q;
       q.swap(flusher_queue);
-      
+
+      int num = flusher_queue_len;  // see how many we're taking, here
+
       lock.Unlock();
       while (!q.empty()) {
        __u64 ep = q.front();
@@ -1625,6 +1640,7 @@ void FileStore::flusher_entry()
        ::close(fd);
       }
       lock.Lock();
+      flusher_queue_len -= num;   // they're definitely closed, forget
     } else {
       if (stop)
        break;
index 53a82b411ed04c0050445d7a121be57e2f697198..a74469598e3bd730fee9b7e0d94895d08c16d16f 100644 (file)
@@ -84,6 +84,7 @@ class FileStore : public JournalingObjectStore {
   // flusher thread
   Cond flusher_cond;
   list<__u64> flusher_queue;
+  int flusher_queue_len;
   void flusher_entry();
   struct FlusherThread : public Thread {
     FileStore *fs;
@@ -93,7 +94,7 @@ class FileStore : public JournalingObjectStore {
       return 0;
     }
   } flusher_thread;
-  void queue_flusher(int fd, __u64 off, __u64 len);
+  bool queue_flusher(int fd, __u64 off, __u64 len);
   int open_journal();
 
  public:
@@ -104,7 +105,7 @@ class FileStore : public JournalingObjectStore {
     attrs(this), fake_attrs(false), 
     collections(this), fake_collections(false),
     lock("FileStore::lock"),
-    sync_epoch(0), stop(false), sync_thread(this), flusher_thread(this) { }
+    sync_epoch(0), stop(false), sync_thread(this), flusher_queue_len(0), flusher_thread(this) { }
 
   int mount();
   int umount();