ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
uint64_t object_number, shared_ptr<Mutex> lock,
- ContextWQ *work_queue, SafeTimer &timer,
- Mutex &timer_lock, Handler *handler,
+ ContextWQ *work_queue, Handler *handler,
uint8_t order, uint32_t flush_interval,
uint64_t flush_bytes, double flush_age,
- uint64_t max_in_flight_appends)
+ int32_t max_in_flight_appends)
: RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
- m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
- m_timer_lock(timer_lock), m_handler(handler), m_order(order),
- m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
- m_flush_bytes(flush_bytes), m_flush_age(flush_age),
- m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
- m_lock(lock), m_append_tid(0), m_pending_bytes(0),
- m_size(0), m_overflowed(false), m_object_closed(false),
- m_in_flight_flushes(false), m_aio_scheduled(false) {
+ m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
+ m_order(order), m_soft_max_size(1 << m_order),
+ m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
+ m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends),
+ m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()),
+ m_append_tid(0), m_overflowed(false), m_object_closed(false),
+ m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
ObjectRecorder::~ObjectRecorder() {
ldout(m_cct, 20) << dendl;
- ceph_assert(m_append_task == NULL);
- ceph_assert(m_append_buffers.empty());
+ ceph_assert(m_pending_buffers.empty());
ceph_assert(m_in_flight_tids.empty());
ceph_assert(m_in_flight_appends.empty());
- ceph_assert(!m_aio_scheduled);
}
-bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
- ceph_assert(m_lock->is_locked());
+bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
- FutureImplPtr last_flushed_future;
- bool schedule_append = false;
-
- if (m_overflowed) {
- m_append_buffers.insert(m_append_buffers.end(),
- append_buffers.begin(), append_buffers.end());
- m_lock->Unlock();
- ldout(m_cct, 20) << "already overflowed" << dendl;
- return false;
- }
+ ceph_assert(m_lock->is_locked());
- for (AppendBuffers::const_iterator iter = append_buffers.begin();
- iter != append_buffers.end(); ++iter) {
- if (append(*iter, &schedule_append)) {
- last_flushed_future = iter->first;
+ FutureImplPtr last_flushed_future;
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << ", "
+ << "size=" << append_buffer.second.length() << dendl;
+ bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+ if (flush_requested) {
+ last_flushed_future = append_buffer.first;
}
- }
- if (last_flushed_future) {
- flush(last_flushed_future);
- m_lock->Unlock();
- } else {
- m_lock->Unlock();
- if (schedule_append) {
- schedule_append_task();
- } else {
- cancel_append_task();
- }
+ m_pending_buffers.push_back(append_buffer);
+ m_pending_bytes += append_buffer.second.length();
}
- return (!m_object_closed && !m_overflowed &&
- m_size + m_pending_bytes >= m_soft_max_size);
+
+ return send_appends(!!last_flushed_future, last_flushed_future);
}
void ObjectRecorder::flush(Context *on_safe) {
ldout(m_cct, 20) << dendl;
- cancel_append_task();
Future future;
{
Mutex::Locker locker(*m_lock);
}
// attach the flush to the most recent append
- if (!m_append_buffers.empty()) {
- future = Future(m_append_buffers.rbegin()->first);
-
- flush_appends(true);
- } else if (!m_pending_buffers.empty()) {
+ if (!m_pending_buffers.empty()) {
future = Future(m_pending_buffers.rbegin()->first);
} else if (!m_in_flight_appends.empty()) {
AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
}
if (future.is_valid()) {
- future.flush(on_safe);
+ // cannot be invoked while the same lock context
+ m_op_work_queue->queue(new FunctionContext(
+ [future, on_safe] (int r) mutable {
+ future.flush(on_safe);
+ }));
} else {
on_safe->complete(0);
}
void ObjectRecorder::flush(const FutureImplPtr &future) {
ldout(m_cct, 20) << "flushing " << *future << dendl;
- ceph_assert(m_lock->is_locked());
-
+ m_lock->Lock();
if (future->get_flush_handler().get() != &m_flush_handler) {
// if we don't own this future, re-issue the flush so that it hits the
// correct journal object owner
future->flush();
+ m_lock->Unlock();
return;
} else if (future->is_flush_in_progress()) {
+ m_lock->Unlock();
return;
}
- if (m_object_closed || m_overflowed) {
- return;
- }
-
- AppendBuffers::reverse_iterator r_it;
- for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
- ++r_it) {
- if (r_it->first == future) {
- break;
- }
+ bool overflowed = send_appends(true, future);
+ if (overflowed) {
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
}
- ceph_assert(r_it != m_append_buffers.rend());
-
- auto it = (++r_it).base();
- ceph_assert(it != m_append_buffers.end());
- ++it;
-
- AppendBuffers flush_buffers;
- flush_buffers.splice(flush_buffers.end(), m_append_buffers,
- m_append_buffers.begin(), it);
- send_appends(&flush_buffers);
}
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
ceph_assert(m_in_flight_tids.empty());
ceph_assert(m_in_flight_appends.empty());
ceph_assert(m_object_closed || m_overflowed);
- append_buffers->splice(append_buffers->end(), m_append_buffers,
- m_append_buffers.begin(), m_append_buffers.end());
+
+ for (auto& append_buffer : m_pending_buffers) {
+ ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
+ append_buffer.first->detach();
+ }
+ append_buffers->splice(append_buffers->end(), m_pending_buffers,
+ m_pending_buffers.begin(), m_pending_buffers.end());
}
bool ObjectRecorder::close() {
ceph_assert(m_lock->is_locked());
ldout(m_cct, 20) << dendl;
-
- cancel_append_task();
-
- flush_appends(true);
+ send_appends(true, {});
ceph_assert(!m_object_closed);
m_object_closed = true;
- return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
-}
-
-void ObjectRecorder::handle_append_task() {
- ceph_assert(m_timer_lock.is_locked());
- m_append_task = NULL;
-
- Mutex::Locker locker(*m_lock);
- flush_appends(true);
-}
-
-void ObjectRecorder::cancel_append_task() {
- Mutex::Locker locker(m_timer_lock);
- if (m_append_task != NULL) {
- ldout(m_cct, 20) << dendl;
- m_timer.cancel_event(m_append_task);
- m_append_task = NULL;
- }
-}
-
-void ObjectRecorder::schedule_append_task() {
- Mutex::Locker locker(m_timer_lock);
- if (m_append_task == nullptr && m_flush_age > 0) {
- ldout(m_cct, 20) << dendl;
- m_append_task = m_timer.add_event_after(
- m_flush_age, new FunctionContext([this](int) {
- handle_append_task();
- }));
- }
-}
-
-bool ObjectRecorder::append(const AppendBuffer &append_buffer,
- bool *schedule_append) {
- ceph_assert(m_lock->is_locked());
- ldout(m_cct, 20) << "bytes=" << append_buffer.second.length() << dendl;
-
- bool flush_requested = false;
- if (!m_object_closed && !m_overflowed) {
- flush_requested = append_buffer.first->attach(&m_flush_handler);
- }
-
- m_append_buffers.push_back(append_buffer);
- m_pending_bytes += append_buffer.second.length();
-
- if (!flush_appends(false)) {
- *schedule_append = true;
- }
- return flush_requested;
-}
-
-bool ObjectRecorder::flush_appends(bool force) {
- ceph_assert(m_lock->is_locked());
- ldout(m_cct, 20) << "force=" << force << dendl;
- if (m_object_closed || m_overflowed) {
- ldout(m_cct, 20) << "already closed or overflowed" << dendl;
- return true;
- }
-
- if (m_append_buffers.empty() ||
- (!force &&
- m_size + m_pending_bytes < m_soft_max_size &&
- (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
- (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
- ldout(m_cct, 20) << "batching append" << dendl;
- return false;
- }
-
- m_pending_bytes = 0;
- AppendBuffers append_buffers;
- append_buffers.swap(m_append_buffers);
- send_appends(&append_buffers);
- return true;
+ return (m_in_flight_tids.empty() && !m_in_flight_flushes);
}
void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
m_in_flight_tids.erase(tid_iter);
InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
- if (r == -EOVERFLOW || m_overflowed) {
- if (iter != m_in_flight_appends.end()) {
- ldout(m_cct, 10) << "append overflowed" << dendl;
- m_overflowed = true;
- } else {
- // must have seen an overflow on a previous append op
- ceph_assert(r == -EOVERFLOW && m_overflowed);
- }
+ ceph_assert(iter != m_in_flight_appends.end());
+
+ if (r == -EOVERFLOW) {
+ ldout(m_cct, 10) << "append overflowed" << dendl;
+ m_overflowed = true;
// notify of overflow once all in-flight ops are complete
- if (m_in_flight_tids.empty() && !m_aio_scheduled) {
- m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+ if (m_in_flight_tids.empty()) {
append_overflowed();
notify_handler_unlock();
} else {
return;
}
- ceph_assert(iter != m_in_flight_appends.end());
append_buffers.swap(iter->second);
ceph_assert(!append_buffers.empty());
+ for (auto& append_buffer : append_buffers) {
+ m_object_bytes += append_buffer.second.length();
+ }
+ ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
+
m_in_flight_appends.erase(iter);
m_in_flight_flushes = true;
m_lock->Unlock();
}
// Flag the associated futures as complete.
- for (AppendBuffers::iterator buf_it = append_buffers.begin();
- buf_it != append_buffers.end(); ++buf_it) {
- ldout(m_cct, 20) << *buf_it->first << " marked safe" << dendl;
- buf_it->first->safe(r);
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
+ append_buffer.first->safe(r);
}
// wake up any flush requests that raced with a RADOS callback
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
- if (!m_aio_scheduled) {
- if (m_in_flight_appends.empty() &&
- (m_object_closed || m_aio_sent_size >= m_soft_max_size)) {
- if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 20) << " soft max size reached, notifying overflow"
- << dendl;
- m_overflowed = true;
- }
- // all remaining unsent appends should be redirected to new object
- m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
+ if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock();
+ } else {
+ bool overflowed = send_appends(false, {});
+ if (overflowed) {
notify_handler_unlock();
- } else if (!m_pending_buffers.empty()) {
- m_aio_scheduled = true;
- m_lock->Unlock();
- send_appends_aio();
} else {
m_lock->Unlock();
}
- } else {
- m_lock->Unlock();
}
}
ceph_assert(m_lock->is_locked());
ceph_assert(!m_in_flight_appends.empty());
- cancel_append_task();
-
InFlightAppends in_flight_appends;
in_flight_appends.swap(m_in_flight_appends);
}
restart_append_buffers.splice(restart_append_buffers.end(),
- m_append_buffers,
- m_append_buffers.begin(),
- m_append_buffers.end());
- restart_append_buffers.swap(m_append_buffers);
-
- for (AppendBuffers::const_iterator it = m_append_buffers.begin();
- it != m_append_buffers.end(); ++it) {
- ldout(m_cct, 20) << "overflowed " << *it->first << dendl;
- it->first->detach();
- }
+ m_pending_buffers,
+ m_pending_buffers.begin(),
+ m_pending_buffers.end());
+ restart_append_buffers.swap(m_pending_buffers);
}
-void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
ldout(m_cct, 20) << dendl;
- ceph_assert(m_lock->is_locked());
- ceph_assert(!append_buffers->empty());
- for (AppendBuffers::iterator it = append_buffers->begin();
- it != append_buffers->end(); ++it) {
- ldout(m_cct, 20) << "flushing " << *it->first << dendl;
- it->first->set_flush_in_progress();
- m_size += it->second.length();
+ ceph_assert(m_lock->is_locked());
+ if (m_object_closed || m_overflowed) {
+ ldout(m_cct, 20) << "already closed or overflowed" << dendl;
+ return false;
}
- m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
- append_buffers->begin(), append_buffers->end());
- if (!m_aio_scheduled) {
- m_op_work_queue->queue(new FunctionContext([this] (int r) {
- send_appends_aio();
- }));
- m_aio_scheduled = true;
+ if (m_pending_buffers.empty()) {
+ ldout(m_cct, 20) << "append buffers empty" << dendl;
+ return false;
}
-}
-void ObjectRecorder::send_appends_aio() {
- ldout(m_cct, 20) << dendl;
- librados::AioCompletion *rados_completion;
- {
- Mutex::Locker locker(*m_lock);
- m_aio_scheduled = false;
+ if (!force &&
+ ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
+ (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
+ (m_flush_age > 0 &&
+ m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+ ldout(m_cct, 20) << "forcing batch flush" << dendl;
+ force = true;
+ }
- if (m_pending_buffers.empty()) {
- ldout(m_cct, 10) << "pending buffers empty" << dendl;
- return;
+ auto max_in_flight_appends = m_max_in_flight_appends;
+ if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
+ if (!force && max_in_flight_appends == 0) {
+ ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
+ max_in_flight_appends = 1;
}
+ } else if (max_in_flight_appends < 0) {
+ max_in_flight_appends = 0;
+ }
- if (m_max_in_flight_appends != 0 &&
- m_in_flight_tids.size() >= m_max_in_flight_appends) {
- ldout(m_cct, 10) << "max in flight appends reached" << dendl;
- return;
+ if (!force && max_in_flight_appends != 0 &&
+ static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
+ ldout(m_cct, 10) << "max in flight appends reached" << dendl;
+ return false;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
+
+ size_t append_bytes = 0;
+ AppendBuffers append_buffers;
+ for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
+ auto& future = it->first;
+ auto& bl = it->second;
+ auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
+ if (size == m_soft_max_size) {
+ ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+ m_overflowed = true;
+ } else if (size > m_soft_max_size) {
+ ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+ m_overflowed = true;
+ break;
}
- if (m_aio_sent_size >= m_soft_max_size) {
- ldout(m_cct, 10) << "soft max size reached" << dendl;
- return;
+ bool flush_break = (force && flush_future && flush_future == future);
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
+ future->set_flush_in_progress();
+
+ op.append(bl);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ append_bytes += bl.length();
+ append_buffers.push_back(*it);
+ it = m_pending_buffers.erase(it);
+
+ if (flush_break) {
+ ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
+ break;
}
+ }
+
+ if (append_bytes > 0) {
+ m_last_flush_time = ceph_clock_now();
uint64_t append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);
+ m_in_flight_appends[append_tid].swap(append_buffers);
+ m_in_flight_bytes += append_bytes;
- ldout(m_cct, 10) << "flushing journal tid=" << append_tid << dendl;
-
- librados::ObjectWriteOperation op;
- client::guard_append(&op, m_soft_max_size);
- auto append_buffers = &m_in_flight_appends[append_tid];
-
- size_t append_bytes = 0;
- for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
- ldout(m_cct, 20) << "flushing " << *it->first << dendl;
- op.append(it->second);
- op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
- m_aio_sent_size += it->second.length();
- append_bytes += it->second.length();
- append_buffers->push_back(*it);
- it = m_pending_buffers.erase(it);
- if (m_aio_sent_size >= m_soft_max_size) {
- break;
- }
- }
- rados_completion = librados::Rados::aio_create_completion(
- new C_AppendFlush(this, append_tid), nullptr,
- utils::rados_ctx_callback);
+ ceph_assert(m_pending_bytes >= append_bytes);
+ m_pending_bytes -= append_bytes;
+
+ auto rados_completion = librados::Rados::aio_create_completion(
+ new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
ceph_assert(r == 0);
- ldout(m_cct, 20) << "append_bytes=" << append_bytes << dendl;
+ rados_completion->release();
+ ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
+ << "append_bytes=" << append_bytes << ", "
+ << "in_flight_bytes=" << m_in_flight_bytes << ", "
+ << "pending_bytes=" << m_pending_bytes << dendl;
}
- rados_completion->release();
+
+ return m_overflowed;
}
void ObjectRecorder::notify_handler_unlock() {
RadosTestFixture::TearDown();
}
- inline void set_flush_interval(uint32_t i) {
- m_flush_interval = i;
- }
- inline void set_flush_bytes(uint64_t i) {
- m_flush_bytes = i;
- }
- inline void set_flush_age(double i) {
- m_flush_age = i;
+ inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age, int max_in_flight) {
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+ m_max_in_flight_appends = max_in_flight;
}
journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
journal::ObjectRecorderPtr create_object(const std::string &oid,
uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler,
- order, m_flush_interval, m_flush_bytes, m_flush_age,
- m_max_in_flight_appends));
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval,
+ m_flush_bytes, m_flush_age, m_max_in_flight_appends));
m_object_recorders.push_back(object);
m_object_recorder_locks.insert(std::make_pair(oid, lock));
m_handler.object_lock = lock;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 0, 0, 0);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
- ASSERT_EQ(1U, object->get_pending_appends());
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
- ASSERT_EQ(2U, object->get_pending_appends());
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
append_buffer2.first->flush(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_interval(2);
+ set_batch_options(2, 0, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_bytes(10);
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_age(0.1);
+ set_batch_options(0, 0, 0.1, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
lock->Lock();
- ASSERT_TRUE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_TRUE(object->append(std::move(append_buffers)));
+ lock->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
+ set_batch_options(0, 10, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
append_buffer.first->wait(&cond);
- lock->Lock();
object->flush(append_buffer.first);
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
append_buffer.first->is_complete());
ASSERT_EQ(0, cond.wait());
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
- lock->Lock();
object->flush(append_buffer.first);
- ASSERT_TRUE(lock->is_locked());
- lock->Unlock();
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
// should automatically flush once its attached to the object
C_SaferCond cond;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- set_flush_interval(2);
+ set_batch_options(2, 0, 0, -1);
shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
lock->Lock();
- ASSERT_FALSE(object->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
ASSERT_EQ(1U, object->get_pending_appends());
lock->Lock();
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
lock1->Lock();
- ASSERT_TRUE(object1->append_unlock(std::move(append_buffers)));
+ ASSERT_TRUE(object1->append(std::move(append_buffers)));
+ lock1->Unlock();
C_SaferCond cond;
append_buffer2.first->wait(&cond);
payload);
append_buffers = {append_buffer3};
lock2->Lock();
- ASSERT_FALSE(object2->append_unlock(std::move(append_buffers)));
+ ASSERT_FALSE(object2->append(std::move(append_buffers)));
+ lock2->Unlock();
append_buffer3.first->flush(NULL);
overflowed = false;