assert(!m_aio_scheduled);
}
-bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
+bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
-
+
if (m_overflowed) {
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
}
void ObjectRecorder::send_appends_aio() {
- Mutex::Locker locker(*m_lock);
-
- m_aio_scheduled = false;
+ AppendBuffers *append_buffers;
+ uint64_t append_tid;
+ {
+ Mutex::Locker locker(*m_lock);
+ append_tid = m_append_tid++;
+ m_in_flight_tids.insert(append_tid);
- AppendBuffers append_buffers;
- m_pending_buffers.swap(append_buffers);
+ // safe to hold pointer outside lock until op is submitted
+ append_buffers = &m_in_flight_appends[append_tid];
+ append_buffers->swap(m_pending_buffers);
+ }
- uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
+ C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
-
- for (AppendBuffers::iterator it = append_buffers.begin();
- it != append_buffers.end(); ++it) {
+ for (AppendBuffers::iterator it = append_buffers->begin();
+ it != append_buffers->end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
- m_in_flight_tids.insert(append_tid);
- m_in_flight_appends[append_tid].swap(append_buffers);
librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(append_flush, NULL,
+ librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();
+
+ {
+ Mutex::Locker locker(*m_lock);
+ if (m_pending_buffers.empty()) {
+ m_aio_scheduled = false;
+ if (m_in_flight_appends.empty() && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler();
+ }
+ } else {
+ // additional pending items -- reschedule
+ m_op_work_queue->queue(new FunctionContext([this] (int r) {
+ send_appends_aio();
+ }));
+ }
+ }
+
+ // allow append op to complete
+ gather_ctx->activate();
}
void ObjectRecorder::notify_handler() {
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(2U, object->get_pending_appends());
C_SaferCond cond;
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_TRUE(object->append_unlock(append_buffers));
+ ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
lock->Unlock();
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
// should automatically flush once its attached to the object
C_SaferCond cond;
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
ASSERT_EQ(1U, object->get_pending_appends());
lock->Lock();
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
lock1->Lock();
- ASSERT_TRUE(object1->append_unlock(append_buffers));
+ ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
C_SaferCond cond;
append_buffer2.first->wait(&cond);
append_buffers = {append_buffer3};
lock2->Lock();
- ASSERT_FALSE(object2->append_unlock(append_buffers));
+ ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
append_buffer3.first->flush(NULL);
bool overflowed = false;