m_work_queue->queue(on_finish, r);
}
+ inline ContextWQ *get_work_queue() {
+ return m_work_queue;
+ }
+
inline SafeTimer &get_timer() {
return *m_timer;
}
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
- m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+
- std::to_string(splay_offset))));
+ m_object_locks.push_back(shared_ptr<Mutex>(
+ new Mutex("ObjectRecorder::m_lock::"+
+ std::to_string(splay_offset))));
uint64_t object_number = splay_offset + (m_current_set * splay_width);
- m_object_ptrs[splay_offset] = create_object_recorder(object_number,
+ m_object_ptrs[splay_offset] = create_object_recorder(
+ object_number,
m_object_locks[splay_offset]);
}
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
- if (object_recorder->get_object_number() / splay_width != m_current_set) {
+ uint64_t object_number = object_recorder->get_object_number();
+ if (object_number / splay_width != m_current_set) {
assert(object_recorder->is_closed());
// ready to close object and open object in active set
- create_next_object_recorder(object_recorder);
+ create_next_object_recorder_unlock(object_recorder);
+ } else {
+ uint8_t splay_offset = object_number % splay_width;
+ m_object_locks[splay_offset]->Unlock();
}
}
- unlock_object_recorders();
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
uint64_t object_number, shared_ptr<Mutex> lock) {
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
- object_number, lock, m_journal_metadata->get_timer(),
- m_journal_metadata->get_timer_lock(), &m_object_handler,
- m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
- m_flush_age));
+ object_number, lock, m_journal_metadata->get_work_queue(),
+ m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
+ &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
+ m_flush_bytes, m_flush_age));
return object_recorder;
}
-void JournalRecorder::create_next_object_recorder(
+void JournalRecorder::create_next_object_recorder_unlock(
ObjectRecorderPtr object_recorder) {
assert(m_lock.is_locked());
new_object_recorder->get_object_number());
}
- new_object_recorder->append(append_buffers, false);
+ new_object_recorder->append_unlock(append_buffers);
m_object_ptrs[splay_offset] = new_object_recorder;
}
ObjectRecorderPtr create_object_recorder(uint64_t object_number,
std::shared_ptr<Mutex> lock);
- void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+ void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
void handle_update();
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
- SafeTimer &timer, Mutex &timer_lock,
- Handler *handler, uint8_t order,
- uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age)
+ ContextWQ *work_queue, SafeTimer &timer,
+ Mutex &timer_lock, Handler *handler,
+ uint8_t order, uint32_t flush_interval,
+ uint64_t flush_bytes, double flush_age)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
- m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
- m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
- m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
- m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
- m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
- m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
+ m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
+ m_timer_lock(timer_lock), m_handler(handler), m_order(order),
+ m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+ m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0),
+ m_size(0), m_overflowed(false), m_object_closed(false),
+ m_in_flight_flushes(false), m_aio_scheduled(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
assert(m_handler != NULL);
assert(m_append_buffers.empty());
assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
+ assert(!m_aio_scheduled);
}
bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
- return append(append_buffers, true);
-}
-
-bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) {
assert(m_lock->is_locked());
FutureImplPtr last_flushed_future;
bool schedule_append = false;
- {
- if (m_overflowed) {
- m_append_buffers.insert(m_append_buffers.end(),
- append_buffers.begin(), append_buffers.end());
- if (unlock) {
- m_lock->Unlock();
- }
- return false;
- }
-
- for (AppendBuffers::const_iterator iter = append_buffers.begin();
- iter != append_buffers.end(); ++iter) {
- if (append(*iter, &schedule_append)) {
- last_flushed_future = iter->first;
- }
- }
+
+ if (m_overflowed) {
+ m_append_buffers.insert(m_append_buffers.end(),
+ append_buffers.begin(), append_buffers.end());
+ m_lock->Unlock();
+ return false;
+ }
- if (unlock) {
- m_lock->Unlock();
+ for (AppendBuffers::const_iterator iter = append_buffers.begin();
+ iter != append_buffers.end(); ++iter) {
+ if (append(*iter, &schedule_append)) {
+ last_flushed_future = iter->first;
}
}
if (last_flushed_future) {
- if (unlock) {
- m_lock->Lock();
- }
flush(last_flushed_future);
- if (unlock) {
- m_lock->Unlock();
- }
- } else if (schedule_append) {
- schedule_append_task();
+ m_lock->Unlock();
} else {
- cancel_append_task();
+ m_lock->Unlock();
+ if (schedule_append) {
+ schedule_append_task();
+ } else {
+ cancel_append_task();
+ }
}
return (!m_object_closed && !m_overflowed &&
m_size + m_pending_bytes >= m_soft_max_size);
assert(!m_object_closed);
m_object_closed = true;
- return m_in_flight_tids.empty();
+ return m_in_flight_tids.empty() && !m_aio_scheduled;
}
void ObjectRecorder::handle_append_task() {
}
// notify of overflow once all in-flight ops are complete
- if (m_in_flight_tids.empty()) {
+ if (m_in_flight_tids.empty() && !m_aio_scheduled) {
notify_handler();
}
return;
assert(!append_buffers.empty());
m_in_flight_appends.erase(iter);
- if (m_in_flight_appends.empty() && m_object_closed) {
+ if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler();
}
assert(m_lock->is_locked());
assert(!append_buffers->empty());
+ for (AppendBuffers::iterator it = append_buffers->begin();
+ it != append_buffers->end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
+ << dendl;
+ it->first->set_flush_in_progress();
+ m_size += it->second.length();
+ }
+
+ m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
+ append_buffers->begin(), append_buffers->end());
+ if (!m_aio_scheduled) {
+ m_op_work_queue->queue(new FunctionContext([this] (int r) {
+ send_appends_aio();
+ }));
+ m_aio_scheduled = true;
+ }
+}
+
+void ObjectRecorder::send_appends_aio() {
+ Mutex::Locker locker(*m_lock);
+
+ m_aio_scheduled = false;
+
+ AppendBuffers append_buffers;
+ m_pending_buffers.swap(append_buffers);
+
uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);
- for (AppendBuffers::iterator it = append_buffers->begin();
- it != append_buffers->end(); ++it) {
+ for (AppendBuffers::iterator it = append_buffers.begin();
+ it != append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
- it->first->set_flush_in_progress();
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
- m_size += it->second.length();
}
m_in_flight_tids.insert(append_tid);
- m_in_flight_appends[append_tid].swap(*append_buffers);
+ m_in_flight_appends[append_tid].swap(append_buffers);
librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
#include <list>
#include <map>
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, std::shared_ptr<Mutex> lock,
- SafeTimer &timer, Mutex &timer_lock, Handler *handler,
- uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
- double flush_age);
+ ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
+ Handler *handler, uint8_t order, uint32_t flush_interval,
+ uint64_t flush_bytes, double flush_age);
~ObjectRecorder();
inline uint64_t get_object_number() const {
}
bool append_unlock(const AppendBuffers &append_buffers);
- bool append(const AppendBuffers &append_buffers, bool unlock);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
object_recorder->put();
}
virtual void flush(const FutureImplPtr &future) {
+ Mutex::Locker locker(*(object_recorder->m_lock));
object_recorder->flush(future);
}
};
uint64_t m_object_number;
CephContext *m_cct;
+ ContextWQ *m_op_work_queue;
+
SafeTimer &m_timer;
Mutex &m_timer_lock;
bool m_in_flight_flushes;
Cond m_in_flight_flushes_cond;
+ AppendBuffers m_pending_buffers;
+ bool m_aio_scheduled;
+
void handle_append_task();
void cancel_append_task();
void schedule_append_task();
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed(uint64_t tid);
void send_appends(AppendBuffers *append_buffers);
+ void send_appends_aio();
void notify_handler();
};
#include "librbd/journal/CreateRequest.h"
#include <boost/scope_exit.hpp>
+#include <utility>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr && is_tag_owner(m_lock));
+ int r;
+ C_SaferCond ctx;
+ Future future;
+ C_SaferCond flush_ctx;
- cls::journal::Client client;
- int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
- if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": "
- << "failed to retrieve client: " << cpp_strerror(r) << dendl;
- return r;
- }
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_journaler != nullptr && is_tag_owner(m_lock));
- assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
- journal::TagPredecessor predecessor;
- predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
- if (!client.commit_position.object_positions.empty()) {
- auto position = client.commit_position.object_positions.front();
- predecessor.commit_valid = true;
- predecessor.tag_tid = position.tag_tid;
- predecessor.entry_tid = position.entry_tid;
- }
+ cls::journal::Client client;
+ r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": "
+ << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
- cls::journal::Tag new_tag;
- r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class,
- predecessor, ORPHAN_MIRROR_UUID, &new_tag);
- if (r < 0) {
- return r;
- }
+ assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+ journal::TagPredecessor predecessor;
+ predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
+ if (!client.commit_position.object_positions.empty()) {
+ auto position = client.commit_position.object_positions.front();
+ predecessor.commit_valid = true;
+ predecessor.tag_tid = position.tag_tid;
+ predecessor.entry_tid = position.entry_tid;
+ }
- bufferlist::iterator tag_data_bl_it = new_tag.data.begin();
- r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data);
- if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": "
- << "failed to decode newly allocated tag" << dendl;
- return r;
- }
+ cls::journal::Tag new_tag;
+ r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class,
+ predecessor, ORPHAN_MIRROR_UUID, &new_tag);
+ if (r < 0) {
+ return r;
+ }
+
+ bufferlist::iterator tag_data_bl_it = new_tag.data.begin();
+ r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data);
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": "
+ << "failed to decode newly allocated tag" << dendl;
+ return r;
+ }
- journal::EventEntry event_entry{journal::DemoteEvent{}};
- bufferlist event_entry_bl;
- ::encode(event_entry, event_entry_bl);
+ journal::EventEntry event_entry{journal::DemoteEvent{}};
+ bufferlist event_entry_bl;
+ ::encode(event_entry, event_entry_bl);
- m_tag_tid = new_tag.tid;
- Future future = m_journaler->append(m_tag_tid, event_entry_bl);
- C_SaferCond ctx;
- future.flush(&ctx);
+ m_tag_tid = new_tag.tid;
+ future = m_journaler->append(m_tag_tid, event_entry_bl);
+ future.flush(&ctx);
+ }
r = ctx.wait();
if (r < 0) {
return r;
}
- m_journaler->committed(future);
- C_SaferCond flush_ctx;
- m_journaler->flush_commit_position(&flush_ctx);
+ {
+ Mutex::Locker l(m_lock);
+ m_journaler->committed(future);
+ m_journaler->flush_commit_position(&flush_ctx);
+ }
r = flush_ctx.wait();
if (r < 0) {
bool flush_entry) {
assert(!bufferlists.empty());
- Futures futures;
uint64_t tid;
{
- {
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_READY);
- tid = ++m_event_tid;
- assert(tid != 0);
- }
+ tid = ++m_event_tid;
+ assert(tid != 0);
+ }
- for (auto &bl : bufferlists) {
- assert(bl.length() <= m_max_append_size);
- futures.push_back(m_journaler->append(m_tag_tid, bl));
- }
+ Futures futures;
+ for (auto &bl : bufferlists) {
+ assert(bl.length() <= m_max_append_size);
+ futures.push_back(m_journaler->append(m_tag_tid, bl));
+ }
+
+ {
Mutex::Locker event_locker(m_event_lock);
m_events[tid] = Event(futures, requests, offset, length);
}
} else {
futures.back().wait(on_safe);
}
+
return tid;
}
#include "include/interval_set.h"
#include "common/Cond.h"
#include "common/Mutex.h"
+#include "common/Cond.h"
#include "journal/Future.h"
#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
int is_resync_requested(bool *do_resync);
+ inline ContextWQ *get_work_queue() {
+ return m_work_queue;
+ }
+
private:
ImageCtxT &m_image_ctx;
journal::ObjectRecorderPtr create_object(const std::string &oid,
uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order,
- m_flush_interval, m_flush_bytes, m_flush_age));
+ m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
+ order, m_flush_interval, m_flush_bytes, m_flush_age));
m_object_recorders.push_back(object);
m_object_recorder_locks.insert(std::make_pair(oid, lock));
m_handler.object_lock = lock;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(2U, object->get_pending_appends());
C_SaferCond cond;
- lock->Lock();
append_buffer2.first->flush(&cond);
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object->get_pending_appends());
}
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
lock->Lock();
ASSERT_TRUE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
append_buffers = {append_buffer};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
// should automatically flush once its attached to the object
C_SaferCond cond;
append_buffers = {append_buffer1};
lock->Lock();
ASSERT_FALSE(object->append_unlock(append_buffers));
- ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
lock->Lock();
append_buffers = {append_buffer1, append_buffer2};
lock1->Lock();
ASSERT_TRUE(object1->append_unlock(append_buffers));
- ASSERT_FALSE(lock1->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
lock2->Lock();
ASSERT_FALSE(object2->append_unlock(append_buffers));
- ASSERT_FALSE(lock2->is_locked());
- lock2->Lock();
append_buffer3.first->flush(NULL);
- ASSERT_TRUE(lock2->is_locked());
- lock2->Unlock();
bool overflowed = false;
{
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe1);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+ mock_journal.get_work_queue()->drain();
Context *on_journal_safe2;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe2);
ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal));
+ mock_journal.get_work_queue()->drain();
// commit journal event followed by IO event (standard)
on_journal_safe1->complete(0);
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_write_event(mock_image_ctx, mock_journal, 1 << 17));
+ mock_journal.get_work_queue()->drain();
on_journal_safe->complete(0);
C_SaferCond event_ctx;
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
object_request));
+ mock_journal.get_work_queue()->drain();
// commit the event in the journal w/o waiting writeback
expect_future_committed(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal,
object_request));
+ mock_journal.get_work_queue()->drain();
expect_future_is_valid(mock_future);
C_SaferCond flush_ctx;
expect_append_journaler(mock_journaler);
expect_wait_future(mock_future, &on_journal_safe);
ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal));
+ mock_journal.get_work_queue()->drain();
// failed IO remains uncommitted in journal
on_journal_safe->complete(0);