const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age)
+ double flush_age,
+ uint64_t max_in_flight_appends)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+ m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
m_current_set(m_journal_metadata->get_active_set()) {
object_number, lock, m_journal_metadata->get_work_queue(),
m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
&m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
- m_flush_bytes, m_flush_age));
+ m_flush_bytes, m_flush_age, m_max_in_flight_appends));
return object_recorder;
}
JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr &journal_metadata,
uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age);
+ double flush_age, uint64_t max_in_flight_appends);
~JournalRecorder();
Future append(uint64_t tag_tid, const bufferlist &bl);
uint32_t m_flush_interval;
uint64_t m_flush_bytes;
double m_flush_age;
+ uint64_t m_max_in_flight_appends;
Listener m_listener;
ObjectHandler m_object_handler;
}
void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
- double flush_age) {
+ double flush_age, uint64_t max_in_flight_appends) {
ceph_assert(m_recorder == nullptr);
// TODO verify active object set >= current replay object set
m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
m_metadata, flush_interval, flush_bytes,
- flush_age);
+ flush_age, max_in_flight_appends);
}
void Journaler::stop_append(Context *on_safe) {
void stop_replay(Context *on_finish);
uint64_t get_max_append_size() const;
- void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
+ void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
+ uint64_t max_in_flight_appends);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush_append(Context *on_safe);
void stop_append(Context *on_safe);
ContextWQ *work_queue, SafeTimer &timer,
Mutex &timer_lock, Handler *handler,
uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age)
+ uint64_t flush_bytes, double flush_age,
+ uint64_t max_in_flight_appends)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
m_timer_lock(timer_lock), m_handler(handler), m_order(order),
m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+ m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
m_lock(lock), m_append_tid(0), m_pending_bytes(0),
m_size(0), m_overflowed(false), m_object_closed(false),
m_in_flight_flushes(false), m_aio_scheduled(false) {
future = Future(m_append_buffers.rbegin()->first);
flush_appends(true);
+ } else if (!m_pending_buffers.empty()) {
+ future = Future(m_pending_buffers.rbegin()->first);
} else if (!m_in_flight_appends.empty()) {
AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
ceph_assert(!append_buffers.empty());
}
bool ObjectRecorder::close() {
- assert (m_lock->is_locked());
+ ceph_assert(m_lock->is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
+ m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
append_overflowed();
notify_handler_unlock();
} else {
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
- if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
- // all remaining unsent appends should be redirected to new object
- notify_handler_unlock();
+ if (!m_aio_scheduled) {
+ if (m_in_flight_appends.empty() && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+ notify_handler_unlock();
+ } else {
+ m_aio_scheduled = true;
+ m_lock->Unlock();
+ send_appends_aio();
+ }
} else {
m_lock->Unlock();
}
}
void ObjectRecorder::send_appends_aio() {
- librados::ObjectWriteOperation op;
- client::guard_append(&op, m_soft_max_size);
- C_Gather *gather_ctx;
+ librados::AioCompletion *rados_completion;
{
Mutex::Locker locker(*m_lock);
+ m_aio_scheduled = false;
+
+ if (m_pending_buffers.empty()) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty"
+ << dendl;
+ return;
+ }
+
+ if (m_max_in_flight_appends != 0 &&
+ m_in_flight_tids.size() >= m_max_in_flight_appends) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid
+ << " max in flight appends reached" << dendl;
+ return;
+ }
+
uint64_t append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
- gather_ctx = new C_Gather(m_cct, new C_AppendFlush(this, append_tid));
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
auto append_buffers = &m_in_flight_appends[append_tid];
for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
break;
}
}
+ rados_completion = librados::Rados::aio_create_completion(
+ new C_AppendFlush(this, append_tid), nullptr,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
+ ceph_assert(r == 0);
}
-
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
- utils::rados_ctx_callback);
- int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
- ceph_assert(r == 0);
rados_completion->release();
-
- {
- m_lock->Lock();
- if (m_pending_buffers.empty()) {
- m_aio_scheduled = false;
- if (m_in_flight_appends.empty() && m_object_closed) {
- // all remaining unsent appends should be redirected to new object
- notify_handler_unlock();
- } else {
- m_lock->Unlock();
- }
- } else {
- // additional pending items -- reschedule
- m_op_work_queue->queue(new FunctionContext([this] (int r) {
- send_appends_aio();
- }));
- m_lock->Unlock();
- }
- }
-
- // allow append op to complete
- gather_ctx->activate();
}
void ObjectRecorder::notify_handler_unlock() {
uint64_t object_number, std::shared_ptr<Mutex> lock,
ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
Handler *handler, uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age);
+ uint64_t flush_bytes, double flush_age,
+ uint64_t max_in_flight_appends);
~ObjectRecorder() override;
inline uint64_t get_object_number() const {
uint32_t m_flush_interval;
uint64_t m_flush_bytes;
double m_flush_age;
+ uint32_t m_max_in_flight_appends;
FlushHandler m_flush_handler;
MOCK_METHOD0(stop_replay, void());
MOCK_METHOD1(stop_replay, void(Context *on_finish));
- MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes,
- double flush_age));
+ MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t));
MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
const bufferlist &bl));
MockJournaler::get_instance().stop_replay(on_finish);
}
- void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) {
+ void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
+ uint64_t max_in_flight_appends) {
MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
- flush_age);
+ flush_age,
+ max_in_flight_appends);
}
uint64_t get_max_append_size() const {
RadosTestFixture::TearDown();
}
- journal::JournalRecorder *create_recorder(const std::string &oid,
- const journal::JournalMetadataPtr &metadata) {
+ journal::JournalRecorder *create_recorder(
+ const std::string &oid, const journal::JournalMetadataPtr &metadata) {
journal::JournalRecorder *recorder(new journal::JournalRecorder(
- m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(), 0));
+ m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(),
+ 0, 0));
m_recorders.push_back(recorder);
return recorder;
}
uint32_t m_flush_interval;
uint64_t m_flush_bytes;
double m_flush_age;
+ uint64_t m_max_in_flight_appends = 0;
Handler m_handler;
void TearDown() override {
uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
- order, m_flush_interval, m_flush_bytes, m_flush_age));
+ order, m_flush_interval, m_flush_bytes, m_flush_age,
+ m_max_in_flight_appends));
m_object_recorders.push_back(object);
m_object_recorder_locks.insert(std::make_pair(oid, lock));
m_handler.object_lock = lock;