uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
m_commit_tid(commit_tid),
- m_lock("FutureImpl::m_lock", false, false), m_safe(false),
+ m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
}
FlushHandlers flush_handlers;
FutureImplPtr prev_future;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
complete = (m_safe && m_consistent);
if (!complete) {
if (on_safe != nullptr) {
}
FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return prepare_flush(flush_handlers, m_lock);
}
FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
- Mutex &lock) {
- ceph_assert(m_lock.is_locked());
+ ceph::mutex &lock) {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;
void FutureImpl::wait(Context *on_safe) {
ceph_assert(on_safe != NULL);
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (!m_safe || !m_consistent) {
m_contexts.push_back(on_safe);
return;
}
bool FutureImpl::is_complete() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_safe && m_consistent;
}
int FutureImpl::get_return_value() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_safe && m_consistent);
return m_return_value;
}
bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_flush_handler);
m_flush_handler = flush_handler;
return m_flush_state != FLUSH_STATE_NONE;
}
void FutureImpl::safe(int r) {
- m_lock.Lock();
+ m_lock.lock();
ceph_assert(!m_safe);
m_safe = true;
if (m_return_value == 0) {
if (m_consistent) {
finish_unlock();
} else {
- m_lock.Unlock();
+ m_lock.unlock();
}
}
void FutureImpl::consistent(int r) {
- m_lock.Lock();
+ m_lock.lock();
ceph_assert(!m_consistent);
m_consistent = true;
m_prev_future.reset();
if (m_safe) {
finish_unlock();
} else {
- m_lock.Unlock();
+ m_lock.unlock();
}
}
void FutureImpl::finish_unlock() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_safe && m_consistent);
Contexts contexts;
contexts.swap(m_contexts);
- m_lock.Unlock();
+ m_lock.unlock();
for (Contexts::iterator it = contexts.begin();
it != contexts.end(); ++it) {
(*it)->complete(m_return_value);
#define CEPH_JOURNAL_FUTURE_IMPL_H
#include "include/int_types.h"
-#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "include/Context.h"
#include "journal/Future.h"
int get_return_value() const;
inline bool is_flush_in_progress() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
}
inline void set_flush_in_progress() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_flush_handler);
m_flush_handler.reset();
m_flush_state = FLUSH_STATE_IN_PROGRESS;
bool attach(const FlushHandlerPtr &flush_handler);
inline void detach() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_flush_handler.reset();
}
inline FlushHandlerPtr get_flush_handler() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_flush_handler;
}
uint64_t m_entry_tid;
uint64_t m_commit_tid;
- mutable Mutex m_lock;
+ mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
FutureImplPtr m_prev_future;
bool m_safe;
bool m_consistent;
Contexts m_contexts;
FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
- FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, Mutex &lock);
+ FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
void consistent(int r);
void finish_unlock();
} // anonymous namespace
JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock, librados::IoCtx &ioctx,
+ ceph::mutex *timer_lock, librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
const Settings &settings)
m_client_id(client_id), m_settings(settings), m_order(0),
m_splay_width(0), m_pool_id(-1), m_initialized(false),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
- m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+ m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
m_commit_position_task_ctx(NULL) {
}
JournalMetadata::~JournalMetadata() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_initialized);
}
void JournalMetadata::init(Context *on_finish) {
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_initialized);
m_initialized = true;
}
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
<< cpp_strerror(r) << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_watch_handle = 0;
on_finish->complete(r);
return;
uint64_t watch_handle = 0;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_initialized = false;
std::swap(watch_handle, m_watch_handle);
}
}
void JournalMetadata::add_listener(JournalMetadataListener *listener) {
- Mutex::Locker locker(m_lock);
- while (m_update_notifications > 0) {
- m_update_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
m_listeners.push_back(listener);
}
void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
- Mutex::Locker locker(m_lock);
- while (m_update_notifications > 0) {
- m_update_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_update_cond.wait(locker, [this] {
+ return m_update_notifications <= 0;
+ });
m_listeners.remove(listener);
}
void JournalMetadata::set_minimum_set(uint64_t object_set) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
<< ", new=" << object_set << dendl;
}
void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
<< ", new=" << object_set << dendl;
}
void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
m_async_op_tracker,
void JournalMetadata::flush_commit_position(Context *on_safe) {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
// nothing to flush
if (on_safe != nullptr) {
}
void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
if (allocated_entry_tid <= entry_tid) {
allocated_entry_tid = entry_tid + 1;
bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
uint64_t *entry_tid) const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
if (it == m_allocated_entry_tids.end()) {
ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (on_complete != nullptr) {
m_refresh_ctxs.push_back(on_complete);
}
void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
- m_lock.Lock();
+ m_lock.lock();
if (r == 0) {
Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
m_client = *it;
++m_update_notifications;
- m_lock.Unlock();
+ m_lock.unlock();
for (Listeners::iterator it = m_listeners.begin();
it != m_listeners.end(); ++it) {
(*it)->handle_update(this);
}
- m_lock.Lock();
+ m_lock.lock();
if (--m_update_notifications == 0) {
- m_update_cond.Signal();
+ m_update_cond.notify_all();
}
} else {
lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
if (m_refreshes_in_progress == 0) {
std::swap(refresh_ctxs, m_refresh_ctxs);
}
- m_lock.Unlock();
+ m_lock.unlock();
for (auto ctx : refresh_ctxs) {
ctx->complete(r);
void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_commit_position_ctx != nullptr);
ceph_assert(m_commit_position_task_ctx != nullptr);
m_timer->cancel_event(m_commit_position_task_ctx);
void JournalMetadata::schedule_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx =
}
void JournalMetadata::handle_commit_position_task() {
- ceph_assert(m_timer_lock->is_locked());
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ldout(m_cct, 20) << __func__ << ": "
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
- m_lock.Lock();
+ m_lock.lock();
ceph_assert(m_flush_commits_in_progress > 0);
--m_flush_commits_in_progress;
if (m_flush_commits_in_progress == 0) {
std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
}
- m_lock.Unlock();
+ m_lock.unlock();
commit_position_ctx->complete(0);
for (auto ctx : flush_commit_position_ctxs) {
ctx = new FunctionContext([this, ctx](int r) {
// manually kick of a refresh in case the notification is missed
// and ignore the next notification that we are about to send
- m_lock.Lock();
+ m_lock.lock();
++m_ignore_watch_notifies;
- m_lock.Unlock();
+ m_lock.unlock();
refresh(ctx);
});
}
void JournalMetadata::schedule_watch_reset() {
- ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
m_timer->add_event_after(1, new C_WatchReset(this));
}
void JournalMetadata::handle_watch_reset() {
- ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
if (!m_initialized) {
return;
}
m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_ignore_watch_notifies > 0) {
--m_ignore_watch_notifies;
return;
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
}
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
// release old watch on error
if (m_watch_handle != 0) {
uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
uint64_t tag_tid,
uint64_t entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t commit_tid = ++m_commit_tid;
m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
entry_tid);
void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
uint64_t object_num) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
auto it = m_pending_commit_tids.find(commit_tid);
ceph_assert(it != m_pending_commit_tids.end());
void JournalMetadata::get_commit_entry(uint64_t commit_tid,
uint64_t *object_num,
uint64_t *tag_tid, uint64_t *entry_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
auto it = m_pending_commit_tids.find(commit_tid);
ceph_assert(it != m_pending_commit_tids.end());
ObjectSetPosition commit_position;
Context *stale_ctx = nullptr;
{
- Mutex::Locker timer_locker(*m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::scoped_lock locker{*m_timer_lock, m_lock};
ceph_assert(commit_tid > m_commit_position_tid);
if (!m_commit_position.object_positions.empty()) {
Context *ctx = on_finish;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
for (auto &c : m_registered_clients) {
if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
c.id == m_client_id ||
std::ostream &operator<<(std::ostream &os,
const JournalMetadata &jm) {
- Mutex::Locker locker(jm.m_lock);
+ std::lock_guard locker{jm.m_lock};
os << "[oid=" << jm.m_oid << ", "
<< "initialized=" << jm.m_initialized << ", "
<< "order=" << (int)jm.m_order << ", "
#include "include/rados/librados.hpp"
#include "common/AsyncOpTracker.h"
#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "cls/journal/cls_journal_types.h"
typedef std::set<Client> RegisteredClients;
typedef std::list<Tag> Tags;
- JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
librados::IoCtx &ioctx, const std::string &oid,
const std::string &client_id, const Settings &settings);
~JournalMetadata() override;
inline SafeTimer &get_timer() {
return *m_timer;
}
- inline Mutex &get_timer_lock() {
+ inline ceph::mutex &get_timer_lock() {
return *m_timer_lock;
}
void set_minimum_set(uint64_t object_set);
inline uint64_t get_minimum_set() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_minimum_set;
}
int set_active_set(uint64_t object_set);
void set_active_set(uint64_t object_set, Context *on_finish);
inline uint64_t get_active_set() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_active_set;
}
void flush_commit_position();
void flush_commit_position(Context *on_safe);
void get_commit_position(ObjectSetPosition *commit_position) const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
*commit_position = m_client.commit_position;
}
void get_registered_clients(RegisteredClients *registered_clients) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
*registered_clients = m_registered_clients;
}
inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_allocated_entry_tids[tag_tid]++;
}
void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
journal_metadata->m_async_op_tracker.finish_op();
}
void finish(int r) override {
- Mutex::Locker locker(journal_metadata->m_lock);
+ std::lock_guard locker{journal_metadata->m_lock};
journal_metadata->handle_commit_position_task();
};
};
C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
: journal_metadata(_journal_metadata), on_finish(_on_finish) {
- Mutex::Locker locker(journal_metadata->m_lock);
+ std::lock_guard locker{journal_metadata->m_lock};
journal_metadata->m_async_op_tracker.start_op();
}
~C_ImmutableMetadata() override {
C_Refresh(JournalMetadata *_journal_metadata)
: journal_metadata(_journal_metadata), minimum_set(0), active_set(0) {
- Mutex::Locker locker(journal_metadata->m_lock);
+ std::lock_guard locker{journal_metadata->m_lock};
journal_metadata->m_async_op_tracker.start_op();
}
~C_Refresh() override {
ContextWQ *m_work_queue;
SafeTimer *m_timer;
- Mutex *m_timer_lock;
+ ceph::mutex *m_timer_lock;
- mutable Mutex m_lock;
+ mutable ceph::mutex m_lock = ceph::make_mutex("JournalMetadata::m_lock");
uint64_t m_commit_tid;
CommitTids m_pending_commit_tids;
AllocatedEntryTids m_allocated_entry_tids;
size_t m_update_notifications;
- Cond m_update_cond;
+ ceph::condition_variable m_update_cond;
size_t m_ignore_watch_notifies = 0;
size_t m_refreshes_in_progress = 0;
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
m_cache_manager_handler(cache_manager_handler),
- m_cache_rebalance_handler(this), m_lock("JournalPlayer::m_lock"),
+ m_cache_rebalance_handler(this),
m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false),
m_watch_scheduled(false), m_watch_interval(0) {
m_replay_handler->get();
JournalPlayer::~JournalPlayer() {
ceph_assert(m_async_op_tracker.empty());
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_shut_down);
ceph_assert(m_fetch_object_numbers.empty());
ceph_assert(!m_watch_scheduled);
}
void JournalPlayer::prefetch() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_state == STATE_INIT);
if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) {
void JournalPlayer::prefetch_and_watch(double interval) {
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_watch_enabled = true;
m_watch_interval = interval;
m_watch_step = WATCH_STEP_FETCH_CURRENT;
void JournalPlayer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_shut_down);
m_shut_down = true;
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state != STATE_PLAYBACK) {
m_handler_notified = false;
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
<< "r=" << r << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (r >= 0) {
switch (m_state) {
case STATE_PREFETCH:
int JournalPlayer::process_prefetch(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
int JournalPlayer::process_playback(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (verify_playback_ready()) {
notify_entries_available();
}
bool JournalPlayer::is_object_set_ready() const {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
return false;
}
bool JournalPlayer::verify_playback_ready() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
while (true) {
if (!is_object_set_ready()) {
}
void JournalPlayer::prune_tag(uint64_t tag_tid) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
<< tag_tid << dendl;
}
void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_active_tag_tid);
uint64_t active_tag_tid = *m_active_tag_tid;
}
ObjectPlayerPtr JournalPlayer::get_object_player() const {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
SplayedObjectPlayers::const_iterator it = m_object_players.find(
m_splay_offset);
}
ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
}
void JournalPlayer::advance_splay_object() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
++m_splay_offset;
m_splay_offset %= m_journal_metadata->get_splay_width();
m_watch_step = WATCH_STEP_FETCH_CURRENT;
}
bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(!m_watch_scheduled);
uint8_t splay_width = m_journal_metadata->get_splay_width();
}
void JournalPlayer::fetch(uint64_t object_num) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ObjectPlayerPtr object_player(new ObjectPlayer(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
}
void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
uint64_t object_num = object_player->get_object_number();
std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
<< utils::get_object_name(m_object_oid_prefix, object_num)
<< ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
void JournalPlayer::refetch(bool immediate) {
ldout(m_cct, 10) << __func__ << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
m_handler_notified = false;
// if watching the object, handle the periodic re-fetch
void JournalPlayer::schedule_watch(bool immediate) {
ldout(m_cct, 10) << __func__ << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_watch_scheduled) {
return;
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_watch_scheduled);
m_watch_scheduled = false;
void JournalPlayer::handle_watch_assert_active(int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_watch_scheduled);
m_watch_scheduled = false;
}
void JournalPlayer::notify_entries_available() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_handler_notified) {
return;
}
}
void JournalPlayer::notify_complete(int r) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
}
void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state == STATE_ERROR || m_shut_down) {
return;
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/AsyncOpTracker.h"
-#include "common/Mutex.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectPlayer.h"
#include "journal/Types.h"
AsyncOpTracker m_async_op_tracker;
- mutable Mutex m_lock;
+ mutable ceph::mutex m_lock = ceph::make_mutex("JournalPlayer::m_lock");
State m_state;
uint8_t m_splay_offset;
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata),
m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
- m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
- m_current_set(m_journal_metadata->get_active_set()) {
-
- Mutex::Locker locker(m_lock);
+ m_object_handler(this),
+ m_lock(ceph::make_mutex("JournalerRecorder::m_lock")),
+ m_current_set(m_journal_metadata->get_active_set()),
+ m_object_locks{ceph::make_lock_container<ceph::mutex>(
+ journal_metadata->get_splay_width(), [](const size_t splay_offset) {
+ return ceph::make_mutex("ObjectRecorder::m_lock::" +
+ std::to_string(splay_offset));
+ })}
+{
+
+ std::lock_guard locker{m_lock};
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- shared_ptr<Mutex> object_lock(new Mutex(
- "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
- m_object_locks.push_back(object_lock);
-
uint64_t object_number = splay_offset + (m_current_set * splay_width);
- Mutex::Locker locker(*object_lock);
+ std::lock_guard locker{m_object_locks[splay_offset]};
m_object_ptrs[splay_offset] = create_object_recorder(
- object_number, m_object_locks[splay_offset]);
+ object_number, &m_object_locks[splay_offset]);
}
m_journal_metadata->add_listener(&m_listener);
JournalRecorder::~JournalRecorder() {
m_journal_metadata->remove_listener(&m_listener);
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_in_flight_advance_sets == 0);
ceph_assert(m_in_flight_object_closes == 0);
}
[this, on_safe](int r) {
Context *ctx = nullptr;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_in_flight_advance_sets != 0) {
ceph_assert(m_on_object_set_advanced == nullptr);
m_on_object_set_advanced = new FunctionContext(
<< "flush_bytes=" << flush_bytes << ", "
<< "flush_age=" << flush_age << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_flush_interval = flush_interval;
m_flush_bytes = flush_bytes;
m_flush_age = flush_age;
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+ std::lock_guard object_locker{m_object_locks[splay_offset]};
auto object_recorder = get_object(splay_offset);
object_recorder->set_append_batch_options(flush_interval, flush_bytes,
flush_age);
const bufferlist &payload_bl) {
ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
- m_lock.Lock();
+ m_lock.lock();
uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
uint8_t splay_width = m_journal_metadata->get_splay_width();
future->init(m_prev_future);
m_prev_future = future;
- m_object_locks[splay_offset]->Lock();
- m_lock.Unlock();
+ m_object_locks[splay_offset].lock();
+ m_lock.unlock();
bufferlist entry_bl;
encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
bool object_full = object_ptr->append({{future, entry_bl}});
- m_object_locks[splay_offset]->Unlock();
+ m_object_locks[splay_offset].unlock();
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
- Mutex::Locker l(m_lock);
+ std::lock_guard l{m_lock};
close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
}
return Future(future);
C_Flush *ctx;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
}
ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
ceph_assert(object_recoder != NULL);
}
void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
// entry overflow from open object
if (m_current_set != object_set) {
}
void JournalRecorder::advance_object_set() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ceph_assert(m_in_flight_object_closes == 0);
ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
void JournalRecorder::handle_advance_object_set(int r) {
Context *on_object_set_advanced = nullptr;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
ceph_assert(m_in_flight_advance_sets > 0);
}
void JournalRecorder::open_object_set() {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
bool JournalRecorder::close_object_set(uint64_t active_set) {
ldout(m_cct, 10) << "active_set=" << active_set << dendl;
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
// object recorders will invoke overflow handler as they complete
// closing the object to ensure correct order of future appends
}
ObjectRecorderPtr JournalRecorder::create_object_recorder(
- uint64_t object_number, shared_ptr<Mutex> lock) {
+ uint64_t object_number, ceph::mutex* lock) {
ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
void JournalRecorder::create_next_object_recorder(
ObjectRecorderPtr object_recorder) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
ldout(m_cct, 10) << "object_number=" << object_number << dendl;
- ceph_assert(m_object_locks[splay_offset]->is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset]));
ObjectRecorderPtr new_object_recorder = create_object_recorder(
- (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
+ (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]);
ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
<< "new oid=" << new_object_recorder->get_oid() << dendl;
}
void JournalRecorder::handle_update() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t active_set = m_journal_metadata->get_active_set();
if (m_current_set < active_set) {
void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
+#include "common/containers.h"
#include "journal/Future.h"
#include "journal/FutureImpl.h"
#include "journal/JournalMetadata.h"
Listener m_listener;
ObjectHandler m_object_handler;
- Mutex m_lock;
+ ceph::mutex m_lock;
uint32_t m_in_flight_advance_sets = 0;
uint32_t m_in_flight_object_closes = 0;
uint64_t m_current_set;
ObjectRecorderPtrs m_object_ptrs;
- std::vector<std::shared_ptr<Mutex>> m_object_locks;
+ ceph::containers::tiny_vector<ceph::mutex> m_object_locks;
FutureImplPtr m_prev_future;
void close_and_advance_object_set(uint64_t object_set);
ObjectRecorderPtr create_object_recorder(uint64_t object_number,
- std::shared_ptr<Mutex> lock);
+ ceph::mutex* lock);
void create_next_object_recorder(ObjectRecorderPtr object_recorder);
void handle_update();
void lock_object_recorders() {
for (auto& lock : m_object_locks) {
- lock->Lock();
+ lock.lock();
}
}
void unlock_object_recorders() {
for (auto& lock : m_object_locks) {
- lock->Unlock();
+ lock.unlock();
}
}
};
struct JournalTrimmer::C_RemoveSet : public Context {
JournalTrimmer *journal_trimmer;
uint64_t object_set;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("JournalTrimmer::m_lock");
uint32_t refs;
int return_value;
const JournalMetadataPtr &journal_metadata)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_metadata_listener(this),
- m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
+ m_remove_set_pending(false),
m_remove_set(0), m_remove_set_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
void JournalTrimmer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_shutdown);
m_shutdown = true;
}
ldout(m_cct, 20) << __func__ << dendl;
on_finish = new FunctionContext([this, force, on_finish](int r) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_remove_set_pending) {
on_finish->complete(-EBUSY);
}
void JournalTrimmer::trim_objects(uint64_t minimum_set) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl;
if (minimum_set <= m_journal_metadata->get_minimum_set()) {
}
void JournalTrimmer::remove_set(uint64_t object_set) {
- ceph_assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
m_async_op_tracker.start_op();
uint8_t splay_width = m_journal_metadata->get_splay_width();
void JournalTrimmer::handle_metadata_updated() {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
JournalMetadata::RegisteredClients registered_clients;
m_journal_metadata->get_registered_clients(®istered_clients);
ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", "
<< "trim=" << m_remove_set << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_remove_set_pending = false;
if (r == -ENOENT) {
uint64_t _object_set,
uint8_t _splay_width)
: journal_trimmer(_journal_trimmer), object_set(_object_set),
- lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
+ lock(ceph::make_mutex(utils::unique_lock_name("C_RemoveSet::lock", this))),
refs(_splay_width), return_value(-ENOENT) {
}
void JournalTrimmer::C_RemoveSet::complete(int r) {
- lock.Lock();
+ lock.lock();
if (r < 0 && r != -ENOENT &&
(return_value == -ENOENT || return_value == 0)) {
return_value = r;
if (--refs == 0) {
finish(return_value);
- lock.Unlock();
+ lock.unlock();
delete this;
} else {
- lock.Unlock();
+ lock.unlock();
}
}
#include "include/rados/librados.hpp"
#include "include/Context.h"
#include "common/AsyncOpTracker.h"
-#include "common/Mutex.h"
#include "journal/JournalMetadata.h"
#include "cls/journal/cls_journal_types.h"
#include <functional>
AsyncOpTracker m_async_op_tracker;
- Mutex m_lock;
+ ceph::mutex m_lock = ceph::make_mutex("JournalTrimmer::m_lock");
bool m_remove_set_pending;
uint64_t m_remove_set;
return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
}
-Journaler::Threads::Threads(CephContext *cct)
- : timer_lock("Journaler::timer_lock") {
+Journaler::Threads::Threads(CephContext *cct) {
thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
thread_pool->start();
Journaler::Threads::~Threads() {
{
- Mutex::Locker timer_locker(timer_lock);
+ std::lock_guard timer_locker{timer_lock};
timer->shutdown();
}
delete timer;
}
Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock, librados::IoCtx &header_ioctx,
+ ceph::mutex *timer_lock, librados::IoCtx &header_ioctx,
const std::string &journal_id,
const std::string &client_id, const Settings &settings,
CacheManagerHandler *cache_manager_handler)
}
void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock, librados::IoCtx &header_ioctx,
+ ceph::mutex *timer_lock, librados::IoCtx &header_ioctx,
const std::string &journal_id,
const Settings &settings) {
m_header_ioctx.dup(header_ioctx);
ThreadPool *thread_pool = nullptr;
ContextWQ *work_queue = nullptr;
- SafeTimer *timer = nullptr;
- Mutex timer_lock;
+ SafeTimer *timer;
+ ceph::mutex timer_lock = ceph::make_mutex("Journaler::timer_lock");
};
typedef cls::journal::Tag Tag;
Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
const std::string &client_id, const Settings &settings,
CacheManagerHandler *cache_manager_handler);
- Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ Journaler(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
librados::IoCtx &header_ioctx, const std::string &journal_id,
const std::string &client_id, const Settings &settings,
CacheManagerHandler *cache_manager_handler);
JournalRecorder *m_recorder = nullptr;
JournalTrimmer *m_trimmer = nullptr;
- void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
librados::IoCtx &header_ioctx, const std::string &journal_id,
const Settings &settings);
ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
uint64_t object_num, SafeTimer &timer,
- Mutex &timer_lock, uint8_t order,
+ ceph::mutex &timer_lock, uint8_t order,
uint64_t max_fetch_bytes)
: RefCountedObject(NULL, 0), m_object_num(object_num),
m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
m_watch_interval(0), m_watch_task(NULL),
- m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
+ m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this))),
m_fetch_in_progress(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ObjectPlayer::~ObjectPlayer() {
{
- Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::lock_guard timer_locker{m_timer_lock};
+ std::lock_guard locker{m_lock};
ceph_assert(!m_fetch_in_progress);
ceph_assert(m_watch_ctx == nullptr);
}
void ObjectPlayer::fetch(Context *on_finish) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_fetch_in_progress);
m_fetch_in_progress = true;
void ObjectPlayer::watch(Context *on_fetch, double interval) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
m_watch_interval = interval;
ceph_assert(m_watch_ctx == nullptr);
ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
Context *watch_ctx = nullptr;
{
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
ceph_assert(!m_unwatched);
m_unwatched = true;
}
void ObjectPlayer::front(Entry *entry) const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_entries.empty());
*entry = m_entries.front();
}
void ObjectPlayer::pop_front() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_entries.empty());
auto &entry = m_entries.front();
return 0;
}
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_fetch_in_progress);
m_read_off += bl.length();
m_read_bl.append(bl);
}
void ObjectPlayer::schedule_watch() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
if (m_watch_ctx == NULL) {
return;
}
}
bool ObjectPlayer::cancel_watch() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
if (m_watch_task != nullptr) {
bool canceled = m_timer.cancel_event(m_watch_task);
}
void ObjectPlayer::handle_watch_task() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
ceph_assert(m_watch_ctx != nullptr);
Context *watch_ctx = nullptr;
{
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
std::swap(watch_ctx, m_watch_ctx);
if (m_unwatched) {
r = object_player->handle_fetch_complete(r, read_bl, &refetch);
{
- Mutex::Locker locker(object_player->m_lock);
+ std::lock_guard locker{object_player->m_lock};
object_player->m_fetch_in_progress = false;
}
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/rados/librados.hpp"
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/RefCountedObj.h"
#include "journal/Entry.h"
#include <list>
};
ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
- uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
+ uint64_t object_num, SafeTimer &timer, ceph::mutex &timer_lock,
uint8_t order, uint64_t max_fetch_bytes);
~ObjectPlayer() override;
void front(Entry *entry) const;
void pop_front();
inline bool empty() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_entries.empty();
}
inline void get_entries(Entries *entries) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
*entries = m_entries;
}
inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
*invalid_ranges = m_invalid_ranges;
}
}
inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_max_fetch_bytes = max_fetch_bytes;
}
CephContext *m_cct;
SafeTimer &m_timer;
- Mutex &m_timer_lock;
+ ceph::mutex &m_timer_lock;
uint8_t m_order;
uint64_t m_max_fetch_bytes;
double m_watch_interval;
Context *m_watch_task;
- mutable Mutex m_lock;
+ mutable ceph::mutex m_lock;
bool m_fetch_in_progress;
bufferlist m_read_bl;
uint32_t m_read_off = 0;
namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
- uint64_t object_number, shared_ptr<Mutex> lock,
+ uint64_t object_number, ceph::mutex* lock,
ContextWQ *work_queue, Handler *handler,
uint8_t order, int32_t max_in_flight_appends)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
<< "flush_bytes=" << flush_bytes << ", "
<< "flush_age=" << flush_age << dendl;
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
m_flush_interval = flush_interval;
m_flush_bytes = flush_bytes;
m_flush_age = flush_age;
bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
FutureImplPtr last_flushed_future;
for (auto& append_buffer : append_buffers) {
Future future;
{
- Mutex::Locker locker(*m_lock);
+ std::unique_lock locker{*m_lock};
// if currently handling flush notifications, wait so that
// we notify in the correct order (since lock is dropped on
// callback)
if (m_in_flight_flushes) {
- m_in_flight_flushes_cond.Wait(*(m_lock.get()));
+ m_in_flight_flushes_cond.wait(locker);
}
// attach the flush to the most recent append
void ObjectRecorder::flush(const FutureImplPtr &future) {
ldout(m_cct, 20) << "flushing " << *future << dendl;
- m_lock->Lock();
+ m_lock->lock();
if (future->get_flush_handler().get() != &m_flush_handler) {
// if we don't own this future, re-issue the flush so that it hits the
// correct journal object owner
future->flush();
- m_lock->Unlock();
+ m_lock->unlock();
return;
} else if (future->is_flush_in_progress()) {
- m_lock->Unlock();
+ m_lock->unlock();
return;
}
if (overflowed) {
notify_handler_unlock();
} else {
- m_lock->Unlock();
+ m_lock->unlock();
}
}
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
ldout(m_cct, 20) << dendl;
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
ceph_assert(m_in_flight_tids.empty());
ceph_assert(m_in_flight_appends.empty());
ceph_assert(m_object_closed || m_overflowed);
}
bool ObjectRecorder::close() {
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
ldout(m_cct, 20) << dendl;
send_appends(true, {});
AppendBuffers append_buffers;
{
- m_lock->Lock();
+ m_lock->lock();
auto tid_iter = m_in_flight_tids.find(tid);
ceph_assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
append_overflowed();
notify_handler_unlock();
} else {
- m_lock->Unlock();
+ m_lock->unlock();
}
return;
}
m_in_flight_appends.erase(iter);
m_in_flight_flushes = true;
- m_lock->Unlock();
+ m_lock->unlock();
}
// Flag the associated futures as complete.
}
// wake up any flush requests that raced with a RADOS callback
- m_lock->Lock();
+ m_lock->lock();
m_in_flight_flushes = false;
- m_in_flight_flushes_cond.Signal();
+ m_in_flight_flushes_cond.notify_all();
if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
// all remaining unsent appends should be redirected to new object
if (overflowed) {
notify_handler_unlock();
} else {
- m_lock->Unlock();
+ m_lock->unlock();
}
}
}
void ObjectRecorder::append_overflowed() {
ldout(m_cct, 10) << dendl;
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
ceph_assert(!m_in_flight_appends.empty());
InFlightAppends in_flight_appends;
bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
ldout(m_cct, 20) << dendl;
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
if (m_object_closed || m_overflowed) {
ldout(m_cct, 20) << "already closed or overflowed" << dendl;
return false;
}
void ObjectRecorder::notify_handler_unlock() {
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
if (m_object_closed) {
- m_lock->Unlock();
+ m_lock->unlock();
m_handler->closed(this);
} else {
// TODO need to delay completion until after aio_notify completes
- m_lock->Unlock();
+ m_lock->unlock();
m_handler->overflow(this);
}
}
#include "include/utime.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
-#include "common/Cond.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
};
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
- uint64_t object_number, std::shared_ptr<Mutex> lock,
+ uint64_t object_number, ceph::mutex* lock,
ContextWQ *work_queue, Handler *handler, uint8_t order,
int32_t max_in_flight_appends);
~ObjectRecorder() override;
void claim_append_buffers(AppendBuffers *append_buffers);
bool is_closed() const {
- ceph_assert(m_lock->is_locked());
+ ceph_assert(ceph_mutex_is_locked(*m_lock));
return (m_object_closed && m_in_flight_appends.empty());
}
bool close();
}
inline size_t get_pending_appends() const {
- Mutex::Locker locker(*m_lock);
+ std::lock_guard locker{*m_lock};
return m_pending_buffers.size();
}
FlushHandler m_flush_handler;
- mutable std::shared_ptr<Mutex> m_lock;
+ mutable ceph::mutex* m_lock;
AppendBuffers m_pending_buffers;
uint64_t m_pending_bytes = 0;
utime_t m_last_flush_time;
bufferlist m_prefetch_bl;
bool m_in_flight_flushes;
- Cond m_in_flight_flushes_cond;
+ ceph::condition_variable m_in_flight_flushes_cond;
uint64_t m_in_flight_bytes = 0;
bool send_appends(bool force, FutureImplPtr flush_sentinal);