- d_revalidate?
- test truncate
- is ino_t really still 32 bits on i386?? hrm!
+- fix file open vs file_cap race
+ - preemptively release caps as part of request if doing utimes/etc. on an open file?
- mds client
/ - handle file caps, ack back to mds, etc.
- actually flush dirty data, too
- handle map changes (resubmit ops)
- mon client
- work out message resend logic..?
-- mount
- - determine suitable local ip automatically
- - this should be determined by which interface/ip would be used to connect to the monitor ip
- unmount
/ - close open file handles, mds sessions
- flush data before unmount.
ceph_msg_put(msg);
}
+
+const char *ceph_msg_type_name(int type)
+{
+ switch (type) {
+ case CEPH_MSG_SHUTDOWN: return "shutdown";
+ case CEPH_MSG_PING: return "ping";
+ case CEPH_MSG_PING_ACK: return "ping_ack";
+ case CEPH_MSG_MON_MAP: return "mon_map";
+ case CEPH_MSG_CLIENT_MOUNT: return "client_mount";
+ case CEPH_MSG_CLIENT_UNMOUNT: return "client_unmount";
+ case CEPH_MSG_STATFS: return "statfs";
+ case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
+ case CEPH_MSG_MDS_GETMAP: return "mds_getmap";
+ case CEPH_MSG_MDS_MAP: return "mds_map";
+ case CEPH_MSG_CLIENT_SESSION: return "client_session";
+ case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
+ case CEPH_MSG_CLIENT_REQUEST: return "client_request";
+ case CEPH_MSG_CLIENT_REQUEST_FORWARD: return "client_request_forward";
+ case CEPH_MSG_CLIENT_REPLY: return "client_reply";
+ case CEPH_MSG_CLIENT_FILECAPS: return "client_filecaps";
+ case CEPH_MSG_OSD_GETMAP: return "osd_getmap";
+ case CEPH_MSG_OSD_MAP: return "osd_map";
+ case CEPH_MSG_OSD_OP: return "osd_op";
+ case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
+ }
+ return "unknown";
+}
return 0;
}
-static int ceph_open_init_private_data(struct inode *inode, struct file *file)
+static int ceph_open_init_private_data(struct inode *inode, struct file *file, int flags)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *cf;
return -ENOMEM;
file->private_data = cf;
- mode = ceph_file_mode(file->f_flags);
+ mode = ceph_file_mode(flags);
+ cf->mode = mode;
ci->i_nr_by_mode[mode]++;
wanted = ceph_caps_wanted(ci);
+ dout(10, "opened %p flags 0%o mode %d nr now %d. wanted %d -> %d\n",
+ file, flags,
+ mode, ci->i_nr_by_mode[mode],
+ ci->i_cap_wanted, ci->i_cap_wanted|wanted);
ci->i_cap_wanted |= wanted; /* FIXME this isn't quite right */
return 0;
return err;
}
- err = ceph_open_init_private_data(inode, file);
+ err = ceph_open_init_private_data(inode, file, file->f_flags);
if (err < 0)
return err;
/* finish the open */
err = proc_open_reply(inode, file, session, &rinfo);
if (err == 0)
- err = ceph_open_init_private_data(inode, file);
+ err = ceph_open_init_private_data(inode, file, nd->intent.open.flags);
out:
ceph_mdsc_put_session(session);
return err;
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *cf = file->private_data;
- int mode, wanted;
+ int mode = cf->mode;
+ int wanted;
- dout(5, "ceph_release inode %p file %p\n", inode, file);
+ dout(5, "release inode %p file %p\n", inode, file);
atomic_dec(&ci->i_cap_count);
- if (cf->rinfo.reply)
- ceph_mdsc_destroy_reply_info(&cf->rinfo);
- kfree(cf);
-
- mode = ceph_file_mode(file->f_flags);
+ /*
+ * FIXME mystery: why is file->f_flags now different than
+ * file->f_flags (actually, nd->intent.open.flags) on
+ * open? e.g., on ceph_lookup_open,
+ * ceph_file: opened 000000006fa3ebd0 flags 0101102 mode 2 nr now 1. wanted 0 -> 30
+ * and on release,
+ * ceph_file: released 000000006fa3ebd0 flags 0100001 mode 3 nr now -1. wanted 30 was 30
+ * for now, store the open mode in ceph_file_info.
+ */
+ mode = cf->mode;
ci->i_nr_by_mode[mode]--;
wanted = ceph_caps_wanted(ci);
- dout(10, "mode %d wanted %d was %d\n", mode, wanted, ci->i_cap_wanted);
+ dout(10, "released %p flags 0%o mode %d nr now %d. wanted %d was %d\n",
+ file, file->f_flags, mode,
+ ci->i_nr_by_mode[mode], wanted, ci->i_cap_wanted);
if (wanted != ci->i_cap_wanted)
ceph_mdsc_update_cap_wanted(ci, wanted);
+ if (cf->rinfo.reply)
+ ceph_mdsc_destroy_reply_info(&cf->rinfo);
+ kfree(cf);
+
return 0;
}
dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n", inode, ci, mds, seq);
dout(10, " my wanted = %d\n", wanted);
- /* unwanted? */
- if (wanted == 0) {
- dout(10, "wanted=0, reminding mds\n");
- grant->wanted = cpu_to_le32(0);
- return 1; /* ack */
- }
- if (wanted != le32_to_cpu(grant->wanted)) {
- dout(10, "wanted %d -> %d\n", le32_to_cpu(grant->wanted), wanted);
- grant->wanted = cpu_to_le32(wanted);
- }
+ cap = get_cap_for_mds(inode, mds);
/* new cap? */
- cap = get_cap_for_mds(inode, mds);
if (!cap) {
+ /* unwanted? */
+ if (wanted == 0) {
+ dout(10, "wanted=0, reminding mds\n");
+ grant->wanted = cpu_to_le32(0);
+ return 1; /* ack */
+ }
+ /* hrm */
+ BUG_ON(1);
dout(10, "adding new cap inode %p for mds%d\n", inode, mds);
- cap = ceph_add_cap(inode, session, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq));
+ cap = ceph_add_cap(inode, session,
+ le32_to_cpu(grant->caps),
+ le32_to_cpu(grant->seq));
return ret;
}
+ cap->seq = seq;
+
+ if (wanted != le32_to_cpu(grant->wanted)) {
+ dout(10, "wanted %d -> %d\n", le32_to_cpu(grant->wanted), wanted);
+ grant->wanted = cpu_to_le32(wanted);
+ }
+
/* revocation? */
newcaps = le32_to_cpu(grant->caps);
if (cap->caps & ~newcaps) {
int ceph_setattr(struct dentry *dentry, struct iattr *attr)
{
struct inode *inode = dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
const unsigned int ia_valid = attr->ia_valid;
/* gratuitous debug output */
if (ia_valid & ATTR_UID)
- dout(10, "uid %d -> %d\n", inode->i_uid, attr->ia_uid);
+ dout(10, "setattr: uid %d -> %d\n", inode->i_uid, attr->ia_uid);
if (ia_valid & ATTR_GID)
- dout(10, "gid %d -> %d\n", inode->i_uid, attr->ia_uid);
+ dout(10, "setattr: gid %d -> %d\n", inode->i_uid, attr->ia_uid);
if (ia_valid & ATTR_MODE)
- dout(10, "mode %d -> %d\n", inode->i_mode, attr->ia_mode);
+ dout(10, "setattr: mode %d -> %d\n", inode->i_mode, attr->ia_mode);
if (ia_valid & ATTR_SIZE)
- dout(10, "size %lld -> %lld\n", inode->i_size, attr->ia_size);
+ dout(10, "setattr: size %lld -> %lld\n", inode->i_size, attr->ia_size);
if (ia_valid & ATTR_ATIME)
- dout(10, "atime %ld.%ld -> %ld.%ld\n",
+ dout(10, "setattr: atime %ld.%ld -> %ld.%ld\n",
inode->i_atime.tv_sec, inode->i_atime.tv_nsec,
attr->ia_atime.tv_sec, attr->ia_atime.tv_nsec);
if (ia_valid & ATTR_MTIME)
- dout(10, "mtime %ld.%ld -> %ld.%ld\n",
+ dout(10, "setattr: mtime %ld.%ld -> %ld.%ld\n",
inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
attr->ia_mtime.tv_sec, attr->ia_mtime.tv_nsec);
if (ia_valid & ATTR_FILE)
- dout(10, "ATTR_FILE ... hrm!\n");
+ dout(10, "setattr: ATTR_FILE ... hrm!\n");
/* chown */
if (ia_valid & (ATTR_UID|ATTR_GID)) {
}
/* truncate? */
- if (ia_valid & ATTR_SIZE) {
+ if (ia_valid & ATTR_SIZE &&
+ attr->ia_size < inode->i_size) { /* fixme? */
+ dout(10, "truncate: ia_size %d i_size %d ci->i_wr_size %d\n",
+ (int)attr->ia_size, (int)inode->i_size, (int)ci->i_wr_size);
if (ia_valid & ATTR_FILE)
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_TRUNCATE,
ceph_ino(dentry->d_inode), "", 0, 0);
#include "super.h"
#include "messenger.h"
+/*
+ * note: this also appears in messages/MClientRequest.h,
+ * but i don't want it inline in the kernel.
+ */
+const char* ceph_mds_op_name(int op)
+{
+ switch (op) {
+ case CEPH_MDS_OP_STAT: return "stat";
+ case CEPH_MDS_OP_LSTAT: return "lstat";
+ case CEPH_MDS_OP_FSTAT: return "fstat";
+ case CEPH_MDS_OP_UTIME: return "utime";
+ case CEPH_MDS_OP_CHMOD: return "chmod";
+ case CEPH_MDS_OP_CHOWN: return "chown";
+ case CEPH_MDS_OP_READDIR: return "readdir";
+ case CEPH_MDS_OP_MKNOD: return "mknod";
+ case CEPH_MDS_OP_LINK: return "link";
+ case CEPH_MDS_OP_UNLINK: return "unlink";
+ case CEPH_MDS_OP_RENAME: return "rename";
+ case CEPH_MDS_OP_MKDIR: return "mkdir";
+ case CEPH_MDS_OP_RMDIR: return "rmdir";
+ case CEPH_MDS_OP_SYMLINK: return "symlink";
+ case CEPH_MDS_OP_OPEN: return "open";
+ case CEPH_MDS_OP_TRUNCATE: return "truncate";
+ case CEPH_MDS_OP_FSYNC: return "fsync";
+ default: return "unknown";
+ }
+}
static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
{
/* encode paths */
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
- dout(10, "create_request op %d -> %p\n", op, req);
+ dout(10, "create_request op %d=%s -> %p\n", op, ceph_mds_op_name(op), req);
if (path1)
dout(10, "create_request path1 %llx/%s\n", ino1, path1);
if (path2)
{
struct ceph_mds_file_caps *fc;
struct ceph_msg *msg;
+
+ dout(10, "send_cap_ack ino %llx caps %d wanted %d seq %u size %llu\n",
+ ino, caps, wanted, (unsigned)seq, size);
msg = ceph_msg_new(CEPH_MSG_CLIENT_FILECAPS, sizeof(*fc), 0, 0, 0);
if (IS_ERR(msg))
__u32 *dir_dname_len;
};
-
+extern const char* ceph_mds_op_name(int op);
extern void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client);
extern void ceph_mdsc_stop(struct ceph_mds_client *mdsc);
ret = read_message_partial(con);
if (ret <= 0) goto done;
- dout(5, "===== %p from %s%d type %d len %d+%d =====\n", con->in_msg,
+ dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", con->in_msg,
ceph_name_type_str(con->in_msg->hdr.src.name.type), con->in_msg->hdr.src.name.num,
- con->in_msg->hdr.type, con->in_msg->hdr.front_len, con->in_msg->hdr.data_len);
+ con->in_msg->hdr.type, ceph_msg_type_name(con->in_msg->hdr.type),
+ con->in_msg->hdr.front_len, con->in_msg->hdr.data_len);
msgr->dispatch(con->msgr->parent, con->in_msg); /* fixme: use a workqueue */
con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
/* queue */
spin_lock(&con->out_queue_lock);
msg->hdr.seq = ++con->out_seq;
- dout(1, "----- %p to %s%d type %d len %d+%d -----\n", msg,
+ dout(1, "----- %p to %s%d %d=%s len %d+%d -----\n", msg,
ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num,
- msg->hdr.type, msg->hdr.front_len, msg->hdr.data_len);
- dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
+ msg->hdr.type, ceph_msg_type_name(msg->hdr.type),
+ msg->hdr.front_len, msg->hdr.data_len);
+ dout(2, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
ceph_msg_get(msg);
list_add_tail(&msg->list_head, &con->out_queue);
extern int ceph_debug_mdsc;
extern int ceph_debug_osdc;
-# define dout(x, args...) do { \
+#define dout(x, args...) do { \
if (x <= (ceph_debug ? ceph_debug : DOUT_VAR)) \
printk(KERN_INFO "ceph_" DOUT_PREFIX args); \
} while (0)
-# define derr(x, args...) do { \
+#define derr(x, args...) do { \
if (x <= (ceph_debug ? ceph_debug : DOUT_VAR)) \
printk(KERN_ERR "ceph_" DOUT_PREFIX args); \
} while (0)
#define CEPH_BLKSIZE 4096
+
+
/*
* mount options
*/
{
if ((flags & O_DIRECTORY) == O_DIRECTORY)
return FILE_MODE_PIN;
- if ((flags & O_RDWR) == O_RDWR)
+ if ((flags & O_ACCMODE) == O_RDWR)
return FILE_MODE_RDWR;
- if ((flags & O_WRONLY) == O_WRONLY)
+ if ((flags & O_ACCMODE) == O_WRONLY)
return FILE_MODE_WRONLY;
- if ((flags & O_RDONLY) == O_RDONLY)
+ if ((flags & O_ACCMODE) == O_RDONLY)
return FILE_MODE_RDONLY;
BUG_ON(1);
}
*/
struct ceph_file_info {
u32 frag; /* one frag at a time; screw seek_dir() on large dirs */
+ int mode; /* initialized on open */
struct ceph_mds_reply_info rinfo;
};
extern void ceph_destroy_client(struct ceph_client *cl);
extern int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args,
struct dentry **pmnt_root);
+extern const char *ceph_msg_type_name(int type);
/* inode.c */
CInode *inode;
int wanted_caps; // what the client wants (ideally)
- map<capseq_t, int> cap_history; // seq -> cap
+ map<capseq_t, int> cap_history; // seq -> cap, [last_recv,last_sent]
capseq_t last_sent, last_recv;
capseq_t last_open;
// most recently issued caps.
int pending() {
+ if (!last_sent)
+ return 0;
if (cap_history.count(last_sent))
- return cap_history[ last_sent ];
- return 0;
+ return cap_history[last_sent];
+ else
+ return 0;
}
// caps client has confirmed receipt of
int confirmed() {
+ if (!last_recv)
+ return 0;
if (cap_history.count(last_recv))
- return cap_history[ last_recv ];
- return 0;
+ return cap_history[last_recv];
+ else
+ return 0;
}
- // caps potentially issued
+ // caps issued, potentially still in hands of client
int issued() {
int c = 0;
- for (capseq_t seq = last_recv; seq <= last_sent; seq++) {
- if (cap_history.count(seq)) {
- c |= cap_history[seq];
- generic_dout(10) << " cap issued: seq " << seq << " " << cap_string(cap_history[seq]) << " -> " << cap_string(c) << dendl;
- }
+ for (map<capseq_t,int>::iterator p = cap_history.begin();
+ p != cap_history.end();
+ p++) {
+ c |= p->second;
+ generic_dout(10) << " cap issued: seq " << p->first << " "
+ << cap_string(p->second) << " -> " << cap_string(c)
+ << dendl;
}
return c;
}
int confirm_receipt(capseq_t seq, int caps) {
int r = 0;
- // old seqs
- while (last_recv < seq) {
- generic_dout(10) << " cap.confirm_receipt forgetting seq " << last_recv << " " << cap_string(cap_history[last_recv]) << dendl;
- r |= cap_history[last_recv];
- cap_history.erase(last_recv);
- ++last_recv;
- }
-
- // release current?
- if (cap_history.count(seq) &&
- cap_history[seq] != caps) {
- generic_dout(10) << " cap.confirm_receipt revising seq " << seq << " " << cap_string(cap_history[seq]) << " -> " << cap_string(caps) << dendl;
- // note what we're releasing..
- assert(cap_history[seq] & ~caps);
- r |= cap_history[seq] & ~caps;
-
- cap_history[seq] = caps; // confirmed() now less than before..
- }
+ generic_dout(10) << " confirm_receipt seq " << seq << " last_recv " << last_recv << " last_sent " << last_sent
+ << " cap_history " << cap_history << dendl;
+
+ assert(last_recv <= last_sent);
+ assert(seq <= last_sent);
+ while (!cap_history.empty()) {
+ map<capseq_t,int>::iterator p = cap_history.begin();
+
+ if (p->first > seq)
+ break;
+
+ if (p->first == seq) {
+ // note what we're releasing..
+ if (p->second & ~caps) {
+ generic_dout(10) << " cap.confirm_receipt revising seq " << seq
+ << " " << cap_string(cap_history[seq]) << " -> " << cap_string(caps)
+ << dendl;
+ r |= cap_history[seq] & ~caps;
+ cap_history[seq] = caps; // confirmed() now less than before..
+ }
+
+ // null?
+ if (caps == 0 && seq == last_sent) {
+ generic_dout(10) << " cap.confirm_receipt making null seq " << last_recv
+ << " " << cap_string(cap_history[last_recv]) << dendl;
+ cap_history.clear(); // viola, null!
+ }
+ break;
+ }
- // null?
- if (caps == 0 &&
- cap_history.size() == 1 &&
- cap_history.count(seq)) {
- cap_history.clear(); // viola, null!
+ generic_dout(10) << " cap.confirm_receipt forgetting seq " << p->first
+ << " " << cap_string(p->second) << dendl;
+ r |= p->second;
+ cap_history.erase(p);
}
-
+ last_recv = seq;
+
return r;
}
<< dendl;
// confirm caps
+ int had2 = cap->issued();
int had = cap->confirm_receipt(m->get_seq(), m->get_caps());
int has = cap->confirmed();
+ dout(10) << "had " << cap_string(had) << " " << cap_string(had2) << " has " << cap_string(has) << dendl;
+ had |= had2;
// update wanted
if (cap->wanted() != wanted) {
out << "clientreq(" << get_client()
<< "." << get_tid()
<< " " << ceph_mds_op_name(get_op());
- if (!get_filepath().empty())
+ //if (!get_filepath().empty())
out << " " << get_filepath();
if (!get_filepath2().empty())
out << " " << get_filepath2();