#undef dout_prefix
#define dout_prefix *_dout << "JournalRecorder: " << this << " "
+using std::shared_ptr;
+
namespace journal {
namespace {
uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ m_object_locks.push_back(shared_ptr<Mutex>(new Mutex("ObjectRecorder::m_lock::"+
+ std::to_string(splay_offset))));
uint64_t object_number = splay_offset + (m_current_set * splay_width);
- m_object_ptrs[splay_offset] = create_object_recorder(object_number);
+ m_object_ptrs[splay_offset] = create_object_recorder(object_number,
+ m_object_locks[splay_offset]);
}
m_journal_metadata->add_listener(&m_listener);
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
- Mutex::Locker locker(m_lock);
+ m_lock.Lock();
uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
uint8_t splay_width = m_journal_metadata->get_splay_width();
future->init(m_prev_future);
m_prev_future = future;
+ m_object_locks[splay_offset]->Lock();
+ m_lock.Unlock();
+
bufferlist entry_bl;
::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
entry_bl);
AppendBuffers append_buffers;
append_buffers.push_back(std::make_pair(future, entry_bl));
- bool object_full = object_ptr->append(append_buffers);
+ bool object_full = object_ptr->append_unlock(append_buffers);
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
+ Mutex::Locker l(m_lock);
close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
}
return Future(future);
it != m_object_ptrs.end(); ++it) {
it->second->flush(ctx);
}
+
}
// avoid holding the lock in case there is nothing to flush
<< dendl;
uint8_t splay_width = m_journal_metadata->get_splay_width();
+
+ lock_object_recorders();
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
create_next_object_recorder(object_recorder);
}
}
+ unlock_object_recorders();
}
bool JournalRecorder::close_object_set(uint64_t active_set) {
// object recorders will invoke overflow handler as they complete
// closing the object to ensure correct order of future appends
uint8_t splay_width = m_journal_metadata->get_splay_width();
+ lock_object_recorders();
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
}
}
}
+ unlock_object_recorders();
return (m_in_flight_object_closes == 0);
}
ObjectRecorderPtr JournalRecorder::create_object_recorder(
- uint64_t object_number) {
+ uint64_t object_number, shared_ptr<Mutex> lock) {
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
- object_number, m_journal_metadata->get_timer(),
+ object_number, lock, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), &m_object_handler,
m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
m_flush_age));
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
+ assert(m_object_locks[splay_offset]->is_locked());
+
ObjectRecorderPtr new_object_recorder = create_object_recorder(
- (m_current_set * splay_width) + splay_offset);
+ (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
ldout(m_cct, 10) << __func__ << ": "
<< "old oid=" << object_recorder->get_oid() << ", "
new_object_recorder->get_object_number());
}
- new_object_recorder->append(append_buffers);
+ new_object_recorder->append(append_buffers, false);
m_object_ptrs[splay_offset] = new_object_recorder;
}
uint32_t m_in_flight_object_closes = 0;
uint64_t m_current_set;
ObjectRecorderPtrs m_object_ptrs;
+ std::vector<std::shared_ptr<Mutex>> m_object_locks;
FutureImplPtr m_prev_future;
void close_and_advance_object_set(uint64_t object_set);
- ObjectRecorderPtr create_object_recorder(uint64_t object_number);
+ ObjectRecorderPtr create_object_recorder(uint64_t object_number,
+ std::shared_ptr<Mutex> lock);
void create_next_object_recorder(ObjectRecorderPtr object_recorder);
void handle_update();
void handle_closed(ObjectRecorder *object_recorder);
void handle_overflow(ObjectRecorder *object_recorder);
+
+ void lock_object_recorders() {
+ for (auto& lock : m_object_locks) {
+ lock->Lock();
+ }
+ }
+
+ void unlock_object_recorders() {
+ for (auto& lock : m_object_locks) {
+ lock->Unlock();
+ }
+ }
};
} // namespace journal
#define dout_prefix *_dout << "ObjectRecorder: " << this << " "
using namespace cls::journal;
+using std::shared_ptr;
namespace journal {
ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
- uint64_t object_number,
+ uint64_t object_number, shared_ptr<Mutex> lock,
SafeTimer &timer, Mutex &timer_lock,
Handler *handler, uint8_t order,
uint32_t flush_interval, uint64_t flush_bytes,
m_handler(handler), m_order(order), m_soft_max_size(1 << m_order),
m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL),
- m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
- m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
- m_object_closed(false), m_in_flight_flushes(false) {
+ m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0),
+ m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
assert(m_handler != NULL);
assert(m_in_flight_appends.empty());
}
-bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
+bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
+ return append(append_buffers, true);
+}
+
+bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) {
+ assert(m_lock->is_locked());
+
FutureImplPtr last_flushed_future;
bool schedule_append = false;
{
- Mutex::Locker locker(m_lock);
if (m_overflowed) {
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
+ if (unlock) {
+ m_lock->Unlock();
+ }
return false;
}
last_flushed_future = iter->first;
}
}
+
+ if (unlock) {
+ m_lock->Unlock();
+ }
}
if (last_flushed_future) {
+ if (unlock) {
+ m_lock->Lock();
+ }
flush(last_flushed_future);
+ if (unlock) {
+ m_lock->Unlock();
+ }
} else if (schedule_append) {
schedule_append_task();
} else {
cancel_append_task();
Future future;
{
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(*m_lock);
// if currently handling flush notifications, wait so that
// we notify in the correct order (since lock is dropped on
// callback)
if (m_in_flight_flushes) {
- m_in_flight_flushes_cond.Wait(m_lock);
+ m_in_flight_flushes_cond.Wait(*(m_lock.get()));
}
// attach the flush to the most recent append
ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
<< dendl;
- Mutex::Locker locker(m_lock);
+ assert(m_lock->is_locked());
+
if (future->get_flush_handler().get() != &m_flush_handler) {
// if we don't own this future, re-issue the flush so that it hits the
// correct journal object owner
void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
- Mutex::Locker locker(m_lock);
+ assert(m_lock->is_locked());
assert(m_in_flight_tids.empty());
assert(m_in_flight_appends.empty());
assert(m_object_closed || m_overflowed);
}
bool ObjectRecorder::close() {
+ assert (m_lock->is_locked());
+
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
cancel_append_task();
- Mutex::Locker locker(m_lock);
flush_appends(true);
assert(!m_object_closed);
assert(m_timer_lock.is_locked());
m_append_task = NULL;
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(*m_lock);
flush_appends(true);
}
bool ObjectRecorder::append(const AppendBuffer &append_buffer,
bool *schedule_append) {
- assert(m_lock.is_locked());
+ assert(m_lock->is_locked());
bool flush_requested = false;
if (!m_object_closed && !m_overflowed) {
}
bool ObjectRecorder::flush_appends(bool force) {
- assert(m_lock.is_locked());
+ assert(m_lock->is_locked());
if (m_object_closed || m_overflowed) {
return true;
}
AppendBuffers append_buffers;
{
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(*m_lock);
auto tid_iter = m_in_flight_tids.find(tid);
assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
}
// wake up any flush requests that raced with a RADOS callback
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(*m_lock);
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();
}
ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
<< dendl;
- assert(m_lock.is_locked());
+ assert(m_lock->is_locked());
assert(!m_in_flight_appends.empty());
assert(m_in_flight_appends.begin()->first == tid);
}
void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
- assert(m_lock.is_locked());
+ assert(m_lock->is_locked());
assert(!append_buffers->empty());
uint64_t append_tid = m_append_tid++;
}
void ObjectRecorder::notify_handler() {
- assert(m_lock.is_locked());
+ assert(m_lock->is_locked());
for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
}
if (m_object_closed) {
- m_lock.Unlock();
+ m_lock->Unlock();
m_handler->closed(this);
- m_lock.Lock();
+ m_lock->Lock();
} else {
// TODO need to delay completion until after aio_notify completes
- m_lock.Unlock();
+ m_lock->Unlock();
m_handler->overflow(this);
- m_lock.Lock();
+ m_lock->Lock();
}
}
};
ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
- uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
- Handler *handler, uint8_t order, uint32_t flush_interval,
- uint64_t flush_bytes, double flush_age);
+ uint64_t object_number, std::shared_ptr<Mutex> lock,
+ SafeTimer &timer, Mutex &timer_lock, Handler *handler,
+ uint8_t order, uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age);
~ObjectRecorder();
inline uint64_t get_object_number() const {
return m_oid;
}
- bool append(const AppendBuffers &append_buffers);
+ bool append_unlock(const AppendBuffers &append_buffers);
+ bool append(const AppendBuffers &append_buffers, bool unlock);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
void claim_append_buffers(AppendBuffers *append_buffers);
bool is_closed() const {
- Mutex::Locker locker(m_lock);
+ assert(m_lock->is_locked());
return (m_object_closed && m_in_flight_appends.empty());
}
bool close();
}
inline size_t get_pending_appends() const {
- Mutex::Locker locker(m_lock);
+ Mutex::Locker locker(*m_lock);
return m_append_buffers.size();
}
C_AppendTask *m_append_task;
- mutable Mutex m_lock;
+ mutable std::shared_ptr<Mutex> m_lock;
AppendBuffers m_append_buffers;
uint64_t m_append_tid;
uint32_t m_pending_bytes;
Futures futures;
uint64_t tid;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_READY);
- Mutex::Locker event_locker(m_event_lock);
- tid = ++m_event_tid;
- assert(tid != 0);
+ tid = ++m_event_tid;
+ assert(tid != 0);
+ }
for (auto &bl : bufferlists) {
assert(bl.length() <= m_max_append_size);
futures.push_back(m_journaler->append(m_tag_tid, bl));
}
+ Mutex::Locker event_locker(m_event_lock);
m_events[tid] = Event(futures, requests, offset, length);
}
#include "test/journal/RadosTestFixture.h"
#include <limits>
+using std::shared_ptr;
+
class TestObjectRecorder : public RadosTestFixture {
public:
TestObjectRecorder()
struct Handler : public journal::ObjectRecorder::Handler {
Mutex lock;
+ shared_ptr<Mutex> object_lock;
Cond cond;
bool is_closed = false;
uint32_t overflows = 0;
virtual void overflow(journal::ObjectRecorder *object_recorder) {
Mutex::Locker locker(lock);
journal::AppendBuffers append_buffers;
+ object_lock->Lock();
object_recorder->claim_append_buffers(&append_buffers);
+ object_lock->Unlock();
++overflows;
cond.Signal();
};
typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
+ typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
ObjectRecorders m_object_recorders;
+ ObjectRecorderLocksMap m_object_recorder_locks;
uint32_t m_flush_interval;
uint64_t m_flush_bytes;
}
journal::ObjectRecorderPtr create_object(const std::string &oid,
- uint8_t order) {
+ uint8_t order, shared_ptr<Mutex> lock) {
journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
- m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order,
+ m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order,
m_flush_interval, m_flush_bytes, m_flush_age));
m_object_recorders.push_back(object);
+ m_object_recorder_locks.insert(std::make_pair(oid, lock));
+ m_handler.object_lock = lock;
return object;
}
};
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(2U, object->get_pending_appends());
C_SaferCond cond;
+ lock->Lock();
append_buffer2.first->flush(&cond);
+ ASSERT_TRUE(lock->is_locked());
+ lock->Unlock();
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object->get_pending_appends());
}
ASSERT_EQ(0, init_metadata(metadata));
set_flush_interval(2);
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
ASSERT_EQ(0, init_metadata(metadata));
set_flush_bytes(10);
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(0U, object->get_pending_appends());
C_SaferCond cond;
ASSERT_EQ(0, init_metadata(metadata));
set_flush_age(0.1);
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object = create_object(oid, 12);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
std::string payload(2048, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
- ASSERT_TRUE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_TRUE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond1;
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
C_SaferCond cond;
append_buffer.first->wait(&cond);
+ lock->Lock();
object->flush(append_buffer.first);
+ ASSERT_TRUE(lock->is_locked());
+ lock->Unlock();
ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
append_buffer.first->is_complete());
ASSERT_EQ(0, cond.wait());
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
+ lock->Lock();
object->flush(append_buffer.first);
+ ASSERT_TRUE(lock->is_locked());
+ lock->Unlock();
ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
// should automatically flush once its attached to the object
C_SaferCond cond;
ASSERT_EQ(0, init_metadata(metadata));
set_flush_interval(2);
- journal::ObjectRecorderPtr object = create_object(oid, 24);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
- ASSERT_FALSE(object->append(append_buffers));
+ lock->Lock();
+ ASSERT_FALSE(object->append_unlock(append_buffers));
+ ASSERT_FALSE(lock->is_locked());
ASSERT_EQ(1U, object->get_pending_appends());
+ lock->Lock();
ASSERT_FALSE(object->close());
+ ASSERT_TRUE(lock->is_locked());
+ lock->Unlock();
{
Mutex::Locker locker(m_handler.lock);
journal::JournalMetadataPtr metadata = create_metadata(oid);
ASSERT_EQ(0, init_metadata(metadata));
- journal::ObjectRecorderPtr object1 = create_object(oid, 12);
- journal::ObjectRecorderPtr object2 = create_object(oid, 12);
+ shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
+ journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+ shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
+ journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
std::string payload(2048, '1');
journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
- ASSERT_TRUE(object1->append(append_buffers));
+ lock1->Lock();
+ ASSERT_TRUE(object1->append_unlock(append_buffers));
+ ASSERT_FALSE(lock1->is_locked());
C_SaferCond cond;
append_buffer2.first->wait(&cond);
payload);
append_buffers = {append_buffer3};
- ASSERT_FALSE(object2->append(append_buffers));
+ lock2->Lock();
+ ASSERT_FALSE(object2->append_unlock(append_buffers));
+ ASSERT_FALSE(lock2->is_locked());
+ lock2->Lock();
append_buffer3.first->flush(NULL);
+ ASSERT_TRUE(lock2->is_locked());
+ lock2->Unlock();
bool overflowed = false;
{