} __attribute__ ((packed));
struct ceph_mds_request_release {
- __le64 ino;
+ __le64 ino, cap_id;
__le32 caps;
- __le32 seq;
+ __le32 seq, mseq;
+ __le32 dname_seq;
__le32 dname_len; /* if releasing a dentry lease, too. string follows. */
} __attribute__ ((packed));
*
* Called with i_lock held.
*/
-static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds)
+static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap;
struct rb_node *n = ci->i_caps.rb_node;
retry:
spin_lock(&inode->i_lock);
- cap = __get_cap_for_mds(inode, mds);
+ cap = __get_cap_for_mds(ci, mds);
if (!cap) {
if (new_cap) {
cap = new_cap;
/* the rest require a cap */
spin_lock(&inode->i_lock);
- cap = __get_cap_for_mds(inode, mds);
+ cap = __get_cap_for_mds(ceph_inode(inode), mds);
if (!cap) {
dout(10, "no cap on %p ino %llx.%llx from mds%d, releasing\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
ceph_check_caps(ci, 0, 0, NULL);
}
+/*
+ * Helpers for embedding cap and dentry lease releases into mds
+ * requests.
+ */
+int ceph_encode_inode_release(void **p, struct inode *inode,
+ int mds, int drop, int unless)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_cap *cap;
+ struct ceph_mds_request_release *rel = *p;
+ int ret = 0;
+
+ dout(10, "encode_inode_release %p mds%d drop %s unless %s\n", inode,
+ mds, ceph_cap_string(drop), ceph_cap_string(unless));
+
+ spin_lock(&inode->i_lock);
+ cap = __get_cap_for_mds(ci, mds);
+ if (cap && __cap_is_valid(cap)) {
+ if ((cap->issued & drop) &&
+ (cap->issued & unless) == 0) {
+ dout(10, "encode_inode_release %p cap %p %s -> %s\n",
+ inode, cap, ceph_cap_string(cap->issued),
+ ceph_cap_string(cap->issued & ~drop));
+ cap->issued &= ~drop;
+ cap->implemented &= ~drop;
+
+ rel->ino = cpu_to_le64(ceph_ino(inode));
+ rel->cap_id = cpu_to_le64(cap->cap_id);
+ rel->seq = cpu_to_le32(cap->seq);
+ rel->mseq = cpu_to_le32(cap->mseq);
+ rel->caps = cpu_to_le32(cap->issued);
+ rel->dname_len = 0;
+ rel->dname_seq = 0;
+ *p += sizeof(*rel);
+ ret = 1;
+ } else {
+ dout(10, "encode_inode_release %p cap %p %s\n",
+ inode, cap, ceph_cap_string(cap->issued));
+ }
+ }
+ spin_unlock(&inode->i_lock);
+ return ret;
+}
+
+int ceph_encode_dentry_release(void **p, struct dentry *dentry,
+ int mds, int drop, int unless)
+{
+ struct inode *dir = dentry->d_parent->d_inode;
+ struct ceph_mds_request_release *rel = *p;
+ struct ceph_dentry_info *di = ceph_dentry(dentry);
+ int ret;
+
+ ret = ceph_encode_inode_release(p, dir, mds, drop, unless);
+
+ /* drop dentry lease too? */
+ spin_lock(&dentry->d_lock);
+ if (ret && di->lease_session && di->lease_session->s_mds == mds) {
+ dout(10, "encode_dentry_release %p mds%d seq %d\n",
+ dentry, mds, (int)di->lease_seq);
+ rel->dname_len = cpu_to_le32(dentry->d_name.len);
+ memcpy(*p, dentry->d_name.name, dentry->d_name.len);
+ *p += dentry->d_name.len;
+ rel->dname_seq = cpu_to_le32(di->lease_seq);
+ }
+ spin_unlock(&dentry->d_lock);
+
+ return ret;
+}
}
static int ceph_mknod(struct inode *dir, struct dentry *dentry,
- int mode, dev_t rdev)
+ int mode, dev_t rdev)
{
struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
dout(5, "mknod in dir %p dentry %p mode 0%o rdev %d\n",
dir, dentry, mode, rdev);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
-
if (IS_ERR(req)) {
d_drop(dentry);
return PTR_ERR(req);
req->r_locked_dir = dir;
req->r_args.mknod.mode = cpu_to_le32(mode);
req->r_args.mknod.rdev = cpu_to_le32(rdev);
- if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
d_drop(dentry);
return PTR_ERR(req);
}
-
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_path2 = dest;
req->r_locked_dir = dir;
- if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
req->r_num_caps = 2;
req->r_locked_dir = dir;
req->r_args.mkdir.mode = cpu_to_le32(mode);
-
- if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
req->r_locked_dir = dir;
-
- if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
if (err) {
d_drop(dentry);
* than PIN) we don't specifically want (due to the file still being
* open).
*/
-static void drop_caps_for_unlink(struct inode *inode)
+static int drop_caps_for_unlink(struct inode *inode)
{
int drop = CEPH_CAP_LINK_RDCACHE | CEPH_CAP_LINK_EXCL;
spin_lock(&inode->i_lock);
- if (inode->i_nlink == 1)
+ if (inode->i_nlink == 1)
drop |= ~(__ceph_caps_wanted(ceph_inode(inode)) | CEPH_CAP_PIN);
spin_unlock(&inode->i_lock);
- ceph_release_caps(inode, drop);
+ return drop;
}
/*
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_locked_dir = dir;
-
- if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN);
- drop_caps_for_unlink(inode);
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+ req->r_inode_drop = drop_caps_for_unlink(inode);
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
d_delete(dentry);
req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry);
req->r_locked_dir = new_dir;
- if (!ceph_caps_issued_mask(ceph_inode(old_dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(old_dir, CEPH_CAP_FILE_RDCACHE);
- ceph_mdsc_lease_release(mdsc, old_dir, old_dentry, CEPH_LOCK_DN);
- if (!ceph_caps_issued_mask(ceph_inode(new_dir), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(new_dir, CEPH_CAP_FILE_RDCACHE);
- ceph_mdsc_lease_release(mdsc, new_dir, new_dentry, CEPH_LOCK_DN);
+ req->r_old_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
/* release LINK_RDCACHE on source inode (mds will lock it) */
- ceph_release_caps(old_dentry->d_inode, CEPH_CAP_LINK_RDCACHE);
+ req->r_old_inode_drop = CEPH_CAP_LINK_RDCACHE;
if (new_dentry->d_inode)
- drop_caps_for_unlink(new_dentry->d_inode);
+ req->r_inode_drop = drop_caps_for_unlink(new_dentry->d_inode);
err = ceph_mdsc_do_request(mdsc, old_dir, req);
if (!err && !req->r_reply_info.head->is_dentry) {
/*
spin_unlock(&inode->i_lock);
dout(10, "open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
- if (!ceph_caps_issued_mask(ceph_inode(inode), CEPH_CAP_FILE_EXCL))
- ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
req = prepare_open_request(inode->i_sb, flags, 0);
if (IS_ERR(req)) {
err = PTR_ERR(req);
return ERR_PTR(PTR_ERR(req));
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
- if ((flags & O_CREAT) &&
- (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)))
- ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+ if (flags & O_CREAT) {
+ req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+ req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+ }
req->r_locked_dir = dir; /* caller holds dir->i_mutex */
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
dentry = ceph_finish_lookup(req, dentry, err);
spin_unlock(&inode->i_lock);
if (mask) {
- if (release)
- ceph_release_caps(inode, release);
req->r_inode = igrab(inode);
+ req->r_inode_drop = release;
req->r_args.setattr.mask = mask;
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
+ req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE;
req->r_num_caps = 1;
req->r_args.setxattr.flags = cpu_to_le32(flags);
req->r_request->hdr.data_len = cpu_to_le32(size);
req->r_request->hdr.data_off = cpu_to_le16(0);
- ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
+ req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE;
req->r_num_caps = 1;
- ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
+
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
return err;
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
+ req->r_inode_drop = CEPH_CAP_FILE_RDCACHE | CEPH_CAP_FILE_EXCL;
req->r_args.setlayout.layout = layout;
- ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
+
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
return err;
if (!req)
return ERR_PTR(-ENOMEM);
+
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
const char *path2 = req->r_path2;
u64 ino1, ino2;
int pathlen1, pathlen2;
- int pathlen;
+ int len;
int freepath1, freepath2;
+ u32 releases;
void *p, *end;
int ret;
goto out_free1;
}
- pathlen = pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(*head) + pathlen,
- 0, 0, NULL);
+ len = sizeof(*head) +
+ pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
+
+ /* calculate (max) length for cap releases */
+ len += sizeof(struct ceph_mds_request_release) *
+ (!!req->r_inode_drop + !!req->r_dentry_drop +
+ !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
+ if (req->r_dentry_drop)
+ len += req->r_dentry->d_name.len;
+ if (req->r_old_dentry_drop)
+ len += req->r_old_dentry->d_name.len;
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
if (IS_ERR(msg))
goto out_free2;
#endif
head->args = req->r_args;
- head->num_releases = 0;
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
- BUG_ON(p != end);
+ /* cap releases */
+ releases = 0;
+ if (req->r_inode_drop)
+ releases += ceph_encode_inode_release(&p,
+ req->r_inode ? req->r_inode : req->r_dentry->d_inode,
+ mds, req->r_inode_drop, req->r_inode_unless);
+ if (req->r_dentry_drop)
+ releases += ceph_encode_dentry_release(&p, req->r_dentry,
+ mds, req->r_dentry_drop, req->r_dentry_unless);
+ if (req->r_old_dentry_drop)
+ releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
+ mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
+ if (req->r_old_inode_drop)
+ releases += ceph_encode_inode_release(&p,
+ req->r_old_dentry->d_inode,
+ mds, req->r_old_inode_drop, req->r_old_inode_unless);
+ head->num_releases = cpu_to_le32(releases);
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
out_free2:
if (freepath2)
struct ceph_vino r_ino1, r_ino2;
union ceph_mds_request_args r_args;
+ int r_inode_drop, r_inode_unless;
+ int r_dentry_drop, r_dentry_unless;
+ int r_old_dentry_drop, r_old_dentry_unless;
+ struct inode *r_old_inode;
+ int r_old_inode_drop, r_old_inode_unless;
+
struct inode *r_target_inode;
struct ceph_msg *r_request; /* original request */
{
ceph_check_caps(ceph_inode(inode), 1, mask, NULL);
}
+extern int ceph_encode_inode_release(void **p, struct inode *inode,
+ int mds, int drop, int unless);
+extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
+ int mds, int drop, int unless);
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
loff_t endoff);
delete m;
}
+void Locker::process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq,
+ const nstring& dname)
+{
+ CInode *in = mdcache->get_inode(ino);
+ if (!in)
+ return;
+ Capability *cap = in->get_client_cap(client);
+ if (cap) {
+ dout(10) << "process_cap_update client" << client << " " << ccap_string(caps) << " on " << *in << dendl;
+
+ if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
+ dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
+ return;
+ }
+
+ cap->confirm_receipt(seq, caps);
+
+ eval_cap_gather(in);
+ if (in->filelock.is_stable())
+ file_eval(&in->filelock);
+ if (in->authlock.is_stable())
+ eval(&in->authlock);
+ }
+
+ if (dname.length()) {
+ frag_t fg = in->pick_dirfrag(dname);
+ CDir *dir = in->get_dirfrag(fg);
+ if (dir) {
+ CDentry *dn = dir->lookup(dname);
+ MDSCacheObject *p = dn;
+ ClientLease *l = p->get_client_lease(client);
+ if (l)
+ p->remove_client_lease(l, l->mask, this);
+ }
+ }
+
+}
+
/*
* update inode based on cap flush|flushsnap|wanted.
* adjust max_size, if needed.
public:
void mark_updated_scatterlock(ScatterLock *lock);
+
+ // caps
+ void process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq,
+ const nstring& dname);
+
+ protected:
+ void handle_client_caps(class MClientCaps *m);
+ bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m,
+ MClientCaps *ack=0);
+ void handle_client_cap_release(class MClientCapRelease *m);
+
+
// local
+public:
void local_wrlock_grab(LocalLock *lock, Mutation *mut);
protected:
bool local_wrlock_start(LocalLock *lock, MDRequest *mut);
void resume_stale_caps(Session *session);
void remove_stale_leases(Session *session);
- protected:
- void handle_client_caps(class MClientCaps *m);
- bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m,
- MClientCaps *ack=0);
- void handle_client_cap_release(class MClientCapRelease *m);
-
public:
void request_inode_file_caps(CInode *in);
protected:
session->trim_completed_requests(req->get_oldest_client_tid());
}
+ // process embedded cap releases?
+ if (req->get_source().is_client()) {
+ int client = req->get_source().num();
+ for (vector<MClientRequest::Release>::iterator p = req->releases.begin();
+ p != req->releases.end();
+ p++) {
+ mds->locker->process_cap_update(client, inodeno_t((__u64)p->item.ino), p->item.cap_id, p->item.caps,
+ p->item.seq, p->item.mseq, p->dname);
+ }
+ }
+
// register + dispatch
MDRequest *mdr = mdcache->request_start(req);
if (!mdr) return;