/**
* Blocking read of JournalPointer for this MDS
*/
-int JournalPointer::load(Objecter *objecter, Mutex *lock)
+int JournalPointer::load(Objecter *objecter)
{
- assert(lock != NULL);
assert(objecter != NULL);
- assert(!lock->is_locked_by_me());
// Blocking read of data
std::string const object_id = get_object_id();
dout(4) << "Reading journal pointer '" << object_id << "'" << dendl;
bufferlist data;
C_SaferCond waiter;
- lock->Lock();
objecter->read_full(object_t(object_id), object_locator_t(pool_id),
CEPH_NOSNAP, &data, 0, &waiter);
- lock->Unlock();
int r = waiter.wait();
// Construct JournalPointer result, null or decoded data
*
* @return objecter write op status code
*/
-int JournalPointer::save(Objecter *objecter, Mutex *lock) const
+int JournalPointer::save(Objecter *objecter) const
{
- assert(lock != NULL);
assert(objecter != NULL);
- assert(!lock->is_locked_by_me());
// It is not valid to persist a null pointer
assert(!is_null());
<< std::hex << front << ":0x" << back << std::dec << dendl;
C_SaferCond waiter;
- lock->Lock();
objecter->write_full(object_t(object_id), object_locator_t(pool_id),
SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter);
- lock->Unlock();
int write_result = waiter.wait();
if (write_result < 0) {
derr << "Error writing pointer object '" << object_id << "': " << cpp_strerror(write_result) << dendl;
JournalPointer() : node_id(-1), pool_id(-1), front(0), back(0) {}
- int load(Objecter *objecter, Mutex *lock);
- int save(Objecter *objecter, Mutex *lock) const;
+ int load(Objecter *objecter);
+ int save(Objecter *objecter) const;
void save(Objecter *objecter, Context *completion) const;
bool is_null() const {
// If the pointer object is not present, then create it with
// front = default ino and back = null
JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
- int const read_result = jp.load(mds->objecter, &(mds->mds_lock));
+ int const read_result = jp.load(mds->objecter);
if (read_result == -ENOENT) {
inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
jp.front = default_log_ino;
- int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+ int write_result = jp.save(mds->objecter);
// Nothing graceful we can do for this
assert(write_result >= 0);
} else if (read_result != 0) {
} else {
dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
jp.back = 0;
- int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+ int write_result = jp.save(mds->objecter);
// Nothing graceful we can do for this
assert(write_result >= 0);
}
inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
- int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+ int write_result = jp.save(mds->objecter);
assert(write_result == 0);
/* Create the new Journaler file */
inodeno_t const tmp = jp.front;
jp.front = jp.back;
jp.back = tmp;
- write_result = jp.save(mds->objecter, &(mds->mds_lock));
+ write_result = jp.save(mds->objecter);
assert(write_result == 0);
/* Delete the old journal to free space */
/* Update the pointer to reflect we're back in clean single journal state. */
jp.back = 0;
- write_result = jp.save(mds->objecter, &(mds->mds_lock));
+ write_result = jp.save(mds->objecter);
assert(write_result == 0);
/* Reset the Journaler object to its default state */
#include <map>
using std::map;
+#include "common/Finisher.h"
+
class MDLog {
public:
void Journaler::set_readonly()
{
+ Mutex::Locker l(lock);
+
ldout(cct, 1) << "set_readonly" << dendl;
readonly = true;
}
void Journaler::set_writeable()
{
+ Mutex::Locker l(lock);
+
ldout(cct, 1) << "set_writeable" << dendl;
readonly = false;
}
void Journaler::create(ceph_file_layout *l, stream_format_t const sf)
{
+ Mutex::Locker lk(lock);
+
assert(!readonly);
state = STATE_ACTIVE;
stream_format = sf;
journal_stream.set_format(sf);
- set_layout(l);
+ _set_layout(l);
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
read_pos = requested_pos = received_pos =
<< ", format=" << stream_format << dendl;
}
-void Journaler::set_layout(ceph_file_layout *l)
+void Journaler::set_layout(ceph_file_layout const *l)
+{
+ Mutex::Locker lk(lock);
+ _set_layout(l);
+}
+
+void Journaler::_set_layout(ceph_file_layout const *l)
{
layout = *l;
void Journaler::recover(Context *onread)
{
+ Mutex::Locker l(lock);
+
ldout(cct, 1) << "recover start" << dendl;
assert(state != STATE_ACTIVE);
assert(readonly);
objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish);
}
+void Journaler::reread_head(Context *onfinish)
+{
+ Mutex::Locker l(lock);
+ _reread_head(onfinish);
+}
+
/**
* Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
* from the on-disk header. This switches the state to STATE_REREADHEAD for
* Also, don't call this until the Journaler has finished its recovery and has
* gone STATE_ACTIVE!
*/
-void Journaler::reread_head(Context *onfinish)
+void Journaler::_reread_head(Context *onfinish)
{
ldout(cct, 10) << "reread_head" << dendl;
assert(state == STATE_ACTIVE);
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
{
+ Mutex::Locker l(lock);
+
//read on-disk header into
assert(bl.length() || r < 0 );
void Journaler::_finish_read_head(int r, bufferlist& bl)
{
+ Mutex::Locker l(lock);
+
assert(state == STATE_READHEAD);
if (r!=0) {
trimmed_pos = trimming_pos = h.trimmed_pos;
init_headers(h);
- set_layout(&h.layout);
+ _set_layout(&h.layout);
stream_format = h.stream_format;
journal_stream.set_format(h.stream_format);
void Journaler::reprobe(Context *finish)
{
+ Mutex::Locker l(lock);
+
ldout(cct, 10) << "reprobe" << dendl;
assert(state == STATE_ACTIVE);
}
-void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) {
+void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish)
+{
+ Mutex::Locker l(lock);
+
assert(new_end >= write_pos || r < 0);
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
void Journaler::_finish_probe_end(int r, uint64_t end)
{
+ Mutex::Locker l(lock);
+
assert(state == STATE_PROBING);
if (r < 0) { // error in probing
goto out;
void Journaler::reread_head_and_probe(Context *onfinish)
{
+ Mutex::Locker l(lock);
+
assert(state == STATE_ACTIVE);
- reread_head(new C_RereadHeadProbe(this, onfinish));
+ _reread_head(new C_RereadHeadProbe(this, onfinish));
}
void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish)
{
+ // Expect to be called back from finish_reread_head, which already takes lock
+ assert(lock.is_locked_by_me());
+
assert(!r); //if we get an error, we're boned
reprobe(onfinish);
}
};
void Journaler::write_head(Context *oncommit)
+{
+ Mutex::Locker l(lock);
+ _write_head(oncommit);
+}
+
+
+void Journaler::_write_head(Context *oncommit)
{
assert(!readonly);
assert(state == STATE_ACTIVE);
void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
{
+ Mutex::Locker l(lock);
+
if (r < 0) {
lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
handle_write_error(r);
oncommit->complete(r);
}
- trim(); // trim?
+ _trim(); // trim?
}
void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
{
+ Mutex::Locker l(lock);
assert(!readonly);
+
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
handle_write_error(r);
uint64_t Journaler::append_entry(bufferlist& bl)
{
+ Mutex::Locker l(lock);
+
assert(!readonly);
uint32_t s = bl.length();
}
-
void Journaler::wait_for_flush(Context *onsafe)
+{
+ Mutex::Locker l(lock);
+ _wait_for_flush(onsafe);
+}
+
+void Journaler::_wait_for_flush(Context *onsafe)
{
assert(!readonly);
}
void Journaler::flush(Context *onsafe)
+{
+ Mutex::Locker l(lock);
+ _flush(onsafe);
+}
+
+void Journaler::_flush(Context *onsafe)
{
assert(!readonly);
onsafe->complete(0);
}
} else {
- if (1) {
- // maybe buffer
- if (write_buf.length() < cct->_conf->journaler_batch_max) {
- // delay! schedule an event.
- ldout(cct, 20) << "flush delaying flush" << dendl;
- if (delay_flush_event)
- timer->cancel_event(delay_flush_event);
- delay_flush_event = new C_DelayFlush(this);
- timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event);
- } else {
- ldout(cct, 20) << "flush not delaying flush" << dendl;
- _do_flush();
+ // maybe buffer
+ if (write_buf.length() < cct->_conf->journaler_batch_max) {
+ // delay! schedule an event.
+ ldout(cct, 20) << "flush delaying flush" << dendl;
+ if (delay_flush_event) {
+ timer->cancel_event(delay_flush_event);
}
+ delay_flush_event = new C_DelayFlush(this);
+ timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event);
} else {
- // always flush
+ ldout(cct, 20) << "flush not delaying flush" << dendl;
_do_flush();
}
- wait_for_flush(onsafe);
+ _wait_for_flush(onsafe);
}
// write head?
if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) {
- write_head();
+ _write_head();
}
}
uint64_t from, len;
C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) : journaler(j), from(f), len(l) {}
void finish(int r) {
- journaler->_prezeroed(r, from, len);
+ journaler->_finish_prezero(r, from, len);
}
};
}
}
-void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
+void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
{
+ Mutex::Locker l(lock);
+
ldout(cct, 10) << "_prezeroed to " << start << "~" << len
<< ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos
<< ", pending " << pending_zero
public:
C_RetryRead(Journaler *l) : ls(l) {}
void finish(int r) {
- // kickstart.
+ Mutex::Locker l(ls->lock);
ls->_prefetch();
}
};
void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
{
+ Mutex::Locker l(lock);
+
if (r < 0) {
ldout(cct, 0) << "_finish_read got error " << r << dendl;
error = r;
if (requested_pos == safe_pos) {
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
assert(write_pos > requested_pos);
- if (flush_pos == safe_pos)
- flush();
+ if (flush_pos == safe_pos) {
+ _flush(NULL);
+ }
assert(flush_pos > safe_pos);
waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
return;
public:
C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {}
void finish(int r) {
- journaler->_erase_finish(r, completion);
+ journaler->_finish_erase(r, completion);
}
};
*/
void Journaler::erase(Context *completion)
{
+ Mutex::Locker l(lock);
+
// Async delete the journal data
uint64_t first = trimmed_pos / get_layout_period();
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
new C_EraseFinish(this, completion));
- // We will not start the operation to delete the header until _erase_finish has
+ // We will not start the operation to delete the header until _finish_erase has
// seen the data deletion succeed: otherwise if there was an error deleting data
// we might prematurely delete the header thereby lose our reference to the data.
}
-void Journaler::_erase_finish(int data_result, Context *completion)
+void Journaler::_finish_erase(int data_result, Context *completion)
{
+ Mutex::Locker l(lock);
+
if (data_result == 0) {
// Async delete the journal header
filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
public:
C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
void finish(int r) {
- ls->_trim_finish(r, to);
+ ls->_finish_trim(r, to);
}
};
void Journaler::trim()
+{
+ Mutex::Locker l(lock);
+ _trim();
+}
+
+void Journaler::_trim()
{
assert(!readonly);
uint64_t period = get_layout_period();
trimming_pos = trim_to;
}
-void Journaler::_trim_finish(int r, uint64_t to)
+void Journaler::_finish_trim(int r, uint64_t to)
{
+ Mutex::Locker l(lock);
+
assert(!readonly);
- ldout(cct, 10) << "_trim_finish trimmed_pos was " << trimmed_pos
+ ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
<< dendl;
if (r < 0 && r != -ENOENT) {
- lderr(cct) << "_trim_finish got " << cpp_strerror(r) << dendl;
+ lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
handle_write_error(r);
return;
}
class Journaler {
public:
// this goes at the head of the log "file".
- struct Header {
+ class Header {
+ public:
uint64_t trimmed_pos;
uint64_t expire_pos;
uint64_t unused_field;
ls.push_back(new Header());
ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
}
- } last_written, last_committed;
+ };
WRITE_CLASS_ENCODER(Header)
uint32_t get_stream_format() const {
private:
// me
CephContext *cct;
+ Mutex lock;
+ Header last_written;
+ Header last_committed;
inodeno_t ino;
int64_t pg_pool;
bool readonly;
public:
C_DelayFlush(Journaler *j) : journaler(j) {}
void finish(int r) {
- journaler->delay_flush_event = 0;
- journaler->_do_flush();
+ journaler->_do_delayed_flush();
}
} *delay_flush_event;
+ /*
+ * Do a flush as a result of a C_DelayFlush context.
+ */
+ void _do_delayed_flush()
+ {
+ assert(delay_flush_event != NULL);
+ Mutex::Locker l(lock);
+ delay_flush_event = NULL;
+ _do_flush();
+ }
// my state
static const int STATE_UNDEF = 0;
int state;
int error;
+ void _write_head(Context *oncommit=NULL);
+ void _wait_for_flush(Context *onsafe);
+ void _trim();
+
// header
utime_t last_wrote_head;
void _finish_write_head(int r, Header &wrote, Context *oncommit);
class C_WriteHead;
friend class C_WriteHead;
+ void _reread_head(Context *onfinish);
+ void _set_layout(ceph_file_layout const *l);
list<Context*> waitfor_recover;
void read_head(Context *on_finish, bufferlist *bl);
void _finish_read_head(int r, bufferlist& bl);
std::set<uint64_t> pending_safe;
std::map<uint64_t, std::list<Context*> > waitfor_safe; // when safe through given offset
+ void _flush(Context *onsafe);
void _do_flush(unsigned amount=0);
void _finish_flush(int r, uint64_t start, utime_t stamp);
class C_Flush;
Context *on_write_error;
void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
+ void _finish_retry_read(int r);
void _assimilate_prefetch();
void _issue_read(uint64_t len); // read some more
void _prefetch(); // maybe read ahead
uint64_t trimmed_pos; // what has been trimmed
map<uint64_t, list<Context*> > waitfor_trim;
- void _trim_finish(int r, uint64_t to);
+ void _finish_trim(int r, uint64_t to);
class C_Trim;
friend class C_Trim;
void _issue_prezero();
- void _prezeroed(int r, uint64_t from, uint64_t len);
+ void _finish_prezero(int r, uint64_t from, uint64_t len);
friend struct C_Journaler_Prezero;
// only init_headers when following or first reading off-disk
bool _is_readable();
+ void _finish_erase(int data_result, Context *completion);
+ class C_EraseFinish;
+ friend class C_EraseFinish;
+
public:
Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) :
- last_written(mag), last_committed(mag), cct(obj->cct),
+ cct(obj->cct), lock("Journaler"),
+ last_written(mag), last_committed(mag),
ino(ino_), pg_pool(pool), readonly(true),
stream_format(-1), journal_stream(-1),
magic(mag),
* an "erase" method.
*/
void reset() {
+ Mutex::Locker l(lock);
assert(state == STATE_ACTIVE);
+
readonly = true;
- delay_flush_event = 0;
+ delay_flush_event = NULL;
state = STATE_UNDEF;
error = 0;
prezeroing_pos = 0;
waiting_for_zero = false;
}
+ // Asynchronous operations
+ // =======================
void erase(Context *completion);
-private:
- void _erase_finish(int data_result, Context *completion);
- class C_EraseFinish;
- friend class C_EraseFinish;
-
-public:
void create(ceph_file_layout *layout, stream_format_t const sf);
void recover(Context *onfinish);
void reread_head(Context *onfinish);
void reprobe(Context *onfinish);
void reread_head_and_probe(Context *onfinish);
void write_head(Context *onsave=0);
-
- void set_layout(ceph_file_layout *l);
-
- void set_readonly();
- void set_writeable();
- bool is_readonly() { return readonly; }
-
- bool is_active() { return state == STATE_ACTIVE; }
- int get_error() { return error; }
-
- uint64_t get_write_pos() const { return write_pos; }
- uint64_t get_write_safe_pos() const { return safe_pos; }
- uint64_t get_read_pos() const { return read_pos; }
- uint64_t get_expire_pos() const { return expire_pos; }
- uint64_t get_trimmed_pos() const { return trimmed_pos; }
-
- uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
- ceph_file_layout& get_layout() { return layout; }
-
- // write
- uint64_t append_entry(bufferlist& bl);
void wait_for_flush(Context *onsafe = 0);
void flush(Context *onsafe = 0);
+ void wait_for_readable(Context *onfinish);
- // read
+ // Synchronous setters
+ // ===================
+ void set_layout(ceph_file_layout const *l);
+ void set_readonly();
+ void set_writeable();
+ void set_write_pos(int64_t p) {
+ Mutex::Locker l(lock);
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+ }
void set_read_pos(int64_t p) {
+ Mutex::Locker l(lock);
assert(requested_pos == received_pos); // we can't cope w/ in-progress read right now.
read_pos = requested_pos = received_pos = p;
read_buf.clear();
}
-
- bool is_readable();
- bool try_read_entry(bufferlist& bl);
- void wait_for_readable(Context *onfinish);
-
- void set_write_pos(int64_t p) {
- prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+ uint64_t append_entry(bufferlist& bl);
+ void set_expire_pos(int64_t ep) {
+ Mutex::Locker l(lock);
+ expire_pos = ep;
+ }
+ void set_trimmed_pos(int64_t p) {
+ Mutex::Locker l(lock);
+ trimming_pos = trimmed_pos = p;
}
+ void trim();
+ void trim_tail() {
+ Mutex::Locker l(lock);
+
+ assert(!readonly);
+ _issue_prezero();
+ }
/**
* set write error callback
*
* @param c callback/context to trigger on error
*/
void set_write_error_handler(Context *c) {
+ Mutex::Locker l(lock);
assert(!on_write_error);
on_write_error = c;
}
- // trim
- void set_expire_pos(int64_t ep) { expire_pos = ep; }
- void set_trimmed_pos(int64_t p) { trimming_pos = trimmed_pos = p; }
-
- void trim();
- void trim_tail() {
- assert(!readonly);
- _issue_prezero();
- }
+ // Synchronous getters
+ // ===================
+ uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
+ ceph_file_layout& get_layout() { return layout; }
+ bool is_active() { return state == STATE_ACTIVE; }
+ int get_error() { return error; }
+ bool is_readonly() { return readonly; }
+ bool is_readable();
+ bool try_read_entry(bufferlist& bl);
+ uint64_t get_write_pos() const { return write_pos; }
+ uint64_t get_write_safe_pos() const { return safe_pos; }
+ uint64_t get_read_pos() const { return read_pos; }
+ uint64_t get_expire_pos() const { return expire_pos; }
+ uint64_t get_trimmed_pos() const { return trimmed_pos; }
};
WRITE_CLASS_ENCODER(Journaler::Header)
}
JournalPointer jp(rank, mdsmap->get_metadata_pool());
- int jp_load_result = jp.load(objecter, &lock);
+ int jp_load_result = jp.load(objecter);
if (jp_load_result != 0) {
std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl;
return jp_load_result;
int r;
JournalPointer jp(rank, mdsmap->get_metadata_pool());
- int jp_load_result = jp.load(objecter, &lock);
+ int jp_load_result = jp.load(objecter);
if (jp_load_result != 0) {
std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl;
return;