This avoids the need to open two threads per journaler.
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
// vim: ts=8 sw=2 smarttab
#include "journal/FutureImpl.h"
-#include "common/Finisher.h"
#include "journal/JournalMetadata.h"
#include "journal/Utils.h"
}
if (complete && on_safe != NULL) {
- m_journal_metadata->get_finisher().queue(on_safe, m_return_value);
+ m_journal_metadata->queue(on_safe, m_return_value);
} else if (flush_handler) {
// attached to journal object -- instruct it to flush all entries through
// this one. possible to become detached while lock is released, so flush
return;
}
}
- m_journal_metadata->get_finisher().queue(on_safe, m_return_value);
+ m_journal_metadata->queue(on_safe, m_return_value);
}
bool FutureImpl::is_complete() const {
#include "journal/JournalMetadata.h"
#include "journal/Utils.h"
#include "common/errno.h"
-#include "common/Finisher.h"
#include "common/Timer.h"
#include "cls/journal/cls_journal_client.h"
#include <functional>
} // anonymous namespace
-JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
+JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
double commit_interval)
: RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
- m_splay_width(0), m_pool_id(-1), m_initialized(false), m_finisher(NULL),
- m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"),
+ m_splay_width(0), m_pool_id(-1), m_initialized(false),
+ m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
JournalMetadata::~JournalMetadata() {
if (m_initialized) {
- shutdown();
+ shut_down();
}
}
assert(!m_initialized);
m_initialized = true;
- m_finisher = new Finisher(m_cct);
- m_finisher->start();
-
- m_timer = new SafeTimer(m_cct, m_timer_lock, true);
- m_timer->init();
-
int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, ctx);
}
-void JournalMetadata::shutdown() {
+void JournalMetadata::shut_down() {
ldout(m_cct, 20) << __func__ << dendl;
flush_commit_position();
- if (m_timer != NULL) {
- Mutex::Locker locker(m_timer_lock);
- m_timer->shutdown();
- delete m_timer;
- m_timer = NULL;
- }
-
- if (m_finisher != NULL) {
- m_finisher->stop();
- delete m_finisher;
- m_finisher = NULL;
- }
-
librados::Rados rados(m_ioctx);
rados.watch_flush();
m_async_op_tracker.wait_for_ops();
- m_ioctx.aio_flush();
}
void JournalMetadata::get_immutable_metadata(uint8_t *order,
void JournalMetadata::flush_commit_position() {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
return;
void JournalMetadata::flush_commit_position(Context *on_safe) {
ldout(m_cct, 20) << __func__ << dendl;
- Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
// nothing to flush
if (on_safe != nullptr) {
- m_finisher->queue(on_safe, 0);
+ m_work_queue->queue(on_safe, 0);
}
return;
}
void JournalMetadata::cancel_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
assert(m_commit_position_task_ctx != nullptr);
void JournalMetadata::schedule_commit_task() {
ldout(m_cct, 20) << __func__ << dendl;
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
assert(m_commit_position_ctx != nullptr);
if (m_commit_position_task_ctx == NULL) {
}
void JournalMetadata::handle_commit_position_task() {
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
assert(m_lock.is_locked());
ldout(m_cct, 20) << __func__ << dendl;
}
void JournalMetadata::schedule_watch_reset() {
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
m_timer->add_event_after(0.1, new C_WatchReset(this));
}
void JournalMetadata::handle_watch_reset() {
- assert(m_timer_lock.is_locked());
+ assert(m_timer_lock->is_locked());
if (!m_initialized) {
return;
}
void JournalMetadata::handle_watch_error(int err) {
lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
- Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
// release old watch on error
ObjectSetPosition commit_position;
Context *stale_ctx = nullptr;
{
- Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker timer_locker(*m_timer_lock);
Mutex::Locker locker(m_lock);
assert(commit_tid > m_commit_position_tid);
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/AsyncOpTracker.h"
#include <boost/intrusive_ptr.hpp>
#include <string>
#include "include/assert.h"
-class Finisher;
class SafeTimer;
namespace journal {
virtual void handle_update(JournalMetadata *) = 0;
};
- JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
+ JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &ioctx, const std::string &oid,
const std::string &client_id, double commit_interval);
~JournalMetadata();
void init(Context *on_init);
- void shutdown();
+ void shut_down();
void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
int64_t *pool_id, Context *on_finish);
return m_pool_id;
}
- inline Finisher &get_finisher() {
- return *m_finisher;
+ inline void queue(Context *on_finish, int r) {
+ m_work_queue->queue(on_finish, r);
}
inline SafeTimer &get_timer() {
return *m_timer;
}
inline Mutex &get_timer_lock() {
- return m_timer_lock;
+ return *m_timer_lock;
}
void set_minimum_set(uint64_t object_set);
int64_t m_pool_id;
bool m_initialized;
- Finisher *m_finisher;
+ ContextWQ *m_work_queue;
SafeTimer *m_timer;
- Mutex m_timer_lock;
+ Mutex *m_timer_lock;
mutable Mutex m_lock;
// vim: ts=8 sw=2 smarttab
#include "journal/JournalPlayer.h"
-#include "common/Finisher.h"
#include "journal/Entry.h"
#include "journal/ReplayHandler.h"
#include "journal/Utils.h"
m_watch_scheduled = true;
} else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_journal_metadata->queue(new C_HandleComplete(
m_replay_handler), 0);
}
return false;
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_journal_metadata->queue(new C_HandleComplete(
m_replay_handler), -ENOMSG);
return false;
}
ObjectPlayerPtr object_player = get_object_player();
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_journal_metadata->queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
} else if (m_watch_enabled) {
object_player->watch(
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
<< dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_journal_metadata->queue(new C_HandleComplete(
m_replay_handler), 0);
}
return 0;
uint64_t object_set = object_player->get_object_number() / splay_width;
if (!object_player->empty()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+ m_journal_metadata->queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
} else if (object_set == active_set) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
- m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+ m_journal_metadata->queue(new C_HandleComplete(
m_replay_handler), 0);
}
}
// vim: ts=8 sw=2 smarttab
#include "journal/JournalRecorder.h"
-#include "common/Finisher.h"
#include "journal/Entry.h"
#include "journal/Utils.h"
}
if (pending_flushes.dec() == 0) {
// ensure all prior callback have been flushed as well
- journal_metadata->get_finisher().queue(on_finish, ret_val);
+ journal_metadata->queue(on_finish, ret_val);
delete this;
}
}
#include "journal/Utils.h"
#include "common/Cond.h"
#include "common/errno.h"
-#include "common/Finisher.h"
#include <limits>
#define dout_subsys ceph_subsys_journaler
#include "journal/Journaler.h"
#include "include/stringify.h"
#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
#include "journal/Entry.h"
#include "journal/FutureImpl.h"
#include "journal/JournalMetadata.h"
return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
}
+Journaler::Threads::Threads(CephContext *cct)
+ : timer_lock("Journaler::timer_lock") {
+ thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
+ thread_pool->start();
+
+ work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
+
+ timer = new SafeTimer(cct, timer_lock, true);
+ timer->init();
+}
+
+Journaler::Threads::~Threads() {
+ {
+ Mutex::Locker timer_locker(timer_lock);
+ timer->shutdown();
+ }
+ delete timer;
+
+ work_queue->drain();
+ delete work_queue;
+
+ thread_pool->stop();
+ delete thread_pool;
+}
+
Journaler::Journaler(librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const std::string &client_id, double commit_interval)
+ : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))),
+ m_client_id(client_id) {
+ set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
+ header_ioctx, journal_id, commit_interval);
+}
+
+Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &header_ioctx,
const std::string &journal_id,
const std::string &client_id, double commit_interval)
- : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
- m_trimmer(NULL)
-{
+ : m_client_id(client_id) {
+ set_up(work_queue, timer, timer_lock, header_ioctx, journal_id,
+ commit_interval);
+}
+
+void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &header_ioctx,
+ const std::string &journal_id, double commit_interval) {
m_header_ioctx.dup(header_ioctx);
m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
m_header_oid = header_oid(journal_id);
m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id);
- m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
+ m_metadata = new JournalMetadata(work_queue, timer, timer_lock,
+ m_header_ioctx, m_header_oid, m_client_id,
commit_interval);
m_metadata->get();
}
Journaler::~Journaler() {
- if (m_metadata != NULL) {
+ if (m_metadata != nullptr) {
m_metadata->put();
- m_metadata = NULL;
+ m_metadata = nullptr;
}
delete m_trimmer;
- assert(m_player == NULL);
- assert(m_recorder == NULL);
+ assert(m_player == nullptr);
+ assert(m_recorder == nullptr);
+
+ delete m_threads;
}
int Journaler::exists(bool *header_exists) const {
return 0;
}
-void Journaler::shutdown() {
- m_metadata->shutdown();
+void Journaler::shut_down() {
+ m_metadata->shut_down();
}
void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
}
int Journaler::remove(bool force) {
- m_metadata->shutdown();
+ m_metadata->shut_down();
ldout(m_cct, 5) << "removing journal: " << m_header_oid << dendl;
int r = m_trimmer->remove_objects(force);
#include <string>
#include "include/assert.h"
+class ContextWQ;
class SafeTimer;
+class ThreadPool;
namespace journal {
class Journaler {
public:
+ struct Threads {
+ Threads(CephContext *cct);
+ ~Threads();
+
+ ThreadPool *thread_pool = nullptr;
+ ContextWQ *work_queue = nullptr;
+
+ SafeTimer *timer = nullptr;
+ Mutex timer_lock;
+ };
+
typedef std::list<cls::journal::Tag> Tags;
typedef std::set<cls::journal::Client> RegisteredClients;
Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
const std::string &client_id, double commit_interval);
+ Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const std::string &client_id, double commit_interval);
~Journaler();
int exists(bool *header_exists) const;
int remove(bool force);
void init(Context *on_init);
- void shutdown();
+ void shut_down();
void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
int64_t *pool_id, Context *on_finish);
}
};
+ Threads *m_threads = nullptr;
+
mutable librados::IoCtx m_header_ioctx;
librados::IoCtx m_data_ioctx;
CephContext *m_cct;
std::string m_header_oid;
std::string m_object_oid_prefix;
- JournalMetadata *m_metadata;
- JournalPlayer *m_player;
- JournalRecorder *m_recorder;
- JournalTrimmer *m_trimmer;
+ JournalMetadata *m_metadata = nullptr;
+ JournalPlayer *m_player = nullptr;
+ JournalRecorder *m_recorder = nullptr;
+ JournalTrimmer *m_trimmer = nullptr;
+
+ void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &header_ioctx, const std::string &journal_id,
+ double commit_interval);
int init_complete();
void create_player(ReplayHandler *replay_handler);
#include "test/journal/RadosTestFixture.h"
#include "cls/journal/cls_journal_client.h"
#include "include/stringify.h"
+#include "common/WorkQueue.h"
RadosTestFixture::RadosTestFixture()
: m_timer_lock("m_timer_lock"), m_timer(NULL), m_listener(this) {
void RadosTestFixture::SetUpTestCase() {
_pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados));
+
+ CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
+ _thread_pool = new ThreadPool(cct, "RadosTestFixture::_thread_pool",
+ "tp_test", 1);
+ _thread_pool->start();
}
void RadosTestFixture::TearDownTestCase() {
ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados));
+
+ _thread_pool->stop();
+ delete _thread_pool;
}
std::string RadosTestFixture::get_temp_oid() {
void RadosTestFixture::SetUp() {
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
- m_timer = new SafeTimer(reinterpret_cast<CephContext*>(m_ioctx.cct()),
- m_timer_lock, true);
+
+ CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+ m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", 60,
+ _thread_pool);
+
+ m_timer = new SafeTimer(cct, m_timer_lock, true);
m_timer->init();
}
m_timer->shutdown();
}
delete m_timer;
+
+ m_work_queue->drain();
+ delete m_work_queue;
}
int RadosTestFixture::create(const std::string &oid, uint8_t order,
const std::string &oid, const std::string &client_id,
double commit_internal) {
journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
- m_ioctx, oid, client_id, commit_internal));
+ m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id,
+ commit_internal));
return metadata;
}
std::string RadosTestFixture::_pool_name;
librados::Rados RadosTestFixture::_rados;
uint64_t RadosTestFixture::_oid_number = 0;
+ThreadPool *RadosTestFixture::_thread_pool = nullptr;
#include "cls/journal/cls_journal_types.h"
#include "gtest/gtest.h"
+class ThreadPool;
+
class RadosTestFixture : public ::testing::Test {
public:
static void SetUpTestCase();
static std::string _pool_name;
static librados::Rados _rados;
static uint64_t _oid_number;
+ static ThreadPool *_thread_pool;
librados::IoCtx m_ioctx;
+ ContextWQ *m_work_queue;
+
Mutex m_timer_lock;
SafeTimer *m_timer;
it != m_metadata_list.end(); ++it) {
(*it)->remove_listener(&m_listener);
}
+ m_metadata_list.clear();
+
RadosTestFixture::TearDown();
}
it != m_metadata_list.end(); ++it) {
(*it)->remove_listener(&m_listener);
}
+ m_metadata_list.clear();
+
for (std::list<journal::JournalTrimmer*>::iterator it = m_trimmers.begin();
it != m_trimmers.end(); ++it) {
delete *it;
virtual void SetUp() {
RadosTestFixture::SetUp();
m_journal_id = get_temp_journal_id();
- m_journaler = new journal::Journaler(m_ioctx, m_journal_id, CLIENT_ID, 5);
+ m_journaler = new journal::Journaler(m_work_queue, m_timer, &m_timer_lock,
+ m_ioctx, m_journal_id, CLIENT_ID, 5);
}
virtual void TearDown() {
}
int register_client(const std::string &client_id, const std::string &desc) {
- journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+ m_ioctx, m_journal_id, client_id, 5);
bufferlist data;
data.append(desc);
C_SaferCond cond;
}
int update_client(const std::string &client_id, const std::string &desc) {
- journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+ m_ioctx, m_journal_id, client_id, 5);
bufferlist data;
data.append(desc);
C_SaferCond cond;
}
int unregister_client(const std::string &client_id) {
- journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+ m_ioctx, m_journal_id, client_id, 5);
C_SaferCond cond;
journaler.unregister_client(&cond);
return cond.wait();
(*it)->flush(&cond);
cond.wait();
}
+ m_object_recorders.clear();
+
RadosTestFixture::TearDown();
}