static const std::string JOURNAL_HEADER_PREFIX = "journal.";
static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
-struct C_DeleteRecorder : public Context {
- JournalRecorder *recorder;
- Context *on_safe;
- C_DeleteRecorder(JournalRecorder *_recorder, Context *_on_safe)
- : recorder(_recorder), on_safe(_on_safe) {
- }
- virtual void finish(int r) {
- delete recorder;
- on_safe->complete(r);
- }
-};
-
} // anonymous namespace
using namespace cls::journal;
Journaler::~Journaler() {
if (m_metadata != nullptr) {
+ assert(!m_metadata->is_initialized());
m_metadata->put();
m_metadata = nullptr;
}
-
- // TODO
- if (m_trimmer != nullptr) {
- C_SaferCond ctx;
- m_trimmer->shut_down(&ctx);
- ctx.wait();
- delete m_trimmer;
- }
+ assert(m_trimmer == nullptr);
assert(m_player == nullptr);
assert(m_recorder == nullptr);
void Journaler::shut_down() {
C_SaferCond ctx;
- m_metadata->shut_down(&ctx);
+ shut_down(&ctx);
ctx.wait();
}
+void Journaler::shut_down(Context *on_finish) {
+ assert(m_player == nullptr);
+ assert(m_recorder == nullptr);
+
+ JournalMetadata *metadata = nullptr;
+ std::swap(metadata, m_metadata);
+ assert(metadata != nullptr);
+
+ on_finish = new FunctionContext([metadata, on_finish](int r) {
+ metadata->put();
+ on_finish->complete(0);
+ });
+
+ JournalTrimmer *trimmer = nullptr;
+ std::swap(trimmer, m_trimmer);
+ if (trimmer == nullptr) {
+ metadata->shut_down(on_finish);
+ return;
+ }
+
+ on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) {
+ delete trimmer;
+ metadata->shut_down(on_finish);
+ });
+ trimmer->shut_down(on_finish);
+}
+
bool Journaler::is_initialized() const {
return m_metadata->is_initialized();
}
}
void Journaler::stop_replay() {
- assert(m_player != NULL);
-
- // TODO
C_SaferCond ctx;
- m_player->shut_down(&ctx);
+ stop_replay(&ctx);
ctx.wait();
+}
- delete m_player;
- m_player = NULL;
+void Journaler::stop_replay(Context *on_finish) {
+ JournalPlayer *player = nullptr;
+ std::swap(player, m_player);
+ assert(player != nullptr);
+
+ on_finish = new FunctionContext([player, on_finish](int r) {
+ delete player;
+ on_finish->complete(r);
+ });
+ player->shut_down(on_finish);
}
void Journaler::committed(const ReplayEntry &replay_entry) {
}
void Journaler::stop_append(Context *on_safe) {
- assert(m_recorder != NULL);
-
- flush_append(new C_DeleteRecorder(m_recorder, on_safe));
- m_recorder = NULL;
+ JournalRecorder *recorder = nullptr;
+ std::swap(recorder, m_recorder);
+ assert(recorder != nullptr);
+
+ on_safe = new FunctionContext([recorder, on_safe](int r) {
+ delete recorder;
+ on_safe->complete(r);
+ });
+ recorder->flush(on_safe);
}
uint64_t Journaler::get_max_append_size() const {