#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
-#define dout_prefix *_dout << "JournalRecorder: " << this << " "
+#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \
+ << ": "
using std::shared_ptr;
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
+ ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
m_lock.Lock();
}
void JournalRecorder::flush(Context *on_safe) {
+ ldout(m_cct, 20) << dendl;
+
C_Flush *ctx;
{
Mutex::Locker locker(m_lock);
// entry overflow from open object
if (m_current_set != object_set) {
- ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
+ ldout(m_cct, 20) << "close already in-progress" << dendl;
return;
}
++m_current_set;
++m_in_flight_advance_sets;
- ldout(m_cct, 20) << __func__ << ": closing active object set "
- << object_set << dendl;
+ ldout(m_cct, 10) << "closing active object set " << object_set << dendl;
if (close_object_set(m_current_set)) {
advance_object_set();
}
ceph_assert(m_lock.is_locked());
ceph_assert(m_in_flight_object_closes == 0);
- ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
- << dendl;
+ ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
this));
}
--m_in_flight_advance_sets;
if (r < 0 && r != -ESTALE) {
- lderr(m_cct) << __func__ << ": failed to advance object set: "
- << cpp_strerror(r) << dendl;
+ lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r)
+ << dendl;
}
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
void JournalRecorder::open_object_set() {
ceph_assert(m_lock.is_locked());
- ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
- << dendl;
+ ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
uint8_t splay_width = m_journal_metadata->get_splay_width();
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
+ ldout(m_cct, 10) << "active_set=" << active_set << dendl;
ceph_assert(m_lock.is_locked());
// object recorders will invoke overflow handler as they complete
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
if (object_recorder->get_object_number() / splay_width != active_set) {
- ldout(m_cct, 10) << __func__ << ": closing object "
- << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
+ << dendl;
// flush out all queued appends and hold future appends
if (!object_recorder->close()) {
++m_in_flight_object_closes;
} else {
- ldout(m_cct, 20) << __func__ << ": object "
- << object_recorder->get_oid() << " closed" << dendl;
+ ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
+ << dendl;
}
}
}
ObjectRecorderPtr JournalRecorder::create_object_recorder(
uint64_t object_number, shared_ptr<Mutex> lock) {
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_work_queue(),
uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ceph_assert(m_object_locks[splay_offset]->is_locked());
ObjectRecorderPtr new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
- ldout(m_cct, 10) << __func__ << ": "
- << "old oid=" << object_recorder->get_oid() << ", "
+ ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
<< "new oid=" << new_object_recorder->get_oid() << dendl;
AppendBuffers append_buffers;
object_recorder->claim_append_buffers(&append_buffers);
uint64_t active_set = m_journal_metadata->get_active_set();
if (m_current_set < active_set) {
// peer journal client advanced the active set
- ldout(m_cct, 20) << __func__ << ": "
- << "current_set=" << m_current_set << ", "
+ ldout(m_cct, 10) << "current_set=" << m_current_set << ", "
<< "active_set=" << active_set << dendl;
uint64_t current_set = m_current_set;
m_current_set = active_set;
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
- ldout(m_cct, 20) << __func__ << ": closing current object set "
- << current_set << dendl;
+ ldout(m_cct, 10) << "closing current object set " << current_set << dendl;
if (close_object_set(active_set)) {
open_object_set();
}
}
void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
- ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
Mutex::Locker locker(m_lock);
--m_in_flight_object_closes;
// object closed after advance active set committed
- ldout(m_cct, 20) << __func__ << ": object "
- << active_object_recorder->get_oid() << " closed" << dendl;
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " closed" << dendl;
if (m_in_flight_object_closes == 0) {
if (m_in_flight_advance_sets == 0) {
// peer forced closing of object set
}
void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
- ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
Mutex::Locker locker(m_lock);
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
ceph_assert(active_object_recorder->get_object_number() == object_number);
- ldout(m_cct, 20) << __func__ << ": object "
- << active_object_recorder->get_oid() << " overflowed"
- << dendl;
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " overflowed" << dendl;
close_and_advance_object_set(object_number / splay_width);
}
#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
-#define dout_prefix *_dout << "ObjectRecorder: " << this << " "
+#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
+ << __func__ << " (" << m_oid << "): "
using namespace cls::journal;
using std::shared_ptr;
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
+ ldout(m_cct, 20) << dendl;
}
ObjectRecorder::~ObjectRecorder() {
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_append_task == NULL);
ceph_assert(m_append_buffers.empty());
ceph_assert(m_in_flight_tids.empty());
bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
ceph_assert(m_lock->is_locked());
+ ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
FutureImplPtr last_flushed_future;
bool schedule_append = false;
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
m_lock->Unlock();
+ ldout(m_cct, 20) << "already overflowed" << dendl;
return false;
}
}
void ObjectRecorder::flush(Context *on_safe) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ ldout(m_cct, 20) << dendl;
cancel_append_task();
Future future;
}
void ObjectRecorder::flush(const FutureImplPtr &future) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
- << dendl;
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
ceph_assert(m_lock->is_locked());
}
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_lock->is_locked());
ceph_assert(m_in_flight_tids.empty());
bool ObjectRecorder::close() {
ceph_assert(m_lock->is_locked());
- ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+ ldout(m_cct, 20) << dendl;
cancel_append_task();
void ObjectRecorder::cancel_append_task() {
Mutex::Locker locker(m_timer_lock);
if (m_append_task != NULL) {
+ ldout(m_cct, 20) << dendl;
m_timer.cancel_event(m_append_task);
m_append_task = NULL;
}
void ObjectRecorder::schedule_append_task() {
Mutex::Locker locker(m_timer_lock);
if (m_append_task == nullptr && m_flush_age > 0) {
+ ldout(m_cct, 20) << dendl;
m_append_task = m_timer.add_event_after(
m_flush_age, new FunctionContext([this](int) {
handle_append_task();
bool ObjectRecorder::append(const AppendBuffer &append_buffer,
bool *schedule_append) {
ceph_assert(m_lock->is_locked());
+ ldout(m_cct, 20) << "bytes=" << append_buffer.second.length() << dendl;
bool flush_requested = false;
if (!m_object_closed && !m_overflowed) {
bool ObjectRecorder::flush_appends(bool force) {
ceph_assert(m_lock->is_locked());
+ ldout(m_cct, 20) << "force=" << force << dendl;
if (m_object_closed || m_overflowed) {
+ ldout(m_cct, 20) << "already closed or overflowed" << dendl;
return true;
}
m_size + m_pending_bytes < m_soft_max_size &&
(m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
(m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
+ ldout(m_cct, 20) << "batching append" << dendl;
return false;
}
}
void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
- << ", r=" << r << dendl;
+ ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
AppendBuffers append_buffers;
{
InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
if (r == -EOVERFLOW || m_overflowed) {
if (iter != m_in_flight_appends.end()) {
+ ldout(m_cct, 10) << "append overflowed" << dendl;
m_overflowed = true;
} else {
// must have seen an overflow on a previous append op
// Flag the associated futures as complete.
for (AppendBuffers::iterator buf_it = append_buffers.begin();
buf_it != append_buffers.end(); ++buf_it) {
- ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
- << dendl;
+ ldout(m_cct, 20) << *buf_it->first << " marked safe" << dendl;
buf_it->first->safe(r);
}
if (m_in_flight_appends.empty() &&
(m_object_closed || m_aio_sent_size >= m_soft_max_size)) {
if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " soft max size reached, notifying overflow"
+ ldout(m_cct, 20) << " soft max size reached, notifying overflow"
<< dendl;
m_overflowed = true;
}
}
void ObjectRecorder::append_overflowed() {
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
- << dendl;
+ ldout(m_cct, 10) << dendl;
ceph_assert(m_lock->is_locked());
ceph_assert(!m_in_flight_appends.empty());
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
- << dendl;
+ ldout(m_cct, 20) << "overflowed " << *it->first << dendl;
it->first->detach();
}
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+ ldout(m_cct, 20) << dendl;
ceph_assert(m_lock->is_locked());
ceph_assert(!append_buffers->empty());
for (AppendBuffers::iterator it = append_buffers->begin();
it != append_buffers->end(); ++it) {
- ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
- << dendl;
+ ldout(m_cct, 20) << "flushing " << *it->first << dendl;
it->first->set_flush_in_progress();
m_size += it->second.length();
}
}
void ObjectRecorder::send_appends_aio() {
+ ldout(m_cct, 20) << dendl;
librados::AioCompletion *rados_completion;
{
Mutex::Locker locker(*m_lock);
m_aio_scheduled = false;
if (m_pending_buffers.empty()) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty"
- << dendl;
+ ldout(m_cct, 10) << "pending buffers empty" << dendl;
return;
}
if (m_max_in_flight_appends != 0 &&
m_in_flight_tids.size() >= m_max_in_flight_appends) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " max in flight appends reached" << dendl;
+ ldout(m_cct, 10) << "max in flight appends reached" << dendl;
return;
}
if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 20) << __func__ << ": " << m_oid
- << " soft max size reached" << dendl;
+ ldout(m_cct, 10) << "soft max size reached" << dendl;
return;
}
uint64_t append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
- ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
- << append_tid << dendl;
+ ldout(m_cct, 10) << "flushing journal tid=" << append_tid << dendl;
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
auto append_buffers = &m_in_flight_appends[append_tid];
+ size_t append_bytes = 0;
for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
- ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl;
+ ldout(m_cct, 20) << "flushing " << *it->first << dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
m_aio_sent_size += it->second.length();
+ append_bytes += it->second.length();
append_buffers->push_back(*it);
it = m_pending_buffers.erase(it);
if (m_aio_sent_size >= m_soft_max_size) {
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
ceph_assert(r == 0);
+ ldout(m_cct, 20) << "append_bytes=" << append_bytes << dendl;
}
rados_completion->release();
}