uint32_t bl_offset = bl.length();
bl.claim_append(data_bl);
encode(crc, bl);
- assert(get_fixed_size() + m_data.length() + bl_offset == bl.length());
+ ceph_assert(get_fixed_size() + m_data.length() + bl_offset == bl.length());
}
void Entry::decode(bufferlist::const_iterator &iter) {
}
void Future::wait(Context *on_safe) {
- assert(on_safe != NULL);
+ ceph_assert(on_safe != NULL);
m_future_impl->wait(on_safe);
}
FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
Mutex &lock) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;
}
void FutureImpl::wait(Context *on_safe) {
- assert(on_safe != NULL);
+ ceph_assert(on_safe != NULL);
{
Mutex::Locker locker(m_lock);
if (!m_safe || !m_consistent) {
int FutureImpl::get_return_value() const {
Mutex::Locker locker(m_lock);
- assert(m_safe && m_consistent);
+ ceph_assert(m_safe && m_consistent);
return m_return_value;
}
bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
Mutex::Locker locker(m_lock);
- assert(!m_flush_handler);
+ ceph_assert(!m_flush_handler);
m_flush_handler = flush_handler;
return m_flush_state != FLUSH_STATE_NONE;
}
void FutureImpl::safe(int r) {
m_lock.Lock();
- assert(!m_safe);
+ ceph_assert(!m_safe);
m_safe = true;
if (m_return_value == 0) {
m_return_value = r;
void FutureImpl::consistent(int r) {
m_lock.Lock();
- assert(!m_consistent);
+ ceph_assert(!m_consistent);
m_consistent = true;
m_prev_future.reset();
if (m_return_value == 0) {
}
void FutureImpl::finish_unlock() {
- assert(m_lock.is_locked());
- assert(m_safe && m_consistent);
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_safe && m_consistent);
Contexts contexts;
contexts.swap(m_contexts);
}
inline void set_flush_in_progress() {
Mutex::Locker locker(m_lock);
- assert(m_flush_handler);
+ ceph_assert(m_flush_handler);
m_flush_handler.reset();
m_flush_state = FLUSH_STATE_IN_PROGRESS;
}
C_GetClient, &C_GetClient::handle_get_client>);
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
out_bl.clear();
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
C_AllocateTag, &C_AllocateTag::handle_tag_create>);
int r = ioctx.aio_operate(oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
out_bl.clear();
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
C_GetTag, &C_GetTag::handle_get_tag>);
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
out_bl.clear();
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
JournalMetadata::~JournalMetadata() {
Mutex::Locker locker(m_lock);
- assert(!m_initialized);
+ ceph_assert(!m_initialized);
}
void JournalMetadata::init(Context *on_finish) {
{
Mutex::Locker locker(m_lock);
- assert(!m_initialized);
+ ceph_assert(!m_initialized);
m_initialized = true;
}
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
r = rados.aio_watch_flush(comp);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
});
on_finish = new FunctionContext([this, on_finish](int r) {
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_unwatch(watch_handle, comp);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
} else {
on_finish->complete(0);
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
void JournalMetadata::unregister_client(Context *on_finish) {
- assert(!m_client_id.empty());
+ ceph_assert(!m_client_id.empty());
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
librados::ObjectWriteOperation op;
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
m_minimum_set = object_set;
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
m_active_set = object_set;
return false;
}
- assert(it->second > 0);
+ ceph_assert(it->second > 0);
*entry_tid = it->second - 1;
return true;
}
}
Contexts refresh_ctxs;
- assert(m_refreshes_in_progress > 0);
+ ceph_assert(m_refreshes_in_progress > 0);
--m_refreshes_in_progress;
if (m_refreshes_in_progress == 0) {
std::swap(refresh_ctxs, m_refresh_ctxs);
void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- assert(m_timer_lock->is_locked());
- assert(m_lock.is_locked());
- assert(m_commit_position_ctx != nullptr);
- assert(m_commit_position_task_ctx != nullptr);
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_commit_position_ctx != nullptr);
+ ceph_assert(m_commit_position_task_ctx != nullptr);
m_timer->cancel_event(m_commit_position_task_ctx);
m_commit_position_task_ctx = NULL;
}
void JournalMetadata::schedule_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- assert(m_timer_lock->is_locked());
- assert(m_lock.is_locked());
- assert(m_commit_position_ctx != nullptr);
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == nullptr) {
m_commit_position_task_ctx =
m_timer->add_event_after(m_settings.commit_interval,
}
void JournalMetadata::handle_commit_position_task() {
- assert(m_timer_lock->is_locked());
- assert(m_lock.is_locked());
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": "
<< "client_id=" << m_client_id << ", "
<< "commit_position=" << m_commit_position << dendl;
Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
m_lock.Lock();
- assert(m_flush_commits_in_progress > 0);
+ ceph_assert(m_flush_commits_in_progress > 0);
--m_flush_commits_in_progress;
if (m_flush_commits_in_progress == 0) {
std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
void JournalMetadata::schedule_watch_reset() {
- assert(m_timer_lock->is_locked());
+ ceph_assert(m_timer_lock->is_locked());
m_timer->add_event_after(1, new C_WatchReset(this));
}
void JournalMetadata::handle_watch_reset() {
- assert(m_timer_lock->is_locked());
+ ceph_assert(m_timer_lock->is_locked());
if (!m_initialized) {
return;
}
Mutex::Locker locker(m_lock);
auto it = m_pending_commit_tids.find(commit_tid);
- assert(it != m_pending_commit_tids.end());
- assert(it->second.object_num < object_num);
+ ceph_assert(it != m_pending_commit_tids.end());
+ ceph_assert(it->second.object_num < object_num);
ldout(m_cct, 20) << __func__ << ": "
<< "commit_tid=" << commit_tid << ", "
Mutex::Locker locker(m_lock);
auto it = m_pending_commit_tids.find(commit_tid);
- assert(it != m_pending_commit_tids.end());
+ ceph_assert(it != m_pending_commit_tids.end());
*object_num = it->second.object_num;
*tag_tid = it->second.tag_tid;
{
Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
- assert(commit_tid > m_commit_position_tid);
+ ceph_assert(commit_tid > m_commit_position_tid);
if (!m_commit_position.object_positions.empty()) {
// in-flight commit position update
}
CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
- assert(it != m_pending_commit_tids.end());
+ ceph_assert(it != m_pending_commit_tids.end());
CommitEntry &commit_entry = it->second;
commit_entry.committed = true;
bufferlist bl;
int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
auto comp = librados::Rados::aio_create_completion(
ctx, nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
});
}
}
JournalPlayer::~JournalPlayer() {
- assert(m_async_op_tracker.empty());
+ ceph_assert(m_async_op_tracker.empty());
{
Mutex::Locker locker(m_lock);
- assert(m_shut_down);
- assert(m_fetch_object_numbers.empty());
- assert(!m_watch_scheduled);
+ ceph_assert(m_shut_down);
+ ceph_assert(m_fetch_object_numbers.empty());
+ ceph_assert(!m_watch_scheduled);
}
m_replay_handler->put();
}
void JournalPlayer::prefetch() {
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_INIT);
+ ceph_assert(m_state == STATE_INIT);
m_state = STATE_PREFETCH;
m_active_set = m_journal_metadata->get_active_set();
// active set)
std::map<uint8_t, uint64_t> splay_offset_to_objects;
for (auto &position : m_commit_positions) {
- assert(splay_offset_to_objects.count(position.first) == 0);
+ ceph_assert(splay_offset_to_objects.count(position.first) == 0);
splay_offset_to_objects[position.first] = position.second.object_number;
}
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
- assert(!m_shut_down);
+ ceph_assert(!m_shut_down);
m_shut_down = true;
m_watch_enabled = false;
}
ObjectPlayerPtr object_player = get_object_player();
- assert(object_player && !object_player->empty());
+ ceph_assert(object_player && !object_player->empty());
object_player->front(entry);
object_player->pop_front();
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
<< "r=" << r << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (r >= 0) {
switch (m_state) {
case STATE_PREFETCH:
int JournalPlayer::process_prefetch(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
}
bool prefetch_complete = false;
- assert(m_object_players.count(splay_offset) == 1);
+ ceph_assert(m_object_players.count(splay_offset) == 1);
ObjectPlayerPtr object_player = m_object_players[splay_offset];
// prefetch in-order since a newer splay object could prefetch first
int JournalPlayer::process_playback(uint64_t object_number) {
ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (verify_playback_ready()) {
notify_entries_available();
}
bool JournalPlayer::is_object_set_ready() const {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
return false;
}
bool JournalPlayer::verify_playback_ready() {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
while (true) {
if (!is_object_set_ready()) {
}
ObjectPlayerPtr object_player = get_object_player();
- assert(object_player);
+ ceph_assert(object_player);
uint64_t object_num = object_player->get_object_number();
// Verify is the active object player has another entry available
ldout(m_cct, 20) << __func__ << ": "
<< "object_num=" << object_num << ", "
<< "entry: " << entry << dendl;
- assert(entry.get_tag_tid() == *m_active_tag_tid);
+ ceph_assert(entry.get_tag_tid() == *m_active_tag_tid);
return true;
}
} else {
}
void JournalPlayer::prune_tag(uint64_t tag_tid) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
<< tag_tid << dendl;
}
void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
- assert(m_lock.is_locked());
- assert(m_active_tag_tid);
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_active_tag_tid);
uint64_t active_tag_tid = *m_active_tag_tid;
if (tag_tid) {
}
ObjectPlayerPtr JournalPlayer::get_object_player() const {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
SplayedObjectPlayers::const_iterator it = m_object_players.find(
m_splay_offset);
- assert(it != m_object_players.end());
+ ceph_assert(it != m_object_players.end());
return it->second;
}
ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
auto splay_it = m_object_players.find(splay_offset);
- assert(splay_it != m_object_players.end());
+ ceph_assert(splay_it != m_object_players.end());
ObjectPlayerPtr object_player = splay_it->second;
- assert(object_player->get_object_number() == object_number);
+ ceph_assert(object_player->get_object_number() == object_number);
return object_player;
}
void JournalPlayer::advance_splay_object() {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
++m_splay_offset;
m_splay_offset %= m_journal_metadata->get_splay_width();
m_watch_step = WATCH_STEP_FETCH_CURRENT;
}
bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
- assert(m_lock.is_locked());
- assert(!m_watch_scheduled);
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(!m_watch_scheduled);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t object_set = player->get_object_number() / splay_width;
}
void JournalPlayer::fetch(uint64_t object_num) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
ObjectPlayerPtr object_player(new ObjectPlayer(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
}
void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
uint64_t object_num = object_player->get_object_number();
std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
- assert(m_fetch_object_numbers.count(object_num) == 0);
+ ceph_assert(m_fetch_object_numbers.count(object_num) == 0);
m_fetch_object_numbers.insert(object_num);
ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
<< ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
- assert(m_fetch_object_numbers.count(object_num) == 1);
+ ceph_assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
if (m_shut_down) {
void JournalPlayer::refetch(bool immediate) {
ldout(m_cct, 10) << __func__ << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
m_handler_notified = false;
// if watching the object, handle the periodic re-fetch
void JournalPlayer::schedule_watch(bool immediate) {
ldout(m_cct, 10) << __func__ << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (m_watch_scheduled) {
return;
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
- assert(m_watch_scheduled);
+ ceph_assert(m_watch_scheduled);
m_watch_scheduled = false;
if (m_shut_down || r == -ECANCELED) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
- assert(m_watch_scheduled);
+ ceph_assert(m_watch_scheduled);
m_watch_scheduled = false;
if (r == -ESTALE) {
}
void JournalPlayer::notify_entries_available() {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
if (m_handler_notified) {
return;
}
}
void JournalPlayer::notify_complete(int r) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
m_handler_notified = true;
ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
m_journal_metadata->remove_listener(&m_listener);
Mutex::Locker locker(m_lock);
- assert(m_in_flight_advance_sets == 0);
- assert(m_in_flight_object_closes == 0);
+ ceph_assert(m_in_flight_advance_sets == 0);
+ ceph_assert(m_in_flight_object_closes == 0);
}
Future JournalRecorder::append(uint64_t tag_tid,
bufferlist entry_bl;
encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
entry_bl);
- assert(entry_bl.length() <= m_journal_metadata->get_object_size());
+ ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
bool object_full = object_ptr->append_unlock({{future, entry_bl}});
if (object_full) {
}
ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
- assert(object_recoder != NULL);
+ ceph_assert(object_recoder != NULL);
return object_recoder;
}
void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
// entry overflow from open object
if (m_current_set != object_set) {
// we shouldn't overflow upon append if already closed and we
// shouldn't receive an overflowed callback if already closed
- assert(m_in_flight_advance_sets == 0);
- assert(m_in_flight_object_closes == 0);
+ ceph_assert(m_in_flight_advance_sets == 0);
+ ceph_assert(m_in_flight_object_closes == 0);
uint64_t active_set = m_journal_metadata->get_active_set();
- assert(m_current_set == active_set);
+ ceph_assert(m_current_set == active_set);
++m_current_set;
++m_in_flight_advance_sets;
}
void JournalRecorder::advance_object_set() {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
- assert(m_in_flight_object_closes == 0);
+ ceph_assert(m_in_flight_object_closes == 0);
ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
<< dendl;
m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
- assert(m_in_flight_advance_sets > 0);
+ ceph_assert(m_in_flight_advance_sets > 0);
--m_in_flight_advance_sets;
if (r < 0 && r != -ESTALE) {
}
void JournalRecorder::open_object_set() {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
<< dendl;
ObjectRecorderPtr object_recorder = it->second;
uint64_t object_number = object_recorder->get_object_number();
if (object_number / splay_width != m_current_set) {
- assert(object_recorder->is_closed());
+ ceph_assert(object_recorder->is_closed());
// ready to close object and open object in active set
create_next_object_recorder_unlock(object_recorder);
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
// object recorders will invoke overflow handler as they complete
// closing the object to ensure correct order of future appends
void JournalRecorder::create_next_object_recorder_unlock(
ObjectRecorderPtr object_recorder) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
- assert(m_object_locks[splay_offset]->is_locked());
+ ceph_assert(m_object_locks[splay_offset]->is_locked());
ObjectRecorderPtr new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
- assert(active_object_recorder->get_object_number() == object_number);
+ ceph_assert(active_object_recorder->get_object_number() == object_number);
- assert(m_in_flight_object_closes > 0);
+ ceph_assert(m_in_flight_object_closes > 0);
--m_in_flight_object_closes;
// object closed after advance active set committed
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
- assert(active_object_recorder->get_object_number() == object_number);
+ ceph_assert(active_object_recorder->get_object_number() == object_number);
ldout(m_cct, 20) << __func__ << ": object "
<< active_object_recorder->get_oid() << " overflowed"
}
JournalTrimmer::~JournalTrimmer() {
- assert(m_shutdown);
+ ceph_assert(m_shutdown);
}
void JournalTrimmer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
{
Mutex::Locker locker(m_lock);
- assert(!m_shutdown);
+ ceph_assert(!m_shutdown);
m_shutdown = true;
}
}
void JournalTrimmer::trim_objects(uint64_t minimum_set) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl;
if (minimum_set <= m_journal_metadata->get_minimum_set()) {
}
void JournalTrimmer::remove_set(uint64_t object_set) {
- assert(m_lock.is_locked());
+ ceph_assert(m_lock.is_locked());
m_async_op_tracker.start_op();
uint8_t splay_width = m_journal_metadata->get_splay_width();
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_remove(oid, comp);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
}
Journaler::~Journaler() {
if (m_metadata != nullptr) {
- assert(!m_metadata->is_initialized());
+ ceph_assert(!m_metadata->is_initialized());
if (!m_initialized) {
// never initialized -- ensure any in-flight ops are complete
// since we wouldn't expect shut_down to be invoked
m_metadata->put();
m_metadata = nullptr;
}
- assert(m_trimmer == nullptr);
- assert(m_player == nullptr);
- assert(m_recorder == nullptr);
+ ceph_assert(m_trimmer == nullptr);
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
delete m_threads;
m_threads = nullptr;
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, nullptr);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
}
void Journaler::shut_down(Context *on_finish) {
- assert(m_player == nullptr);
- assert(m_recorder == nullptr);
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
JournalMetadata *metadata = nullptr;
- assert(m_metadata != nullptr);
+ ceph_assert(m_metadata != nullptr);
std::swap(metadata, m_metadata);
- assert(metadata != nullptr);
+ ceph_assert(metadata != nullptr);
on_finish = new FunctionContext([metadata, on_finish](int r) {
metadata->put();
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
}
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
r = m_header_ioctx.aio_remove(m_header_oid, comp);
- assert(r == 0);
+ ceph_assert(r == 0);
comp->release();
});
bool Journaler::try_pop_front(ReplayEntry *replay_entry,
uint64_t *tag_tid) {
- assert(m_player != nullptr);
+ ceph_assert(m_player != nullptr);
Entry entry;
uint64_t commit_tid;
void Journaler::stop_replay(Context *on_finish) {
JournalPlayer *player = nullptr;
- assert(m_player != nullptr);
+ ceph_assert(m_player != nullptr);
std::swap(player, m_player);
- assert(player != nullptr);
+ ceph_assert(player != nullptr);
on_finish = new FunctionContext([player, on_finish](int r) {
delete player;
void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
double flush_age) {
- assert(m_recorder == nullptr);
+ ceph_assert(m_recorder == nullptr);
// TODO verify active object set >= current replay object set
void Journaler::stop_append(Context *on_safe) {
JournalRecorder *recorder = nullptr;
- assert(m_recorder != nullptr);
+ ceph_assert(m_recorder != nullptr);
std::swap(recorder, m_recorder);
- assert(recorder != nullptr);
+ ceph_assert(recorder != nullptr);
on_safe = new FunctionContext([recorder, on_safe](int r) {
delete recorder;
}
void Journaler::create_player(ReplayHandler *replay_handler) {
- assert(m_player == nullptr);
+ ceph_assert(m_player == nullptr);
m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
replay_handler);
}
void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width,
int64_t *pool_id) {
- assert(m_metadata != nullptr);
+ ceph_assert(m_metadata != nullptr);
*order = m_metadata->get_order();
*splay_width = m_metadata->get_splay_width();
{
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
- assert(!m_fetch_in_progress);
- assert(m_watch_ctx == nullptr);
+ ceph_assert(!m_fetch_in_progress);
+ ceph_assert(m_watch_ctx == nullptr);
}
}
ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
Mutex::Locker locker(m_lock);
- assert(!m_fetch_in_progress);
+ ceph_assert(!m_fetch_in_progress);
m_fetch_in_progress = true;
C_Fetch *context = new C_Fetch(this, on_finish);
librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
NULL);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
- assert(r == 0);
+ ceph_assert(r == 0);
rados_completion->release();
}
Mutex::Locker timer_locker(m_timer_lock);
m_watch_interval = interval;
- assert(m_watch_ctx == nullptr);
+ ceph_assert(m_watch_ctx == nullptr);
m_watch_ctx = on_fetch;
schedule_watch();
Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
- assert(!m_unwatched);
+ ceph_assert(!m_unwatched);
m_unwatched = true;
if (!cancel_watch()) {
void ObjectPlayer::front(Entry *entry) const {
Mutex::Locker locker(m_lock);
- assert(!m_entries.empty());
+ ceph_assert(!m_entries.empty());
*entry = m_entries.front();
}
void ObjectPlayer::pop_front() {
Mutex::Locker locker(m_lock);
- assert(!m_entries.empty());
+ ceph_assert(!m_entries.empty());
auto &entry = m_entries.front();
m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
}
Mutex::Locker locker(m_lock);
- assert(m_fetch_in_progress);
+ ceph_assert(m_fetch_in_progress);
m_read_off += bl.length();
m_read_bl.append(bl);
m_refetch_state = REFETCH_STATE_REQUIRED;
}
void ObjectPlayer::schedule_watch() {
- assert(m_timer_lock.is_locked());
+ ceph_assert(m_timer_lock.is_locked());
if (m_watch_ctx == NULL) {
return;
}
ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
- assert(m_watch_task == nullptr);
+ ceph_assert(m_watch_task == nullptr);
m_watch_task = m_timer.add_event_after(
m_watch_interval,
new FunctionContext([this](int) {
}
bool ObjectPlayer::cancel_watch() {
- assert(m_timer_lock.is_locked());
+ ceph_assert(m_timer_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
if (m_watch_task != nullptr) {
bool canceled = m_timer.cancel_event(m_watch_task);
- assert(canceled);
+ ceph_assert(canceled);
m_watch_task = nullptr;
return true;
}
void ObjectPlayer::handle_watch_task() {
- assert(m_timer_lock.is_locked());
+ ceph_assert(m_timer_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
- assert(m_watch_ctx != nullptr);
- assert(m_watch_task != nullptr);
+ ceph_assert(m_watch_ctx != nullptr);
+ ceph_assert(m_watch_task != nullptr);
m_watch_task = nullptr;
fetch(new C_WatchFetch(this));
m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
- assert(m_handler != NULL);
+ ceph_assert(m_handler != NULL);
}
ObjectRecorder::~ObjectRecorder() {
- assert(m_append_task == NULL);
- assert(m_append_buffers.empty());
- assert(m_in_flight_tids.empty());
- assert(m_in_flight_appends.empty());
- assert(!m_aio_scheduled);
+ ceph_assert(m_append_task == NULL);
+ ceph_assert(m_append_buffers.empty());
+ ceph_assert(m_in_flight_tids.empty());
+ ceph_assert(m_in_flight_appends.empty());
+ ceph_assert(!m_aio_scheduled);
}
bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
flush_appends(true);
} else if (!m_in_flight_appends.empty()) {
AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
- assert(!append_buffers.empty());
+ ceph_assert(!append_buffers.empty());
future = Future(append_buffers.rbegin()->first);
}
}
ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
<< dendl;
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
if (future->get_flush_handler().get() != &m_flush_handler) {
// if we don't own this future, re-issue the flush so that it hits the
break;
}
}
- assert(r_it != m_append_buffers.rend());
+ ceph_assert(r_it != m_append_buffers.rend());
auto it = (++r_it).base();
- assert(it != m_append_buffers.end());
+ ceph_assert(it != m_append_buffers.end());
++it;
AppendBuffers flush_buffers;
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
- assert(m_lock->is_locked());
- assert(m_in_flight_tids.empty());
- assert(m_in_flight_appends.empty());
- assert(m_object_closed || m_overflowed);
+ ceph_assert(m_lock->is_locked());
+ ceph_assert(m_in_flight_tids.empty());
+ ceph_assert(m_in_flight_appends.empty());
+ ceph_assert(m_object_closed || m_overflowed);
append_buffers->splice(append_buffers->end(), m_append_buffers,
m_append_buffers.begin(), m_append_buffers.end());
}
flush_appends(true);
- assert(!m_object_closed);
+ ceph_assert(!m_object_closed);
m_object_closed = true;
return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
}
void ObjectRecorder::handle_append_task() {
- assert(m_timer_lock.is_locked());
+ ceph_assert(m_timer_lock.is_locked());
m_append_task = NULL;
Mutex::Locker locker(*m_lock);
bool ObjectRecorder::append(const AppendBuffer &append_buffer,
bool *schedule_append) {
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
bool flush_requested = false;
if (!m_object_closed && !m_overflowed) {
}
bool ObjectRecorder::flush_appends(bool force) {
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
if (m_object_closed || m_overflowed) {
return true;
}
{
m_lock->Lock();
auto tid_iter = m_in_flight_tids.find(tid);
- assert(tid_iter != m_in_flight_tids.end());
+ ceph_assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
m_overflowed = true;
} else {
// must have seen an overflow on a previous append op
- assert(r == -EOVERFLOW && m_overflowed);
+ ceph_assert(r == -EOVERFLOW && m_overflowed);
}
// notify of overflow once all in-flight ops are complete
return;
}
- assert(iter != m_in_flight_appends.end());
+ ceph_assert(iter != m_in_flight_appends.end());
append_buffers.swap(iter->second);
- assert(!append_buffers.empty());
+ ceph_assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
m_in_flight_flushes = true;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
<< dendl;
- assert(m_lock->is_locked());
- assert(!m_in_flight_appends.empty());
+ ceph_assert(m_lock->is_locked());
+ ceph_assert(!m_in_flight_appends.empty());
cancel_append_task();
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
- assert(m_lock->is_locked());
- assert(!append_buffers->empty());
+ ceph_assert(m_lock->is_locked());
+ ceph_assert(!append_buffers->empty());
for (AppendBuffers::iterator it = append_buffers->begin();
it != append_buffers->end(); ++it) {
librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
- assert(r == 0);
+ ceph_assert(r == 0);
rados_completion->release();
{
}
void ObjectRecorder::notify_handler_unlock() {
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
if (m_object_closed) {
m_lock->Unlock();
m_handler->closed(this);
void claim_append_buffers(AppendBuffers *append_buffers);
bool is_closed() const {
- assert(m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
return (m_object_closed && m_in_flight_appends.empty());
}
bool close();