JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age,
uint64_t max_in_flight_appends)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+ m_journal_metadata(journal_metadata),
m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
m_current_set(m_journal_metadata->get_active_set()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- m_object_locks.push_back(shared_ptr<Mutex>(
- new Mutex("ObjectRecorder::m_lock::"+
- std::to_string(splay_offset))));
+ shared_ptr<Mutex> object_lock(new Mutex(
+ "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
+ m_object_locks.push_back(object_lock);
+
uint64_t object_number = splay_offset + (m_current_set * splay_width);
+ Mutex::Locker locker(*object_lock);
m_object_ptrs[splay_offset] = create_object_recorder(
- object_number,
- m_object_locks[splay_offset]);
+ object_number, m_object_locks[splay_offset]);
}
m_journal_metadata->add_listener(&m_listener);
flush(on_safe);
}
+void JournalRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+ auto object_recorder = get_object(splay_offset);
+ object_recorder->set_append_batch_options(flush_interval, flush_bytes,
+ flush_age);
+ }
+}
+
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_work_queue(),
- &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
- m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+ &m_object_handler, m_journal_metadata->get_order(),
+ m_max_in_flight_appends));
+ object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
return object_recorder;
}
public:
JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr &journal_metadata,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age, uint64_t max_in_flight_appends);
+ uint64_t max_in_flight_appends);
~JournalRecorder();
void shut_down(Context *on_safe);
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
JournalMetadataPtr m_journal_metadata;
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
uint64_t m_max_in_flight_appends;
Listener m_listener;
m_trimmer->committed(future_impl->get_commit_tid());
}
-void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
- double flush_age, uint64_t max_in_flight_appends) {
+void Journaler::start_append(uint64_t max_in_flight_appends) {
ceph_assert(m_recorder == nullptr);
// TODO verify active object set >= current replay object set
m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
- m_metadata, flush_interval, flush_bytes,
- flush_age, max_in_flight_appends);
+ m_metadata, max_in_flight_appends);
+}
+
+void Journaler::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ceph_assert(m_recorder != nullptr);
+ m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
}
void Journaler::stop_append(Context *on_safe) {
void stop_replay(Context *on_finish);
uint64_t get_max_append_size() const;
- void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends);
+ void start_append(uint64_t max_in_flight_appends);
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush_append(Context *on_safe);
void stop_append(Context *on_safe);
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
ContextWQ *work_queue, Handler *handler,
- uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age,
- int32_t max_in_flight_appends)
+ uint8_t order, int32_t max_in_flight_appends)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
m_order(order), m_soft_max_size(1 << m_order),
- m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
- m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends),
- m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()),
- m_append_tid(0), m_overflowed(false), m_object_closed(false),
- m_in_flight_flushes(false) {
+ m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
+ m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
+ m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
ceph_assert(m_in_flight_appends.empty());
}
+void ObjectRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
+ ceph_assert(m_lock->is_locked());
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+}
+
bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
ContextWQ *work_queue, Handler *handler, uint8_t order,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age, int32_t max_in_flight_appends);
+ int32_t max_in_flight_appends);
~ObjectRecorder() override;
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
inline uint64_t get_object_number() const {
return m_object_number;
}
uint8_t m_order;
uint64_t m_soft_max_size;
- uint32_t m_flush_interval;
- uint64_t m_flush_bytes;
- double m_flush_age;
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
int32_t m_max_in_flight_appends;
FlushHandler m_flush_handler;
template <typename I>
void Journal<I>::start_append() {
ceph_assert(m_lock.is_locked());
+
m_journaler->start_append(
+ m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+ m_journaler->set_append_batch_options(
m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
- m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"),
- m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+ m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+
transition_state(STATE_READY, 0);
}
bufferlist event_entry_bl;
encode(event_entry, event_entry_bl);
- m_journaler->start_append(0, 0, 0, 0);
+ m_journaler->start_append(0);
m_future = m_journaler->append(m_tag_tid, event_entry_bl);
auto ctx = create_context_callback<
bufferlist event_entry_bl;
encode(event_entry, event_entry_bl);
- m_journaler->start_append(0, 0, 0, 0);
+ m_journaler->start_append(0);
m_future = m_journaler->append(m_tag_tid, event_entry_bl);
auto ctx = create_context_callback<
MOCK_METHOD0(stop_replay, void());
MOCK_METHOD1(stop_replay, void(Context *on_finish));
- MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t));
+ MOCK_METHOD1(start_append, void(uint64_t));
+ MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double));
MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
const bufferlist &bl));
MockJournaler::get_instance().stop_replay(on_finish);
}
- void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends) {
- MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
- flush_age,
- max_in_flight_appends);
+ void start_append(uint64_t max_in_flight_appends) {
+ MockJournaler::get_instance().start_append(max_in_flight_appends);
+ }
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age) {
+ MockJournaler::get_instance().set_append_batch_options(
+ flush_interval, flush_bytes, flush_age);
}
uint64_t get_max_append_size() const {
journal::JournalRecorder *create_recorder(
const std::string &oid, const journal::JournalMetadataPtr &metadata) {
journal::JournalRecorder *recorder(new journal::JournalRecorder(
- m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(),
- 0, 0));
+ m_ioctx, oid + ".", metadata, 0));
+ recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(),
+ 0);
m_recorders.push_back(recorder);
return recorder;
}
journal::ObjectRecorderPtr create_object(const std::string &oid,
uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval,
- m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
+ m_max_in_flight_appends));
+ {
+ Mutex::Locker locker(*lock);
+ object->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
+ }
m_object_recorders.push_back(object);
m_object_recorder_locks.insert(std::make_pair(oid, lock));
m_handler.object_lock = lock;
return r;
}
- replay_journaler.start_append(0, 0, 0, 0);
+ replay_journaler.start_append(0);
C_SaferCond replay_ctx;
ReplayHandler replay_handler(&journaler, &replay_journaler,
}
void expect_start_append(::journal::MockJournaler &mock_journaler) {
- EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+ EXPECT_CALL(mock_journaler, start_append(_));
}
void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
}
void expect_start_append(::journal::MockJournaler &mock_journaler) {
- EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+ EXPECT_CALL(mock_journaler, start_append(_));
+ }
+
+ void expect_set_append_batch_options(::journal::MockJournaler &mock_journaler) {
+ EXPECT_CALL(mock_journaler, set_append_batch_options(_, _, _));
}
void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
expect_committed(mock_journaler, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
}
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, 0);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, 0);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, -EINVAL);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, -EINVAL);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
C_SaferCond ctx;
mock_journal.open(&ctx);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
expect_flush_commit_position(mock_journaler);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
C_SaferCond ctx;
mock_journal.open(&ctx);
InSequence seq;
expect_stop_append(mock_journaler, 0);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
expect_shut_down_journaler(mock_journaler);
C_SaferCond start_ctx;
InSequence seq;
expect_stop_append(mock_journaler, -EINVAL);
expect_start_append(mock_journaler);
+ expect_set_append_batch_options(mock_journaler);
expect_shut_down_journaler(mock_journaler);
C_SaferCond start_ctx;
if (r < 0) {
return r;
}
- m_journaler.start_append(0, 0, 0, 0);
+ m_journaler.start_append(0);
int r1 = 0;
bufferlist bl;