*
*/
-#include "Journaler.h"
-
-#include "include/Context.h"
#include "common/ProfLogger.h"
+#include "common/dout.h"
+#include "include/Context.h"
#include "msg/Messenger.h"
-
-#include "common/config.h"
+#include "osdc/Journaler.h"
#define DOUT_SUBSYS journaler
#undef dout_prefix
void Journaler::set_readonly()
{
- dout(1) << "set_readonly" << dendl;
+ ldout(cct, 1) << "set_readonly" << dendl;
readonly = true;
}
void Journaler::set_writeable()
{
- dout(1) << "set_writeable" << dendl;
+ ldout(cct, 1) << "set_writeable" << dendl;
readonly = false;
}
void Journaler::create(ceph_file_layout *l)
{
assert(!readonly);
- dout(1) << "create blank journal" << dendl;
+ ldout(cct, 1) << "create blank journal" << dendl;
state = STATE_ACTIVE;
set_layout(l);
// prefetch intelligently.
// (watch out, this is big if you use big objects or weird striping)
- uint64_t periods = g_conf->journaler_prefetch_periods;
+ uint64_t periods = cct->_conf->journaler_prefetch_periods;
if (periods < 2)
periods = 2; // we need at least 2 periods to make progress.
fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods;
void Journaler::recover(Context *onread)
{
- dout(1) << "recover start" << dendl;
+ ldout(cct, 1) << "recover start" << dendl;
assert(state != STATE_ACTIVE);
assert(readonly);
waitfor_recover.push_back(onread);
if (state != STATE_UNDEF) {
- dout(1) << "recover - already recoverying" << dendl;
+ ldout(cct, 1) << "recover - already recoverying" << dendl;
return;
}
- dout(1) << "read_head" << dendl;
+ ldout(cct, 1) << "read_head" << dendl;
state = STATE_READHEAD;
C_ReadHead *fin = new C_ReadHead(this);
read_head(fin, &fin->bl);
*/
void Journaler::reread_head(Context *onfinish)
{
- dout(10) << "reread_head" << dendl;
+ ldout(cct, 10) << "reread_head" << dendl;
assert(state == STATE_ACTIVE);
state = STATE_REREADHEAD;
assert(state == STATE_READHEAD);
if (bl.length() == 0) {
- dout(1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl;
+ ldout(cct, 1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl;
state = STATE_ACTIVE;
list<Context*> ls;
ls.swap(waitfor_recover);
- finish_contexts(g_ceph_context, ls, 0);
+ finish_contexts(cct, ls, 0);
return;
}
::decode(h, p);
if (h.magic != magic) {
- dout(0) << "on disk magic '" << h.magic << "' != my magic '"
+ ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
<< magic << "'" << dendl;
list<Context*> ls;
ls.swap(waitfor_recover);
- finish_contexts(g_ceph_context, ls, -EINVAL);
+ finish_contexts(cct, ls, -EINVAL);
return;
}
init_headers(h);
set_layout(&h.layout);
- dout(1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl;
+ ldout(cct, 1) << "_finish_read_head " << h << ". probing for end of log (from " << write_pos << ")..." << dendl;
C_ProbeEnd *fin = new C_ProbeEnd(this);
state = STATE_PROBING;
probe(fin, &fin->end);
void Journaler::probe(Context *finish, uint64_t *end)
{
- dout(1) << "probing for end of the log" << dendl;
+ ldout(cct, 1) << "probing for end of the log" << dendl;
assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
void Journaler::reprobe(Context *finish)
{
- dout(10) << "reprobe" << dendl;
+ ldout(cct, 10) << "reprobe" << dendl;
assert(state == STATE_ACTIVE);
state = STATE_REPROBING;
void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) {
assert(new_end >= write_pos);
assert(r >= 0);
- dout(1) << "_finish_reprobe new_end = " << new_end
+ ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
<< dendl;
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end;
if (((int64_t)end) == -1) {
end = write_pos;
- dout(1) << "_finish_probe_end write_pos = " << end
+ ldout(cct, 1) << "_finish_probe_end write_pos = " << end
<< " (header had " << write_pos << "). log was empty. recovered."
<< dendl;
assert(0); // hrm.
} else {
assert(end >= write_pos);
assert(r >= 0);
- dout(1) << "_finish_probe_end write_pos = " << end
+ ldout(cct, 1) << "_finish_probe_end write_pos = " << end
<< " (header had " << write_pos << "). recovered."
<< dendl;
}
// done.
list<Context*> ls;
ls.swap(waitfor_recover);
- finish_contexts(g_ceph_context, ls, 0);
+ finish_contexts(cct, ls, 0);
}
class Journaler::C_RereadHeadProbe : public Context
last_written.expire_pos = expire_pos;
last_written.unused_field = expire_pos;
last_written.write_pos = safe_pos;
- dout(10) << "write_head " << last_written << dendl;
+ ldout(cct, 10) << "write_head " << last_written << dendl;
- last_wrote_head = ceph_clock_now(g_ceph_context);
+ last_wrote_head = ceph_clock_now(cct);
bufferlist bl;
::encode(last_written, bl);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
- objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(g_ceph_context), 0,
+ objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0,
NULL,
new C_WriteHead(this, last_written, oncommit));
}
{
assert(r >= 0); // we can't really recover from write errors here
assert(!readonly);
- dout(10) << "_finish_write_head " << wrote << dendl;
+ ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
last_committed = wrote;
if (oncommit) {
oncommit->finish(0);
// calc latency?
if (logger) {
- utime_t lat = ceph_clock_now(g_ceph_context);
+ utime_t lat = ceph_clock_now(cct);
lat -= stamp;
logger->favg(logger_key_lat, lat);
}
else
safe_pos = *pending_safe.begin();
- dout(10) << "_finish_flush safe from " << start
+ ldout(cct, 10) << "_finish_flush safe from " << start
<< ", pending_safe " << pending_safe
<< ", (prezeroing/prezero)/write/flush/safe positions now "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
while (!waitfor_safe.empty()) {
if (waitfor_safe.begin()->first > safe_pos)
break;
- finish_contexts(g_ceph_context, waitfor_safe.begin()->second);
+ finish_contexts(cct, waitfor_safe.begin()->second);
waitfor_safe.erase(waitfor_safe.begin());
}
}
assert(!readonly);
uint32_t s = bl.length();
- if (!g_conf->journaler_allow_split_entries) {
+ if (!cct->_conf->journaler_allow_split_entries) {
// will we span a stripe boundary?
int p = layout.fl_stripe_unit;
if (write_pos / p != (write_pos + (int64_t)(bl.length() + sizeof(s))) / p) {
// now flush.
flush();
- dout(12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << dendl;
+ ldout(cct, 12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << dendl;
}
}
- dout(10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
+ ldout(cct, 10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
// append
::encode(s, write_buf);
uint64_t write_obj = write_pos / su;
uint64_t flush_obj = flush_pos / su;
if (write_obj != flush_obj) {
- dout(10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
+ ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
}
int64_t newlen = prezero_pos - flush_pos - period;
if (newlen <= 0) {
- dout(10) << "_do_flush wanted to do " << flush_pos << "~" << len
+ ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
<< " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl;
waiting_for_zero = true;
return;
}
if (newlen < len) {
- dout(10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos
+ ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos
<< ", will do " << flush_pos << "~" << newlen << dendl;
len = newlen;
}
}
- dout(10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
+ ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
// submit write for anything pending
// flush _start_ pos to _finish_flush
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
SnapContext snapc;
Context *onsafe = new C_Flush(this, flush_pos, now); // on COMMIT
}
filer.write(ino, &layout, snapc,
- flush_pos, len, write_bl, ceph_clock_now(g_ceph_context),
+ flush_pos, len, write_bl, ceph_clock_now(cct),
0,
NULL, onsafe);
flush_pos += len;
assert(write_buf.length() == write_pos - flush_pos);
- dout(10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
+ ldout(cct, 10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
_issue_prezero();
// all flushed and safe?
if (write_pos == safe_pos) {
assert(write_buf.length() == 0);
- dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
+ ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
if (onsafe) {
onsafe->finish(0);
if (write_pos == flush_pos) {
assert(write_buf.length() == 0);
- dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
+ ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
<< "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
if (onsafe) {
onsafe->finish(0);
} else {
if (1) {
// maybe buffer
- if (write_buf.length() < g_conf->journaler_batch_max) {
+ if (write_buf.length() < cct->_conf->journaler_batch_max) {
// delay! schedule an event.
- dout(20) << "flush delaying flush" << dendl;
+ ldout(cct, 20) << "flush delaying flush" << dendl;
if (delay_flush_event)
timer->cancel_event(delay_flush_event);
delay_flush_event = new C_DelayFlush(this);
- timer->add_event_after(g_conf->journaler_batch_interval, delay_flush_event);
+ timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event);
} else {
- dout(20) << "flush not delaying flush" << dendl;
+ ldout(cct, 20) << "flush not delaying flush" << dendl;
_do_flush();
}
} else {
}
// write head?
- if (last_wrote_head.sec() + g_conf->journaler_write_head_interval < ceph_clock_now(g_ceph_context).sec()) {
+ if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) {
write_head();
}
}
// we need to zero at least two periods, minimum, to ensure that we have a full
// empty object/period in front of us.
- uint64_t num_periods = MAX(2, g_conf->journaler_prezero_periods);
+ uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
/*
* issue zero requests based on write_pos, even though the invariant
to -= to % period;
if (prezeroing_pos >= to) {
- dout(20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl;
+ ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl;
return;
}
uint64_t len;
if (prezeroing_pos % period == 0) {
len = period;
- dout(10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl;
+ ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl;
} else {
len = period - (prezeroing_pos % period);
- dout(10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
+ ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
}
SnapContext snapc;
Context *c = new C_Journaler_Prezero(this, prezeroing_pos, len);
- filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(g_ceph_context), 0, NULL, c);
+ filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
prezeroing_pos += len;
}
}
void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
{
- dout(10) << "_prezeroed to " << start << "~" << len
+ ldout(cct, 10) << "_prezeroed to " << start << "~" << len
<< ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos
<< ", pending " << pending_zero
<< dendl;
} else {
pending_zero.insert(start, len);
}
- dout(10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos
+ ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos
<< ", pending " << pending_zero
<< dendl;
}
void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
{
if (r < 0) {
- dout(0) << "_finish_read got error " << r << dendl;
+ ldout(cct, 0) << "_finish_read got error " << r << dendl;
error = r;
if (on_readable) {
Context *f = on_readable;
}
assert(r>=0);
- dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
+ ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
prefetch_buf[offset].swap(bl);
_assimilate_prefetch();
map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
if (p->first != received_pos) {
uint64_t gap = p->first - received_pos;
- dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
+ ldout(cct, 10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
<< " to first prefetched buffer " << p->first << dendl;
break;
}
- dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
+ ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
received_pos += p->second.length();
read_buf.claim_append(p->second);
assert(received_pos <= requested_pos);
}
if (got_any)
- dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length()
+ ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length()
<< ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
<< dendl;
if ((got_any && !was_readable && _is_readable()) ||
read_pos == write_pos) {
// readable!
- dout(10) << "_finish_read now readable (or at journal end)" << dendl;
+ ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl;
if (on_readable) {
Context *f = on_readable;
on_readable = 0;
// (this is needed if we are reading the tail of a journal we are also writing to)
assert(requested_pos <= safe_pos);
if (requested_pos == safe_pos) {
- dout(10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
+ ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
assert(write_pos > requested_pos);
if (flush_pos == safe_pos)
flush();
// don't read too much
if (requested_pos + len > safe_pos) {
len = safe_pos - requested_pos;
- dout(10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl;
+ ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl;
}
// go.
- dout(10) << "_issue_read reading " << requested_pos << "~" << len
+ ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
<< ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
<< dendl;
// prefetch
uint64_t pf;
if (temp_fetch_len) {
- dout(10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
+ ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
pf = temp_fetch_len;
temp_fetch_len = 0;
} else {
if (requested_pos < target) {
uint64_t len = target - requested_pos;
- dout(10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
+ ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
<< " (" << raw_target << "), prefetching " << len << dendl;
_issue_read(len);
}
// partial fragment at the end?
if (received_pos == write_pos) {
- dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
+ ldout(cct, 10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos;
uint64_t need = (sizeof(s)+s-read_buf.length());
if (need > fetch_len) {
- dout(10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+ ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
<< " for len " << s << " entry" << dendl;
temp_fetch_len = need;
}
bool Journaler::try_read_entry(bufferlist& bl)
{
if (!is_readable()) { // this may start a read.
- dout(10) << "try_read_entry at " << read_pos << " not readable" << dendl;
+ ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl;
return false;
}
}
assert(read_buf.length() >= sizeof(s) + s);
- dout(10) << "try_read_entry at " << read_pos << " reading "
+ ldout(cct, 10) << "try_read_entry at " << read_pos << " reading "
<< read_pos << "~" << (sizeof(s)+s) << " (have " << read_buf.length() << ")" << dendl;
if (s == 0) {
- dout(0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
+ ldout(cct, 0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
error = -EINVAL;
return false;
}
void Journaler::wait_for_readable(Context *onreadable)
{
- dout(10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
+ ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
assert(!_is_readable());
assert(on_readable == 0);
on_readable = onreadable;
uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
- dout(10) << "trim last_commited head was " << last_committed
+ ldout(cct, 10) << "trim last_commited head was " << last_committed
<< ", can trim to " << trim_to
<< dendl;
if (trim_to == 0 || trim_to == trimming_pos) {
- dout(10) << "trim already trimmed/trimming to "
+ ldout(cct, 10) << "trim already trimmed/trimming to "
<< trimmed_pos << "/" << trimming_pos << dendl;
return;
}
if (trimming_pos > trimmed_pos) {
- dout(10) << "trim already trimming atm, try again later. trimmed/trimming is "
+ ldout(cct, 10) << "trim already trimming atm, try again later. trimmed/trimming is "
<< trimmed_pos << "/" << trimming_pos << dendl;
return;
}
assert(trim_to <= write_pos);
assert(trim_to <= expire_pos);
assert(trim_to > trimming_pos);
- dout(10) << "trim trimming to " << trim_to
+ ldout(cct, 10) << "trim trimming to " << trim_to
<< ", trimmed/trimming/expire are "
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
<< dendl;
uint64_t first = trimming_pos / period;
uint64_t num = (trim_to - trimming_pos) / period;
SnapContext snapc;
- filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(g_ceph_context), 0,
+ filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0,
new C_Trim(this, trim_to));
trimming_pos = trim_to;
}
void Journaler::_trim_finish(int r, uint64_t to)
{
assert(!readonly);
- dout(10) << "_trim_finish trimmed_pos was " << trimmed_pos
+ ldout(cct, 10) << "_trim_finish trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
<< dendl;
// finishers?
while (!waitfor_trim.empty() &&
waitfor_trim.begin()->first <= trimmed_pos) {
- finish_contexts(g_ceph_context, waitfor_trim.begin()->second, 0);
+ finish_contexts(cct, waitfor_trim.begin()->second, 0);
waitfor_trim.erase(waitfor_trim.begin());
}
}