#include "PurgeQueue.h"
+#include <string.h>
#define dout_context cct
#define dout_subsys ceph_subsys_mds
void PurgeQueue::activate()
{
std::lock_guard l(lock);
+
+ if (readonly) {
+ dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
+ return;
+ }
+
if (journaler.get_read_pos() == journaler.get_write_pos())
return;
finish_contexts(g_ceph_context, waiting_for_recovery);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
}
}));
}
void PurgeQueue::wait_for_recovery(Context* c)
{
std::lock_guard l(lock);
- if (recovered)
+ if (recovered) {
c->complete(0);
- else
+ } else if (readonly) {
+ dout(10) << "cannot wait for recovery: PurgeQueue is readonly" << dendl;
+ c->complete(-EROFS);
+ } else {
waiting_for_recovery.push_back(c);
+ }
}
void PurgeQueue::_recover()
if (journaler.get_error()) {
int r = journaler.get_error();
derr << "Error " << r << " recovering write_pos" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
return;
}
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
journaler.write_head(new FunctionContext([this](int r) {
std::lock_guard l(lock);
- recovered = true;
- finish_contexts(g_ceph_context, waiting_for_recovery);
+ if (r) {
+ _go_readonly(r);
+ } else {
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
+ }
}));
}
dout(4) << "pushing inode " << pi.ino << dendl;
std::lock_guard l(lock);
+ if (readonly) {
+ dout(10) << "cannot push inode: PurgeQueue is readonly" << dendl;
+ completion->complete(-EROFS);
+ return;
+ }
+
// Callers should have waited for open() before using us
ceph_assert(!journaler.is_readonly());
bool PurgeQueue::_can_consume()
{
+ if (readonly) {
+ dout(10) << "can't consume: PurgeQueue is readonly" << dendl;
+ return false;
+ }
+
dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
<< in_flight.size() << "/" << g_conf()->mds_max_purge_files
<< " files" << dendl;
}
}
+void PurgeQueue::_go_readonly(int r)
+{
+ if (readonly) return;
+ dout(1) << "going readonly because internal IO failed: " << strerror(-r) << dendl;
+ readonly = true;
+ on_error->complete(r);
+ on_error = nullptr;
+ journaler.set_readonly();
+ finish_contexts(g_ceph_context, waiting_for_recovery, r);
+}
+
bool PurgeQueue::_consume()
{
ceph_assert(lock.is_locked_by_me());
if (int r = journaler.get_error()) {
derr << "Error " << r << " recovering write_pos" << dendl;
- on_error->complete(r);
+ _go_readonly(r);
return could_consume;
}
if (r == 0) {
_consume();
} else if (r != -EAGAIN) {
- on_error->complete(r);
+ _go_readonly(r);
}
}));
}
} catch (const buffer::error &err) {
derr << "Decode error at read_pos=0x" << std::hex
<< journaler.get_read_pos() << dendl;
- on_error->complete(0);
+ _go_readonly(EIO);
}
dout(20) << " executing item (" << item.ino << ")" << dendl;
_execute_item(item, journaler.get_read_pos());
{
std::lock_guard l(lock);
+ if (readonly) {
+ dout(10) << "skipping; PurgeQueue is readonly" << dendl;
+ return;
+ }
+
uint64_t pg_count = 0;
objecter->with_osdmap([&](const OSDMap& o) {
// Number of PGs across all data pools
{
std::lock_guard l(lock);
+ if (readonly) {
+ dout(10) << "skipping drain; PurgeQueue is readonly" << dendl;
+ return true;
+ }
+
ceph_assert(progress != nullptr);
ceph_assert(progress_total != nullptr);
ceph_assert(in_flight_count != nullptr);