// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "common/PriorityCache.h"
+#include "include/stringify.h"
#include "journal/JournalPlayer.h"
#include "journal/Entry.h"
#include "journal/ReplayHandler.h"
+#include "journal/Types.h"
#include "journal/Utils.h"
#define dout_subsys ceph_subsys_journaler
namespace {
+static const uint64_t MIN_FETCH_BYTES = 32768;
+
struct C_HandleComplete : public Context {
ReplayHandler *replay_handler;
JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
- ReplayHandler *replay_handler)
+ ReplayHandler *replay_handler,
+ CacheManagerHandler *cache_manager_handler)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
- m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
- m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
+ m_cache_manager_handler(cache_manager_handler),
+ m_cache_rebalance_handler(this), m_lock("JournalPlayer::m_lock"),
+ m_state(STATE_INIT), m_splay_offset(0), m_watch_enabled(false),
+ m_watch_scheduled(false), m_watch_interval(0) {
m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
m_commit_positions[splay_offset] = position;
}
}
+
+ if (m_cache_manager_handler != nullptr) {
+ m_cache_name = "JournalPlayer/" + stringify(m_ioctx.get_id()) + "/" +
+ m_object_oid_prefix;
+ auto order = m_journal_metadata->get_order();
+ auto splay_width = m_journal_metadata->get_splay_width();
+ uint64_t min_size = MIN_FETCH_BYTES * splay_width;
+ uint64_t max_size = (2 << order) * splay_width;
+
+ m_cache_manager_handler->register_cache(m_cache_name, min_size, max_size,
+ &m_cache_rebalance_handler);
+ m_max_fetch_bytes = 0;
+ } else {
+ m_max_fetch_bytes = 2 << m_journal_metadata->get_order();
+ }
}
JournalPlayer::~JournalPlayer() {
ceph_assert(!m_watch_scheduled);
}
m_replay_handler->put();
+
+ if (m_cache_manager_handler != nullptr) {
+ m_cache_manager_handler->unregister_cache(m_cache_name);
+ }
}
void JournalPlayer::prefetch() {
Mutex::Locker locker(m_lock);
ceph_assert(m_state == STATE_INIT);
+
+ if (m_cache_manager_handler != nullptr && m_max_fetch_bytes == 0) {
+ m_state = STATE_WAITCACHE;
+ return;
+ }
+
m_state = STATE_PREFETCH;
m_active_set = m_journal_metadata->get_active_set();
ObjectPlayerPtr object_player(new ObjectPlayer(
m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
- m_journal_metadata->get_settings().max_fetch_bytes));
+ m_max_fetch_bytes));
- uint8_t splay_width = m_journal_metadata->get_splay_width();
+ auto splay_width = m_journal_metadata->get_splay_width();
m_object_players[object_num % splay_width] = object_player;
fetch(object_player);
}
m_replay_handler), r);
}
+void JournalPlayer::handle_cache_rebalanced(uint64_t new_cache_bytes) {
+ Mutex::Locker locker(m_lock);
+
+ if (m_state == STATE_ERROR) {
+ return;
+ }
+
+ auto splay_width = m_journal_metadata->get_splay_width();
+ m_max_fetch_bytes = p2align<uint64_t>(new_cache_bytes / splay_width, 4096);
+
+ ldout(m_cct, 10) << __func__ << ": new_cache_bytes=" << new_cache_bytes
+ << ", max_fetch_bytes=" << m_max_fetch_bytes << dendl;
+
+ uint64_t min_bytes = MIN_FETCH_BYTES;
+
+ if (m_state == STATE_WAITCACHE) {
+ m_state = STATE_INIT;
+ if (m_max_fetch_bytes >= min_bytes) {
+ auto ctx = new FunctionContext(
+ [this](int r) {
+ prefetch();
+ });
+ m_journal_metadata->queue(ctx, 0);
+ return;
+ }
+ } else {
+ min_bytes = p2align<uint64_t>(min_bytes - (rand() % min_bytes) / 2, 4096);
+ }
+
+ if (m_max_fetch_bytes < min_bytes) {
+ lderr(m_cct) << __func__ << ": can't allocate enough memory from cache"
+ << dendl;
+ m_state = STATE_ERROR;
+ notify_complete(-ENOMEM);
+ return;
+ }
+
+ for (auto &pair : m_object_players) {
+ pair.second->set_max_fetch_bytes(m_max_fetch_bytes);
+ }
+}
+
+
} // namespace journal
#include "common/Mutex.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectPlayer.h"
+#include "journal/Types.h"
#include "cls/journal/cls_journal_types.h"
#include <boost/none.hpp>
#include <boost/optional.hpp>
namespace journal {
+class CacheManagerHandler;
class Entry;
class ReplayHandler;
JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
- ReplayHandler *replay_handler);
+ ReplayHandler *replay_handler, CacheManagerHandler *cache_manager_handler);
~JournalPlayer();
void prefetch();
enum State {
STATE_INIT,
+ STATE_WAITCACHE,
STATE_PREFETCH,
STATE_PLAYBACK,
STATE_ERROR
}
};
+ struct CacheRebalanceHandler : public journal::CacheRebalanceHandler {
+ JournalPlayer *player;
+
+ CacheRebalanceHandler(JournalPlayer *player) : player(player) {
+ }
+
+ void handle_cache_rebalanced(uint64_t new_cache_bytes) override {
+ player->handle_cache_rebalanced(new_cache_bytes);
+ }
+ };
+
librados::IoCtx m_ioctx;
CephContext *m_cct;
std::string m_object_oid_prefix;
- JournalMetadataPtr m_journal_metadata;
-
+ JournalMetadataPtr m_journal_metadata;
ReplayHandler *m_replay_handler;
+ CacheManagerHandler *m_cache_manager_handler;
+
+ std::string m_cache_name;
+ CacheRebalanceHandler m_cache_rebalance_handler;
+ uint64_t m_max_fetch_bytes;
AsyncOpTracker m_async_op_tracker;
void notify_entries_available();
void notify_complete(int r);
+
+ void handle_cache_rebalanced(uint64_t new_cache_bytes);
};
} // namespace journal
Journaler::Journaler(librados::IoCtx &header_ioctx,
const std::string &journal_id,
- const std::string &client_id, const Settings &settings)
+ const std::string &client_id, const Settings &settings,
+ CacheManagerHandler *cache_manager_handler)
: m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))),
- m_client_id(client_id) {
+ m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) {
set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
header_ioctx, journal_id, settings);
}
Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
Mutex *timer_lock, librados::IoCtx &header_ioctx,
const std::string &journal_id,
- const std::string &client_id, const Settings &settings)
- : m_client_id(client_id) {
+ const std::string &client_id, const Settings &settings,
+ CacheManagerHandler *cache_manager_handler)
+ : m_client_id(client_id), m_cache_manager_handler(cache_manager_handler) {
set_up(work_queue, timer, timer_lock, header_ioctx, journal_id,
settings);
}
void Journaler::create_player(ReplayHandler *replay_handler) {
ceph_assert(m_player == nullptr);
m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
- replay_handler);
+ replay_handler, m_cache_manager_handler);
}
void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width,
namespace journal {
+struct CacheManagerHandler;
+
class JournalMetadata;
class JournalPlayer;
class JournalRecorder;
const std::string &journal_id);
Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
- const std::string &client_id, const Settings &settings);
+ const std::string &client_id, const Settings &settings,
+ CacheManagerHandler *cache_manager_handler);
Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
librados::IoCtx &header_ioctx, const std::string &journal_id,
- const std::string &client_id, const Settings &settings);
+ const std::string &client_id, const Settings &settings,
+ CacheManagerHandler *cache_manager_handler);
~Journaler();
void exists(Context *on_finish) const;
librados::IoCtx m_data_ioctx;
CephContext *m_cct;
std::string m_client_id;
+ CacheManagerHandler *m_cache_manager_handler;
std::string m_header_oid;
std::string m_object_oid_prefix;
m_refetch_state = refetch_state;
}
+ inline void set_max_fetch_bytes(uint64_t max_fetch_bytes) {
+ Mutex::Locker locker(m_lock);
+ m_max_fetch_bytes = max_fetch_bytes;
+ }
+
private:
typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_TYPES_H
+#define CEPH_JOURNAL_TYPES_H
+
+namespace journal {
+
+struct CacheRebalanceHandler {
+ virtual ~CacheRebalanceHandler() {
+ }
+
+ virtual void handle_cache_rebalanced(uint64_t new_cache_bytes) = 0;
+};
+
+struct CacheManagerHandler {
+ virtual ~CacheManagerHandler() {
+ }
+
+ virtual void register_cache(const std::string &cache_name,
+ uint64_t min_size, uint64_t max_size,
+ CacheRebalanceHandler* handler) = 0;
+ virtual void unregister_cache(const std::string &cache_name) = 0;
+};
+
+} // namespace journal
+
+#endif // # CEPH_JOURNAL_TYPES_H
op_work_queue(op_work_queue), on_finish(on_finish),
cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
- {})) {
+ {}, nullptr)) {
}
void finish(int r) override {
C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
std::string *mirror_uuid, Context *on_finish)
: mirror_uuid(mirror_uuid), on_finish(on_finish),
- journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
+ journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
}
virtual void finish(int r) {
CephContext *cct = image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
- Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
+ Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
+ nullptr);
Mutex lock("lock");
journal::ImageClientMeta client_meta;
m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
m_image_ctx.md_ctx, m_image_ctx.id,
- IMAGE_CLIENT_ID, settings);
+ IMAGE_CLIENT_ID, settings, nullptr);
m_journaler->add_listener(&m_metadata_listener);
Context *ctx = create_async_context_callback(
ldout(m_cct, 20) << this << " " << __func__ << dendl;
ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
- m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
- m_ioctx, m_image_id, m_image_client_id, {});
+ m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock, m_ioctx,
+ m_image_id, m_image_client_id, {}, nullptr);
using klass = CreateRequest<I>;
Context *ctx = create_context_callback<klass, &klass::handle_create_journal>(this);
ldout(cct, 20) << dendl;
m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
- Journal<>::IMAGE_CLIENT_ID, {});
+ Journal<>::IMAGE_CLIENT_ID, {}, nullptr);
auto ctx = create_async_context_callback(
m_image_ctx, create_context_callback<
DemoteRequest<I>, &DemoteRequest<I>::handle_open_journaler>(this));
ldout(cct, 20) << dendl;
m_journaler = new Journaler(m_image_ctx->md_ctx, m_image_ctx->id,
- Journal<>::IMAGE_CLIENT_ID, {});
+ Journal<>::IMAGE_CLIENT_ID, {}, nullptr);
Context *ctx = create_async_context_callback(
*m_image_ctx, create_context_callback<
PromoteRequest<I>, &PromoteRequest<I>::handle_open>(this));
ldout(m_cct, 20) << this << " " << __func__ << dendl;
ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
- m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
- m_ioctx, m_image_id, m_image_client_id, {});
+ m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock, m_ioctx,
+ m_image_id, m_image_client_id, {}, nullptr);
using klass = RemoveRequest<I>;
Context *ctx = create_context_callback<klass, &klass::handle_stat_journal>(this);
void ResetRequest<I>::init_journaler() {
ldout(m_cct, 10) << dendl;
- m_journaler = new Journaler(m_io_ctx, m_image_id, m_client_id, {});
+ m_journaler = new Journaler(m_io_ctx, m_image_id, m_client_id, {}, nullptr);
Context *ctx = create_context_callback<
ResetRequest<I>, &ResetRequest<I>::handle_init_journaler>(this);
m_journaler->init(ctx);
template <typename IoCtxT>
MockJournalerProxy(IoCtxT &header_ioctx, const std::string &,
- const std::string &, const Settings&) {
+ const std::string &, const Settings&,
+ journal::CacheManagerHandler *) {
MockJournaler::get_instance().construct();
}
MockJournalerProxy(WorkQueue *work_queue, Timer *timer, Mutex *timer_lock,
librados::IoCtx &header_ioctx,
const std::string &journal_id,
- const std::string &client_id, const Settings&) {
+ const std::string &client_id, const Settings&,
+ journal::CacheManagerHandler *) {
MockJournaler::get_instance().construct();
}
journal::JournalPlayer *create_player(const std::string &oid,
const journal::JournalMetadataPtr &metadata) {
journal::JournalPlayer *player(new journal::JournalPlayer(
- m_ioctx, oid + ".", metadata, &m_replay_hander));
+ m_ioctx, oid + ".", metadata, &m_replay_hander, nullptr));
m_players.push_back(player);
return player;
}
RadosTestFixture::SetUp();
m_journal_id = get_temp_journal_id();
m_journaler = new journal::Journaler(m_work_queue, m_timer, &m_timer_lock,
- m_ioctx, m_journal_id, CLIENT_ID, {});
+ m_ioctx, m_journal_id, CLIENT_ID, {},
+ nullptr);
}
void TearDown() override {
}
int register_client(const std::string &client_id, const std::string &desc) {
- journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
- m_ioctx, m_journal_id, client_id, {});
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
bufferlist data;
data.append(desc);
C_SaferCond cond;
}
int update_client(const std::string &client_id, const std::string &desc) {
- journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
- m_ioctx, m_journal_id, client_id, {});
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
bufferlist data;
data.append(desc);
C_SaferCond cond;
}
int unregister_client(const std::string &client_id) {
- journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
- m_ioctx, m_journal_id, client_id, {});
+ journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock, m_ioctx,
+ m_journal_id, client_id, {}, nullptr);
C_SaferCond cond;
journaler.unregister_client(&cond);
return cond.wait();
return r;
}
- journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {});
+ journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {},
+ nullptr);
r = journaler.register_client(bufferlist());
if (r < 0) {
simple_err("failed to register journal client", r);
return r;
}
- journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {});
+ journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {},
+ nullptr);
r = journaler.unregister_client();
if (r < 0) {
simple_err("failed to unregister journal client", r);
return r;
}
- journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {});
+ journal::Journaler journaler(io_ctx, image_id, JOURNAL_CLIENT_ID, {},
+ nullptr);
C_SaferCond init_ctx;
journaler.init(&init_ctx);
BOOST_SCOPE_EXIT_ALL( (&journaler) ) {
return r;
}
- journal::Journaler replay_journaler(io_ctx, replay_image_id, "", {});
+ journal::Journaler replay_journaler(io_ctx, replay_image_id, "", {},
+ nullptr);
C_SaferCond replay_init_ctx;
replay_journaler.init(&replay_init_ctx);
journal::Journaler *create_journaler(librbd::ImageCtx *ictx) {
journal::Journaler *journaler = new journal::Journaler(
- ictx->md_ctx, ictx->id, "dummy client", {});
+ ictx->md_ctx, ictx->id, "dummy client", {}, nullptr);
int r = journaler->register_client(bufferlist());
if (r < 0) {
"remote-image-id", {{{}, "sync-point-snap", boost::none}}, {});
librbd::journal::ClientData client_data(peer_client_meta);
- journal::Journaler journaler(io_ctx, image_id, "peer-client", {});
+ journal::Journaler journaler(io_ctx, image_id, "peer-client", {}, nullptr);
C_SaferCond init_ctx;
journaler.init(&init_ctx);
ASSERT_EQ(-ENOENT, init_ctx.wait());
m_remote_journaler = new ::journal::Journaler(
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
- m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {});
+ m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {}, nullptr);
m_client_meta = {"image-id"};
public:
Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
const std::string &client_id) :
- ::journal::Journaler(io_ctx, journal_id, client_id, {}) {
+ ::journal::Journaler(io_ctx, journal_id, client_id, {}, nullptr) {
}
int init() {
&m_threads->timer_lock,
m_remote_parent_io_ctx,
m_remote_parent_spec.image_id,
- m_local_parent_mirror_uuid, {});
+ m_local_parent_mirror_uuid, {}, nullptr);
Context *ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<
*m_remote_journaler = new Journaler(m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock, m_remote_io_ctx,
*m_remote_image_id, m_local_mirror_uuid,
- m_journal_settings);
+ m_journal_settings, nullptr);
Context *ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<