void Journaler::recover(Context *onread)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
onread->complete(-EAGAIN);
return;
}
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
finish->complete(-EAGAIN);
return;
}
void Journaler::_finish_read_head(int r, bufferlist& bl)
{
lock_guard l(lock);
- if (is_stopping())
+ if (state == STATE_STOPPING)
return;
ceph_assert(state == STATE_READHEAD);
C_OnFinisher *onfinish)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
onfinish->complete(-EAGAIN);
return;
}
void Journaler::_finish_probe_end(int r, uint64_t end)
{
lock_guard l(lock);
- if (is_stopping())
+ if (state == STATE_STOPPING)
return;
ceph_assert(state == STATE_PROBING);
{
// Expect to be called back from finish_reread_head, which already takes lock
// lock is locked
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
onfinish->complete(-EAGAIN);
return;
}
write_pos += wrote;
// flush previous object?
- uint64_t su = get_layout_period();
+ uint64_t su = layout.get_period();
ceph_assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
void Journaler::_do_flush(unsigned amount)
{
- if (is_stopping())
+ if (state == STATE_STOPPING)
return;
if (write_pos == flush_pos)
return;
// zero at least two full periods ahead. this ensures
// that the next object will not exist.
- uint64_t period = get_layout_period();
+ uint64_t period = layout.get_period();
if (flush_pos + len + 2*period > prezero_pos) {
_issue_prezero();
void Journaler::wait_for_flush(Context *onsafe)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
if (onsafe)
onsafe->complete(-EAGAIN);
return;
void Journaler::flush(Context *onsafe)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
if (onsafe)
onsafe->complete(-EAGAIN);
return;
* issue zero requests based on write_pos, even though the invariant
* is that we zero ahead of flush_pos.
*/
- uint64_t period = get_layout_period();
+ uint64_t period = layout.get_period();
uint64_t to = write_pos + period * num_periods + period - 1;
to -= to % period;
// here because it will wait for all object reads to complete before
// giving us back any data. this way we can process whatever bits
// come in that are contiguous.
- uint64_t period = get_layout_period();
+ uint64_t period = layout.get_period();
while (len > 0) {
uint64_t e = requested_pos + period;
e -= e % period;
void Journaler::_prefetch()
{
- if (is_stopping())
+ if (state == STATE_STOPPING)
return;
ldout(cct, 10) << "_prefetch" << dendl;
uint64_t raw_target = read_pos + pf;
// read full log segments, so increase if necessary
- uint64_t period = get_layout_period();
+ uint64_t period = layout.get_period();
uint64_t remainder = raw_target % period;
uint64_t adjustment = remainder ? period - remainder : 0;
uint64_t target = raw_target + adjustment;
lock_guard l(lock);
// Async delete the journal data
- uint64_t first = trimmed_pos / get_layout_period();
- uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
+ uint64_t first = trimmed_pos / layout.get_period();
+ uint64_t num = (write_pos - trimmed_pos) / layout.get_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num,
ceph::real_clock::now(), 0,
wrap_finisher(new C_EraseFinish(
void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
{
lock_guard l(lock);
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
completion->complete(-EAGAIN);
return;
}
void Journaler::_wait_for_readable(Context *onreadable)
{
- if (is_stopping()) {
+ if (state == STATE_STOPPING) {
finisher->queue(onreadable, -EAGAIN);
return;
}
void Journaler::_trim()
{
- if (is_stopping())
+ if (state == STATE_STOPPING)
return;
ceph_assert(!readonly);
- uint64_t period = get_layout_period();
+ uint64_t period = layout.get_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
ldout(cct, 10) << "trim last_commited head was " << last_committed
{
std::unique_lock l(lock);
while (!_is_readable() &&
- get_read_pos() < get_write_pos() &&
- !get_error()) {
+ read_pos < write_pos &&
+ !error) {
C_SaferCond readable_waiter;
_wait_for_readable(&readable_waiter);
l.unlock();
private:
// me
CephContext *cct;
- std::mutex lock;
+ mutable ceph::mutex lock;
const std::string name;
- typedef std::lock_guard<std::mutex> lock_guard;
- typedef std::unique_lock<std::mutex> unique_lock;
+ typedef std::lock_guard<ceph::mutex> lock_guard;
+ typedef std::unique_lock<ceph::mutex> unique_lock;
Finisher *finisher;
Header last_written;
inodeno_t ino;
Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
last_committed(mag),
- cct(obj->cct), name(name_), finisher(f), last_written(mag),
+ cct(obj->cct), lock(ceph::make_mutex("Journaler::" + name_)), name(name_), finisher(f), last_written(mag),
ino(ino_), pg_pool(pool), readonly(true),
stream_format(-1), journal_stream(-1),
magic(mag),
// Synchronous getters
// ===================
- // TODO: need some locks on reads for true safety
uint64_t get_layout_period() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
return layout.get_period();
}
- file_layout_t& get_layout() { return layout; }
- bool is_active() { return state == STATE_ACTIVE; }
- bool is_stopping() { return state == STATE_STOPPING; }
- int get_error() { return error; }
- bool is_readonly() { return readonly; }
+ file_layout_t get_layout() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return layout;
+ }
+ bool is_active() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return state == STATE_ACTIVE;
+ }
+ bool is_stopping() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return state == STATE_STOPPING;
+ }
+ int get_error() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return error;
+ }
+ bool is_readonly() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return readonly;
+ }
bool is_readable();
bool _is_readable();
bool try_read_entry(bufferlist& bl);
- uint64_t get_write_pos() const { return write_pos; }
- uint64_t get_write_safe_pos() const { return safe_pos; }
- uint64_t get_read_pos() const { return read_pos; }
- uint64_t get_expire_pos() const { return expire_pos; }
- uint64_t get_trimmed_pos() const { return trimmed_pos; }
+ uint64_t get_write_pos() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return write_pos;
+ }
+ uint64_t get_write_safe_pos() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return safe_pos;
+ }
+ uint64_t get_read_pos() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return read_pos;
+ }
+ uint64_t get_expire_pos() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return expire_pos;
+ }
+ uint64_t get_trimmed_pos() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
+ return trimmed_pos;
+ }
size_t get_journal_envelope_size() const {
+ ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+ lock_guard l(lock);
return journal_stream.get_envelope_size();
}
void check_isreadable();