OPTION(filestore_dev, 0, OPT_STR, 0),
OPTION(filestore_btrfs_trans, 0, OPT_BOOL, true),
OPTION(filestore_flusher, 0, OPT_BOOL, true),
+ OPTION(filestore_flusher_max_fds, 0, OPT_INT, 512),
OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
OPTION(ebofs, 0, OPT_BOOL, false),
OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
}
- if (g_conf.filestore_flusher)
- queue_flusher(fd, offset, len);
- else {
+ if (!g_conf.filestore_flusher ||
+ !queue_flusher(fd, offset, len)) {
if (g_conf.filestore_sync_flush)
::sync_file_range(fd, offset, len, SYNC_FILE_RANGE_WRITE);
::close(fd);
}
-void FileStore::queue_flusher(int fd, __u64 off, __u64 len)
+bool FileStore::queue_flusher(int fd, __u64 off, __u64 len)
{
+ bool queued;
lock.Lock();
- dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len << dendl;
- flusher_queue.push_back(sync_epoch);
- flusher_queue.push_back(fd);
- flusher_queue.push_back(off);
- flusher_queue.push_back(len);
- flusher_cond.Signal();
+ if (flusher_queue_len < g_conf.filestore_flusher_max_fds) {
+ flusher_queue.push_back(sync_epoch);
+ flusher_queue.push_back(fd);
+ flusher_queue.push_back(off);
+ flusher_queue.push_back(len);
+ flusher_queue_len++;
+ flusher_cond.Signal();
+ dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
+ << " qlen " << flusher_queue_len
+ << dendl;
+ queued = true;
+ } else {
+ dout(10) << "queue_flusher ep " << sync_epoch << " fd " << fd << " " << off << "~" << len
+ << " qlen " << flusher_queue_len
+ << " hit flusher_max_fds " << g_conf.filestore_flusher_max_fds
+ << ", skipping async flush" << dendl;
+ queued = false;
+ }
lock.Unlock();
+ return queued;
}
void FileStore::flusher_entry()
if (!flusher_queue.empty()) {
list<__u64> q;
q.swap(flusher_queue);
-
+
+ int num = flusher_queue_len; // see how many we're taking, here
+
lock.Unlock();
while (!q.empty()) {
__u64 ep = q.front();
::close(fd);
}
lock.Lock();
+ flusher_queue_len -= num; // they're definitely closed, forget
} else {
if (stop)
break;
// flusher thread
Cond flusher_cond;
list<__u64> flusher_queue;
+ int flusher_queue_len;
void flusher_entry();
struct FlusherThread : public Thread {
FileStore *fs;
return 0;
}
} flusher_thread;
- void queue_flusher(int fd, __u64 off, __u64 len);
+ bool queue_flusher(int fd, __u64 off, __u64 len);
int open_journal();
public:
attrs(this), fake_attrs(false),
collections(this), fake_collections(false),
lock("FileStore::lock"),
- sync_epoch(0), stop(false), sync_thread(this), flusher_thread(this) { }
+ sync_epoch(0), stop(false), sync_thread(this), flusher_queue_len(0), flusher_thread(this) { }
int mount();
int umount();