dout(2) << "boot_start " << step << ": opening mds log" << dendl;
mdlog->open(gather.new_sub());
+ if (is_starting()) {
+ dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
+ purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
+ } else if (!standby_replaying) {
+ dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
+ purge_queue.open(NULL);
+ }
+
if (mdsmap->get_tableserver() == whoami) {
dout(2) << "boot_start " << step << ": opening snap table" << dendl;
snapserver->set_rank(whoami);
mdcache->open_mydir_inode(gather.new_sub());
- purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
-
if (is_starting() ||
whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
mdcache->open_root_inode(gather.new_sub());
break;
case MDS_BOOT_PREPARE_LOG:
if (is_any_replay()) {
- dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
- mdlog->replay(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
+ dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
+ MDSGatherBuilder gather(g_ceph_context,
+ new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
+
+ if (!standby_replaying && !purge_queue.is_recovered()) {
+ dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
+ purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
+ }
+
+ mdlog->replay(gather.new_sub());
+ gather.activate();
} else {
dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
mdlog->append();
new C_MDS_StandbyReplayRestartFinish(
this,
mdlog->get_journaler()->get_read_pos()));
+
+ dout(1) << " opening purge queue (async)" << dendl;
+ purge_queue.open(NULL);
} else {
dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
<< " (which blacklists prior instance)" << dendl;
max_purge_ops(0),
drain_initial(0),
draining(false),
- delayed_flush(nullptr)
+ delayed_flush(nullptr),
+ recovered(false)
{
assert(cct != nullptr);
assert(on_error != nullptr);
Mutex::Locker l(lock);
- journaler.recover(new FunctionContext([this, completion](int r){
+ if (completion)
+ waiting_for_recovery.push_back(completion);
+
+ journaler.recover(new FunctionContext([this](int r){
if (r == -ENOENT) {
dout(1) << "Purge Queue not found, assuming this is an upgrade and "
"creating it." << dendl;
- create(completion);
+ create(NULL);
} else if (r == 0) {
Mutex::Locker l(lock);
dout(4) << "open complete" << dendl;
if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
dout(4) << "recovering write_pos" << dendl;
journaler.set_read_pos(journaler.last_committed.write_pos);
- _recover(completion);
+ _recover();
return;
}
journaler.set_writeable();
- completion->complete(0);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
on_error->complete(r);
}));
}
+bool PurgeQueue::is_recovered()
+{
+ Mutex::Locker l(lock);
+ return recovered;
+}
-void PurgeQueue::_recover(Context *completion)
+void PurgeQueue::wait_for_recovery(Context* c)
+{
+ Mutex::Locker l(lock);
+ waiting_for_recovery.push_back(c);
+}
+
+void PurgeQueue::_recover()
{
assert(lock.is_locked_by_me());
if (!journaler.is_readable() &&
!journaler.get_error() &&
journaler.get_read_pos() < journaler.get_write_pos()) {
- journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+ journaler.wait_for_readable(new FunctionContext([this](int r) {
Mutex::Locker l(lock);
- _recover(completion);
+ _recover();
}));
return;
}
// restore original read_pos
journaler.set_read_pos(journaler.last_committed.expire_pos);
journaler.set_writeable();
- completion->complete(0);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
return;
}
dout(4) << "creating" << dendl;
Mutex::Locker l(lock);
+ if (fin)
+ waiting_for_recovery.push_back(fin);
+
file_layout_t layout = file_layout_t::get_default();
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
- journaler.write_head(fin);
+ journaler.write_head(new FunctionContext([this](int r) {
+ Mutex::Locker l(lock);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
+ }));
}
/**
bool draining;
// recover the journal write_pos (drop any partial written entry)
- void _recover(Context *completion);
+ void _recover();
/**
* @return true if we were in a position to try and consume something:
void _execute_item_complete(
uint64_t expire_to);
+ bool recovered;
+ std::list<Context*> waiting_for_recovery;
public:
void init();
// Read the Journaler header for an existing queue and start consuming
void open(Context *completion);
+ bool is_recovered();
+ void wait_for_recovery(Context *c);
+
// Submit one entry to the work queue. Call back when it is persisted
// to the queue (there is no callback for when it is executed)
void push(const PurgeItem &pi, Context *completion);