From: John Spray Date: Wed, 23 Jul 2014 11:48:57 +0000 (+0100) Subject: osdc: locking for Journaler X-Git-Tag: v0.86~213^2~50 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=db7369bb2ce16b92bed88cc475b606d26080d858;p=ceph.git osdc: locking for Journaler also remove the lock pointers that JournalPointer load/save used to require in order to use the objecter. Signed-off-by: John Spray --- diff --git a/src/mds/JournalPointer.cc b/src/mds/JournalPointer.cc index 8c8c54c08de9..0ceb758a9b75 100644 --- a/src/mds/JournalPointer.cc +++ b/src/mds/JournalPointer.cc @@ -40,21 +40,17 @@ std::string JournalPointer::get_object_id() const /** * Blocking read of JournalPointer for this MDS */ -int JournalPointer::load(Objecter *objecter, Mutex *lock) +int JournalPointer::load(Objecter *objecter) { - assert(lock != NULL); assert(objecter != NULL); - assert(!lock->is_locked_by_me()); // Blocking read of data std::string const object_id = get_object_id(); dout(4) << "Reading journal pointer '" << object_id << "'" << dendl; bufferlist data; C_SaferCond waiter; - lock->Lock(); objecter->read_full(object_t(object_id), object_locator_t(pool_id), CEPH_NOSNAP, &data, 0, &waiter); - lock->Unlock(); int r = waiter.wait(); // Construct JournalPointer result, null or decoded data @@ -73,11 +69,9 @@ int JournalPointer::load(Objecter *objecter, Mutex *lock) * * @return objecter write op status code */ -int JournalPointer::save(Objecter *objecter, Mutex *lock) const +int JournalPointer::save(Objecter *objecter) const { - assert(lock != NULL); assert(objecter != NULL); - assert(!lock->is_locked_by_me()); // It is not valid to persist a null pointer assert(!is_null()); @@ -91,10 +85,8 @@ int JournalPointer::save(Objecter *objecter, Mutex *lock) const << std::hex << front << ":0x" << back << std::dec << dendl; C_SaferCond waiter; - lock->Lock(); objecter->write_full(object_t(object_id), object_locator_t(pool_id), SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter); - lock->Unlock(); int write_result = waiter.wait(); if (write_result < 0) { derr << "Error writing pointer object '" << object_id << "': " << cpp_strerror(write_result) << dendl; diff --git a/src/mds/JournalPointer.h b/src/mds/JournalPointer.h index 7724de35ef3c..559543b44a1f 100644 --- a/src/mds/JournalPointer.h +++ b/src/mds/JournalPointer.h @@ -57,8 +57,8 @@ class JournalPointer { JournalPointer() : node_id(-1), pool_id(-1), front(0), back(0) {} - int load(Objecter *objecter, Mutex *lock); - int save(Objecter *objecter, Mutex *lock) const; + int load(Objecter *objecter); + int save(Objecter *objecter) const; void save(Objecter *objecter, Context *completion) const; bool is_null() const { diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 1eb6096665e3..7e44ea273f37 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -700,11 +700,11 @@ void MDLog::_recovery_thread(Context *completion) // If the pointer object is not present, then create it with // front = default ino and back = null JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool()); - int const read_result = jp.load(mds->objecter, &(mds->mds_lock)); + int const read_result = jp.load(mds->objecter); if (read_result == -ENOENT) { inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); jp.front = default_log_ino; - int write_result = jp.save(mds->objecter, &(mds->mds_lock)); + int write_result = jp.save(mds->objecter); // Nothing graceful we can do for this assert(write_result >= 0); } else if (read_result != 0) { @@ -753,7 +753,7 @@ void MDLog::_recovery_thread(Context *completion) } else { dout(1) << "Successfully erased journal, updating journal pointer" << dendl; jp.back = 0; - int write_result = jp.save(mds->objecter, &(mds->mds_lock)); + int write_result = jp.save(mds->objecter); // Nothing graceful we can do for this assert(write_result >= 0); } @@ -823,7 +823,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid(); inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid(); jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino); - int write_result = jp.save(mds->objecter, &(mds->mds_lock)); + int write_result = jp.save(mds->objecter); assert(write_result == 0); /* Create the new Journaler file */ @@ -907,7 +907,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa inodeno_t const tmp = jp.front; jp.front = jp.back; jp.back = tmp; - write_result = jp.save(mds->objecter, &(mds->mds_lock)); + write_result = jp.save(mds->objecter); assert(write_result == 0); /* Delete the old journal to free space */ @@ -922,7 +922,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa /* Update the pointer to reflect we're back in clean single journal state. */ jp.back = 0; - write_result = jp.save(mds->objecter, &(mds->mds_lock)); + write_result = jp.save(mds->objecter); assert(write_result == 0); /* Reset the Journaler object to its default state */ diff --git a/src/mds/MDLog.h b/src/mds/MDLog.h index ac27efc094cc..f3b3a04f989d 100644 --- a/src/mds/MDLog.h +++ b/src/mds/MDLog.h @@ -59,6 +59,8 @@ class PerfCounters; #include using std::map; +#include "common/Finisher.h" + class MDLog { public: diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 94a7fd2a2559..b4b51b19a7fd 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -27,24 +27,30 @@ void Journaler::set_readonly() { + Mutex::Locker l(lock); + ldout(cct, 1) << "set_readonly" << dendl; readonly = true; } void Journaler::set_writeable() { + Mutex::Locker l(lock); + ldout(cct, 1) << "set_writeable" << dendl; readonly = false; } void Journaler::create(ceph_file_layout *l, stream_format_t const sf) { + Mutex::Locker lk(lock); + assert(!readonly); state = STATE_ACTIVE; stream_format = sf; journal_stream.set_format(sf); - set_layout(l); + _set_layout(l); prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos = requested_pos = received_pos = @@ -54,7 +60,13 @@ void Journaler::create(ceph_file_layout *l, stream_format_t const sf) << ", format=" << stream_format << dendl; } -void Journaler::set_layout(ceph_file_layout *l) +void Journaler::set_layout(ceph_file_layout const *l) +{ + Mutex::Locker lk(lock); + _set_layout(l); +} + +void Journaler::_set_layout(ceph_file_layout const *l) { layout = *l; @@ -127,6 +139,8 @@ public: void Journaler::recover(Context *onread) { + Mutex::Locker l(lock); + ldout(cct, 1) << "recover start" << dendl; assert(state != STATE_ACTIVE); assert(readonly); @@ -154,6 +168,12 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl) objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish); } +void Journaler::reread_head(Context *onfinish) +{ + Mutex::Locker l(lock); + _reread_head(onfinish); +} + /** * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos * from the on-disk header. This switches the state to STATE_REREADHEAD for @@ -162,7 +182,7 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl) * Also, don't call this until the Journaler has finished its recovery and has * gone STATE_ACTIVE! */ -void Journaler::reread_head(Context *onfinish) +void Journaler::_reread_head(Context *onfinish) { ldout(cct, 10) << "reread_head" << dendl; assert(state == STATE_ACTIVE); @@ -174,6 +194,8 @@ void Journaler::reread_head(Context *onfinish) void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) { + Mutex::Locker l(lock); + //read on-disk header into assert(bl.length() || r < 0 ); @@ -191,6 +213,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) void Journaler::_finish_read_head(int r, bufferlist& bl) { + Mutex::Locker l(lock); + assert(state == STATE_READHEAD); if (r!=0) { @@ -238,7 +262,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) trimmed_pos = trimming_pos = h.trimmed_pos; init_headers(h); - set_layout(&h.layout); + _set_layout(&h.layout); stream_format = h.stream_format; journal_stream.set_format(h.stream_format); @@ -259,6 +283,8 @@ void Journaler::probe(Context *finish, uint64_t *end) void Journaler::reprobe(Context *finish) { + Mutex::Locker l(lock); + ldout(cct, 10) << "reprobe" << dendl; assert(state == STATE_ACTIVE); @@ -268,7 +294,10 @@ void Journaler::reprobe(Context *finish) } -void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) { +void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) +{ + Mutex::Locker l(lock); + assert(new_end >= write_pos || r < 0); ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." @@ -280,6 +309,8 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) { void Journaler::_finish_probe_end(int r, uint64_t end) { + Mutex::Locker l(lock); + assert(state == STATE_PROBING); if (r < 0) { // error in probing goto out; @@ -322,12 +353,17 @@ public: void Journaler::reread_head_and_probe(Context *onfinish) { + Mutex::Locker l(lock); + assert(state == STATE_ACTIVE); - reread_head(new C_RereadHeadProbe(this, onfinish)); + _reread_head(new C_RereadHeadProbe(this, onfinish)); } void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish) { + // Expect to be called back from finish_reread_head, which already takes lock + assert(lock.is_locked_by_me()); + assert(!r); //if we get an error, we're boned reprobe(onfinish); } @@ -347,6 +383,13 @@ public: }; void Journaler::write_head(Context *oncommit) +{ + Mutex::Locker l(lock); + _write_head(oncommit); +} + + +void Journaler::_write_head(Context *oncommit) { assert(!readonly); assert(state == STATE_ACTIVE); @@ -376,6 +419,8 @@ void Journaler::write_head(Context *oncommit) void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit) { + Mutex::Locker l(lock); + if (r < 0) { lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl; handle_write_error(r); @@ -388,7 +433,7 @@ void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit) oncommit->complete(r); } - trim(); // trim? + _trim(); // trim? } @@ -407,7 +452,9 @@ public: void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp) { + Mutex::Locker l(lock); assert(!readonly); + if (r < 0) { lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl; handle_write_error(r); @@ -452,6 +499,8 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp) uint64_t Journaler::append_entry(bufferlist& bl) { + Mutex::Locker l(lock); + assert(!readonly); uint32_t s = bl.length(); @@ -569,8 +618,13 @@ void Journaler::_do_flush(unsigned amount) } - void Journaler::wait_for_flush(Context *onsafe) +{ + Mutex::Locker l(lock); + _wait_for_flush(onsafe); +} + +void Journaler::_wait_for_flush(Context *onsafe) { assert(!readonly); @@ -592,6 +646,12 @@ void Journaler::wait_for_flush(Context *onsafe) } void Journaler::flush(Context *onsafe) +{ + Mutex::Locker l(lock); + _flush(onsafe); +} + +void Journaler::_flush(Context *onsafe) { assert(!readonly); @@ -603,29 +663,25 @@ void Journaler::flush(Context *onsafe) onsafe->complete(0); } } else { - if (1) { - // maybe buffer - if (write_buf.length() < cct->_conf->journaler_batch_max) { - // delay! schedule an event. - ldout(cct, 20) << "flush delaying flush" << dendl; - if (delay_flush_event) - timer->cancel_event(delay_flush_event); - delay_flush_event = new C_DelayFlush(this); - timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event); - } else { - ldout(cct, 20) << "flush not delaying flush" << dendl; - _do_flush(); + // maybe buffer + if (write_buf.length() < cct->_conf->journaler_batch_max) { + // delay! schedule an event. + ldout(cct, 20) << "flush delaying flush" << dendl; + if (delay_flush_event) { + timer->cancel_event(delay_flush_event); } + delay_flush_event = new C_DelayFlush(this); + timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event); } else { - // always flush + ldout(cct, 20) << "flush not delaying flush" << dendl; _do_flush(); } - wait_for_flush(onsafe); + _wait_for_flush(onsafe); } // write head? if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) { - write_head(); + _write_head(); } } @@ -637,7 +693,7 @@ struct C_Journaler_Prezero : public Context { uint64_t from, len; C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) : journaler(j), from(f), len(l) {} void finish(int r) { - journaler->_prezeroed(r, from, len); + journaler->_finish_prezero(r, from, len); } }; @@ -678,8 +734,10 @@ void Journaler::_issue_prezero() } } -void Journaler::_prezeroed(int r, uint64_t start, uint64_t len) +void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) { + Mutex::Locker l(lock); + ldout(cct, 10) << "_prezeroed to " << start << "~" << len << ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos << ", pending " << pending_zero @@ -733,13 +791,15 @@ class Journaler::C_RetryRead : public Context { public: C_RetryRead(Journaler *l) : ls(l) {} void finish(int r) { - // kickstart. + Mutex::Locker l(ls->lock); ls->_prefetch(); } }; void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl) { + Mutex::Locker l(lock); + if (r < 0) { ldout(cct, 0) << "_finish_read got error " << r << dendl; error = r; @@ -809,8 +869,9 @@ void Journaler::_issue_read(uint64_t len) if (requested_pos == safe_pos) { ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; assert(write_pos > requested_pos); - if (flush_pos == safe_pos) - flush(); + if (flush_pos == safe_pos) { + _flush(NULL); + } assert(flush_pos > safe_pos); waitfor_safe[flush_pos].push_back(new C_RetryRead(this)); return; @@ -939,7 +1000,7 @@ class Journaler::C_EraseFinish : public Context { public: C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {} void finish(int r) { - journaler->_erase_finish(r, completion); + journaler->_finish_erase(r, completion); } }; @@ -949,19 +1010,23 @@ class Journaler::C_EraseFinish : public Context { */ void Journaler::erase(Context *completion) { + Mutex::Locker l(lock); + // Async delete the journal data uint64_t first = trimmed_pos / get_layout_period(); uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0, new C_EraseFinish(this, completion)); - // We will not start the operation to delete the header until _erase_finish has + // We will not start the operation to delete the header until _finish_erase has // seen the data deletion succeed: otherwise if there was an error deleting data // we might prematurely delete the header thereby lose our reference to the data. } -void Journaler::_erase_finish(int data_result, Context *completion) +void Journaler::_finish_erase(int data_result, Context *completion) { + Mutex::Locker l(lock); + if (data_result == 0) { // Async delete the journal header filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0, @@ -1019,11 +1084,17 @@ class Journaler::C_Trim : public Context { public: C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {} void finish(int r) { - ls->_trim_finish(r, to); + ls->_finish_trim(r, to); } }; void Journaler::trim() +{ + Mutex::Locker l(lock); + _trim(); +} + +void Journaler::_trim() { assert(!readonly); uint64_t period = get_layout_period(); @@ -1062,15 +1133,17 @@ void Journaler::trim() trimming_pos = trim_to; } -void Journaler::_trim_finish(int r, uint64_t to) +void Journaler::_finish_trim(int r, uint64_t to) { + Mutex::Locker l(lock); + assert(!readonly); - ldout(cct, 10) << "_trim_finish trimmed_pos was " << trimmed_pos + ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos << ", trimmed/trimming/expire now " << to << "/" << trimming_pos << "/" << expire_pos << dendl; if (r < 0 && r != -ENOENT) { - lderr(cct) << "_trim_finish got " << cpp_strerror(r) << dendl; + lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl; handle_write_error(r); return; } diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 19e724d38094..76192353aea6 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -109,7 +109,8 @@ class JournalStream class Journaler { public: // this goes at the head of the log "file". - struct Header { + class Header { + public: uint64_t trimmed_pos; uint64_t expire_pos; uint64_t unused_field; @@ -186,7 +187,7 @@ public: ls.push_back(new Header()); ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT; } - } last_written, last_committed; + }; WRITE_CLASS_ENCODER(Header) uint32_t get_stream_format() const { @@ -196,6 +197,9 @@ public: private: // me CephContext *cct; + Mutex lock; + Header last_written; + Header last_committed; inodeno_t ino; int64_t pg_pool; bool readonly; @@ -220,11 +224,20 @@ private: public: C_DelayFlush(Journaler *j) : journaler(j) {} void finish(int r) { - journaler->delay_flush_event = 0; - journaler->_do_flush(); + journaler->_do_delayed_flush(); } } *delay_flush_event; + /* + * Do a flush as a result of a C_DelayFlush context. + */ + void _do_delayed_flush() + { + assert(delay_flush_event != NULL); + Mutex::Locker l(lock); + delay_flush_event = NULL; + _do_flush(); + } // my state static const int STATE_UNDEF = 0; @@ -237,12 +250,18 @@ private: int state; int error; + void _write_head(Context *oncommit=NULL); + void _wait_for_flush(Context *onsafe); + void _trim(); + // header utime_t last_wrote_head; void _finish_write_head(int r, Header &wrote, Context *oncommit); class C_WriteHead; friend class C_WriteHead; + void _reread_head(Context *onfinish); + void _set_layout(ceph_file_layout const *l); list waitfor_recover; void read_head(Context *on_finish, bufferlist *bl); void _finish_read_head(int r, bufferlist& bl); @@ -275,6 +294,7 @@ private: std::set pending_safe; std::map > waitfor_safe; // when safe through given offset + void _flush(Context *onsafe); void _do_flush(unsigned amount=0); void _finish_flush(int r, uint64_t start, utime_t stamp); class C_Flush; @@ -297,6 +317,7 @@ private: Context *on_write_error; void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback + void _finish_retry_read(int r); void _assimilate_prefetch(); void _issue_read(uint64_t len); // read some more void _prefetch(); // maybe read ahead @@ -311,12 +332,12 @@ private: uint64_t trimmed_pos; // what has been trimmed map > waitfor_trim; - void _trim_finish(int r, uint64_t to); + void _finish_trim(int r, uint64_t to); class C_Trim; friend class C_Trim; void _issue_prezero(); - void _prezeroed(int r, uint64_t from, uint64_t len); + void _finish_prezero(int r, uint64_t from, uint64_t len); friend struct C_Journaler_Prezero; // only init_headers when following or first reading off-disk @@ -338,9 +359,14 @@ private: bool _is_readable(); + void _finish_erase(int data_result, Context *completion); + class C_EraseFinish; + friend class C_EraseFinish; + public: Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) : - last_written(mag), last_committed(mag), cct(obj->cct), + cct(obj->cct), lock("Journaler"), + last_written(mag), last_committed(mag), ino(ino_), pg_pool(pool), readonly(true), stream_format(-1), journal_stream(-1), magic(mag), @@ -363,9 +389,11 @@ public: * an "erase" method. */ void reset() { + Mutex::Locker l(lock); assert(state == STATE_ACTIVE); + readonly = true; - delay_flush_event = 0; + delay_flush_event = NULL; state = STATE_UNDEF; error = 0; prezeroing_pos = 0; @@ -384,58 +412,51 @@ public: waiting_for_zero = false; } + // Asynchronous operations + // ======================= void erase(Context *completion); -private: - void _erase_finish(int data_result, Context *completion); - class C_EraseFinish; - friend class C_EraseFinish; - -public: void create(ceph_file_layout *layout, stream_format_t const sf); void recover(Context *onfinish); void reread_head(Context *onfinish); void reprobe(Context *onfinish); void reread_head_and_probe(Context *onfinish); void write_head(Context *onsave=0); - - void set_layout(ceph_file_layout *l); - - void set_readonly(); - void set_writeable(); - bool is_readonly() { return readonly; } - - bool is_active() { return state == STATE_ACTIVE; } - int get_error() { return error; } - - uint64_t get_write_pos() const { return write_pos; } - uint64_t get_write_safe_pos() const { return safe_pos; } - uint64_t get_read_pos() const { return read_pos; } - uint64_t get_expire_pos() const { return expire_pos; } - uint64_t get_trimmed_pos() const { return trimmed_pos; } - - uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; } - ceph_file_layout& get_layout() { return layout; } - - // write - uint64_t append_entry(bufferlist& bl); void wait_for_flush(Context *onsafe = 0); void flush(Context *onsafe = 0); + void wait_for_readable(Context *onfinish); - // read + // Synchronous setters + // =================== + void set_layout(ceph_file_layout const *l); + void set_readonly(); + void set_writeable(); + void set_write_pos(int64_t p) { + Mutex::Locker l(lock); + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; + } void set_read_pos(int64_t p) { + Mutex::Locker l(lock); assert(requested_pos == received_pos); // we can't cope w/ in-progress read right now. read_pos = requested_pos = received_pos = p; read_buf.clear(); } - - bool is_readable(); - bool try_read_entry(bufferlist& bl); - void wait_for_readable(Context *onfinish); - - void set_write_pos(int64_t p) { - prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p; + uint64_t append_entry(bufferlist& bl); + void set_expire_pos(int64_t ep) { + Mutex::Locker l(lock); + expire_pos = ep; + } + void set_trimmed_pos(int64_t p) { + Mutex::Locker l(lock); + trimming_pos = trimmed_pos = p; } + void trim(); + void trim_tail() { + Mutex::Locker l(lock); + + assert(!readonly); + _issue_prezero(); + } /** * set write error callback * @@ -451,19 +472,25 @@ public: * @param c callback/context to trigger on error */ void set_write_error_handler(Context *c) { + Mutex::Locker l(lock); assert(!on_write_error); on_write_error = c; } - // trim - void set_expire_pos(int64_t ep) { expire_pos = ep; } - void set_trimmed_pos(int64_t p) { trimming_pos = trimmed_pos = p; } - - void trim(); - void trim_tail() { - assert(!readonly); - _issue_prezero(); - } + // Synchronous getters + // =================== + uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; } + ceph_file_layout& get_layout() { return layout; } + bool is_active() { return state == STATE_ACTIVE; } + int get_error() { return error; } + bool is_readonly() { return readonly; } + bool is_readable(); + bool try_read_entry(bufferlist& bl); + uint64_t get_write_pos() const { return write_pos; } + uint64_t get_write_safe_pos() const { return safe_pos; } + uint64_t get_read_pos() const { return read_pos; } + uint64_t get_expire_pos() const { return expire_pos; } + uint64_t get_trimmed_pos() const { return trimmed_pos; } }; WRITE_CLASS_ENCODER(Journaler::Header) diff --git a/src/tools/cephfs/Dumper.cc b/src/tools/cephfs/Dumper.cc index 665fa88a5050..5070f5ea14a9 100644 --- a/src/tools/cephfs/Dumper.cc +++ b/src/tools/cephfs/Dumper.cc @@ -40,7 +40,7 @@ int Dumper::init(int rank_) } JournalPointer jp(rank, mdsmap->get_metadata_pool()); - int jp_load_result = jp.load(objecter, &lock); + int jp_load_result = jp.load(objecter); if (jp_load_result != 0) { std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl; return jp_load_result; diff --git a/src/tools/cephfs/Resetter.cc b/src/tools/cephfs/Resetter.cc index fde2dddd161a..bcfa10eec231 100644 --- a/src/tools/cephfs/Resetter.cc +++ b/src/tools/cephfs/Resetter.cc @@ -31,7 +31,7 @@ void Resetter::reset(int rank) int r; JournalPointer jp(rank, mdsmap->get_metadata_pool()); - int jp_load_result = jp.load(objecter, &lock); + int jp_load_result = jp.load(objecter); if (jp_load_result != 0) { std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl; return;