#include "common/safe_io.h"
#include "FileJournal.h"
#include "include/color.h"
+#include "common/ProfLogger.h"
+#include "os/ObjectStore.h"
#include <fcntl.h>
#include <sstream>
// throw out what we have so far
full_state = FULL_FULL;
while (!writeq.empty()) {
- dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
- throttle_ops.put(1);
- throttle_bytes.put(writeq.front().bl.length());
+ put_throttle(1, writeq.front().bl.length());
writeq.pop_front();
}
print_header();
{
write_lock.Lock();
while ((!writeq.empty() || writing) && !write_stop) {
- dout(10) << "flush waiting for writeq to empty and writes to complete" << dendl;
+ dout(5) << "flush waiting for writeq to empty and writes to complete" << dendl;
write_empty_cond.Wait(write_lock);
}
write_lock.Unlock();
- dout(10) << "flush waiting for finisher" << dendl;
+ dout(5) << "flush waiting for finisher" << dendl;
finisher->wait_for_empty();
- dout(10) << "flush done" << dendl;
+ dout(5) << "flush done" << dendl;
}
assert(r == 0);
do_write(bl);
- dout(30) << "XXX throttle put " << orig_bytes << dendl;
- uint64_t new_ops = throttle_ops.put(orig_ops);
- uint64_t new_bytes = throttle_bytes.put(orig_bytes);
- dout(10) << "write_thread throttle finished " << orig_ops << " ops and "
- << orig_bytes << " bytes, now "
- << new_ops << " ops and " << new_bytes << " bytes"
- << dendl;
+ put_throttle(orig_ops, orig_bytes);
}
write_empty_cond.Signal();
write_lock.Unlock();
Mutex::Locker locker(write_lock); // ** lock **
// dump on queue
- dout(10) << "submit_entry seq " << seq
+ dout(5) << "submit_entry seq " << seq
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
throttle_ops.take(1);
throttle_bytes.take(e.length());
+ if (logger) {
+ logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+ logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+ logger->set(l_os_jq_ops, throttle_ops.get_current());
+ logger->set(l_os_jq_bytes, throttle_bytes.get_current());
+ }
+
writeq.push_back(write_item(seq, e, alignment));
write_cond.Signal();
} else {
Mutex::Locker locker(write_lock);
if (seq < last_committed_seq) {
- dout(10) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
+ dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
assert(seq >= last_committed_seq);
return;
}
if (seq == last_committed_seq) {
- dout(10) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
+ dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
return;
}
- dout(10) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
+ dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
last_committed_seq = seq;
// adjust start pointer
dout(15) << " dropping committed but unwritten seq " << writeq.front().seq
<< " len " << writeq.front().bl.length()
<< dendl;
- dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
- throttle_ops.put(1);
- throttle_bytes.put(writeq.front().bl.length());
+ put_throttle(1, writeq.front().bl.length());
writeq.pop_front();
}
}
+void FileJournal::put_throttle(uint64_t ops, uint64_t bytes)
+{
+ uint64_t new_ops = throttle_ops.put(ops);
+ uint64_t new_bytes = throttle_bytes.put(bytes);
+ dout(5) << "put_throttle finished " << ops << " ops and "
+ << bytes << " bytes, now "
+ << new_ops << " ops and " << new_bytes << " bytes"
+ << dendl;
+
+ if (logger) {
+ logger->inc(l_os_j_ops, ops);
+ logger->inc(l_os_j_bytes, bytes);
+ logger->set(l_os_jq_ops, new_ops);
+ logger->set(l_os_jq_bytes, new_bytes);
+ logger->set(l_os_jq_max_ops, throttle_ops.get_max());
+ logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
+ }
+}
+
void FileJournal::make_writeable()
{
_open(true);
#include "common/errno.h"
#include "common/run_cmd.h"
#include "common/safe_io.h"
+#include "common/ProfLogger.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
stop(false), sync_thread(this),
op_queue_len(0), op_queue_bytes(0), next_finish(0),
op_tp("FileStore::op_tp", g_conf.filestore_op_threads), op_wq(this, &op_tp),
- flusher_queue_len(0), flusher_thread(this)
+ flusher_queue_len(0), flusher_thread(this),
+ logger(NULL)
{
ostringstream oss;
oss << basedir << "/current";
flusher_thread.join();
journal_stop();
+ stop_logger();
op_finisher.stop();
ondisk_finisher.stop();
}
+void FileStore::start_logger(int whoami, utime_t tare)
+{
+ dout(10) << "start_logger" << dendl;
+ assert(!logger);
+
+ static ProfLogType fs_logtype(l_os_first, l_os_last);
+ static bool didit = false;
+ if (!didit) {
+ didit = true;
+
+ fs_logtype.add_inc(l_os_in_ops, "in_o");
+ //fs_logtype.add_inc(l_os_in_bytes, "in_b");
+ fs_logtype.add_inc(l_os_readable_ops, "or_o");
+ fs_logtype.add_inc(l_os_readable_bytes, "or_b");
+ //fs_logtype.add_inc(l_os_commit_bytes, "com_o");
+ //fs_logtype.add_inc(l_os_commit_bytes, "com_b");
+
+ fs_logtype.add_set(l_os_jq_max_ops, "jq_mo");
+ fs_logtype.add_set(l_os_jq_ops, "jq_o");
+ fs_logtype.add_inc(l_os_j_ops, "j_o");
+ fs_logtype.add_set(l_os_jq_max_bytes, "jq_mb");
+ fs_logtype.add_set(l_os_jq_bytes, "jq_b");
+ fs_logtype.add_inc(l_os_j_bytes, "j_b");
+ fs_logtype.add_set(l_os_oq_max_ops, "oq_mo");
+ fs_logtype.add_set(l_os_oq_ops, "oq_o");
+ fs_logtype.add_inc(l_os_ops, "o");
+ fs_logtype.add_set(l_os_oq_max_bytes, "oq_mb");
+ fs_logtype.add_set(l_os_oq_bytes, "oq_b");
+ fs_logtype.add_inc(l_os_bytes, "b");
+ fs_logtype.add_set(l_os_committing, "comitng");
+ }
+
+ char name[80];
+ snprintf(name, sizeof(name), "osd.%d.fs.log", whoami);
+ logger = new ProfLogger(name, (ProfLogType*)&fs_logtype);
+ journal->logger = logger;
+ logger_add(logger);
+ logger_tare(tare);
+ logger_start();
+}
+
+void FileStore::stop_logger()
+{
+ dout(10) << "stop_logger" << dendl;
+ if (logger) {
+ journal->logger = NULL;
+ logger_remove(logger);
+ delete logger;
+ logger = NULL;
+ }
+}
+
+
+
+
/// -----------------------------
void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
op_tp.lock();
osr->queue(o);
+ _op_queue_throttle("queue_op");
op_queue_len++;
op_queue_bytes += bytes;
+ if (logger) {
+ logger->inc(l_os_ops);
+ logger->inc(l_os_bytes, bytes);
+ logger->set(l_os_oq_ops, op_queue_len);
+ logger->set(l_os_oq_bytes, op_queue_bytes);
+ }
+
op_tp.unlock();
- dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
+ dout(5) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
<< " (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)"
<< dendl;
op_wq.queue(osr);
void FileStore::op_queue_throttle()
{
op_tp.lock();
+ _op_queue_throttle("op_queue_throttle");
+ op_tp.unlock();
+}
+void FileStore::_op_queue_throttle(const char *caller)
+{
uint64_t max_ops = g_conf.filestore_queue_max_ops;
uint64_t max_bytes = g_conf.filestore_queue_max_bytes;
+
if (is_committing()) {
max_ops += g_conf.filestore_queue_committing_max_ops;
max_bytes += g_conf.filestore_queue_committing_max_bytes;
}
+ if (logger) {
+ logger->set(l_os_oq_max_ops, max_ops);
+ logger->set(l_os_oq_max_bytes, max_bytes);
+ }
+
while ((max_ops && op_queue_len >= max_ops) ||
(max_bytes && op_queue_bytes >= max_bytes)) {
- dout(2) << "op_queue_throttle waiting: "
+ dout(2) << caller << " waiting: "
<< op_queue_len << " > " << max_ops << " ops || "
<< op_queue_bytes << " > " << max_bytes << dendl;
op_tp.wait(op_throttle_cond);
}
- op_tp.unlock();
}
void FileStore::_do_op(OpSequencer *osr)
osr->apply_lock.Lock();
Op *o = osr->peek_queue();
- dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
+ dout(5) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
op_apply_finish(o->op);
dout(10) << "_do_op " << o << " " << o->op << " r = " << r
op_queue_bytes -= o->bytes;
op_throttle_cond.Signal();
+ if (logger) {
+ logger->inc(l_os_readable_ops);
+ logger->inc(l_os_readable_bytes, o->bytes);
+ }
+
if (next_finish == o->op) {
dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
<< " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl;
posr = &default_osr;
if (posr->p) {
osr = (OpSequencer *)posr->p;
- dout(10) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
+ dout(5) << "queue_transactions existing osr " << osr << "/" << osr->parent << dendl; //<< " w/ q " << osr->q << dendl;
} else {
osr = new OpSequencer;
osr->parent = posr;
posr->p = osr;
- dout(10) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+ dout(5) << "queue_transactions new osr " << osr << "/" << osr->parent << dendl;
+ }
+
+ if (logger) {
+ logger->inc(l_os_in_ops);
+ //logger->inc(l_os_in_bytes, 1);
}
if (journal && journal->is_writeable()) {
op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
uint64_t op = op_submit_start();
- dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+ dout(5) << "queue_transactions (parallel) " << op << " " << tls << dendl;
_op_journal_transactions(tls, op, ondisk);
op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
uint64_t op = op_submit_start();
- dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+ dout(5) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
osr->queue_journal(op);
_op_journal_transactions(tls, op,
new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
}
uint64_t op = op_submit_start();
- dout(10) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
+ dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
_op_apply_start(op);
int r = do_transactions(tls, op);
op_submit_finish(op);
op_apply_finish(op);
+ if (logger) {
+ logger->inc(l_os_readable_ops);
+ //fixme logger->inc(l_os_readable_bytes, 1);
+ }
+
return r;
}
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
{
- dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
+ dout(5) << "_journaled_ahead " << op << " " << tls << dendl;
op_queue_throttle();
timer.add_event_after(g_conf.filestore_commit_timeout, sync_entry_timeo);
sync_entry_timeo_lock.Unlock();
+ if (logger)
+ logger->set(l_os_committing, 1);
+
// make flusher stop flushing previously queued stuff
sync_epoch++;
dout(10) << "sync_entry commit took " << done << dendl;
commit_finish();
+ if (logger)
+ logger->set(l_os_committing, 0);
+
// remove old snaps?
if (do_snap) {
while (snaps.size() > 2) {