handle caps on any inode in reply. no more inode leases.
int ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session,
int fmode, unsigned issued,
- unsigned seq, unsigned mseq,
- void *snapblob, int snapblob_len,
+ unsigned seq, unsigned mseq, u64 realmino,
struct ceph_cap *new_cap)
{
struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap;
- struct ceph_snap_realm *realm;
int mds = session->s_mds;
int is_first = 0;
- realm = ceph_update_snap_trace(mdsc, snapblob, snapblob+snapblob_len,
- false /* not a deletion */);
-
dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode,
session->s_mds, issued, seq);
retry:
} else {
spin_unlock(&inode->i_lock);
new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
- if (new_cap == NULL) {
- ceph_put_snap_realm(mdsc, realm);
+ if (new_cap == NULL)
return -ENOMEM;
- }
goto retry;
}
}
}
if (!ci->i_snap_realm) {
+ struct ceph_snap_realm *realm = ceph_get_snap_realm(mdsc,
+ realmino);
ci->i_snap_realm = realm;
list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
- } else {
- ceph_put_snap_realm(mdsc, realm);
}
dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
*
* caller holds s_mutex, snap_rwsem
*/
-static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
+static void handle_cap_import(struct ceph_mds_client *mdsc,
+ struct inode *inode, struct ceph_mds_caps *im,
struct ceph_mds_session *session,
void *snaptrace, int snaptrace_len)
{
unsigned issued = le32_to_cpu(im->caps);
unsigned seq = le32_to_cpu(im->seq);
unsigned mseq = le32_to_cpu(im->migrate_seq);
+ u64 realmino = le64_to_cpu(im->realm);
if (ci->i_cap_exporting_mds >= 0 &&
ci->i_cap_exporting_mseq < mseq) {
inode, ci, mds, mseq);
}
- ceph_add_cap(inode, session, -1, issued, seq, mseq,
- snaptrace, snaptrace_len, NULL);
+ ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, false);
+ ceph_add_cap(inode, session, -1, issued, seq, mseq, realmino, NULL);
}
break;
case CEPH_CAP_OP_IMPORT:
- handle_cap_import(inode, h, session,
+ handle_cap_import(mdsc, inode, h, session,
msg->front.iov_base + sizeof(*h),
le32_to_cpu(h->snap_trace_len));
up_write(&mdsc->snap_rwsem);
* populate an inode based on info from mds.
* may be called on new or existing inodes.
*/
-int ceph_fill_inode(struct inode *inode,
- struct ceph_mds_reply_info_in *iinfo,
- struct ceph_mds_reply_dirfrag *dirinfo)
+static int fill_inode(struct inode *inode,
+ struct ceph_mds_reply_info_in *iinfo,
+ struct ceph_mds_reply_dirfrag *dirinfo,
+ struct ceph_mds_session *session)
{
struct ceph_mds_reply_inode *info = iinfo->in;
struct ceph_inode_info *ci = ceph_inode(inode);
}
mutex_unlock(&ci->i_fragtree_mutex);
+ /* cap? */
+ if (info->cap.caps) {
+ if (ceph_snap(inode) == CEPH_NOSNAP) {
+ ceph_add_cap(inode, session, -1,
+ info->cap.caps, info->cap.seq,
+ info->cap.mseq, info->cap.realm,
+ NULL);
+ } else {
+
+ }
+ }
+
/* update delegation info? */
if (dirinfo)
ceph_fill_dirfrag(inode, dirinfo);
}
if (vino.ino == 1) {
- err = ceph_fill_inode(in, &rinfo->trace_in[0],
- rinfo->trace_numd ?
- rinfo->trace_dir[0] : NULL);
+ err = fill_inode(in, &rinfo->trace_in[0],
+ rinfo->trace_numd ?
+ rinfo->trace_dir[0] : NULL,
+ session);
if (err < 0)
return err;
if (unlikely(sb->s_root == NULL))
update_inode:
BUG_ON(dn->d_inode != in);
- err = ceph_fill_inode(in,
- &rinfo->trace_in[d+1],
- rinfo->trace_numd <= d ?
- rinfo->trace_dir[d+1] : NULL);
+ err = fill_inode(in,
+ &rinfo->trace_in[d+1],
+ rinfo->trace_numd <= d ?
+ rinfo->trace_dir[d+1] : NULL,
+ session);
if (err < 0) {
- derr(30, "ceph_fill_inode badness\n");
+ derr(30, "fill_inode badness\n");
d_delete(dn);
dn = NULL;
in = NULL;
/*
* prepopulate cache with readdir results, leases, etc.
*/
-int ceph_readdir_prepopulate(struct ceph_mds_request *req)
+int ceph_readdir_prepopulate(struct ceph_mds_request *req,
+ struct ceph_mds_session *session)
{
struct dentry *parent = req->r_last_dentry;
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
dn = splice_dentry(dn, in, NULL);
}
- if (ceph_fill_inode(in, &rinfo->dir_in[i], NULL) < 0) {
- dout(0, "ceph_fill_inode badness on %p\n", in);
+ if (fill_inode(in, &rinfo->dir_in[i], NULL, session) < 0) {
+ dout(0, "fill_inode badness on %p\n", in);
dput(dn);
continue;
}
/* alloc one big block of memory for all of these arrays */
info->trace_in = kmalloc(numi * (sizeof(*info->trace_in) +
- 2*sizeof(*info->trace_ilease) +
sizeof(*info->trace_dir) +
sizeof(*info->trace_dname) +
sizeof(*info->trace_dname_len)),
err = -ENOMEM;
goto out_bad;
}
- info->trace_ilease = (void *)(info->trace_in + numi);
- info->trace_dir = (void *)(info->trace_ilease + numi);
+ info->trace_dir = (void *)(info->trace_in + numi);
info->trace_dname = (void *)(info->trace_dir + numd);
info->trace_dname_len = (void *)(info->trace_dname + numd);
info->trace_dlease = (void *)(info->trace_dname_len + numd);
err = parse_reply_info_in(p, end, &info->trace_in[numi]);
if (err < 0)
goto out_bad;
- info->trace_ilease[numi] = *p;
- *p += sizeof(struct ceph_mds_reply_lease);
dentry:
if (!numd)
/* alloc large array */
info->dir_nr = num;
info->dir_in = kmalloc(num * (sizeof(*info->dir_in) +
- sizeof(*info->dir_ilease) +
sizeof(*info->dir_dname) +
sizeof(*info->dir_dname_len) +
sizeof(*info->dir_dlease)),
err = -ENOMEM;
goto out_bad;
}
- info->dir_ilease = (void *)(info->dir_in + num);
- info->dir_dname = (void *)(info->dir_ilease + num);
+ info->dir_dname = (void *)(info->dir_in + num);
info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num);
err = parse_reply_info_in(p, end, &info->dir_in[i]);
if (err < 0)
goto out_bad;
- info->dir_ilease[i] = *p;
- *p += sizeof(struct ceph_mds_reply_lease);
i++;
num--;
}
u64 tid;
int err, result;
int mds;
- u32 cap, capseq, mseq;
int took_snap_sem = 0;
if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
result = le32_to_cpu(rinfo->head->result);
dout(10, "handle_reply tid %lld result %d\n", tid, result);
+ /* snap trace */
+ if (rinfo->snapblob_len)
+ ceph_update_snap_trace(mdsc, rinfo->snapblob,
+ rinfo->snapblob + rinfo->snapblob_len,
+ le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+
/* insert trace into our cache */
err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
if (err)
goto done;
if (result == 0) {
- /* caps? (i.e. this is an open) */
- if (req->r_expected_cap && req->r_last_inode) {
- cap = le32_to_cpu(rinfo->head->file_caps);
- capseq = le32_to_cpu(rinfo->head->file_caps_seq);
- mseq = le32_to_cpu(rinfo->head->file_caps_mseq);
- if (ceph_snap(req->r_last_inode) == CEPH_NOSNAP) {
- /* use our preallocated struct ceph_cap */
- err = ceph_add_cap(req->r_last_inode,
- req->r_session,
- req->r_fmode,
- cap, capseq, mseq,
- rinfo->snapblob,
- rinfo->snapblob_len,
- req->r_expected_cap);
- req->r_expected_cap = NULL;
- if (err)
- goto done;
- } else {
- /* don't bother with full blown caps on snapped
- * metadata, since its read-only and won't
- * change anyway. */
- struct ceph_inode_info *ci =
- ceph_inode(req->r_last_inode);
-
- spin_lock(&req->r_last_inode->i_lock);
- ci->i_snap_caps |= cap;
- __ceph_get_fmode(ci, req->r_fmode);
- spin_unlock(&req->r_last_inode->i_lock);
- }
- }
-
/* readdir result? */
if (rinfo->dir_nr)
- ceph_readdir_prepopulate(req);
+ ceph_readdir_prepopulate(req, req->r_session);
}
+
done:
if (took_snap_sem)
up_write(&mdsc->snap_rwsem);
int trace_numi, trace_numd, trace_snapdirpos;
struct ceph_mds_reply_info_in *trace_in;
- struct ceph_mds_reply_lease **trace_ilease;
struct ceph_mds_reply_dirfrag **trace_dir;
char **trace_dname;
u32 *trace_dname_len;
struct ceph_mds_reply_dirfrag *dir_dir;
int dir_nr;
- struct ceph_mds_reply_lease **dir_ilease;
char **dir_dname;
u32 *dir_dname_len;
struct ceph_mds_reply_lease **dir_dlease;
*
* caller must hold snap_rwsem for write.
*/
-static
struct ceph_snap_realm *ceph_get_snap_realm(struct ceph_mds_client *mdsc,
u64 ino)
{
/* snap.c */
+struct ceph_snap_realm *ceph_get_snap_realm(struct ceph_mds_client *mdsc,
+ u64 ino);
extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm);
extern struct ceph_snap_realm *ceph_update_snap_trace(struct ceph_mds_client *m,
extern struct inode *ceph_get_inode(struct super_block *sb,
struct ceph_vino vino);
extern struct inode *ceph_get_snapdir(struct inode *parent);
-extern int ceph_fill_inode(struct inode *inode,
- struct ceph_mds_reply_info_in *iinfo,
- struct ceph_mds_reply_dirfrag *dirinfo);
extern void ceph_fill_file_bits(struct inode *inode, int issued,
u64 truncate_seq, u64 size,
u64 time_warp_seq, struct timespec *ctime,
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req,
struct ceph_mds_session *session);
-extern int ceph_readdir_prepopulate(struct ceph_mds_request *req);
+extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
+ struct ceph_mds_session *session);
extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern int ceph_dentry_lease_valid(struct dentry *dentry);
extern int ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session,
int fmode, unsigned issued,
- unsigned cap, unsigned seq,
- void *snapblob, int snapblob_len,
+ unsigned cap, unsigned seq, u64 realmino,
struct ceph_cap *new_cap);
extern void ceph_remove_cap(struct ceph_cap *cap);
extern int ceph_get_cap_mds(struct inode *inode);