* active (no longer projected). if the passed dnl is projected,
* don't link in, and do that work later in pop_projected_linkage().
*/
-void CDentry::link_remote(CDentry::linkage_t *dnl, CInode *in)
+void CDentry::link_remote(CDentry::linkage_t *dnl, CInode *remote_in, CInode *referent_in)
{
- ceph_assert(dnl->is_remote());
- ceph_assert(in->ino() == dnl->get_remote_ino());
- dnl->inode = in;
+ ceph_assert(dnl->is_remote() || dnl->is_referent_remote());
+ ceph_assert(remote_in->ino() == dnl->get_remote_ino());
+ dnl->inode = remote_in;
+
+ if (referent_in) {
+ dnl->referent_inode = referent_in;
+ dnl->referent_ino = referent_in->ino();
+ }
if (dnl == &linkage)
- in->add_remote_parent(this);
+ remote_in->add_remote_parent(this);
// check for reintegration
dir->mdcache->eval_remote(this);
void CDentry::unlink_remote(CDentry::linkage_t *dnl)
{
- ceph_assert(dnl->is_remote());
+ ceph_assert(dnl->is_remote() || dnl->is_referent_remote());
ceph_assert(dnl->inode);
if (dnl == &linkage)
}
}
+void CDentry::push_projected_linkage(CInode *referent_inode, inodeno_t remote_ino, inodeno_t referent_ino)
+{
+ ceph_assert(remote_ino);
+ ceph_assert(referent_inode);
+ ceph_assert(referent_ino);
+
+ linkage_t *p = _project_linkage();
+ p->referent_inode = referent_inode;
+ referent_inode->push_projected_parent(this);
+ p->referent_ino = referent_ino;
+
+ p->remote_ino = remote_ino;
+ p->remote_d_type = referent_inode->d_type();
+}
void CDentry::push_projected_linkage(CInode *inode)
{
* much).
*/
- if (n.remote_ino) {
+ if (n.is_remote()) {
dir->link_remote_inode(this, n.remote_ino, n.remote_d_type);
if (n.inode) {
linkage.inode = n.inode;
linkage.inode->add_remote_parent(this);
}
+ } else if (n.is_referent_remote()){
+ dir->link_referent_inode(this, n.referent_inode, n.remote_ino, n.remote_d_type);
+ if (n.inode) {
+ linkage.inode = n.inode;
+ linkage.inode->add_remote_parent(this);
+ }
+ n.referent_inode->pop_projected_parent();
} else {
if (n.inode) {
dir->link_primary_inode(this, n.inode);
CInode *inode = nullptr;
inodeno_t remote_ino = 0;
unsigned char remote_d_type = 0;
+ CInode *referent_inode = nullptr;
+ inodeno_t referent_ino = 0;
linkage_t() {}
// dentry type is primary || remote || null
// inode ptr is required for primary, optional for remote, undefined for null
bool is_primary() const { return remote_ino == 0 && inode != 0; }
- bool is_remote() const { return remote_ino > 0; }
- bool is_null() const { return remote_ino == 0 && inode == 0; }
+ bool is_remote() const { return remote_ino > 0 && referent_inode == nullptr && referent_ino == 0; }
+ bool is_null() const { return remote_ino == 0 && inode == 0 && referent_ino == 0 && referent_inode == nullptr; }
+ bool is_referent_remote() const {return remote_ino > 0 && referent_ino != 0 && referent_inode != nullptr;}
CInode *get_inode() { return inode; }
const CInode *get_inode() const { return inode; }
inodeno_t get_remote_ino() const { return remote_ino; }
unsigned char get_remote_d_type() const { return remote_d_type; }
std::string get_remote_d_type_string() const;
+ CInode *get_referent_inode() { return referent_inode; }
+ const CInode *get_referent_inode() const { return referent_inode; }
+ inodeno_t get_referent_ino() const { return referent_ino; }
void set_remote(inodeno_t ino, unsigned char d_type) {
remote_ino = ino;
{}
CDentry(std::string_view n, __u32 h,
mempool::mds_co::string alternate_name,
- inodeno_t ino, unsigned char dt,
- snapid_t f, snapid_t l) :
+ inodeno_t ino, inodeno_t referent_ino,
+ unsigned char dt, snapid_t f, snapid_t l) :
hash(h),
first(f), last(l),
item_dirty(this),
{
linkage.remote_ino = ino;
linkage.remote_d_type = dt;
+ linkage.referent_ino = referent_ino;
}
~CDentry() override {
p->remote_d_type = d_type;
}
void push_projected_linkage(CInode *inode);
+ void push_projected_linkage(CInode *referent_inode, inodeno_t remote_ino, inodeno_t referent_ino);
linkage_t *pop_projected_linkage();
bool is_projected() const { return !projected.empty(); }
int get_num_dir_auth_pins() const;
// remote links
- void link_remote(linkage_t *dnl, CInode *in);
+ void link_remote(linkage_t *dnl, CInode *remote_in, CInode *ref_in=nullptr);
void unlink_remote(linkage_t *dnl);
// copy cons
return dn;
}
-CDentry* CDir::add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
- mempool::mds_co::string alternate_name,
+// This also adds referent remote if referent inode is passed
+CDentry* CDir::add_remote_dentry(std::string_view dname, CInode *ref_in, inodeno_t ino,
+ unsigned char d_type, mempool::mds_co::string alternate_name,
snapid_t first, snapid_t last)
{
// foreign
ceph_assert(lookup_exact_snap(dname, last) == 0);
+ inodeno_t referent_ino = ref_in ? ref_in->ino() : inodeno_t(0);
+
// create dentry
- CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, d_type, first, last);
+ CDentry* dn = new CDentry(dname, inode->hash_dentry_name(dname), std::move(alternate_name), ino, referent_ino, d_type, first, last);
dn->dir = this;
dn->version = get_projected_version();
dn->check_corruption(true);
//assert(null_items.count(dn->get_name()) == 0);
items[dn->key()] = dn;
+
+ //link referent inode
+ if (ref_in) {
+ dn->get_linkage()->referent_inode = ref_in;
+ link_inode_work(dn, ref_in);
+ }
+
if (last == CEPH_NOSNAP)
num_head_items++;
else
ceph_assert(get_num_any() == items.size());
}
+void CDir::link_null_referent_inode(CDentry *dn, inodeno_t referent_ino, inodeno_t rino, unsigned char d_type)
+{
+ dout(12) << __func__ << " " << *dn << " referent_ino " << referent_ino << " remote " << rino << dendl;
+ ceph_assert(dn->get_linkage()->is_null());
+
+ dn->get_linkage()->set_remote(rino, d_type);
+ dn->get_linkage()->referent_ino = referent_ino;
+}
+
+/*
+ * The linking fun - It can be done in following different ways
+ * 1. add_remote_dentry()
+ * - single step, if referent CInode is available and dentry needs to be created.
+ * 2. link_referent_inode()
+ * - if referent CInode is available and dentry needs to be created, usually in
+ * referent inode creation phase.
+ * e.g., pop_projected_linkage() preceded by push_projected_linkage()
+ * 3. add_null_dentry() -> link_referent_inode()
+ * - two step, if referent CInode is not available and dentry needs to be created,
+ * usually in journal replay.
+ * 4. link_null_referent_inode() -> link_referent_inode()
+ * - two step, if referent CInode is not available and dentry exists, usually in
+ * migration.
+ * e.g., decode_replica_dentry() followed by decode_replica_inode()
+ */
+void CDir::link_referent_inode(CDentry *dn, CInode *ref_in, inodeno_t rino, unsigned char d_type)
+{
+ ceph_assert(ref_in);
+ dout(12) << __func__ << " " << *dn << " remote " << rino << " referent inode " << *ref_in << dendl;
+
+ // The link_referent_inode could be called after add_null_dentry or link_null_referent_inode.
+ // So linkage need not be always null
+ ceph_assert(dn->get_linkage()->is_null() || dn->get_linkage()->get_referent_ino() > 0);
+ ceph_assert(!dn->get_linkage()->get_referent_inode());
+
+ // set linkage
+ dn->get_linkage()->set_remote(rino, d_type);
+ dn->get_linkage()->referent_inode = ref_in;
+ dn->get_linkage()->referent_ino = ref_in->ino();
+
+ link_inode_work(dn, ref_in);
+
+ if (dn->state_test(CDentry::STATE_BOTTOMLRU)) {
+ mdcache->bottom_lru.lru_remove(dn);
+ mdcache->lru.lru_insert_mid(dn);
+ dn->state_clear(CDentry::STATE_BOTTOMLRU);
+ }
+
+ if (dn->last == CEPH_NOSNAP) {
+ num_head_items++;
+ num_head_null--;
+ } else {
+ num_snap_items++;
+ num_snap_null--;
+ }
+ ceph_assert(get_num_any() == items.size());
+}
+
void CDir::link_primary_inode(CDentry *dn, CInode *in)
{
dout(12) << __func__ << " " << *dn << " " << *in << dendl;
void CDir::link_inode_work( CDentry *dn, CInode *in)
{
- ceph_assert(dn->get_linkage()->get_inode() == in);
+ ceph_assert(dn->get_linkage()->get_inode() == in || dn->get_linkage()->get_referent_inode() == in);
in->set_primary_parent(dn);
// set inode version
dn->unlink_remote(dn->get_linkage());
dn->get_linkage()->set_remote(0, 0);
+ } else if(dn->get_linkage()->is_referent_remote()) {
+ // referent remote
+ CInode *ref_in = dn->get_linkage()->get_referent_inode();
+
+ if (ref_in->get_num_ref())
+ dn->put(CDentry::PIN_INODEPIN);
+
+ if (ref_in->state_test(CInode::STATE_TRACKEDBYOFT))
+ mdcache->open_file_table.notify_unlink(ref_in);
+ if (ref_in->is_any_caps())
+ adjust_num_inodes_with_caps(-1);
+
+ // unlink auth_pin count
+ if (ref_in->auth_pins)
+ dn->adjust_nested_auth_pins(-ref_in->auth_pins, nullptr);
+
+ if (ref_in->is_freezing_inode())
+ ref_in->item_freezing_inode.remove_myself();
+ else if (ref_in->is_frozen_inode() || ref_in->is_frozen_auth_pin())
+ num_frozen_inodes--;
+
+ // detach inode
+ ref_in->remove_primary_parent(dn);
+ if (in)
+ dn->unlink_remote(dn->get_linkage());
+
+ dn->get_linkage()->set_remote(0, 0);
+ dn->get_linkage()->referent_inode = 0;
+ dn->get_linkage()->referent_ino = 0;
} else if (dn->get_linkage()->is_primary()) {
// primary
// unpin dentry?
}
} else {
// (remote) link
- dn = add_remote_dentry(dname, ino, d_type, std::move(alternate_name), first, last);
+ dn = add_remote_dentry(dname, nullptr, ino, d_type, std::move(alternate_name), first, last);
// link to inode?
CInode *in = mdcache->get_inode(ino); // we may or may not have it.
snapid_t first=2, snapid_t last=CEPH_NOSNAP);
CDentry* add_primary_dentry(std::string_view dname, CInode *in, mempool::mds_co::string alternate_name,
snapid_t first=2, snapid_t last=CEPH_NOSNAP);
- CDentry* add_remote_dentry(std::string_view dname, inodeno_t ino, unsigned char d_type,
- mempool::mds_co::string alternate_name,
+ CDentry* add_remote_dentry(std::string_view dname, CInode *ref_in, inodeno_t ino,
+ unsigned char d_type, mempool::mds_co::string alternate_name,
snapid_t first=2, snapid_t last=CEPH_NOSNAP);
void remove_dentry( CDentry *dn ); // delete dentry
void link_remote_inode( CDentry *dn, inodeno_t ino, unsigned char d_type);
void link_remote_inode( CDentry *dn, CInode *in );
void link_primary_inode( CDentry *dn, CInode *in );
+ void link_null_referent_inode(CDentry *dn, inodeno_t referent_ino, inodeno_t rino, unsigned char d_type);
+ void link_referent_inode(CDentry *dn, CInode *in, inodeno_t ino, unsigned char d_type);
void unlink_inode(CDentry *dn, bool adjust_lru=true);
void try_remove_unlinked_dn(CDentry *dn);
dn->first = dir_follows+1;
if (realm->has_snaps_in_range(oldfirst, dir_follows)) {
CDir *dir = dn->dir;
- CDentry *olddn = dir->add_remote_dentry(dn->get_name(), in->ino(), in->d_type(), dn->alternate_name, oldfirst, dir_follows);
+ // TODO: What does this mean for referent inode ?? Passing nullptr for now.
+ CDentry *olddn = dir->add_remote_dentry(dn->get_name(), nullptr, in->ino(), in->d_type(), dn->alternate_name, oldfirst, dir_follows);
dout(10) << " olddn " << *olddn << dendl;
ceph_assert(dir->is_projected());
olddn->set_projected_version(dir->get_projected_version());
mut->add_cow_dentry(olddn);
} else {
ceph_assert(dnl->is_remote());
- CDentry *olddn = dir->add_remote_dentry(dn->get_name(), dnl->get_remote_ino(), dnl->get_remote_d_type(), dn->alternate_name, oldfirst, follows);
+ //No need to journal referent inode for cow
+ CDentry *olddn = dir->add_remote_dentry(dn->get_name(), nullptr, dnl->get_remote_ino(), dnl->get_remote_d_type(), dn->alternate_name, oldfirst, follows);
dout(10) << " olddn " << *olddn << dendl;
olddn->set_projected_version(dir->get_projected_version());
}
if (!dn) {
if (d.is_remote()) {
- dn = dir->add_remote_dentry(ss.name, d.remote_ino, d.remote_d_type, mempool::mds_co::string(d.alternate_name), d.first, ss.snapid);
+ //TODO: Fix for referent remote
+ dn = dir->add_remote_dentry(ss.name, nullptr, d.remote_ino, d.remote_d_type, mempool::mds_co::string(d.alternate_name), d.first, ss.snapid);
} else if (d.is_null()) {
dn = dir->add_null_dentry(ss.name, d.first, ss.snapid);
} else {
for (const auto& rb : lump.get_dremote()) {
CDentry *dn = dir->lookup_exact_snap(rb.dn, rb.dnlast);
if (!dn) {
- dn = dir->add_remote_dentry(rb.dn, rb.ino, rb.d_type, mempool::mds_co::string(rb.alternate_name), rb.dnfirst, rb.dnlast);
+ //TODO: Fix for referent inodes
+ dn = dir->add_remote_dentry(rb.dn, nullptr, rb.ino, rb.d_type, mempool::mds_co::string(rb.alternate_name), rb.dnfirst, rb.dnlast);
dn->set_version(rb.dnv);
if (rb.dirty) dn->_mark_dirty(logseg);
dout(10) << "EMetaBlob.replay added " << *dn << dendl;