basedir_fd(-1), current_fd(-1),
backend(NULL),
index_manager(do_update),
- ondisk_finisher(g_ceph_context),
lock("FileStore::lock"),
force_sync(false),
sync_entry_timeo_lock("sync_entry_timeo_lock"),
stop(false), sync_thread(this),
fdcache(g_ceph_context),
wbthrottle(g_ceph_context),
+ next_osr_id(0),
throttle_ops(g_ceph_context, "filestore_ops",g_conf->filestore_queue_max_ops),
throttle_bytes(g_ceph_context, "filestore_bytes",g_conf->filestore_queue_max_bytes),
- op_finisher(g_ceph_context),
+ m_ondisk_finisher_num(g_conf->filestore_ondisk_finisher_threads),
+ m_apply_finisher_num(g_conf->filestore_apply_finisher_threads),
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
m_filestore_max_inline_xattrs(0)
{
m_filestore_kill_at.set(g_conf->filestore_kill_at);
+ for (int i = 0; i < m_ondisk_finisher_num; ++i) {
+ ostringstream oss;
+ oss << "filestore-ondisk-" << i;
+ Finisher *f = new Finisher(g_ceph_context, oss.str());
+ ondisk_finishers.push_back(f);
+ }
+ for (int i = 0; i < m_apply_finisher_num; ++i) {
+ ostringstream oss;
+ oss << "filestore-apply-" << i;
+ Finisher *f = new Finisher(g_ceph_context, oss.str());
+ apply_finishers.push_back(f);
+ }
ostringstream oss;
oss << basedir << "/current";
FileStore::~FileStore()
{
+ for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+ delete *it;
+ *it = NULL;
+ }
+ for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+ delete *it;
+ *it = NULL;
+ }
g_ceph_context->_conf->remove_observer(this);
g_ceph_context->get_perfcounters_collection()->remove(logger);
journal_start();
op_tp.start();
- op_finisher.start();
- ondisk_finisher.start();
+ for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+ (*it)->start();
+ }
+ for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+ (*it)->start();
+ }
timer.init();
if (!(generic_flags & SKIP_JOURNAL_REPLAY))
journal_write_close();
- op_finisher.stop();
- ondisk_finisher.stop();
+ for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+ (*it)->stop();
+ }
+ for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+ (*it)->stop();
+ }
if (fsid_fd >= 0) {
VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
o->onreadable_sync->complete(0);
}
if (o->onreadable) {
- op_finisher.queue(o->onreadable);
+ apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
}
if (!to_queue.empty()) {
- op_finisher.queue(to_queue);
+ apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
}
delete o;
}
osr = static_cast<OpSequencer *>(posr->p.get());
dout(5) << "queue_transactions existing " << osr << " " << *osr << dendl;
} else {
- osr = new OpSequencer;
+ osr = new OpSequencer(next_osr_id.inc());
osr->set_cct(g_ceph_context);
osr->parent = posr;
posr->p = osr;
if (onreadable_sync) {
onreadable_sync->complete(r);
}
- op_finisher.queue(onreadable, r);
+ apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
submit_manager.op_submit_finish(op);
apply_manager.op_apply_finish(op);
// getting blocked behind an ondisk completion.
if (ondisk) {
dout(10) << " queueing ondisk " << ondisk << dendl;
- ondisk_finisher.queue(ondisk);
+ ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
}
if (!to_queue.empty()) {
- ondisk_finisher.queue(to_queue);
+ ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
}
}
dout(10) << "_flush_op_queue draining op tp" << dendl;
op_wq.drain();
dout(10) << "_flush_op_queue waiting for apply finisher" << dendl;
- op_finisher.wait_for_empty();
+ for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
+ (*it)->wait_for_empty();
+ }
}
/*
if (journal)
journal->flush();
dout(10) << "flush draining ondisk finisher" << dendl;
- ondisk_finisher.wait_for_empty();
+ for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
+ (*it)->wait_for_empty();
+ }
}
_flush_op_queue();
// ObjectMap
boost::scoped_ptr<ObjectMap> object_map;
- Finisher ondisk_finisher;
-
// helper fns
int get_cdir(coll_t cid, char *s, int len);
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
+ int id;
/// get_max_uncompleted
bool _get_max_uncompleted(
}
}
- OpSequencer()
+ OpSequencer(int i)
: qlock("FileStore::OpSequencer::qlock", false, false),
parent(0),
- apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
+ apply_lock("FileStore::OpSequencer::apply_lock", false, false),
+ id(i) {}
~OpSequencer() {
assert(q.empty());
}
FDCache fdcache;
WBThrottle wbthrottle;
+ atomic_t next_osr_id;
deque<OpSequencer*> op_queue;
Throttle throttle_ops, throttle_bytes;
- Finisher op_finisher;
+ const int m_ondisk_finisher_num;
+ const int m_apply_finisher_num;
+ vector<Finisher*> ondisk_finishers;
+ vector<Finisher*> apply_finishers;
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {