in = diri;
pdn = in->get_parent_dn();
}
- vector<ceph_file_layout>::iterator i = inode.old_layouts.begin();
- while(i != inode.old_layouts.end()) {
+ vector<int64_t>::iterator i = inode.old_pools.begin();
+ while(i != inode.old_pools.end()) {
// don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
- if (i->fl_pg_pool == location)
+ if (*i == location)
continue;
- bt->old_pools.insert(i->fl_pg_pool);
+ bt->old_pools.insert(*i);
i++;
}
}
void store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin);
void _stored_backtrace(BacktraceInfo *info, Context *fin);
unsigned encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info);
- };
+};
#endif
}
};
+class C_MDC_PurgeForwardingPointers : public Context {
+ MDCache *cache;
+ CDentry *dn;
+ Context *fin;
+public:
+ inode_backtrace_t backtrace;
+ C_MDC_PurgeForwardingPointers(MDCache *c, CDentry *d, Context *f) :
+ cache(c), dn(d), fin(f) {}
+ void finish(int r) {
+ cache->_purge_forwarding_pointers(&backtrace, dn, r, fin);
+ }
+};
+
+class C_MDC_PurgeStray : public Context {
+ MDCache *cache;
+ CDentry *dn;
+public:
+ C_MDC_PurgeStray(MDCache *c, CDentry *d) :
+ cache(c), dn(d) {}
+ void finish(int r) {
+ cache->_purge_stray(dn, r);
+ }
+};
+
+void MDCache::_purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *d, int r, Context *fin)
+{
+ assert(r == 0 || r == -ENOENT);
+ // setup gathering context
+ C_GatherBuilder gather_bld(g_ceph_context);
+
+ // remove all the objects with forwarding pointer backtraces (aka sentinels)
+ for (set<int64_t>::const_iterator i = backtrace->old_pools.begin();
+ i != backtrace->old_pools.end();
+ i++) {
+ SnapContext snapc;
+ object_t oid = CInode::get_object_name(backtrace->ino, frag_t(), "");
+ object_locator_t oloc(*i);
+
+ mds->objecter->remove(oid, oloc, snapc, ceph_clock_now(g_ceph_context), 0,
+ NULL, gather_bld.new_sub());
+ }
+
+ if (gather_bld.has_subs()) {
+ gather_bld.set_finisher(fin);
+ gather_bld.activate();
+ } else {
+ fin->finish(r);
+ }
+}
+
+void MDCache::_purge_stray(CDentry *dn, int r)
+{
+ // purge the strays
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ CInode *in = dnl->get_inode();
+ dout(10) << "_purge_stray " << *dn << " " << *in << dendl;
+
+ SnapRealm *realm = in->find_snaprealm();
+ SnapContext nullsnap;
+ const SnapContext *snapc;
+ if (realm) {
+ dout(10) << " realm " << *realm << dendl;
+ snapc = &realm->get_snap_context();
+ } else {
+ dout(10) << " NO realm, using null context" << dendl;
+ snapc = &nullsnap;
+ assert(in->last == CEPH_NOSNAP);
+ }
+
+ uint64_t period = (uint64_t)in->inode.layout.fl_object_size * (uint64_t)in->inode.layout.fl_stripe_count;
+ uint64_t cur_max_size = in->inode.get_max_size();
+ uint64_t to = MAX(in->inode.size, cur_max_size);
+ if (to && period) {
+ uint64_t num = (to + period - 1) / period;
+ dout(10) << "purge_stray 0~" << to << " objects 0~" << num << " snapc " << snapc << " on " << *in << dendl;
+ mds->filer->purge_range(in->inode.ino, &in->inode.layout, *snapc,
+ 0, num, ceph_clock_now(g_ceph_context), 0,
+ new C_MDC_PurgeStrayPurged(this, dn));
+
+ } else {
+ dout(10) << "purge_stray 0 objects snapc " << snapc << " on " << *in << dendl;
+ _purge_stray_purged(dn);
+ }
+}
+
void MDCache::purge_stray(CDentry *dn)
{
CDentry::linkage_t *dnl = dn->get_projected_linkage();
// that is implicit in the dentry's presence and non-use in the stray
// dir. on recovery, we'll need to re-eval all strays anyway.
- SnapRealm *realm = in->find_snaprealm();
- SnapContext nullsnap;
- const SnapContext *snapc;
- if (realm) {
- dout(10) << " realm " << *realm << dendl;
- snapc = &realm->get_snap_context();
- } else {
- dout(10) << " NO realm, using null context" << dendl;
- snapc = &nullsnap;
- assert(in->last == CEPH_NOSNAP);
- }
-
if (in->is_dir()) {
dout(10) << "purge_stray dir ... implement me!" << dendl; // FIXME XXX
- _purge_stray_purged(dn);
+ // remove the backtrace
+ SnapContext snapc;
+ object_t oid = CInode::get_object_name(in->ino(), frag_t(), "");
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+
+ mds->objecter->removexattr(oid, oloc, "parent", snapc, ceph_clock_now(g_ceph_context), 0,
+ NULL, new C_MDC_PurgeStrayPurged(this, dn));
} else if (in->is_file()) {
- uint64_t period = (uint64_t)in->inode.layout.fl_object_size * (uint64_t)in->inode.layout.fl_stripe_count;
- uint64_t cur_max_size = in->inode.get_max_size();
- uint64_t to = MAX(in->inode.size, cur_max_size);
- if (to && period) {
- uint64_t num = (to + period - 1) / period;
- dout(10) << "purge_stray 0~" << to << " objects 0~" << num << " snapc " << snapc << " on " << *in << dendl;
- mds->filer->purge_range(in->inode.ino, &in->inode.layout, *snapc,
- 0, num, ceph_clock_now(g_ceph_context), 0,
- new C_MDC_PurgeStrayPurged(this, dn));
- } else {
- dout(10) << "purge_stray 0 objects snapc " << snapc << " on " << *in << dendl;
- _purge_stray_purged(dn);
- }
+ // get the backtrace before blowing away the object
+ C_MDC_PurgeStray *strayfin = new C_MDC_PurgeStray(this, dn);
+ C_MDC_PurgeForwardingPointers *fpfin = new C_MDC_PurgeForwardingPointers(this, dn, strayfin);
+ in->fetch_backtrace(&fpfin->backtrace, fpfin);
} else {
// not a dir or file; purged!
_purge_stray_purged(dn);
eval_stray(dn);
}
protected:
+ void _purge_forwarding_pointers(inode_backtrace_t *backtrace, CDentry *dn, int r, Context *fin);
+ void _purge_stray(CDentry *dn, int r);
void purge_stray(CDentry *dn);
void _purge_stray_purged(CDentry *dn, int r=0);
void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls);
void _purge_stray_logged_truncate(CDentry *dn, LogSegment *ls);
+ friend class C_MDC_PurgeForwardingPointers;
+ friend class C_MDC_PurgeStray;
friend class C_MDC_PurgeStrayLogged;
friend class C_MDC_PurgeStrayLoggedTruncate;
friend class C_MDC_PurgeStrayPurged;
reply_request(mdr, 0, cur, dn);
}
-
-
class C_MDS_openc_finish : public Context {
MDS *mds;
MDRequest *mdr;
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_extra_bl(mdr->reply_extra_bl);
mds->server->reply_request(mdr, reply);
+
+ mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
+ assert(g_conf->mds_kill_openc_at != 1);
}
};
CInode *in = prepare_new_inode(mdr, dn->get_dir(), inodeno_t(req->head.ino),
req->head.args.open.mode | S_IFREG, &layout);
assert(in);
-
+
// it's a file.
dn->push_projected_linkage(in);
void finish(int r) {
assert(r == 0);
+ int64_t old_pool = in->inode.layout.fl_pg_pool;
+
// apply
in->pop_and_dirty_projected_inode(mdr->ls);
mdr->apply();
if (changed_ranges)
mds->locker->share_inode_max_size(in);
+
+ // if pool changed, queue a new backtrace and set forward pointer on old
+ if (old_pool != in->inode.layout.fl_pg_pool) {
+ mdr->ls->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool);
+ mdr->ls->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
+
+ // set forwarding pointer on old backtrace
+ mdr->ls->remove_pending_backtraces(in->ino(), old_pool);
+ mdr->ls->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool);
+ }
}
};
// validate layout
ceph_file_layout layout = cur->get_projected_inode()->layout;
+ // save existing layout for later
+ int64_t old_pool = layout.fl_pg_pool;
if (req->head.args.setlayout.layout.fl_object_size > 0)
layout.fl_object_size = req->head.args.setlayout.layout.fl_object_size;
// project update
inode_t *pi = cur->project_inode();
pi->layout = layout;
+ // add the old pool to the inode
+ pi->add_old_pool(old_pool);
pi->version = cur->pre_dirty();
pi->ctime = ceph_clock_now(g_ceph_context);
EUpdate *le = new EUpdate(mdlog, "setlayout");
mdlog->start_entry(le);
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
+ // add the old pool to the metablob to indicate the pool changed with this event
+ le->metablob.add_old_pool(old_pool);
mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(mdr, &le->metablob, cur);
name.find("ceph.dir.layout") == 0) {
inode_t *pi;
string rest;
+ int64_t old_pool = -1;
if (name.find("ceph.dir.layout") == 0) {
if (!cur->is_dir()) {
reply_request(mdr, -EINVAL);
return;
pi = cur->project_inode();
+ old_pool = pi->layout.fl_pg_pool;
+ pi->add_old_pool(old_pool);
pi->layout = layout;
pi->ctime = ceph_clock_now(g_ceph_context);
}
EUpdate *le = new EUpdate(mdlog, "set vxattr layout");
mdlog->start_entry(le);
le->metablob.add_client_req(req->get_reqid(), req->get_oldest_client_tid());
+ if (cur->is_file()) {
+ assert(old_pool != -1);
+ le->metablob.add_old_pool(old_pool);
+ }
mdcache->predirty_journal_parents(mdr, &le->metablob, cur, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(mdr, &le->metablob, cur);
// hit pop
mds->balancer->hit_inode(mdr->now, newi, META_POP_IWR);
+ // store the backtrace on the 'parent' xattr
+ if (newi->inode.is_dir()) {
+ // if its a dir, put it in the metadata pool
+ mdr->ls->queue_backtrace_update(newi, mds->mdsmap->get_metadata_pool());
+ } else {
+ // if its a file, put it in the data pool for that file
+ mdr->ls->queue_backtrace_update(newi, newi->inode.layout.fl_pg_pool);
+ }
+
// reply
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_result(0);
mds->balancer->hit_inode(mdr->now, destdnl->get_inode(), META_POP_IWR);
// did we import srci? if so, explicitly ack that import that, before we unlock and reply.
-
+
+ // backtrace
+ if (destdnl->inode->is_dir()) {
+ // replace previous backtrace on this inode with myself
+ mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), mds->mdsmap->get_metadata_pool());
+ // queue an updated backtrace
+ mdr->ls->queue_backtrace_update(destdnl->inode, mds->mdsmap->get_metadata_pool());
+
+ } else {
+ // remove all pending backtraces going to the same pool
+ mdr->ls->remove_pending_backtraces(destdnl->inode->ino(), destdnl->inode->inode.layout.fl_pg_pool);
+ // queue an updated backtrace
+ mdr->ls->queue_backtrace_update(destdnl->inode, destdnl->inode->inode.layout.fl_pg_pool);
+ }
// reply
MClientReply *reply = new MClientReply(mdr->client_request, 0);
mdcache->project_subtree_rename(oldin, destdn->get_dir(), straydn->get_dir());
if (srci->is_dir())
mdcache->project_subtree_rename(srci, srcdn->get_dir(), destdn->get_dir());
+
+ // always update the backtrace
+ metablob->update_backtrace();
}
// idempotent op(s)
list<pair<metareqid_t,uint64_t> > client_reqs;
+ int64_t old_pool;
+ bool update_bt;
+
public:
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& bl);
void add_dir_context(CDir *dir, int mode = TO_AUTH_SUBTREE_ROOT);
+ void add_old_pool(int64_t pool) {
+ old_pool = pool;
+ }
+ void update_backtrace() {
+ update_bt = true;
+ }
+
void print(ostream& out) const {
out << "[metablob";
if (!lump_order.empty())
if (pool == -1) pool = location;
bt.pool = pool;
- i->build_backtrace(l, bt);
+ i->build_backtrace(l, &bt);
ls->update_backtraces.push_back(&item_logseg);
}
EMetaBlob::EMetaBlob(MDLog *mdlog) : opened_ino(0), renamed_dirino(0),
inotablev(0), sessionmapv(0),
allocated_ino(0),
+ old_pool(-1),
+ update_bt(false),
last_subtree_map(mdlog ? mdlog->get_last_segment_offset() : 0),
my_offset(mdlog ? mdlog->get_write_pos() : 0) //, _segment(0)
{ }
*/
void EMetaBlob::encode(bufferlist& bl) const
{
- ENCODE_START(5, 5, bl);
+ ENCODE_START(6, 5, bl);
::encode(lump_order, bl);
::encode(lump_map, bl);
::encode(roots, bl);
::encode(client_reqs, bl);
::encode(renamed_dirino, bl);
::encode(renamed_dir_frags, bl);
+ ::encode(old_pool, bl);
+ ::encode(update_bt, bl);
ENCODE_FINISH(bl);
}
void EMetaBlob::decode(bufferlist::iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(5, 5, 5, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(6, 5, 5, bl);
::decode(lump_order, bl);
::decode(lump_map, bl);
if (struct_v >= 4) {
::decode(renamed_dirino, bl);
::decode(renamed_dir_frags, bl);
}
+ if (struct_v >= 6) {
+ ::decode(old_pool, bl);
+ ::decode(update_bt, bl);
+ }
DECODE_FINISH(bl);
}
if (!in)
in = new CInode(mds->mdcache, true);
(*p)->update_inode(mds, in);
+
if (isnew)
mds->mdcache->add_inode(in);
if ((*p)->dirty) in->_mark_dirty(logseg);
assert(in->first == p->dnfirst ||
(in->is_multiversion() && in->first > p->dnfirst));
}
+
+ // store backtrace for allocated inos (create, mkdir, symlink, mknod)
+ if (allocated_ino || used_preallocated_ino) {
+ if (in->inode.is_dir()) {
+ logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool());
+ } else {
+ logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
+ }
+ }
+ // handle change of pool with backtrace update
+ if (old_pool != -1 && old_pool != in->inode.layout.fl_pg_pool) {
+ // update backtrace on new data pool
+ logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
+
+ // set forwarding pointer on old backtrace
+ logseg->queue_backtrace_update(in, old_pool, in->inode.layout.fl_pg_pool);
+ }
+ // handle backtrace update if specified (used by rename)
+ if (update_bt) {
+ if (in->is_dir()) {
+ // replace previous backtrace on this inode with myself
+ logseg->remove_pending_backtraces(in->ino(), mds->mdsmap->get_metadata_pool());
+ logseg->queue_backtrace_update(in, mds->mdsmap->get_metadata_pool());
+ } else {
+ // remove all pending backtraces going to the same pool
+ logseg->remove_pending_backtraces(in->ino(), in->inode.layout.fl_pg_pool);
+ logseg->queue_backtrace_update(in, in->inode.layout.fl_pg_pool);
+ }
+ }
}
// remote dentries
f->close_section();
f->open_array_section("old_pools");
- vector<ceph_file_layout>::const_iterator i = old_pools.begin();
+ vector<int64_t>::const_iterator i = old_pools.begin();
while(i != old_pools.end()) {
- ::dump(*i, f);
+ f->dump_int("pool", *i);
}
f->close_section();