]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
reworked message encoding/decoding header vs payload vs data payload, alignment prese...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Nov 2007 21:30:33 +0000 (21:30 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Nov 2007 21:30:33 +0000 (21:30 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2075 29311d96-e01e-0410-9327-a35deaab8ce9

16 files changed:
trunk/ceph/client/Client.cc
trunk/ceph/include/ceph_fs.h
trunk/ceph/kernel/client.c
trunk/ceph/kernel/ktcp.c
trunk/ceph/kernel/ktcp.h
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/messenger.h
trunk/ceph/mds/Server.cc
trunk/ceph/messages/MClientRequest.h
trunk/ceph/messages/MLock.h
trunk/ceph/messages/MOSDOp.h
trunk/ceph/messages/MOSDOpReply.h
trunk/ceph/msg/FakeMessenger.cc
trunk/ceph/msg/Message.cc
trunk/ceph/msg/Message.h
trunk/ceph/msg/SimpleMessenger.cc

index 9a494f92ad06bb09d095edce33c39389394d15a9..2f82cb5f935c527a4409634c4fa0e2c788f2b229 100644 (file)
@@ -1720,7 +1720,7 @@ int Client::_mkdir(const char *path, mode_t mode)
 {
   MClientRequest *req = new MClientRequest(MDS_OP_MKDIR, messenger->get_myinst());
   req->set_path(path);
-  req->args.mkdir.mode = mode;
+  req->head.args.mkdir.mode = mode;
  
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -1886,7 +1886,7 @@ int Client::_do_lstat(const char *path, int mask, Inode **in)
     //req->set_caller_gid(fc->gid);
     
     req = new MClientRequest(MDS_OP_LSTAT, messenger->get_myinst());
-    req->args.stat.mask = mask;
+    req->head.args.stat.mask = mask;
     req->set_path(fpath);
 
     MClientReply *reply = make_request(req);
@@ -2022,7 +2022,7 @@ int Client::_chmod(const char *path, mode_t mode)
   dout(3) << "_chmod(" << path << ", 0" << oct << mode << dec << ")" << dendl;
   MClientRequest *req = new MClientRequest(MDS_OP_CHMOD, messenger->get_myinst());
   req->set_path(path); 
-  req->args.chmod.mode = mode;
+  req->head.args.chmod.mode = mode;
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -2056,8 +2056,8 @@ int Client::_chown(const char *path, uid_t uid, gid_t gid)
   dout(3) << "_chown(" << path << ", " << uid << ", " << gid << ")" << dendl;
   MClientRequest *req = new MClientRequest(MDS_OP_CHOWN, messenger->get_myinst());
   req->set_path(path); 
-  req->args.chown.uid = uid;
-  req->args.chown.gid = gid;
+  req->head.args.chown.uid = uid;
+  req->head.args.chown.gid = gid;
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -2094,8 +2094,8 @@ int Client::_utimes(const char *path, utime_t mtime, utime_t atime)
   dout(3) << "_utimes(" << path << ", " << mtime << ", " << atime << ")" << dendl;
   MClientRequest *req = new MClientRequest(MDS_OP_UTIME, messenger->get_myinst());
   req->set_path(path); 
-  req->args.utime.mtime = mtime.tv_ref();
-  req->args.utime.atime = atime.tv_ref();
+  req->head.args.utime.mtime = mtime.tv_ref();
+  req->head.args.utime.atime = atime.tv_ref();
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -2132,8 +2132,8 @@ int Client::_mknod(const char *path, mode_t mode, dev_t rdev)
 
   MClientRequest *req = new MClientRequest(MDS_OP_MKNOD, messenger->get_myinst());
   req->set_path(path); 
-  req->args.mknod.mode = mode;
-  req->args.mknod.rdev = rdev;
+  req->head.args.mknod.mode = mode;
+  req->head.args.mknod.rdev = rdev;
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -2289,7 +2289,7 @@ int Client::_readdir_get_frag(DirResult *dirp)
 
   MClientRequest *req = new MClientRequest(MDS_OP_READDIR, messenger->get_myinst());
   req->set_path(dirp->path); 
-  req->args.readdir.frag = fg;
+  req->head.args.readdir.frag = fg;
   
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -2516,8 +2516,8 @@ int Client::_open(const char *path, int flags, mode_t mode, Fh **fhp)
   // go
   MClientRequest *req = new MClientRequest(MDS_OP_OPEN, messenger->get_myinst());
   req->set_path(path); 
-  req->args.open.flags = flags;
-  req->args.open.mode = mode;
+  req->head.args.open.flags = flags;
+  req->head.args.open.mode = mode;
 
   int cmode = req->get_open_file_mode();
 
@@ -3073,7 +3073,7 @@ int Client::_truncate(const char *file, off_t length)
 {
   MClientRequest *req = new MClientRequest(MDS_OP_TRUNCATE, messenger->get_myinst());
   req->set_path(file); 
-  req->args.truncate.length = length;
+  req->head.args.truncate.length = length;
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
@@ -3103,8 +3103,8 @@ int Client::ftruncate(int fd, off_t length)
 int Client::_ftruncate(Fh *fh, off_t length) 
 {
   MClientRequest *req = new MClientRequest(MDS_OP_TRUNCATE, messenger->get_myinst());
-  req->args.truncate.ino = fh->inode->inode.ino;
-  req->args.truncate.length = length;
+  req->head.args.truncate.ino = fh->inode->inode.ino;
+  req->head.args.truncate.length = length;
 
   // FIXME where does FUSE maintain user information
   req->set_caller_uid(getuid());
index db8aaa1cbdb564571491cda994e07da9b3e985bb..e98f44950901fa4157c26541b1f878edaa605521 100644 (file)
@@ -206,14 +206,6 @@ struct ceph_entity_inst {
 /*
  * message header
  */
-struct ceph_message_header {  /* old one, just for now */
-       __u32 seq;    /* message seq# for this session */
-       __u32 type;   /* message type */
-       struct ceph_entity_inst src, dst;
-       __u32 nchunks;
-};
-
-/* new way */
 struct ceph_msg_header {
        __u32 seq;    /* message seq# for this session */
        __u32 type;   /* message type */
@@ -278,50 +270,52 @@ struct ceph_client_request_head {
        __u32 op;
        __u32 caller_uid, caller_gid;
        ceph_ino_t cwd_ino;
-};
 
-union ceph_client_request_args {
-       struct {
-               __u32 mask;
-       } stat;
-       struct {
-               ceph_ino_t ino;
-               __u32 mask;
-       } fstat;
-       struct {
-               ceph_frag_t frag;
-       } readdir;
-       struct {
-               struct ceph_timeval mtime;
-               struct ceph_timeval atime;
-       } utime;
-       struct {
-               __u32 mode;
-       } chmod; 
-       struct {
-               uid_t uid;
-               gid_t gid;
-       } chown;
-       struct {
-               __u32 mode;
-               __u32 rdev;
-       } mknod; 
-       struct {
-               __u32 mode;
-       } mkdir; 
-       struct {
-               __u32 flags;
-               __u32 mode;
-       } open;
-       struct {
-               ceph_ino_t ino;  // optional
-               __s64 length;
-       } truncate;
-       struct {
-               ceph_ino_t ino;
-       } fsync;
+       // fixed size arguments.  in a union.
+       union { 
+               struct {
+                       __u32 mask;
+               } stat;
+               struct {
+                       ceph_ino_t ino;
+                       __u32 mask;
+               } fstat;
+               struct {
+                       ceph_frag_t frag;
+               } readdir;
+               struct {
+                       struct ceph_timeval mtime;
+                       struct ceph_timeval atime;
+               } utime;
+               struct {
+                       __u32 mode;
+               } chmod; 
+               struct {
+                       uid_t uid;
+                       gid_t gid;
+               } chown;
+               struct {
+                       __u32 mode;
+                       __u32 rdev;
+               } mknod; 
+               struct {
+                       __u32 mode;
+               } mkdir; 
+               struct {
+                       __u32 flags;
+                       __u32 mode;
+               } open;
+               struct {
+                       ceph_ino_t ino;  // optional
+                       __s64 length;
+               } truncate;
+               struct {
+                       ceph_ino_t ino;
+               } fsync;
+       } args;
 };
 
+
 /* client reply */
 
 struct ceph_client_reply_head {
@@ -342,7 +336,6 @@ struct ceph_client_reply_inode {
        __u32 rdev;
        __u32 mask;
        char *symlink;
-       struct ceph_frag_tree dirfragdir;
 };
 /* followed by symlink string, then dirfragtree */
 
index 451ae1a804b867472f835068fa18e7bfd860bed6..c1a7fa7866107469d81d6da786f69e826055f47d 100644 (file)
@@ -10,6 +10,8 @@
 int ceph_debug = 10;
 
 
+void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg);
+
 
 /*
  * create a fresh client instance
@@ -17,6 +19,7 @@ int ceph_debug = 10;
 static struct ceph_client *create_client(struct ceph_mount_args *args)
 {
        struct ceph_client *cl;
+       int err;
 
        cl = kzalloc(sizeof(*cl), GFP_KERNEL);
        if (cl == NULL)
@@ -26,12 +29,25 @@ static struct ceph_client *create_client(struct ceph_mount_args *args)
        init_waitqueue_head(&cl->mount_wq);
        spin_lock_init(&cl->sb_lock);
 
+       /* messenger */
+       cl->msgr = ceph_messenger_create();
+       if (IS_ERR(cl->msgr)) {
+               err = PTR_ERR(cl->msgr);
+               goto fail;
+       }
+       cl->msgr->parent = cl;
+       cl->msgr->dispatch = (ceph_messenger_dispatch_t)ceph_dispatch;
+       
        cl->whoami = -1;
        ceph_monc_init(&cl->monc);
        ceph_mdsc_init(&cl->mdsc, cl);
        ceph_osdc_init(&cl->osdc);
 
        return cl;
+
+fail:
+       kfree(cl);
+       return ERR_PTR(err);
 }
 
 /*
@@ -175,7 +191,7 @@ void ceph_put_client(struct ceph_client *cl)
  *
  * should be fast and non-blocking, as it is called with locks held.
  */
-static void dispatch(struct ceph_client *client, struct ceph_msg *msg)
+void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
 {
        dout(5, "dispatch %p type %d\n", (void*)msg, msg->hdr.type);
 
index 2fbec52c1724734d7fadc01ebc44f17f3f2da455..9bf03272e140381a5e0cb81f8edb9d764358b4de 100644 (file)
@@ -28,17 +28,16 @@ struct socket * _kconnect(struct sockaddr *saddr)
        return(sd);
 }
 
-struct socket * _klisten(struct sockaddr *saddr)
+struct socket * _klisten(struct sockaddr_in *in_addr)
 {
        int ret;
        struct socket *sd = NULL;
-       struct sockaddr_in *in_addr = (struct sockaddr_in *)saddr;
 
 
        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sd);
         if (ret < 0) {
                printk(KERN_INFO "sock_create_kern error: %d\n", ret);
-               return(NULL);
+               return ERR_PTR(ret);
        }
 
        /* no user specified address given so create, will allow arg to mount */
@@ -55,7 +54,7 @@ struct socket * _klisten(struct sockaddr *saddr)
        if (ret < 0) {
                printk("Failed to set SO_REUSEADDR: %d\n", ret);
        }  */
-       ret = sd->ops->bind(sd, saddr, sizeof(saddr));
+       ret = sd->ops->bind(sd, (struct sockaddr*)in_addr, sizeof(in_addr));
 /* TBD: probaby want to tune the backlog queue .. */
        ret = sd->ops->listen(sd, NUM_BACKUP);
        if (ret < 0) {
index a6f6d1f0a8b57570116ee8c5731cfd9d39a357e8..279a56f090d42f5387dad2e0a100d5654ab1d578 100644 (file)
@@ -3,7 +3,7 @@
 
 /* prototype definitions */
 struct socket * _kconnect(struct sockaddr *);
-struct socket * _klisten(struct sockaddr *);
+struct socket * _klisten(struct sockaddr_in *);
 struct socket *_kaccept(struct socket *);
 int _krecvmsg(struct socket *, void *, size_t , unsigned);
 int _ksendmsg(struct socket *, struct kvec *, size_t, size_t, unsigned);
index 8ffba6815fda8089726dab8d452febd428cfba84..8c43d1c060727a1bee5f3f94d920f00919da0536 100644 (file)
@@ -730,25 +730,37 @@ done:
  * create a new messenger instance, saddr is address specified from mount arg.
  * If null, will get created by _klisten()
  */
-struct ceph_messenger *ceph_create_messenger(struct sockaddr *saddr)
+struct ceph_messenger *ceph_messenger_create()
 {
         struct ceph_messenger *msgr;
+       struct sockaddr_in saddr;
 
         msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
         if (msgr == NULL) 
-               return NULL;
+               return ERR_PTR(-ENOMEM);
 
        spin_lock_init(&msgr->con_lock);
 
        /* create listening socket */
-       msgr->listen_sock = _klisten(saddr);
-       if (msgr->listen_sock == NULL) {
+       msgr->listen_sock = _klisten(&saddr);
+       if (IS_ERR(msgr->listen_sock)) {
+               int err = PTR_ERR(msgr->listen_sock);
                kfree(msgr);
-               return NULL;
+               return ERR_PTR(err);
        }
+
+       /* determine my ip:port */
+       msgr->inst.addr.ipaddr.sin_family = saddr.sin_family;
+       msgr->inst.addr.ipaddr.sin_port = saddr.sin_port;
+       msgr->inst.addr.ipaddr.sin_addr = saddr.sin_addr;
+
        /* TBD: setup callback for accept */
        INIT_WORK(&msgr->awork, try_accept);       /* setup work structure */
-        return msgr;
+
+       dout(1, "ceph_messenger_create listening on %x:%d\n", 
+            ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), 
+            ntohl(msgr->inst.addr.ipaddr.sin_port));
+       return msgr;
 }
 
 
@@ -773,14 +785,14 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                        return PTR_ERR(con);
                dout(5, "opening new connection to peer %x:%d\n",
                     ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), 
-                    msg->hdr.dst.addr.ipaddr.sin_port);
+                    ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
                con->peer_addr = msg->hdr.dst.addr;
                con->state = CONNECTING;
                add_connection(msgr, con);
        } else {
                dout(5, "had connection to peer %x:%d\n",
-                    msg->hdr.dst.addr.ipaddr.sin_addr.s_addr,
-                    msg->hdr.dst.addr.ipaddr.sin_port);
+                    ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
+                    ntohl(msg->hdr.dst.addr.ipaddr.sin_port));
        }                    
        spin_unlock(&msgr->con_lock);
 
index 4a4b509f0e8ce3cd8e38747d56a1aa0ffe3cbee7..37f218370498e624a8e7e819c4ecdfdfebbf8b26 100644 (file)
@@ -106,6 +106,8 @@ struct ceph_connection {
 };
 
 
+extern struct ceph_messenger *ceph_messenger_create(void);
+
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off);
 static __inline__ void ceph_msg_get(struct ceph_msg *msg) {
        atomic_inc(&msg->nref);
index 5a2429d117f2e7db3e8630fbb09b3de8d4755db1..efb566c7cee297f1c009c80c2b54950084a8f449 100644 (file)
@@ -488,19 +488,19 @@ void Server::handle_client_request(MClientRequest *req)
   // some ops are on ino's
   switch (req->get_op()) {
   case MDS_OP_FSTAT:
-    ref = mdcache->get_inode(req->args.fstat.ino);
+    ref = mdcache->get_inode(req->head.args.fstat.ino);
     assert(ref);
     break;
     
   case MDS_OP_TRUNCATE:
-    if (!req->args.truncate.ino) 
+    if (!req->head.args.truncate.ino) 
       break;   // can be called w/ either fh OR path
-    ref = mdcache->get_inode(req->args.truncate.ino);
+    ref = mdcache->get_inode(req->head.args.truncate.ino);
     assert(ref);
     break;
     
   case MDS_OP_FSYNC:
-    ref = mdcache->get_inode(req->args.fsync.ino);   // fixme someday no ino needed?
+    ref = mdcache->get_inode(req->head.args.fsync.ino);   // fixme someday no ino needed?
     assert(ref);
     break;
   }
@@ -563,7 +563,7 @@ void Server::dispatch_client_request(MDRequest *mdr)
 
     // funky.
   case MDS_OP_OPEN:
-    if (req->args.open.flags & O_CREAT)
+    if (req->head.args.open.flags & O_CREAT)
       handle_client_openc(mdr);
     else 
       handle_client_open(mdr);
@@ -1331,7 +1331,7 @@ void Server::handle_client_stat(MDRequest *mdr)
   set<SimpleLock*> wrlocks = mdr->wrlocks;
   set<SimpleLock*> xlocks = mdr->xlocks;
   
-  int mask = req->args.stat.mask;
+  int mask = req->head.args.stat.mask;
   if (mask & STAT_MASK_LINK) rdlocks.insert(&ref->linklock);
   if (mask & STAT_MASK_AUTH) rdlocks.insert(&ref->authlock);
   if (ref->is_file() && 
@@ -1407,8 +1407,8 @@ void Server::handle_client_utime(MDRequest *mdr)
 
   // project update
   inode_t *pi = cur->project_inode();
-  pi->mtime = req->args.utime.mtime;
-  pi->atime = req->args.utime.atime;
+  pi->mtime = req->head.args.utime.mtime;
+  pi->atime = req->head.args.utime.atime;
   pi->version = cur->pre_dirty();
   pi->ctime = g_clock.real_now();
 
@@ -1448,7 +1448,7 @@ void Server::handle_client_chmod(MDRequest *mdr)
   inode_t *pi = cur->project_inode();
   pi->mode = 
     (pi->mode & ~04777) | 
-    (req->args.chmod.mode & 04777);
+    (req->head.args.chmod.mode & 04777);
   pi->version = cur->pre_dirty();
   pi->ctime = g_clock.real_now();
 
@@ -1486,8 +1486,8 @@ void Server::handle_client_chown(MDRequest *mdr)
 
   // project update
   inode_t *pi = cur->project_inode();
-  pi->uid = MAX(req->args.chown.uid, 0);
-  pi->gid = MAX(req->args.chown.gid, 0);
+  pi->uid = MAX(req->head.args.chown.uid, 0);
+  pi->gid = MAX(req->head.args.chown.gid, 0);
   pi->version = cur->pre_dirty();
   pi->ctime = g_clock.real_now();
   
@@ -1525,7 +1525,7 @@ void Server::handle_client_readdir(MDRequest *mdr)
   }
 
   // which frag?
-  frag_t fg = req->args.readdir.frag;
+  frag_t fg = req->head.args.readdir.frag;
 
   // does the frag exist?
   if (diri->dirfragtree[fg] != fg) {
@@ -1677,8 +1677,8 @@ void Server::handle_client_mknod(MDRequest *mdr)
   assert(newi);
 
   // it's a file.
-  newi->inode.rdev = req->args.mknod.rdev;
-  newi->inode.mode = req->args.mknod.mode;
+  newi->inode.rdev = req->head.args.mknod.rdev;
+  newi->inode.mode = req->head.args.mknod.mode;
   newi->inode.mode &= ~INODE_TYPE_MASK;
   newi->inode.mode |= INODE_MODE_FILE;
   newi->inode.version = dn->pre_dirty() - 1;
@@ -1713,7 +1713,7 @@ void Server::handle_client_mkdir(MDRequest *mdr)
   assert(newi);
 
   // it's a directory.
-  newi->inode.mode = req->args.mkdir.mode;
+  newi->inode.mode = req->head.args.mkdir.mode;
   newi->inode.mode &= ~INODE_TYPE_MASK;
   newi->inode.mode |= INODE_MODE_DIR;
   newi->inode.layout = g_OSD_MDDirLayout;
@@ -3624,7 +3624,7 @@ void Server::handle_client_truncate(MDRequest *mdr)
     return;
   
   // already small enough?
-  if (cur->inode.size <= req->args.truncate.length) {
+  if (cur->inode.size <= req->head.args.truncate.length) {
     reply_request(mdr, 0);
     return;
   }
@@ -3633,19 +3633,19 @@ void Server::handle_client_truncate(MDRequest *mdr)
   version_t pdv = cur->pre_dirty();
   utime_t ctime = g_clock.real_now();
   Context *fin = new C_MDS_truncate_logged(mds, mdr, cur, 
-                                          pdv, req->args.truncate.length, ctime);
+                                          pdv, req->head.args.truncate.length, ctime);
   
   // log + wait
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "truncate");
   le->metablob.add_client_req(mdr->reqid);
   le->metablob.add_dir_context(cur->get_parent_dir());
-  le->metablob.add_inode_truncate(cur->ino(), req->args.truncate.length, cur->inode.size);
+  le->metablob.add_inode_truncate(cur->ino(), req->head.args.truncate.length, cur->inode.size);
   inode_t *pi = le->metablob.add_dentry(cur->parent, true);
   pi->mtime = ctime;
   pi->ctime = ctime;
   pi->version = pdv;
-  pi->size = req->args.truncate.length;
+  pi->size = req->head.args.truncate.length;
   
 
   mdlog->submit_entry(le, fin);
@@ -3659,7 +3659,7 @@ void Server::handle_client_open(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
 
-  int flags = req->args.open.flags;
+  int flags = req->head.args.open.flags;
   int cmode = req->get_open_file_mode();
   bool need_auth = ((cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY) ||
                    (flags & O_TRUNC));
@@ -3913,13 +3913,13 @@ void Server::handle_client_openc(MDRequest *mdr)
 
   dout(7) << "open w/ O_CREAT on " << req->get_filepath() << dendl;
   
-  bool excl = (req->args.open.flags & O_EXCL);
+  bool excl = (req->head.args.open.flags & O_EXCL);
   CDentry *dn = rdlock_path_xlock_dentry(mdr, !excl, false);
   if (!dn) return;
 
   if (!dn->is_null()) {
     // it existed.  
-    if (req->args.open.flags & O_EXCL) {
+    if (req->head.args.open.flags & O_EXCL) {
       dout(10) << "O_EXCL, target exists, failing with -EEXIST" << dendl;
       reply_request(mdr, -EEXIST, dn->get_dir()->get_inode());
       return;
@@ -3938,7 +3938,7 @@ void Server::handle_client_openc(MDRequest *mdr)
   assert(in);
   
   // it's a file.
-  in->inode.mode = req->args.open.mode;
+  in->inode.mode = req->head.args.open.mode;
   in->inode.mode |= INODE_MODE_FILE;
   in->inode.version = dn->pre_dirty() - 1;
   
index d8784c981df2fc1dcc7f91591c9f949d9742be24..5336f8cb51ece0084e60c8ace6e310691a91b479 100644 (file)
 
 
 class MClientRequest : public Message {
-  struct ceph_client_request_head st;
+public:
+  struct ceph_client_request_head head;
 
   // path arguments
   filepath path;
   string sarg;
 
  public:
-  // fixed size arguments.  in a union.
-  union ceph_client_request_args args;
-
   // cons
   MClientRequest() : Message(CEPH_MSG_CLIENT_REQUEST) {}
   MClientRequest(int op, entity_inst_t ci) : Message(CEPH_MSG_CLIENT_REQUEST) {
-    memset(&st, 0, sizeof(st));
-    memset(&args, 0, sizeof(args));
-    this->st.op = op;
-    this->st.client_inst.name = ci.name.v;
-    this->st.client_inst.addr = ci.addr.v;
+    memset(&head, 0, sizeof(head));
+    this->head.op = op;
+    this->head.client_inst.name = ci.name.v;
+    this->head.client_inst.addr = ci.addr.v;
   }
 
   metareqid_t get_reqid() {
     // FIXME: for now, assume clients always have 1 incarnation
-    return metareqid_t(st.client_inst.name.num, st.tid); 
+    return metareqid_t(head.client_inst.name.num, head.tid); 
   }
 
   int get_open_file_mode() {
-    if (args.open.flags & O_LAZY) 
+    if (head.args.open.flags & O_LAZY) 
       return FILE_MODE_LAZY;
-    if (args.open.flags & O_WRONLY) 
+    if (head.args.open.flags & O_WRONLY) 
       return FILE_MODE_W;
-    if (args.open.flags & O_RDWR) 
+    if (head.args.open.flags & O_RDWR) 
       return FILE_MODE_RW;
-    if (args.open.flags & O_APPEND) 
+    if (head.args.open.flags & O_APPEND) 
       return FILE_MODE_W;
     return FILE_MODE_R;
   }
@@ -113,17 +110,17 @@ class MClientRequest : public Message {
     return get_open_file_mode() == FILE_MODE_R;
   }
   bool is_idempotent() {
-    if (st.op == MDS_OP_OPEN) 
+    if (head.op == MDS_OP_OPEN) 
       return open_file_mode_is_readonly();
-    return (st.op < 1000);
+    return (head.op < 1000);
   }
   bool auth_is_best() {
     if (!is_idempotent()) return true;
-    if (st.op == MDS_OP_READDIR) return true;
+    if (head.op == MDS_OP_READDIR) return true;
     return false;    
   }
   bool follow_trailing_symlink() {
-    switch (st.op) {
+    switch (head.op) {
     case MDS_OP_LSTAT:
     case MDS_OP_FSTAT:
     case MDS_OP_LINK:
@@ -155,58 +152,55 @@ class MClientRequest : public Message {
 
 
   // normal fields
-  void set_tid(tid_t t) { st.tid = t; }
-  void set_oldest_client_tid(tid_t t) { st.oldest_client_tid = t; }
-  void inc_num_fwd() { st.num_fwd++; }
-  void set_retry_attempt(int a) { st.retry_attempt = a; }
+  void set_tid(tid_t t) { head.tid = t; }
+  void set_oldest_client_tid(tid_t t) { head.oldest_client_tid = t; }
+  void inc_num_fwd() { head.num_fwd++; }
+  void set_retry_attempt(int a) { head.retry_attempt = a; }
   void set_path(string& p) { path.set_path(p); }
   void set_path(const char *p) { path.set_path(p); }
   void set_path(const filepath& fp) { path = fp; }
-  void set_caller_uid(int u) { st.caller_uid = u; }
-  void set_caller_gid(int g) { st.caller_gid = g; }
+  void set_caller_uid(int u) { head.caller_uid = u; }
+  void set_caller_gid(int g) { head.caller_gid = g; }
   void set_sarg(string& arg) { this->sarg = arg; }
   void set_sarg(const char *arg) { this->sarg = arg; }
   void set_mds_wants_replica_in_dirino(inodeno_t dirino) { 
-    st.mds_wants_replica_in_dirino = dirino; }
+    head.mds_wants_replica_in_dirino = dirino; }
   
   void set_client_inst(const entity_inst_t& i) { 
-    st.client_inst.name = i.name.v; 
-    st.client_inst.addr = i.addr.v; 
+    head.client_inst.name = i.name.v; 
+    head.client_inst.addr = i.addr.v; 
   }
   entity_inst_t get_client_inst() { 
-    return entity_inst_t(st.client_inst);
+    return entity_inst_t(head.client_inst);
   }
 
-  int get_client() { return st.client_inst.name.num; }
-  tid_t get_tid() { return st.tid; }
-  tid_t get_oldest_client_tid() { return st.oldest_client_tid; }
-  int get_num_fwd() { return st.num_fwd; }
-  int get_retry_attempt() { return st.retry_attempt; }
-  int get_op() { return st.op; }
-  int get_caller_uid() { return st.caller_uid; }
-  int get_caller_gid() { return st.caller_gid; }
-  //inodeno_t get_ino() { return st.ino; }
+  int get_client() { return head.client_inst.name.num; }
+  tid_t get_tid() { return head.tid; }
+  tid_t get_oldest_client_tid() { return head.oldest_client_tid; }
+  int get_num_fwd() { return head.num_fwd; }
+  int get_retry_attempt() { return head.retry_attempt; }
+  int get_op() { return head.op; }
+  int get_caller_uid() { return head.caller_uid; }
+  int get_caller_gid() { return head.caller_gid; }
+  //inodeno_t get_ino() { return head.ino; }
   const string& get_path() { return path.get_path(); }
   filepath& get_filepath() { return path; }
   string& get_sarg() { return sarg; }
   inodeno_t get_mds_wants_replica_in_dirino() { 
-    return st.mds_wants_replica_in_dirino; }
+    return head.mds_wants_replica_in_dirino; }
 
-  inodeno_t get_cwd_ino() { return st.cwd_ino ? st.cwd_ino:MDS_INO_ROOT; }
+  inodeno_t get_cwd_ino() { return head.cwd_ino ? head.cwd_ino:MDS_INO_ROOT; }
 
   void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(st), (char*)&st);
-    off += sizeof(st);
-    payload.copy(off, sizeof(args), (char*)&args);
-    off += sizeof(args);
+    payload.copy(off, sizeof(head), (char*)&head);
+    off += sizeof(head);
     path._decode(payload, off);
     ::_decode(sarg, payload, off);
   }
 
   void encode_payload() {
-    payload.append((char*)&st, sizeof(st));
-    payload.append((char*)&args, sizeof(args));
+    payload.append((char*)&head, sizeof(head));
     path._encode(payload);
     ::_encode(sarg, payload);
   }
@@ -267,8 +261,8 @@ class MClientRequest : public Message {
       out << " " << get_path();
     if (get_sarg().length())
       out << " " << get_sarg();
-    if (st.retry_attempt)
-      out << " RETRY=" << st.retry_attempt;
+    if (head.retry_attempt)
+      out << " RETRY=" << head.retry_attempt;
     out << ")";
   }
 
index 95c3e5f325212d558c64f955d4f9e674bc678901..4bfd6444cc06e4995cb514303c5f57d2270029ae 100644 (file)
@@ -62,10 +62,10 @@ class MLock : public Message {
   char      lock_type;  // lock object type
   MDSCacheObjectInfo object_info;  
   
-  bufferlist data;  // and possibly some data
+  bufferlist lockdata;  // and possibly some data
 
  public:
-  bufferlist& get_data() { return data; }
+  bufferlist& get_data() { return lockdata; }
   int get_asker() { return asker; }
   int get_action() { return action; }
   metareqid_t get_reqid() { return reqid; }
@@ -88,7 +88,7 @@ class MLock : public Message {
     Message(MSG_MDS_LOCK),
     action(ac), asker(as), lock_type(lock->get_type()) {
     lock->get_parent()->set_object_info(object_info);
-    data.claim(bl);
+    lockdata.claim(bl);
   }
   virtual char *get_type_name() { return "ILock"; }
   void print(ostream& out) {
@@ -99,8 +99,8 @@ class MLock : public Message {
   }
   
   void set_reqid(metareqid_t ri) { reqid = ri; }
-  void set_data(const bufferlist& data) {
-    this->data = data;
+  void set_data(const bufferlist& lockdata) {
+    this->lockdata = lockdata;
   }
   
   void decode_payload() {
@@ -110,7 +110,7 @@ class MLock : public Message {
     ::_decode(reqid, payload, off);
     ::_decode(lock_type, payload, off);
     object_info._decode(payload, off);
-    ::_decode(data, payload, off);
+    ::_decode(lockdata, payload, off);
   }
   virtual void encode_payload() {
     ::_encode(asker, payload);
@@ -118,7 +118,7 @@ class MLock : public Message {
     ::_encode(reqid, payload);
     ::_encode(lock_type, payload);
     object_info._encode(payload);
-    ::_encode(data, payload);
+    ::_encode(lockdata, payload);
   }
 
 };
index 3ff3fe640229002874fcd4e71dbd5f95f00b5db2..d23ad745acf9034c6269ffd27a5044125b60b820 100644 (file)
@@ -118,7 +118,6 @@ private:
     osd_peer_stat_t peer_stat;
   } st;
 
-  bufferlist data;
   map<string,bufferptr> attrset;
 
 
@@ -178,13 +177,6 @@ public:
   void inc_shed_count() { st.shed_count++; }
   int get_shed_count() { return st.shed_count; }
   
-  void set_data(bufferlist &d) {
-    data.claim(d);
-  }
-  bufferlist& get_data() {
-    return data;
-  }
-  off_t get_data_len() { return data.length(); }
 
 
   MOSDOp(entity_inst_t asker, int inc, long tid,
@@ -221,44 +213,14 @@ public:
   // marshalling
   virtual void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(st), (char*)&st);
-    off += sizeof(st);
+    ::_decode(st, payload, off);
     ::_decode(attrset, payload, off);
-    ::_decode(data, payload, off);
-  }
-
-  static void add_payload_chunk_breaks(int from, int off, int len,
-                                      list<int>& breaks) {
-    if (len > 0 &&
-       len & 4095 == 0 && 
-       off & 4095 == 0) {
-      // page-sized and aligned data?  easy.
-      breaks.push_back(from);
-    } else if (len > 8192) {
-      // there is at least 1 full page in there.  somewhere.
-      int p = 0;
-
-      // leading partial page?
-      if (off & 4095 != 0) 
-       p = 4096 - (off & 4095);  
-
-      // full page(s)
-      breaks.push_back(from + p);
-      p += (len - p) & (~4095);
-
-      // tail bit?
-      if (p != len)
-       breaks.push_back(from + p);
-    }
   }
 
   virtual void encode_payload() {
     ::_encode(st, payload);
     ::_encode(attrset, payload);
-    add_payload_chunk_breaks(payload.length() + 4, 
-                            st.offset, data.length(),
-                            chunk_payload_at);
-    ::_encode(data, payload);
+    env.data_off = st.offset;
   }
 
   virtual char *get_type_name() { return "osd_op"; }
index 7b3a5a4150363a427fda802ac21dd7a60ec015ce..0a39395565395aa4375b78e363c21fd93466cbbd 100644 (file)
@@ -30,7 +30,7 @@
  */
 
 class MOSDOpReply : public Message {
-  struct {
+  struct st_t {
     // req
     osdreqid_t reqid;
 
@@ -55,7 +55,6 @@ class MOSDOpReply : public Message {
     osd_peer_stat_t peer_stat;
   } st;
 
-  bufferlist data;
   map<string,bufferptr> attrset;
 
  public:
@@ -90,14 +89,6 @@ class MOSDOpReply : public Message {
   void set_peer_stat(const osd_peer_stat_t& stat) { st.peer_stat = stat; }
   const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
 
-  // data payload
-  void set_data(bufferlist &d) {
-    data.claim(d);
-  }
-  bufferlist& get_data() {
-    return data;
-  }
-
   // osdmap
   epoch_t get_map_epoch() { return st.map_epoch; }
 
@@ -126,19 +117,14 @@ public:
 
   // marshalling
   virtual void decode_payload() {
-    payload.copy(0, sizeof(st), (char*)&st);
-    payload.splice(0, sizeof(st));
     int off = 0;
+    ::_decode(st, payload, off);
     ::_decode(attrset, payload, off);
-    ::_decode(data, payload, off);
   }
   virtual void encode_payload() {
-    payload.append((char*)&st, sizeof(st));
+    ::_encode(st, payload);
     ::_encode(attrset, payload);
-    MOSDOp::add_payload_chunk_breaks(payload.length() + 4, 
-                                    st.offset, data.length(),
-                                    chunk_payload_at);
-    ::_encode(data, payload);
+    env.data_off = st.offset;
   }
 
   virtual char *get_type_name() { return "osd_op_reply"; }
index 590b3214eb3510ffac50184f0f3616cf4be54a50..7d984c23d2eaba24d97548f220ea08bcbbfb4667 100644 (file)
@@ -217,15 +217,17 @@ int fakemessenger_do_loop_2()
           // encode
           if (m->empty_payload()) 
             m->encode_payload();
-          ceph_message_header env = m->get_envelope();
-          bufferlist bl;
-          bl.claim( m->get_payload() );
+          ceph_msg_header env = m->get_env();
+          bufferlist front;
+          front.claim( m->get_payload() );
+         bufferlist data;
+         data.claim( m->get_data() );
           //bl.c_str();   // condense into 1 buffer
 
           delete m;
           
           // decode
-          m = decode_message(env, bl);
+          m = decode_message(env, front, data);
           assert(m);
         } 
         
index 872b4414cc273159e35cd0ee61e075e74fa92b39..6e5724a2dc6975adad0aefa239f138d20db04791 100644 (file)
@@ -60,7 +60,6 @@ using namespace std;
 #include "messages/MMDSResolve.h"
 #include "messages/MMDSResolveAck.h"
 #include "messages/MMDSCacheRejoin.h"
-//#include "messages/MMDSCacheRejoinAck.h"
 
 #include "messages/MDirUpdate.h"
 #include "messages/MDiscover.h"
@@ -105,7 +104,7 @@ using namespace std;
 
 
 Message *
-decode_message(ceph_message_header& env, bufferlist& payload)
+decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data)
 {
   // make message
   Message *m = 0;
@@ -149,15 +148,6 @@ decode_message(ceph_message_header& env, bufferlist& payload)
     m = new MMonMap;
     break;
 
-       /*
-  case MSG_FAILURE:
-    m = new MFailure();
-    break;
-  case MSG_FAILURE_ACK:
-    m = new MFailureAck();
-    break;
-       */
-
   case MSG_OSD_BOOT:
     m = new MOSDBoot();
     break;
@@ -360,11 +350,10 @@ decode_message(ceph_message_header& env, bufferlist& payload)
     assert(0);
   }
   
-  // env
-  m->set_envelope(env);
+  m->set_env(env);
+  m->set_payload(front);
+  m->set_data(data);
 
-  // decode
-  m->set_payload(payload);
   m->decode_payload();
 
   // done!
index 244c07a57fce08aeff01eb83efab78b70ffe54bb..01f492f0df62664b03c19394c02c70578dfbe227 100644 (file)
@@ -104,63 +104,47 @@ using std::list;
 
 
 class Message {
- private:
-  
- protected:
-  ceph_message_header  env;    // envelope
-  bufferlist      payload;        // payload
-  list<int> chunk_payload_at;
+protected:
+  ceph_msg_header  env;      // envelope
+  bufferlist       payload;  // "front" unaligned blob
+  bufferlist       data;     // data payload (page-alignment will be preserved where possible)
   
   utime_t recv_stamp;
 
   friend class Messenger;
-public:
 
- public:
-  Message() { 
-    env.nchunks = 0;
-  };
+public:
+  Message() { };
   Message(int t) {
-    env.nchunks = 0;
     env.type = t;
   }
-  virtual ~Message() {
-  }
+  virtual ~Message() { }
 
+  ceph_msg_header &get_env() { return env; }
+  void set_env(const ceph_msg_header &e) { env = e; }
 
   void clear_payload() { payload.clear(); }
   bool empty_payload() { return payload.length() == 0; }
-  bufferlist& get_payload() {
-    return payload;
-  }
-  void set_payload(bufferlist& bl) {
-    payload.claim(bl);
-  }
-  void copy_payload(const bufferlist& bl) {
-    payload = bl;
-  }
-  const list<int>& get_chunk_payload_at() const { return chunk_payload_at; }
-  void set_chunk_payload_at(list<int>& o) { chunk_payload_at.swap(o); }
-  ceph_message_header& get_envelope() {
-    return env;
-  }
-  void set_envelope(ceph_message_header& env) {
-    this->env = env;
-  }
+  bufferlist& get_payload() { return payload; }
+  void set_payload(bufferlist& bl) { payload.claim(bl); }
+  void copy_payload(const bufferlist& bl) { payload = bl; }
 
+  void set_data(bufferlist &d) { data.claim(d); }
+  void copy_data(const bufferlist &d) { data = d; }
+  bufferlist& get_data() { return data; }
+  off_t get_data_len() { return data.length(); }
 
   void set_recv_stamp(utime_t t) { recv_stamp = t; }
   utime_t get_recv_stamp() { return recv_stamp; }
 
-  unsigned get_seq() { return env.seq; }
-  void set_seq(unsigned s) { env.seq = s; }
-
   // ENVELOPE ----
 
   // type
   int get_type() { return env.type; }
   void set_type(int t) { env.type = t; }
-  virtual char *get_type_name() = 0;
+
+  unsigned get_seq() { return env.seq; }
+  void set_seq(unsigned s) { env.seq = s; }
 
   // source/dest
   entity_inst_t& get_dest_inst() { return *(entity_inst_t*)&env.dst; }
@@ -178,21 +162,17 @@ public:
   entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; }
   void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; }
 
-  // PAYLOAD ----
-  void reset_payload() {
-    payload.clear();
-  }
-
+  // virtual bits
   virtual void decode_payload() = 0;
   virtual void encode_payload() = 0;
-
+  virtual char *get_type_name() = 0;
   virtual void print(ostream& out) {
     out << get_type_name();
   }
   
 };
 
-extern Message *decode_message(ceph_message_header &env, bufferlist& bl);
+extern Message *decode_message(ceph_msg_header &env, bufferlist& front, bufferlist& data);
 inline ostream& operator<<(ostream& out, Message& m) {
   m.print(out);
   return out;
index 2276671f0376c7d9cb93dd35c1c1ab4a89f92ba0..59c1235658ea595efe28a5964b39d19d3a0f2c2c 100644 (file)
@@ -22,6 +22,8 @@
 #include <sys/socket.h>
 #include <netinet/tcp.h>
 
+#include <asm/page.h>
+
 #include "config.h"
 
 #include "messages/MGenericMessage.h"
@@ -1229,7 +1231,8 @@ void Rank::Pipe::writer()
        sent.push_back(m); // move to sent list
        lock.Unlock();
         dout(20) << "writer sending " << m->get_seq() << " " << m << " " << *m << dendl;
-        if (m->empty_payload()) m->encode_payload();
+        if (m->empty_payload()) 
+         m->encode_payload();
        int rc = write_message(m);
        lock.Lock();
        
@@ -1286,56 +1289,67 @@ Message *Rank::Pipe::read_message()
   // envelope
   //dout(10) << "receiver.read_message from sd " << sd  << dendl;
   
-  ceph_message_header env; 
+  ceph_msg_header env; 
   if (tcp_read( sd, (char*)&env, sizeof(env) ) < 0)
     return 0;
   
   dout(20) << "reader got envelope type=" << env.type 
            << " src " << env.src << " dst " << env.dst
-           << " nchunks=" << env.nchunks
+           << " front=" << env.front_len 
+          << " data=" << env.data_len << " at " << env.data_off
            << dendl;
-
-  // read chunk lens
-  __u32 chunklens[4];
-  if (tcp_read(sd, (char*)chunklens, sizeof(__u32)*env.nchunks) < 0)
-    return 0;
   
-  // read chunks
-  bufferlist blist;
-  int32_t pos = 0;
-  list<int> chunk_at;
-  for (unsigned i=0; i<env.nchunks; i++) {
-    int32_t size = chunklens[i];
+  // read front
+  bufferlist front;
+  bufferptr bp;
+  if (env.front_len) {
+    bp = buffer::create(env.front_len);
+    if (tcp_read( sd, bp.c_str(), env.front_len ) < 0) 
+      return 0;
+    front.push_back(bp);
+    dout(20) << "reader got front " << front.length() << dendl;
+  }
 
-    dout(30) << "decode chunk " << i << "/" << env.nchunks << " size " << size << dendl;
+  // read data
+  bufferlist data;
+  if (env.data_len) {
+    int left = env.data_len;
+    if (env.data_off & PAGE_MASK) {
+      // head
+      int head = MIN(PAGE_SIZE - (env.data_off & PAGE_MASK),
+                    (unsigned)left);
+      bp = buffer::create(head);
+      if (tcp_read( sd, bp.c_str(), head ) < 0) 
+       return 0;
+      data.push_back(bp);
+      left -= head;
+      dout(20) << "reader got data head " << head << dendl;
+    }
 
-    if (pos) chunk_at.push_back(pos);
-    pos += size;
+    // middle
+    int middle = left & ~PAGE_MASK;
+    if (middle > 0) {
+      bp = buffer::create_page_aligned(middle);
+      if (tcp_read( sd, bp.c_str(), middle ) < 0) 
+       return 0;
+      data.push_back(bp);
+      left -= middle;
+      dout(20) << "reader got data page-aligned middle " << middle << dendl;
+    }
 
-    bufferptr bp;
-    if (size % 4096 == 0) {
-      dout(30) << "decoding page-aligned chunk of " << size << dendl;
-      bp = buffer::create_page_aligned(size);
-    } else {
-      bp = buffer::create(size);
+    if (left) {
+      bp = buffer::create(left);
+      if (tcp_read( sd, bp.c_str(), left ) < 0) 
+       return 0;
+      data.push_back(bp);
+      dout(20) << "reader got data tail " << left << dendl;
     }
-    
-    if (tcp_read( sd, bp.c_str(), size ) < 0) 
-      return 0;
-    
-    blist.push_back(bp);
-    
-    dout(30) << "reader got frag "
-            << i << " of " << env.nchunks << " len " << bp.length() << dendl;
   }
   
   // unmarshall message
-  size_t s = blist.length();
-  Message *m = decode_message(env, blist);
+  Message *m = decode_message(env, front, data);
 
-  m->set_chunk_payload_at(chunk_at);
-  
-  dout(20) << "reader got " << s << " byte message from " 
+  dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from " 
            << m->get_source() << dendl;
   
   return m;
@@ -1409,45 +1423,20 @@ int Rank::Pipe::write_ack(unsigned seq)
 int Rank::Pipe::write_message(Message *m)
 {
   // get envelope, buffers
-  ceph_message_header *env = &m->get_envelope();
+  ceph_msg_header *env = &m->get_env();
+  env->front_len = m->get_payload().length();
+  env->data_len = m->get_data().length();
+
   bufferlist blist;
   blist.claim( m->get_payload() );
+  blist.append( m->get_data() );
   
-  // chunk out page aligned buffers? 
-  __u32 chunklens[4];
-  if (blist.length() == 0) 
-    env->nchunks = 0; 
-  else {
-    env->nchunks = 1 + m->get_chunk_payload_at().size();  // header + explicit chunk points
-
-    int pos = 0;
-    int c = 0;
-    for (list<int>::const_iterator pc = m->get_chunk_payload_at().begin();
-        pc != m->get_chunk_payload_at().end();
-        pc++) {
-      chunklens[c] = *pc - pos;
-      dout(20) << "chunk bound at " << *pc << ", chunklen[" << c << "] = " << chunklens[c] << dendl;
-      pos = *pc;
-      c++;
-    }
-    chunklens[c] = blist.length() - pos;
-    dout(20) << "tail chunklen[" << c << "] = " << chunklens[c] << dendl;
-    
-    if (!m->get_chunk_payload_at().empty())
-      dout(20) << "chunk bounds at " << m->get_chunk_payload_at()
-             << " in " << *m << " len " << blist.length()
-             << dendl;
-  }
-
-  dout(20)  << "write_message " << m << " " << *m 
-            << " to " << m->get_dest()
-           << " in " << env->nchunks
-            << dendl;
+  dout(20)  << "write_message " << m << " " << *m << " to " << m->get_dest() << dendl;
   
   // set up msghdr and iovecs
   struct msghdr msg;
   memset(&msg, 0, sizeof(msg));
-  struct iovec msgvec[3 + blist.buffers().size() + env->nchunks*2];  // conservative upper bound
+  struct iovec msgvec[3 + blist.buffers().size()];  // conservative upper bound
   msg.msg_iov = msgvec;
   int msglen = 0;
   
@@ -1464,62 +1453,46 @@ int Rank::Pipe::write_message(Message *m)
   msglen += sizeof(*env);
   msg.msg_iovlen++;
 
-  // send chunk sizes
-  msgvec[msg.msg_iovlen].iov_base = &chunklens;
-  msgvec[msg.msg_iovlen].iov_len = sizeof(__u32) * env->nchunks;
-  msglen += sizeof(__u32) * env->nchunks;
-  msg.msg_iovlen++;
-  
-  // payload
+  // payload (front+data)
   list<bufferptr>::const_iterator pb = blist.buffers().begin();
-  list<int>::const_iterator pc = m->get_chunk_payload_at().begin();
   int b_off = 0;  // carry-over buffer offset, if any
   int bl_pos = 0; // blist pos
-  int nchunks = env->nchunks;
-
-  for (int curchunk=0; curchunk < nchunks; curchunk++) {
-    // start a chunk
-    int left = chunklens[curchunk];
-    assert(left > 0);
-    dout(30) << "chunk " << curchunk << " pos " << bl_pos << "/" << blist.length() << " size " << left << dendl;
-
-    // chunk contents
-    while (left > 0) {
-      int donow = MIN(left, (int)pb->length()-b_off);
-      assert(donow > 0);
-      dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
-             << " leftinchunk " << left
-             << " buffer len " << pb->length()
-             << " writing " << donow 
-             << dendl;
-
-      if (msg.msg_iovlen >= IOV_MAX-1) {
-       if (do_sendmsg(sd, &msg, msglen)) 
-         return -1;    
-
-       // and restart the iov
-       msg.msg_iov = msgvec;
-       msg.msg_iovlen = 0;
-       msglen = 0;
-      }
-
-      msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
-      msgvec[msg.msg_iovlen].iov_len = donow;
-      msglen += donow;
-      msg.msg_iovlen++;
-
-      left -= donow;
-      assert(left >= 0);
-      b_off += donow;
-      bl_pos += donow;
-      if (b_off != (int)pb->length()) 
-       break;
-      pb++;
-      b_off = 0;
+  int left = blist.length();
+
+  while (left > 0) {
+    int donow = MIN(left, (int)pb->length()-b_off);
+    assert(donow > 0);
+    dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
+            << " leftinchunk " << left
+            << " buffer len " << pb->length()
+            << " writing " << donow 
+            << dendl;
+    
+    if (msg.msg_iovlen >= IOV_MAX-1) {
+      if (do_sendmsg(sd, &msg, msglen)) 
+       return -1;      
+      
+      // and restart the iov
+      msg.msg_iov = msgvec;
+      msg.msg_iovlen = 0;
+      msglen = 0;
     }
-    assert(left == 0);
+    
+    msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
+    msgvec[msg.msg_iovlen].iov_len = donow;
+    msglen += donow;
+    msg.msg_iovlen++;
+    
+    left -= donow;
+    assert(left >= 0);
+    b_off += donow;
+    bl_pos += donow;
+    if (b_off != (int)pb->length()) 
+      break;
+    pb++;
+    b_off = 0;
   }
-  assert(pb == blist.buffers().end());
+  assert(left == 0);
   
   // send
   if (do_sendmsg(sd, &msg, msglen))