From d793b0f752ca33ffc4fee19048ea8515c8601595 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 16 Nov 2007 21:30:33 +0000 Subject: [PATCH] reworked message encoding/decoding header vs payload vs data payload, alignment preservation git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2075 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/client/Client.cc | 30 ++-- trunk/ceph/include/ceph_fs.h | 93 ++++++------ trunk/ceph/kernel/client.c | 18 ++- trunk/ceph/kernel/ktcp.c | 7 +- trunk/ceph/kernel/ktcp.h | 2 +- trunk/ceph/kernel/messenger.c | 30 ++-- trunk/ceph/kernel/messenger.h | 2 + trunk/ceph/mds/Server.cc | 46 +++--- trunk/ceph/messages/MClientRequest.h | 88 ++++++----- trunk/ceph/messages/MLock.h | 14 +- trunk/ceph/messages/MOSDOp.h | 42 +----- trunk/ceph/messages/MOSDOpReply.h | 22 +-- trunk/ceph/msg/FakeMessenger.cc | 10 +- trunk/ceph/msg/Message.cc | 19 +-- trunk/ceph/msg/Message.h | 64 +++----- trunk/ceph/msg/SimpleMessenger.cc | 211 ++++++++++++--------------- 16 files changed, 303 insertions(+), 395 deletions(-) diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index 9a494f92ad06b..2f82cb5f935c5 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -1720,7 +1720,7 @@ int Client::_mkdir(const char *path, mode_t mode) { MClientRequest *req = new MClientRequest(MDS_OP_MKDIR, messenger->get_myinst()); req->set_path(path); - req->args.mkdir.mode = mode; + req->head.args.mkdir.mode = mode; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -1886,7 +1886,7 @@ int Client::_do_lstat(const char *path, int mask, Inode **in) //req->set_caller_gid(fc->gid); req = new MClientRequest(MDS_OP_LSTAT, messenger->get_myinst()); - req->args.stat.mask = mask; + req->head.args.stat.mask = mask; req->set_path(fpath); MClientReply *reply = make_request(req); @@ -2022,7 +2022,7 @@ int Client::_chmod(const char *path, mode_t mode) dout(3) << "_chmod(" << path << ", 0" << oct << mode << dec << ")" << dendl; MClientRequest *req = new MClientRequest(MDS_OP_CHMOD, messenger->get_myinst()); req->set_path(path); - req->args.chmod.mode = mode; + req->head.args.chmod.mode = mode; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -2056,8 +2056,8 @@ int Client::_chown(const char *path, uid_t uid, gid_t gid) dout(3) << "_chown(" << path << ", " << uid << ", " << gid << ")" << dendl; MClientRequest *req = new MClientRequest(MDS_OP_CHOWN, messenger->get_myinst()); req->set_path(path); - req->args.chown.uid = uid; - req->args.chown.gid = gid; + req->head.args.chown.uid = uid; + req->head.args.chown.gid = gid; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -2094,8 +2094,8 @@ int Client::_utimes(const char *path, utime_t mtime, utime_t atime) dout(3) << "_utimes(" << path << ", " << mtime << ", " << atime << ")" << dendl; MClientRequest *req = new MClientRequest(MDS_OP_UTIME, messenger->get_myinst()); req->set_path(path); - req->args.utime.mtime = mtime.tv_ref(); - req->args.utime.atime = atime.tv_ref(); + req->head.args.utime.mtime = mtime.tv_ref(); + req->head.args.utime.atime = atime.tv_ref(); // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -2132,8 +2132,8 @@ int Client::_mknod(const char *path, mode_t mode, dev_t rdev) MClientRequest *req = new MClientRequest(MDS_OP_MKNOD, messenger->get_myinst()); req->set_path(path); - req->args.mknod.mode = mode; - req->args.mknod.rdev = rdev; + req->head.args.mknod.mode = mode; + req->head.args.mknod.rdev = rdev; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -2289,7 +2289,7 @@ int Client::_readdir_get_frag(DirResult *dirp) MClientRequest *req = new MClientRequest(MDS_OP_READDIR, messenger->get_myinst()); req->set_path(dirp->path); - req->args.readdir.frag = fg; + req->head.args.readdir.frag = fg; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -2516,8 +2516,8 @@ int Client::_open(const char *path, int flags, mode_t mode, Fh **fhp) // go MClientRequest *req = new MClientRequest(MDS_OP_OPEN, messenger->get_myinst()); req->set_path(path); - req->args.open.flags = flags; - req->args.open.mode = mode; + req->head.args.open.flags = flags; + req->head.args.open.mode = mode; int cmode = req->get_open_file_mode(); @@ -3073,7 +3073,7 @@ int Client::_truncate(const char *file, off_t length) { MClientRequest *req = new MClientRequest(MDS_OP_TRUNCATE, messenger->get_myinst()); req->set_path(file); - req->args.truncate.length = length; + req->head.args.truncate.length = length; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -3103,8 +3103,8 @@ int Client::ftruncate(int fd, off_t length) int Client::_ftruncate(Fh *fh, off_t length) { MClientRequest *req = new MClientRequest(MDS_OP_TRUNCATE, messenger->get_myinst()); - req->args.truncate.ino = fh->inode->inode.ino; - req->args.truncate.length = length; + req->head.args.truncate.ino = fh->inode->inode.ino; + req->head.args.truncate.length = length; // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index db8aaa1cbdb56..e98f44950901f 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -206,14 +206,6 @@ struct ceph_entity_inst { /* * message header */ -struct ceph_message_header { /* old one, just for now */ - __u32 seq; /* message seq# for this session */ - __u32 type; /* message type */ - struct ceph_entity_inst src, dst; - __u32 nchunks; -}; - -/* new way */ struct ceph_msg_header { __u32 seq; /* message seq# for this session */ __u32 type; /* message type */ @@ -278,50 +270,52 @@ struct ceph_client_request_head { __u32 op; __u32 caller_uid, caller_gid; ceph_ino_t cwd_ino; -}; -union ceph_client_request_args { - struct { - __u32 mask; - } stat; - struct { - ceph_ino_t ino; - __u32 mask; - } fstat; - struct { - ceph_frag_t frag; - } readdir; - struct { - struct ceph_timeval mtime; - struct ceph_timeval atime; - } utime; - struct { - __u32 mode; - } chmod; - struct { - uid_t uid; - gid_t gid; - } chown; - struct { - __u32 mode; - __u32 rdev; - } mknod; - struct { - __u32 mode; - } mkdir; - struct { - __u32 flags; - __u32 mode; - } open; - struct { - ceph_ino_t ino; // optional - __s64 length; - } truncate; - struct { - ceph_ino_t ino; - } fsync; + // fixed size arguments. in a union. + union { + struct { + __u32 mask; + } stat; + struct { + ceph_ino_t ino; + __u32 mask; + } fstat; + struct { + ceph_frag_t frag; + } readdir; + struct { + struct ceph_timeval mtime; + struct ceph_timeval atime; + } utime; + struct { + __u32 mode; + } chmod; + struct { + uid_t uid; + gid_t gid; + } chown; + struct { + __u32 mode; + __u32 rdev; + } mknod; + struct { + __u32 mode; + } mkdir; + struct { + __u32 flags; + __u32 mode; + } open; + struct { + ceph_ino_t ino; // optional + __s64 length; + } truncate; + struct { + ceph_ino_t ino; + } fsync; + } args; }; + /* client reply */ struct ceph_client_reply_head { @@ -342,7 +336,6 @@ struct ceph_client_reply_inode { __u32 rdev; __u32 mask; char *symlink; - struct ceph_frag_tree dirfragdir; }; /* followed by symlink string, then dirfragtree */ diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index 451ae1a804b86..c1a7fa7866107 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -10,6 +10,8 @@ int ceph_debug = 10; +void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg); + /* * create a fresh client instance @@ -17,6 +19,7 @@ int ceph_debug = 10; static struct ceph_client *create_client(struct ceph_mount_args *args) { struct ceph_client *cl; + int err; cl = kzalloc(sizeof(*cl), GFP_KERNEL); if (cl == NULL) @@ -26,12 +29,25 @@ static struct ceph_client *create_client(struct ceph_mount_args *args) init_waitqueue_head(&cl->mount_wq); spin_lock_init(&cl->sb_lock); + /* messenger */ + cl->msgr = ceph_messenger_create(); + if (IS_ERR(cl->msgr)) { + err = PTR_ERR(cl->msgr); + goto fail; + } + cl->msgr->parent = cl; + cl->msgr->dispatch = (ceph_messenger_dispatch_t)ceph_dispatch; + cl->whoami = -1; ceph_monc_init(&cl->monc); ceph_mdsc_init(&cl->mdsc, cl); ceph_osdc_init(&cl->osdc); return cl; + +fail: + kfree(cl); + return ERR_PTR(err); } /* @@ -175,7 +191,7 @@ void ceph_put_client(struct ceph_client *cl) * * should be fast and non-blocking, as it is called with locks held. */ -static void dispatch(struct ceph_client *client, struct ceph_msg *msg) +void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) { dout(5, "dispatch %p type %d\n", (void*)msg, msg->hdr.type); diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index 2fbec52c17247..9bf03272e1403 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -28,17 +28,16 @@ struct socket * _kconnect(struct sockaddr *saddr) return(sd); } -struct socket * _klisten(struct sockaddr *saddr) +struct socket * _klisten(struct sockaddr_in *in_addr) { int ret; struct socket *sd = NULL; - struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr; ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd); if (ret < 0) { printk(KERN_INFO "sock_create_kern error: %d\n", ret); - return(NULL); + return ERR_PTR(ret); } /* no user specified address given so create, will allow arg to mount */ @@ -55,7 +54,7 @@ struct socket * _klisten(struct sockaddr *saddr) if (ret < 0) { printk("Failed to set SO_REUSEADDR: %d\n", ret); } */ - ret = sd->ops->bind(sd, saddr, sizeof(saddr)); + ret = sd->ops->bind(sd, (struct sockaddr*)in_addr, sizeof(in_addr)); /* TBD: probaby want to tune the backlog queue .. */ ret = sd->ops->listen(sd, NUM_BACKUP); if (ret < 0) { diff --git a/trunk/ceph/kernel/ktcp.h b/trunk/ceph/kernel/ktcp.h index a6f6d1f0a8b57..279a56f090d42 100644 --- a/trunk/ceph/kernel/ktcp.h +++ b/trunk/ceph/kernel/ktcp.h @@ -3,7 +3,7 @@ /* prototype definitions */ struct socket * _kconnect(struct sockaddr *); -struct socket * _klisten(struct sockaddr *); +struct socket * _klisten(struct sockaddr_in *); struct socket *_kaccept(struct socket *); int _krecvmsg(struct socket *, void *, size_t , unsigned); int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned); diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index 8ffba6815fda8..8c43d1c060727 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -730,25 +730,37 @@ done: * create a new messenger instance, saddr is address specified from mount arg. * If null, will get created by _klisten() */ -struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr) +struct ceph_messenger *ceph_messenger_create() { struct ceph_messenger *msgr; + struct sockaddr_in saddr; msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); if (msgr == NULL) - return NULL; + return ERR_PTR(-ENOMEM); spin_lock_init(&msgr->con_lock); /* create listening socket */ - msgr->listen_sock = _klisten(saddr); - if (msgr->listen_sock == NULL) { + msgr->listen_sock = _klisten(&saddr); + if (IS_ERR(msgr->listen_sock)) { + int err = PTR_ERR(msgr->listen_sock); kfree(msgr); - return NULL; + return ERR_PTR(err); } + + /* determine my ip:port */ + msgr->inst.addr.ipaddr.sin_family = saddr.sin_family; + msgr->inst.addr.ipaddr.sin_port = saddr.sin_port; + msgr->inst.addr.ipaddr.sin_addr = saddr.sin_addr; + /* TBD: setup callback for accept */ INIT_WORK(&msgr->awork, try_accept); /* setup work structure */ - return msgr; + + dout(1, "ceph_messenger_create listening on %x:%d\n", + ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), + ntohl(msgr->inst.addr.ipaddr.sin_port)); + return msgr; } @@ -773,14 +785,14 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) return PTR_ERR(con); dout(5, "opening new connection to peer %x:%d\n", ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), - msg->hdr.dst.addr.ipaddr.sin_port); + ntohl(msg->hdr.dst.addr.ipaddr.sin_port)); con->peer_addr = msg->hdr.dst.addr; con->state = CONNECTING; add_connection(msgr, con); } else { dout(5, "had connection to peer %x:%d\n", - msg->hdr.dst.addr.ipaddr.sin_addr.s_addr, - msg->hdr.dst.addr.ipaddr.sin_port); + ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), + ntohl(msg->hdr.dst.addr.ipaddr.sin_port)); } spin_unlock(&msgr->con_lock); diff --git a/trunk/ceph/kernel/messenger.h b/trunk/ceph/kernel/messenger.h index 4a4b509f0e8ce..37f218370498e 100644 --- a/trunk/ceph/kernel/messenger.h +++ b/trunk/ceph/kernel/messenger.h @@ -106,6 +106,8 @@ struct ceph_connection { }; +extern struct ceph_messenger *ceph_messenger_create(void); + extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off); static __inline__ void ceph_msg_get(struct ceph_msg *msg) { atomic_inc(&msg->nref); diff --git a/trunk/ceph/mds/Server.cc b/trunk/ceph/mds/Server.cc index 5a2429d117f2e..efb566c7cee29 100644 --- a/trunk/ceph/mds/Server.cc +++ b/trunk/ceph/mds/Server.cc @@ -488,19 +488,19 @@ void Server::handle_client_request(MClientRequest *req) // some ops are on ino's switch (req->get_op()) { case MDS_OP_FSTAT: - ref = mdcache->get_inode(req->args.fstat.ino); + ref = mdcache->get_inode(req->head.args.fstat.ino); assert(ref); break; case MDS_OP_TRUNCATE: - if (!req->args.truncate.ino) + if (!req->head.args.truncate.ino) break; // can be called w/ either fh OR path - ref = mdcache->get_inode(req->args.truncate.ino); + ref = mdcache->get_inode(req->head.args.truncate.ino); assert(ref); break; case MDS_OP_FSYNC: - ref = mdcache->get_inode(req->args.fsync.ino); // fixme someday no ino needed? + ref = mdcache->get_inode(req->head.args.fsync.ino); // fixme someday no ino needed? assert(ref); break; } @@ -563,7 +563,7 @@ void Server::dispatch_client_request(MDRequest *mdr) // funky. case MDS_OP_OPEN: - if (req->args.open.flags & O_CREAT) + if (req->head.args.open.flags & O_CREAT) handle_client_openc(mdr); else handle_client_open(mdr); @@ -1331,7 +1331,7 @@ void Server::handle_client_stat(MDRequest *mdr) set wrlocks = mdr->wrlocks; set xlocks = mdr->xlocks; - int mask = req->args.stat.mask; + int mask = req->head.args.stat.mask; if (mask & STAT_MASK_LINK) rdlocks.insert(&ref->linklock); if (mask & STAT_MASK_AUTH) rdlocks.insert(&ref->authlock); if (ref->is_file() && @@ -1407,8 +1407,8 @@ void Server::handle_client_utime(MDRequest *mdr) // project update inode_t *pi = cur->project_inode(); - pi->mtime = req->args.utime.mtime; - pi->atime = req->args.utime.atime; + pi->mtime = req->head.args.utime.mtime; + pi->atime = req->head.args.utime.atime; pi->version = cur->pre_dirty(); pi->ctime = g_clock.real_now(); @@ -1448,7 +1448,7 @@ void Server::handle_client_chmod(MDRequest *mdr) inode_t *pi = cur->project_inode(); pi->mode = (pi->mode & ~04777) | - (req->args.chmod.mode & 04777); + (req->head.args.chmod.mode & 04777); pi->version = cur->pre_dirty(); pi->ctime = g_clock.real_now(); @@ -1486,8 +1486,8 @@ void Server::handle_client_chown(MDRequest *mdr) // project update inode_t *pi = cur->project_inode(); - pi->uid = MAX(req->args.chown.uid, 0); - pi->gid = MAX(req->args.chown.gid, 0); + pi->uid = MAX(req->head.args.chown.uid, 0); + pi->gid = MAX(req->head.args.chown.gid, 0); pi->version = cur->pre_dirty(); pi->ctime = g_clock.real_now(); @@ -1525,7 +1525,7 @@ void Server::handle_client_readdir(MDRequest *mdr) } // which frag? - frag_t fg = req->args.readdir.frag; + frag_t fg = req->head.args.readdir.frag; // does the frag exist? if (diri->dirfragtree[fg] != fg) { @@ -1677,8 +1677,8 @@ void Server::handle_client_mknod(MDRequest *mdr) assert(newi); // it's a file. - newi->inode.rdev = req->args.mknod.rdev; - newi->inode.mode = req->args.mknod.mode; + newi->inode.rdev = req->head.args.mknod.rdev; + newi->inode.mode = req->head.args.mknod.mode; newi->inode.mode &= ~INODE_TYPE_MASK; newi->inode.mode |= INODE_MODE_FILE; newi->inode.version = dn->pre_dirty() - 1; @@ -1713,7 +1713,7 @@ void Server::handle_client_mkdir(MDRequest *mdr) assert(newi); // it's a directory. - newi->inode.mode = req->args.mkdir.mode; + newi->inode.mode = req->head.args.mkdir.mode; newi->inode.mode &= ~INODE_TYPE_MASK; newi->inode.mode |= INODE_MODE_DIR; newi->inode.layout = g_OSD_MDDirLayout; @@ -3624,7 +3624,7 @@ void Server::handle_client_truncate(MDRequest *mdr) return; // already small enough? - if (cur->inode.size <= req->args.truncate.length) { + if (cur->inode.size <= req->head.args.truncate.length) { reply_request(mdr, 0); return; } @@ -3633,19 +3633,19 @@ void Server::handle_client_truncate(MDRequest *mdr) version_t pdv = cur->pre_dirty(); utime_t ctime = g_clock.real_now(); Context *fin = new C_MDS_truncate_logged(mds, mdr, cur, - pdv, req->args.truncate.length, ctime); + pdv, req->head.args.truncate.length, ctime); // log + wait mdr->ls = mdlog->get_current_segment(); EUpdate *le = new EUpdate(mdlog, "truncate"); le->metablob.add_client_req(mdr->reqid); le->metablob.add_dir_context(cur->get_parent_dir()); - le->metablob.add_inode_truncate(cur->ino(), req->args.truncate.length, cur->inode.size); + le->metablob.add_inode_truncate(cur->ino(), req->head.args.truncate.length, cur->inode.size); inode_t *pi = le->metablob.add_dentry(cur->parent, true); pi->mtime = ctime; pi->ctime = ctime; pi->version = pdv; - pi->size = req->args.truncate.length; + pi->size = req->head.args.truncate.length; mdlog->submit_entry(le, fin); @@ -3659,7 +3659,7 @@ void Server::handle_client_open(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - int flags = req->args.open.flags; + int flags = req->head.args.open.flags; int cmode = req->get_open_file_mode(); bool need_auth = ((cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY) || (flags & O_TRUNC)); @@ -3913,13 +3913,13 @@ void Server::handle_client_openc(MDRequest *mdr) dout(7) << "open w/ O_CREAT on " << req->get_filepath() << dendl; - bool excl = (req->args.open.flags & O_EXCL); + bool excl = (req->head.args.open.flags & O_EXCL); CDentry *dn = rdlock_path_xlock_dentry(mdr, !excl, false); if (!dn) return; if (!dn->is_null()) { // it existed. - if (req->args.open.flags & O_EXCL) { + if (req->head.args.open.flags & O_EXCL) { dout(10) << "O_EXCL, target exists, failing with -EEXIST" << dendl; reply_request(mdr, -EEXIST, dn->get_dir()->get_inode()); return; @@ -3938,7 +3938,7 @@ void Server::handle_client_openc(MDRequest *mdr) assert(in); // it's a file. - in->inode.mode = req->args.open.mode; + in->inode.mode = req->head.args.open.mode; in->inode.mode |= INODE_MODE_FILE; in->inode.version = dn->pre_dirty() - 1; diff --git a/trunk/ceph/messages/MClientRequest.h b/trunk/ceph/messages/MClientRequest.h index d8784c981df2f..5336f8cb51ece 100644 --- a/trunk/ceph/messages/MClientRequest.h +++ b/trunk/ceph/messages/MClientRequest.h @@ -73,39 +73,36 @@ class MClientRequest : public Message { - struct ceph_client_request_head st; +public: + struct ceph_client_request_head head; // path arguments filepath path; string sarg; public: - // fixed size arguments. in a union. - union ceph_client_request_args args; - // cons MClientRequest() : Message(CEPH_MSG_CLIENT_REQUEST) {} MClientRequest(int op, entity_inst_t ci) : Message(CEPH_MSG_CLIENT_REQUEST) { - memset(&st, 0, sizeof(st)); - memset(&args, 0, sizeof(args)); - this->st.op = op; - this->st.client_inst.name = ci.name.v; - this->st.client_inst.addr = ci.addr.v; + memset(&head, 0, sizeof(head)); + this->head.op = op; + this->head.client_inst.name = ci.name.v; + this->head.client_inst.addr = ci.addr.v; } metareqid_t get_reqid() { // FIXME: for now, assume clients always have 1 incarnation - return metareqid_t(st.client_inst.name.num, st.tid); + return metareqid_t(head.client_inst.name.num, head.tid); } int get_open_file_mode() { - if (args.open.flags & O_LAZY) + if (head.args.open.flags & O_LAZY) return FILE_MODE_LAZY; - if (args.open.flags & O_WRONLY) + if (head.args.open.flags & O_WRONLY) return FILE_MODE_W; - if (args.open.flags & O_RDWR) + if (head.args.open.flags & O_RDWR) return FILE_MODE_RW; - if (args.open.flags & O_APPEND) + if (head.args.open.flags & O_APPEND) return FILE_MODE_W; return FILE_MODE_R; } @@ -113,17 +110,17 @@ class MClientRequest : public Message { return get_open_file_mode() == FILE_MODE_R; } bool is_idempotent() { - if (st.op == MDS_OP_OPEN) + if (head.op == MDS_OP_OPEN) return open_file_mode_is_readonly(); - return (st.op < 1000); + return (head.op < 1000); } bool auth_is_best() { if (!is_idempotent()) return true; - if (st.op == MDS_OP_READDIR) return true; + if (head.op == MDS_OP_READDIR) return true; return false; } bool follow_trailing_symlink() { - switch (st.op) { + switch (head.op) { case MDS_OP_LSTAT: case MDS_OP_FSTAT: case MDS_OP_LINK: @@ -155,58 +152,55 @@ class MClientRequest : public Message { // normal fields - void set_tid(tid_t t) { st.tid = t; } - void set_oldest_client_tid(tid_t t) { st.oldest_client_tid = t; } - void inc_num_fwd() { st.num_fwd++; } - void set_retry_attempt(int a) { st.retry_attempt = a; } + void set_tid(tid_t t) { head.tid = t; } + void set_oldest_client_tid(tid_t t) { head.oldest_client_tid = t; } + void inc_num_fwd() { head.num_fwd++; } + void set_retry_attempt(int a) { head.retry_attempt = a; } void set_path(string& p) { path.set_path(p); } void set_path(const char *p) { path.set_path(p); } void set_path(const filepath& fp) { path = fp; } - void set_caller_uid(int u) { st.caller_uid = u; } - void set_caller_gid(int g) { st.caller_gid = g; } + void set_caller_uid(int u) { head.caller_uid = u; } + void set_caller_gid(int g) { head.caller_gid = g; } void set_sarg(string& arg) { this->sarg = arg; } void set_sarg(const char *arg) { this->sarg = arg; } void set_mds_wants_replica_in_dirino(inodeno_t dirino) { - st.mds_wants_replica_in_dirino = dirino; } + head.mds_wants_replica_in_dirino = dirino; } void set_client_inst(const entity_inst_t& i) { - st.client_inst.name = i.name.v; - st.client_inst.addr = i.addr.v; + head.client_inst.name = i.name.v; + head.client_inst.addr = i.addr.v; } entity_inst_t get_client_inst() { - return entity_inst_t(st.client_inst); + return entity_inst_t(head.client_inst); } - int get_client() { return st.client_inst.name.num; } - tid_t get_tid() { return st.tid; } - tid_t get_oldest_client_tid() { return st.oldest_client_tid; } - int get_num_fwd() { return st.num_fwd; } - int get_retry_attempt() { return st.retry_attempt; } - int get_op() { return st.op; } - int get_caller_uid() { return st.caller_uid; } - int get_caller_gid() { return st.caller_gid; } - //inodeno_t get_ino() { return st.ino; } + int get_client() { return head.client_inst.name.num; } + tid_t get_tid() { return head.tid; } + tid_t get_oldest_client_tid() { return head.oldest_client_tid; } + int get_num_fwd() { return head.num_fwd; } + int get_retry_attempt() { return head.retry_attempt; } + int get_op() { return head.op; } + int get_caller_uid() { return head.caller_uid; } + int get_caller_gid() { return head.caller_gid; } + //inodeno_t get_ino() { return head.ino; } const string& get_path() { return path.get_path(); } filepath& get_filepath() { return path; } string& get_sarg() { return sarg; } inodeno_t get_mds_wants_replica_in_dirino() { - return st.mds_wants_replica_in_dirino; } + return head.mds_wants_replica_in_dirino; } - inodeno_t get_cwd_ino() { return st.cwd_ino ? st.cwd_ino:MDS_INO_ROOT; } + inodeno_t get_cwd_ino() { return head.cwd_ino ? head.cwd_ino:MDS_INO_ROOT; } void decode_payload() { int off = 0; - payload.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); - payload.copy(off, sizeof(args), (char*)&args); - off += sizeof(args); + payload.copy(off, sizeof(head), (char*)&head); + off += sizeof(head); path._decode(payload, off); ::_decode(sarg, payload, off); } void encode_payload() { - payload.append((char*)&st, sizeof(st)); - payload.append((char*)&args, sizeof(args)); + payload.append((char*)&head, sizeof(head)); path._encode(payload); ::_encode(sarg, payload); } @@ -267,8 +261,8 @@ class MClientRequest : public Message { out << " " << get_path(); if (get_sarg().length()) out << " " << get_sarg(); - if (st.retry_attempt) - out << " RETRY=" << st.retry_attempt; + if (head.retry_attempt) + out << " RETRY=" << head.retry_attempt; out << ")"; } diff --git a/trunk/ceph/messages/MLock.h b/trunk/ceph/messages/MLock.h index 95c3e5f325212..4bfd6444cc06e 100644 --- a/trunk/ceph/messages/MLock.h +++ b/trunk/ceph/messages/MLock.h @@ -62,10 +62,10 @@ class MLock : public Message { char lock_type; // lock object type MDSCacheObjectInfo object_info; - bufferlist data; // and possibly some data + bufferlist lockdata; // and possibly some data public: - bufferlist& get_data() { return data; } + bufferlist& get_data() { return lockdata; } int get_asker() { return asker; } int get_action() { return action; } metareqid_t get_reqid() { return reqid; } @@ -88,7 +88,7 @@ class MLock : public Message { Message(MSG_MDS_LOCK), action(ac), asker(as), lock_type(lock->get_type()) { lock->get_parent()->set_object_info(object_info); - data.claim(bl); + lockdata.claim(bl); } virtual char *get_type_name() { return "ILock"; } void print(ostream& out) { @@ -99,8 +99,8 @@ class MLock : public Message { } void set_reqid(metareqid_t ri) { reqid = ri; } - void set_data(const bufferlist& data) { - this->data = data; + void set_data(const bufferlist& lockdata) { + this->lockdata = lockdata; } void decode_payload() { @@ -110,7 +110,7 @@ class MLock : public Message { ::_decode(reqid, payload, off); ::_decode(lock_type, payload, off); object_info._decode(payload, off); - ::_decode(data, payload, off); + ::_decode(lockdata, payload, off); } virtual void encode_payload() { ::_encode(asker, payload); @@ -118,7 +118,7 @@ class MLock : public Message { ::_encode(reqid, payload); ::_encode(lock_type, payload); object_info._encode(payload); - ::_encode(data, payload); + ::_encode(lockdata, payload); } }; diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index 3ff3fe6402290..d23ad745acf90 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -118,7 +118,6 @@ private: osd_peer_stat_t peer_stat; } st; - bufferlist data; map attrset; @@ -178,13 +177,6 @@ public: void inc_shed_count() { st.shed_count++; } int get_shed_count() { return st.shed_count; } - void set_data(bufferlist &d) { - data.claim(d); - } - bufferlist& get_data() { - return data; - } - off_t get_data_len() { return data.length(); } MOSDOp(entity_inst_t asker, int inc, long tid, @@ -221,44 +213,14 @@ public: // marshalling virtual void decode_payload() { int off = 0; - payload.copy(off, sizeof(st), (char*)&st); - off += sizeof(st); + ::_decode(st, payload, off); ::_decode(attrset, payload, off); - ::_decode(data, payload, off); - } - - static void add_payload_chunk_breaks(int from, int off, int len, - list& breaks) { - if (len > 0 && - len & 4095 == 0 && - off & 4095 == 0) { - // page-sized and aligned data? easy. - breaks.push_back(from); - } else if (len > 8192) { - // there is at least 1 full page in there. somewhere. - int p = 0; - - // leading partial page? - if (off & 4095 != 0) - p = 4096 - (off & 4095); - - // full page(s) - breaks.push_back(from + p); - p += (len - p) & (~4095); - - // tail bit? - if (p != len) - breaks.push_back(from + p); - } } virtual void encode_payload() { ::_encode(st, payload); ::_encode(attrset, payload); - add_payload_chunk_breaks(payload.length() + 4, - st.offset, data.length(), - chunk_payload_at); - ::_encode(data, payload); + env.data_off = st.offset; } virtual char *get_type_name() { return "osd_op"; } diff --git a/trunk/ceph/messages/MOSDOpReply.h b/trunk/ceph/messages/MOSDOpReply.h index 7b3a5a4150363..0a39395565395 100644 --- a/trunk/ceph/messages/MOSDOpReply.h +++ b/trunk/ceph/messages/MOSDOpReply.h @@ -30,7 +30,7 @@ */ class MOSDOpReply : public Message { - struct { + struct st_t { // req osdreqid_t reqid; @@ -55,7 +55,6 @@ class MOSDOpReply : public Message { osd_peer_stat_t peer_stat; } st; - bufferlist data; map attrset; public: @@ -90,14 +89,6 @@ class MOSDOpReply : public Message { void set_peer_stat(const osd_peer_stat_t& stat) { st.peer_stat = stat; } const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; } - // data payload - void set_data(bufferlist &d) { - data.claim(d); - } - bufferlist& get_data() { - return data; - } - // osdmap epoch_t get_map_epoch() { return st.map_epoch; } @@ -126,19 +117,14 @@ public: // marshalling virtual void decode_payload() { - payload.copy(0, sizeof(st), (char*)&st); - payload.splice(0, sizeof(st)); int off = 0; + ::_decode(st, payload, off); ::_decode(attrset, payload, off); - ::_decode(data, payload, off); } virtual void encode_payload() { - payload.append((char*)&st, sizeof(st)); + ::_encode(st, payload); ::_encode(attrset, payload); - MOSDOp::add_payload_chunk_breaks(payload.length() + 4, - st.offset, data.length(), - chunk_payload_at); - ::_encode(data, payload); + env.data_off = st.offset; } virtual char *get_type_name() { return "osd_op_reply"; } diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index 590b3214eb351..7d984c23d2eab 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -217,15 +217,17 @@ int fakemessenger_do_loop_2() // encode if (m->empty_payload()) m->encode_payload(); - ceph_message_header env = m->get_envelope(); - bufferlist bl; - bl.claim( m->get_payload() ); + ceph_msg_header env = m->get_env(); + bufferlist front; + front.claim( m->get_payload() ); + bufferlist data; + data.claim( m->get_data() ); //bl.c_str(); // condense into 1 buffer delete m; // decode - m = decode_message(env, bl); + m = decode_message(env, front, data); assert(m); } diff --git a/trunk/ceph/msg/Message.cc b/trunk/ceph/msg/Message.cc index 872b4414cc273..6e5724a2dc697 100644 --- a/trunk/ceph/msg/Message.cc +++ b/trunk/ceph/msg/Message.cc @@ -60,7 +60,6 @@ using namespace std; #include "messages/MMDSResolve.h" #include "messages/MMDSResolveAck.h" #include "messages/MMDSCacheRejoin.h" -//#include "messages/MMDSCacheRejoinAck.h" #include "messages/MDirUpdate.h" #include "messages/MDiscover.h" @@ -105,7 +104,7 @@ using namespace std; Message * -decode_message(ceph_message_header& env, bufferlist& payload) +decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data) { // make message Message *m = 0; @@ -149,15 +148,6 @@ decode_message(ceph_message_header& env, bufferlist& payload) m = new MMonMap; break; - /* - case MSG_FAILURE: - m = new MFailure(); - break; - case MSG_FAILURE_ACK: - m = new MFailureAck(); - break; - */ - case MSG_OSD_BOOT: m = new MOSDBoot(); break; @@ -360,11 +350,10 @@ decode_message(ceph_message_header& env, bufferlist& payload) assert(0); } - // env - m->set_envelope(env); + m->set_env(env); + m->set_payload(front); + m->set_data(data); - // decode - m->set_payload(payload); m->decode_payload(); // done! diff --git a/trunk/ceph/msg/Message.h b/trunk/ceph/msg/Message.h index 244c07a57fce0..01f492f0df626 100644 --- a/trunk/ceph/msg/Message.h +++ b/trunk/ceph/msg/Message.h @@ -104,63 +104,47 @@ using std::list; class Message { - private: - - protected: - ceph_message_header env; // envelope - bufferlist payload; // payload - list chunk_payload_at; +protected: + ceph_msg_header env; // envelope + bufferlist payload; // "front" unaligned blob + bufferlist data; // data payload (page-alignment will be preserved where possible) utime_t recv_stamp; friend class Messenger; -public: - public: - Message() { - env.nchunks = 0; - }; +public: + Message() { }; Message(int t) { - env.nchunks = 0; env.type = t; } - virtual ~Message() { - } + virtual ~Message() { } + ceph_msg_header &get_env() { return env; } + void set_env(const ceph_msg_header &e) { env = e; } void clear_payload() { payload.clear(); } bool empty_payload() { return payload.length() == 0; } - bufferlist& get_payload() { - return payload; - } - void set_payload(bufferlist& bl) { - payload.claim(bl); - } - void copy_payload(const bufferlist& bl) { - payload = bl; - } - const list& get_chunk_payload_at() const { return chunk_payload_at; } - void set_chunk_payload_at(list& o) { chunk_payload_at.swap(o); } - ceph_message_header& get_envelope() { - return env; - } - void set_envelope(ceph_message_header& env) { - this->env = env; - } + bufferlist& get_payload() { return payload; } + void set_payload(bufferlist& bl) { payload.claim(bl); } + void copy_payload(const bufferlist& bl) { payload = bl; } + void set_data(bufferlist &d) { data.claim(d); } + void copy_data(const bufferlist &d) { data = d; } + bufferlist& get_data() { return data; } + off_t get_data_len() { return data.length(); } void set_recv_stamp(utime_t t) { recv_stamp = t; } utime_t get_recv_stamp() { return recv_stamp; } - unsigned get_seq() { return env.seq; } - void set_seq(unsigned s) { env.seq = s; } - // ENVELOPE ---- // type int get_type() { return env.type; } void set_type(int t) { env.type = t; } - virtual char *get_type_name() = 0; + + unsigned get_seq() { return env.seq; } + void set_seq(unsigned s) { env.seq = s; } // source/dest entity_inst_t& get_dest_inst() { return *(entity_inst_t*)&env.dst; } @@ -178,21 +162,17 @@ public: entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; } void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; } - // PAYLOAD ---- - void reset_payload() { - payload.clear(); - } - + // virtual bits virtual void decode_payload() = 0; virtual void encode_payload() = 0; - + virtual char *get_type_name() = 0; virtual void print(ostream& out) { out << get_type_name(); } }; -extern Message *decode_message(ceph_message_header &env, bufferlist& bl); +extern Message *decode_message(ceph_msg_header &env, bufferlist& front, bufferlist& data); inline ostream& operator<<(ostream& out, Message& m) { m.print(out); return out; diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index 2276671f0376c..59c1235658ea5 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -22,6 +22,8 @@ #include #include +#include + #include "config.h" #include "messages/MGenericMessage.h" @@ -1229,7 +1231,8 @@ void Rank::Pipe::writer() sent.push_back(m); // move to sent list lock.Unlock(); dout(20) << "writer sending " << m->get_seq() << " " << m << " " << *m << dendl; - if (m->empty_payload()) m->encode_payload(); + if (m->empty_payload()) + m->encode_payload(); int rc = write_message(m); lock.Lock(); @@ -1286,56 +1289,67 @@ Message *Rank::Pipe::read_message() // envelope //dout(10) << "receiver.read_message from sd " << sd << dendl; - ceph_message_header env; + ceph_msg_header env; if (tcp_read( sd, (char*)&env, sizeof(env) ) < 0) return 0; dout(20) << "reader got envelope type=" << env.type << " src " << env.src << " dst " << env.dst - << " nchunks=" << env.nchunks + << " front=" << env.front_len + << " data=" << env.data_len << " at " << env.data_off << dendl; - - // read chunk lens - __u32 chunklens[4]; - if (tcp_read(sd, (char*)chunklens, sizeof(__u32)*env.nchunks) < 0) - return 0; - // read chunks - bufferlist blist; - int32_t pos = 0; - list chunk_at; - for (unsigned i=0; i 0) { + bp = buffer::create_page_aligned(middle); + if (tcp_read( sd, bp.c_str(), middle ) < 0) + return 0; + data.push_back(bp); + left -= middle; + dout(20) << "reader got data page-aligned middle " << middle << dendl; + } - bufferptr bp; - if (size % 4096 == 0) { - dout(30) << "decoding page-aligned chunk of " << size << dendl; - bp = buffer::create_page_aligned(size); - } else { - bp = buffer::create(size); + if (left) { + bp = buffer::create(left); + if (tcp_read( sd, bp.c_str(), left ) < 0) + return 0; + data.push_back(bp); + dout(20) << "reader got data tail " << left << dendl; } - - if (tcp_read( sd, bp.c_str(), size ) < 0) - return 0; - - blist.push_back(bp); - - dout(30) << "reader got frag " - << i << " of " << env.nchunks << " len " << bp.length() << dendl; } // unmarshall message - size_t s = blist.length(); - Message *m = decode_message(env, blist); + Message *m = decode_message(env, front, data); - m->set_chunk_payload_at(chunk_at); - - dout(20) << "reader got " << s << " byte message from " + dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " << m->get_source() << dendl; return m; @@ -1409,45 +1423,20 @@ int Rank::Pipe::write_ack(unsigned seq) int Rank::Pipe::write_message(Message *m) { // get envelope, buffers - ceph_message_header *env = &m->get_envelope(); + ceph_msg_header *env = &m->get_env(); + env->front_len = m->get_payload().length(); + env->data_len = m->get_data().length(); + bufferlist blist; blist.claim( m->get_payload() ); + blist.append( m->get_data() ); - // chunk out page aligned buffers? - __u32 chunklens[4]; - if (blist.length() == 0) - env->nchunks = 0; - else { - env->nchunks = 1 + m->get_chunk_payload_at().size(); // header + explicit chunk points - - int pos = 0; - int c = 0; - for (list::const_iterator pc = m->get_chunk_payload_at().begin(); - pc != m->get_chunk_payload_at().end(); - pc++) { - chunklens[c] = *pc - pos; - dout(20) << "chunk bound at " << *pc << ", chunklen[" << c << "] = " << chunklens[c] << dendl; - pos = *pc; - c++; - } - chunklens[c] = blist.length() - pos; - dout(20) << "tail chunklen[" << c << "] = " << chunklens[c] << dendl; - - if (!m->get_chunk_payload_at().empty()) - dout(20) << "chunk bounds at " << m->get_chunk_payload_at() - << " in " << *m << " len " << blist.length() - << dendl; - } - - dout(20) << "write_message " << m << " " << *m - << " to " << m->get_dest() - << " in " << env->nchunks - << dendl; + dout(20) << "write_message " << m << " " << *m << " to " << m->get_dest() << dendl; // set up msghdr and iovecs struct msghdr msg; memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[3 + blist.buffers().size() + env->nchunks*2]; // conservative upper bound + struct iovec msgvec[3 + blist.buffers().size()]; // conservative upper bound msg.msg_iov = msgvec; int msglen = 0; @@ -1464,62 +1453,46 @@ int Rank::Pipe::write_message(Message *m) msglen += sizeof(*env); msg.msg_iovlen++; - // send chunk sizes - msgvec[msg.msg_iovlen].iov_base = &chunklens; - msgvec[msg.msg_iovlen].iov_len = sizeof(__u32) * env->nchunks; - msglen += sizeof(__u32) * env->nchunks; - msg.msg_iovlen++; - - // payload + // payload (front+data) list::const_iterator pb = blist.buffers().begin(); - list::const_iterator pc = m->get_chunk_payload_at().begin(); int b_off = 0; // carry-over buffer offset, if any int bl_pos = 0; // blist pos - int nchunks = env->nchunks; - - for (int curchunk=0; curchunk < nchunks; curchunk++) { - // start a chunk - int left = chunklens[curchunk]; - assert(left > 0); - dout(30) << "chunk " << curchunk << " pos " << bl_pos << "/" << blist.length() << " size " << left << dendl; - - // chunk contents - while (left > 0) { - int donow = MIN(left, (int)pb->length()-b_off); - assert(donow > 0); - dout(30) << " bl_pos " << bl_pos << " b_off " << b_off - << " leftinchunk " << left - << " buffer len " << pb->length() - << " writing " << donow - << dendl; - - if (msg.msg_iovlen >= IOV_MAX-1) { - if (do_sendmsg(sd, &msg, msglen)) - return -1; - - // and restart the iov - msg.msg_iov = msgvec; - msg.msg_iovlen = 0; - msglen = 0; - } - - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); - msgvec[msg.msg_iovlen].iov_len = donow; - msglen += donow; - msg.msg_iovlen++; - - left -= donow; - assert(left >= 0); - b_off += donow; - bl_pos += donow; - if (b_off != (int)pb->length()) - break; - pb++; - b_off = 0; + int left = blist.length(); + + while (left > 0) { + int donow = MIN(left, (int)pb->length()-b_off); + assert(donow > 0); + dout(30) << " bl_pos " << bl_pos << " b_off " << b_off + << " leftinchunk " << left + << " buffer len " << pb->length() + << " writing " << donow + << dendl; + + if (msg.msg_iovlen >= IOV_MAX-1) { + if (do_sendmsg(sd, &msg, msglen)) + return -1; + + // and restart the iov + msg.msg_iov = msgvec; + msg.msg_iovlen = 0; + msglen = 0; } - assert(left == 0); + + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); + msgvec[msg.msg_iovlen].iov_len = donow; + msglen += donow; + msg.msg_iovlen++; + + left -= donow; + assert(left >= 0); + b_off += donow; + bl_pos += donow; + if (b_off != (int)pb->length()) + break; + pb++; + b_off = 0; } - assert(pb == blist.buffers().end()); + assert(left == 0); // send if (do_sendmsg(sd, &msg, msglen)) -- 2.39.5