#include "mds/MDSContinuation.h"
#include "mds/InoTable.h"
#include "cephfs_features.h"
+#include "osdc/Objecter.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix *_dout << "mds." << mdcache->mds->get_nodeid() << ".cache.ino(" << ino() << ") "
+void CInodeCommitOperation::update(ObjectOperation &op, inode_backtrace_t *bt) {
+ using ceph::encode;
+
+ op.priority = priority;
+ op.create(false);
+
+ bufferlist parent_bl;
+ encode(*bt, parent_bl);
+ op.setxattr("parent", parent_bl);
+
+ // for the old pool there is no need to update the layout
+ if (!update_layout)
+ return;
+
+ bufferlist layout_bl;
+ encode(_layout, layout_bl, _features);
+ op.setxattr("layout", layout_bl);
+}
class CInodeIOContext : public MDSIOContextBase
{
}
};
-void CInode::store_backtrace(MDSContext *fin, int op_prio)
+struct C_IO_Inode_CommitBacktrace : public Context {
+ CInode *in;
+ version_t version;
+ MDSContext *fin;
+ std::vector<CInodeCommitOperation> ops_vec;
+ inode_backtrace_t bt;
+
+ C_IO_Inode_CommitBacktrace(CInode *i, version_t v, MDSContext *f) :
+ in(i), version(v), fin(f) { }
+ void finish(int r) override {
+ in->_commit_ops(r, version, fin, ops_vec, &bt);
+ }
+};
+
+void CInode::_commit_ops(int r, version_t version, MDSContext *fin,
+ std::vector<CInodeCommitOperation> &ops_vec,
+ inode_backtrace_t *bt)
+{
+ dout(10) << __func__ << dendl;
+
+ if (r < 0) {
+ mdcache->mds->handle_write_error_with_lock(r);
+ return;
+ }
+
+ C_GatherBuilder gather(g_ceph_context,
+ new C_OnFinisher(new C_IO_Inode_StoredBacktrace(this,
+ version,
+ fin),
+ mdcache->mds->finisher));
+
+ SnapContext snapc;
+ object_t oid = get_object_name(ino(), frag_t(), "");
+
+ for (auto &op : ops_vec) {
+ ObjectOperation obj_op;
+ object_locator_t oloc(op.get_pool());
+ op.update(obj_op, bt);
+ mdcache->mds->objecter->mutate(oid, oloc, obj_op, snapc,
+ ceph::real_clock::now(),
+ 0, gather.new_sub());
+ }
+ gather.activate();
+}
+
+void CInode::_store_backtrace(std::vector<CInodeCommitOperation> &ops_vec,
+ inode_backtrace_t &bt, int op_prio)
{
dout(10) << __func__ << " on " << *this << dendl;
ceph_assert(is_dirty_parent());
auth_pin(this);
const int64_t pool = get_backtrace_pool();
- inode_backtrace_t bt;
build_backtrace(pool, bt);
- bufferlist parent_bl;
- using ceph::encode;
- encode(bt, parent_bl);
-
- ObjectOperation op;
- op.priority = op_prio;
- op.create(false);
- op.setxattr("parent", parent_bl);
-
- bufferlist layout_bl;
- encode(get_inode()->layout, layout_bl, mdcache->mds->mdsmap->get_up_features());
- op.setxattr("layout", layout_bl);
- SnapContext snapc;
- object_t oid = get_object_name(ino(), frag_t(), "");
- object_locator_t oloc(pool);
- Context *fin2 = new C_OnFinisher(
- new C_IO_Inode_StoredBacktrace(this, get_inode()->backtrace_version, fin),
- mdcache->mds->finisher);
+ ops_vec.emplace_back(op_prio, pool, get_inode()->layout,
+ mdcache->mds->mdsmap->get_up_features());
if (!state_test(STATE_DIRTYPOOL) || get_inode()->old_pools.empty()) {
dout(20) << __func__ << ": no dirtypool or no old pools" << dendl;
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, fin2);
return;
}
- C_GatherBuilder gather(g_ceph_context, fin2);
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, gather.new_sub());
-
// In the case where DIRTYPOOL is set, we update all old pools backtraces
// such that anyone reading them will see the new pool ID in
// inode_backtrace_t::pool and go read everything else from there.
dout(20) << __func__ << ": updating old pool " << p << dendl;
- ObjectOperation op;
- op.priority = op_prio;
- op.create(false);
- op.setxattr("parent", parent_bl);
-
- object_locator_t oloc(p);
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, gather.new_sub());
+ ops_vec.emplace_back(op_prio, p);
}
- gather.activate();
+}
+
+void CInode::store_backtrace(MDSContext *fin, int op_prio)
+{
+ std::vector<CInodeCommitOperation> ops_vec;
+ auto version = get_inode()->backtrace_version;
+
+ auto c = new C_IO_Inode_CommitBacktrace(this, version, fin);
+ _store_backtrace(c->ops_vec, c->bt, op_prio);
+ mdcache->mds->finisher->queue(c);
+}
+
+void CInode::store_backtrace(CInodeCommitOperations &op, int op_prio)
+{
+ op.version = get_inode()->backtrace_version;
+ op.in = this;
+
+ _store_backtrace(op.ops_vec, op.bt, op_prio);
}
void CInode::_stored_backtrace(int r, version_t v, Context *fin)
int wr_caps;
};
+struct CInodeCommitOperation {
+public:
+ CInodeCommitOperation(int prio, int64_t po)
+ : pool(po), priority(prio) {
+ }
+ CInodeCommitOperation(int prio, int64_t po, file_layout_t l, uint64_t f)
+ : pool(po), priority(prio), _layout(l), _features(f) {
+ update_layout = true;
+ }
+
+ void update(ObjectOperation &op, inode_backtrace_t *bt);
+ int64_t get_pool() { return pool; }
+
+private:
+ int64_t pool; ///< pool id
+ int priority;
+ bool update_layout = false;
+ file_layout_t _layout;
+ uint64_t _features;
+};
+
+struct CInodeCommitOperations {
+ std::vector<CInodeCommitOperation> ops_vec;
+ inode_backtrace_t bt;
+ version_t version;
+ CInode *in;
+};
+
/**
* Base class for CInode, containing the backing store data and
* serialization methods. This exists so that we can read and
void fetch(MDSContext *fin);
void _fetched(ceph::buffer::list& bl, ceph::buffer::list& bl2, Context *fin);
+ void _commit_ops(int r, version_t version, MDSContext *fin,
+ std::vector<CInodeCommitOperation> &ops_vec,
+ inode_backtrace_t *bt);
void build_backtrace(int64_t pool, inode_backtrace_t& bt);
+ void _store_backtrace(std::vector<CInodeCommitOperation> &ops_vec,
+ inode_backtrace_t &bt, int op_prio);
+ void store_backtrace(CInodeCommitOperations &op, int op_prio);
void store_backtrace(MDSContext *fin, int op_prio=-1);
void _stored_backtrace(int r, version_t v, Context *fin);
void fetch_backtrace(Context *fin, ceph::buffer::list *backtrace);
// -----------------------
// LogSegment
+struct BatchStoredBacktrace : public MDSContext {
+ MDSContext *fin;
+ MDSRank *mds;
+
+ BatchStoredBacktrace(MDSContext *f, MDSRank *m) : fin(f), mds(m) {}
+ void finish(int r) override {
+ fin->complete(r);
+ }
+ MDSRank *get_mds() override { return mds; };
+};
+
+struct BatchCommitBacktrace : public Context {
+ std::vector<CInodeCommitOperations> ops_vec;
+ MDSContext *con;
+ MDSRank *mds;
+
+ BatchCommitBacktrace(std::vector<CInodeCommitOperations> &ops, MDSContext *c,
+ MDSRank *m) : con(c), mds(m) {
+ ops_vec.swap(ops);
+ }
+ void finish(int r) override {
+ MDSGatherBuilder gather(g_ceph_context);
+
+ for (auto &op : ops_vec) {
+ op.in->_commit_ops(r, op.version, gather.new_sub(), op.ops_vec, &op.bt);
+ }
+ if (gather.has_subs()) {
+ gather.set_finisher(new BatchStoredBacktrace(con, mds));
+ std::scoped_lock l(mds->mds_lock);
+ gather.activate();
+ }
+ }
+};
+
void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int op_prio)
{
set<CDir*> commit;
ceph_assert(g_conf()->mds_kill_journal_expire_at != 3);
+ size_t count = 0;
+ for (elist<CInode*>::iterator it = dirty_parent_inodes.begin(); !it.end(); ++it)
+ count++;
+
+ std::vector<CInodeCommitOperations> ops_vec;
+ ops_vec.reserve(count);
// backtraces to be stored/updated
for (elist<CInode*>::iterator p = dirty_parent_inodes.begin(); !p.end(); ++p) {
CInode *in = *p;
ceph_assert(in->is_auth());
if (in->can_auth_pin()) {
dout(15) << "try_to_expire waiting for storing backtrace on " << *in << dendl;
- in->store_backtrace(gather_bld.new_sub(), op_prio);
+ ops_vec.resize(ops_vec.size() + 1);
+ in->store_backtrace(ops_vec.back(), op_prio);
} else {
dout(15) << "try_to_expire waiting for unfreeze on " << *in << dendl;
in->add_waiter(CInode::WAIT_UNFREEZE, gather_bld.new_sub());
}
}
+ if (!ops_vec.empty())
+ mds->finisher->queue(new BatchCommitBacktrace(ops_vec, gather_bld.new_sub(), mds));
ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);