ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
uint8_t splay_width = m_journal_metadata->get_splay_width();
+ bool overflowed = false;
auto lockers{lock_object_recorders()};
for (const auto& p : m_object_ptrs) {
ceph_assert(object_recorder->is_closed());
// ready to close object and open object in active set
- create_next_object_recorder(object_recorder);
+ if (create_next_object_recorder(object_recorder)) {
+ overflowed = true;
+ }
}
}
+ lockers.clear();
+
+ if (overflowed) {
+ ldout(m_cct, 10) << "object set " << m_current_set << " now full" << dendl;
+ ldout(m_cct, 10) << "" << dendl;
+ close_and_advance_object_set(m_current_set);
+ }
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
// flush out all queued appends and hold future appends
if (!object_recorder->close()) {
++m_in_flight_object_closes;
+ ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " "
+ << "close in-progress" << dendl;
} else {
ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
<< dendl;
return object_recorder;
}
-void JournalRecorder::create_next_object_recorder(
+bool JournalRecorder::create_next_object_recorder(
ceph::ref_t<ObjectRecorder> object_recorder) {
ceph_assert(ceph_mutex_is_locked(m_lock));
new_object_recorder->get_object_number());
}
- new_object_recorder->append(std::move(append_buffers));
+ bool object_full = new_object_recorder->append(std::move(append_buffers));
+ if (object_full) {
+ ldout(m_cct, 10) << "object " << new_object_recorder->get_oid() << " "
+ << "now full" << dendl;
+ }
+
m_object_ptrs[splay_offset] = std::move(new_object_recorder);
+ return object_full;
}
void JournalRecorder::handle_update() {