]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged 1863:1933 from trunk into branches/sage/mds
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 12 Oct 2007 22:25:55 +0000 (22:25 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 12 Oct 2007 22:25:55 +0000 (22:25 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1934 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
branches/sage/mds/client/SyntheticClient.cc
branches/sage/mds/config.cc
branches/sage/mds/ebofs/BufferCache.cc
branches/sage/mds/ebofs/Onode.h
branches/sage/mds/fakefuse.cc
branches/sage/mds/fakesyn.cc
branches/sage/mds/include/ceph_inttypes.h [new file with mode: 0644]
branches/sage/mds/kernel/Makefile [new file with mode: 0644]
branches/sage/mds/kernel/ceph_fs.h [new file with mode: 0644]
branches/sage/mds/kernel/inode.c [new file with mode: 0644]
branches/sage/mds/kernel/mdsmap.h [new file with mode: 0644]
branches/sage/mds/kernel/monmap.h [new file with mode: 0644]
branches/sage/mds/mds/CInode.h
branches/sage/mds/mds/mdstypes.h
branches/sage/mds/msg/FakeMessenger.cc
branches/sage/mds/msg/Message.cc
branches/sage/mds/msg/Message.h
branches/sage/mds/msg/SimpleMessenger.cc
branches/sage/mds/msg/ceph_msg_types.h [new file with mode: 0644]
branches/sage/mds/msg/msg_types.h

index d742d3114bd012361271ad4ec678b62146fa7727..931ea790625bb81c5174c91f0d0b472ef3e8fe0e 100644 (file)
@@ -139,6 +139,8 @@ void parse_syn_options(vector<char*>& args)
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
 
       } else if (strcmp(args[i],"walk") == 0) {
         syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
index 16ba756825a6414f5c91536a6cc202b06d4b0f5b..f9dea43f893a13582372c7bbdaf5f91b153f87b7 100644 (file)
@@ -450,9 +450,9 @@ bool parse_ip_port(const char *s, entity_addr_t& a)
     s++; off++;
 
     if (count <= 3)
-      a.ipq[count] = val;
+      a.v.ipq[count] = val;
     else
-      a.port = val;
+      a.v.port = val;
     
     count++;
     if (count == 4 && *s != ':') break;
index 837c67f132e4342865e2849f54d452b10b5f0b84..b1c98455f827891545b2d2c292dc0abce261f7c6 100644 (file)
@@ -428,7 +428,19 @@ int ObjectCache::map_write(block_t start, block_t len,
                            map<block_t, BufferHead*>& hits,
                            version_t super_epoch)
 {
-  map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
+  map<block_t, BufferHead*>::iterator p;
+
+  // hack speed up common cases
+  if (start == 0) {
+    p = data.begin();
+  } else if (start + len == on->object_blocks && len == 1 && !data.empty()) {
+    // append hack.
+    p = data.end();
+    p--;
+    if (p->first < start) p++;
+  } else {
+    p = data.lower_bound(start);  
+  }
 
   dout(10) << "map_write " << *on << " " << start << "~" << len << dendl;
   // p->first >= start
index 16cd3e17f9bc5ca268b7a1d6ff4dc3bbd6b312f6..1d79d317dd96a0acb426b9530ffeebadfbe3b54d 100644 (file)
@@ -67,7 +67,8 @@ public:
  public:
   Onode(object_t oid) : ref(0), object_id(oid), version(0),
                        readonly(false),
-                       object_size(0), object_blocks(0), oc(0),
+                       object_size(0), object_blocks(0),
+                       oc(0),
                        dirty(false), dangling(false), deleted(false) { 
     onode_loc.length = 0;
   }
@@ -239,7 +240,23 @@ public:
 
     //assert(start+len <= object_blocks);
 
-    map<block_t,Extent>::iterator p = extent_map.lower_bound(start);
+    map<block_t,Extent>::iterator p;
+    
+    // hack hack speed up common cases!
+    if (start == 0) {
+      p = extent_map.begin();
+    } else if (start+len == object_blocks && len == 1 && !extent_map.empty()) {
+      // append hack.
+      p = extent_map.end();
+      p--;
+      if (p->first < start) p++;
+      //while (p->first >= start) p--;
+      //p++;
+    } else {
+      // normal
+      p = extent_map.lower_bound(start);
+    }
+
     if (p != extent_map.begin() &&
         (p == extent_map.end() || p->first > start && p->first)) {
       p--;
index 2a78281b8064ce94f3f7072e0aec4087c004383c..b08d00d11a5d69f034b7b3a9191cbc8939cdf3d8 100644 (file)
@@ -88,9 +88,9 @@ int main(int argc, char **argv) {
 
   MonMap *monmap = new MonMap(g_conf.num_mon);
   entity_addr_t a;
-  a.nonce = getpid();
+  a.v.nonce = getpid();
   for (int i=0; i<g_conf.num_mon; i++) {
-    a.port = i;
+    a.v.port = i;
     monmap->mon_inst[i] = entity_inst_t(entity_name_t::MON(i), a);  // hack ; see FakeMessenger.cc
   }
 
index 2e9f53e53e9dfa44b261bc1a476642010bf29b7c..9d12f138379ae4b67cfe858d851035901b773628 100644 (file)
@@ -86,9 +86,9 @@ int main(int argc, char **argv)
 
   MonMap *monmap = new MonMap(g_conf.num_mon);
   entity_addr_t a;
-  a.nonce = getpid();
+  a.v.nonce = getpid();
   for (int i=0; i<g_conf.num_mon; i++) {
-    a.port = i;
+    a.v.port = i;
     monmap->mon_inst[i] = entity_inst_t(entity_name_t::MON(i), a);  // hack ; see FakeMessenger.cc
   }
   
diff --git a/branches/sage/mds/include/ceph_inttypes.h b/branches/sage/mds/include/ceph_inttypes.h
new file mode 100644 (file)
index 0000000..c31c76a
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef __CEPH_INTTYPES_H
+#define __CEPH_INTTYPES_H
+
+typedef uint32_t __u32;
+typedef uint16_t __u16;
+typedef uint8_t __u8;
+
+#endif
diff --git a/branches/sage/mds/kernel/Makefile b/branches/sage/mds/kernel/Makefile
new file mode 100644 (file)
index 0000000..2ad658b
--- /dev/null
@@ -0,0 +1,7 @@
+#
+# Makefile for CEPH filesystem.
+#
+
+obj-$(CONFIG_CEPH_FS) += ceph.o
+
+ceph-objs := inode.o
diff --git a/branches/sage/mds/kernel/ceph_fs.h b/branches/sage/mds/kernel/ceph_fs.h
new file mode 100644 (file)
index 0000000..d765194
--- /dev/null
@@ -0,0 +1,75 @@
+/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*- 
+ * vim: ts=8 sw=8 smarttab
+ */
+
+#ifndef _FS_CEPH_CEPH_H
+#define _FS_CEPH_CEPH_H
+
+/* #include <linux/ceph_fs.h> */
+
+#include "kmsg.h"
+
+/* do these later
+#include "osdmap.h"
+#include "mdsmap.h"
+#include "monmap.h"
+*/
+struct ceph_monmap;
+struct ceph_osdmap;
+struct ceph_mdsmap;
+
+
+
+/*
+ * state associated with an individual MDS<->client session
+ */
+struct ceph_mds_session {
+       __u64 s_push_seq;  
+       /* wait queue? */
+};
+
+
+/*
+ * CEPH file system in-core superblock info
+ */
+struct ceph_sb_info {
+       __u32  s_whoami;               /* client number */
+       struct ceph_kmsg   *s_kmsg;    /* messenger instance */
+
+       struct ceph_monmap *s_monmap;  /* monitor map */
+       struct ceph_mdsmap *s_mdsmap;  /* mds map */
+       struct ceph_osdmap *s_osdmap;  /* osd map */
+
+       /* mds sessions */
+       struct ceph_mds_session **s_mds_sessions;     /* sparse array; elements NULL if no session */
+       int                      s_max_mds_sessions;  /* size of s_mds_sessions array */
+
+       /* current requests */
+       /* ... */
+       __u64 last_tid;
+};
+
+/*
+ * CEPH file system in-core inode info
+ */
+struct ceph_inode_info {
+       unsigned long val;  /* inode from types.h is uint64_t */
+       struct inode vfs_inode;
+};
+
+static inline struct ceph_inode_info *CEPH_I(struct inode *inode)
+{
+       return list_entry(inode, struct ceph_inode_info, vfs_inode);
+}
+
+
+/* file.c */
+extern const struct inode_operations ceph_file_inops;
+extern const struct file_operations ceph_file_operations;
+extern const struct address_space_operations ceph_aops;
+
+/* dir.c */
+extern const struct inode_operations ceph_dir_inops;
+extern const struct file_operations ceph_dir_operations;
+
+#endif /* _FS_CEPH_CEPH_H */
diff --git a/branches/sage/mds/kernel/inode.c b/branches/sage/mds/kernel/inode.c
new file mode 100644 (file)
index 0000000..99fdaf8
--- /dev/null
@@ -0,0 +1,140 @@
+/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*- 
+ * vim: ts=8 sw=8 smarttab
+ */
+
+#include <linux/module.h>
+#include <linux/fs.h>
+#include <linux/smp_lock.h>
+#include <linux/slab.h>
+#include "ceph_fs.h"
+
+MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
+MODULE_DESCRIPTION("Ceph filesystem for Linux");
+MODULE_LICENSE("GPL");
+
+
+static void ceph_read_inode(struct inode * inode)
+{
+       return;
+}
+
+static int ceph_write_inode(struct inode * inode, int unused)
+{
+       lock_kernel();
+       unlock_kernel();
+       return 0;
+}
+
+static void ceph_delete_inode(struct inode * inode)
+{
+       return;
+}
+
+static void ceph_put_super(struct super_block *s)
+{
+       return;
+}
+
+static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
+{
+       return 0;
+}
+
+static void ceph_write_super(struct super_block *s)
+{
+       lock_kernel();
+       unlock_kernel();
+       return;
+}
+
+static struct kmem_cache *ceph_inode_cachep;
+
+static struct inode *ceph_alloc_inode(struct super_block *sb)
+{
+       struct ceph_inode_info *ci;
+       ci = kmem_cache_alloc(ceph_inode_cachep, GFP_KERNEL);
+       if (!ci)
+               return NULL;
+       return &ci->vfs_inode;
+}
+
+static void ceph_destroy_inode(struct inode *inode)
+{
+       kmem_cache_free(ceph_inode_cachep, CEPH_I(inode));
+}
+
+static void init_once(void *foo, struct kmem_cache *cachep, unsigned long flags)
+{
+       struct ceph_inode_info *ci = (struct ceph_inode_info *) foo;
+
+       if ((flags & (SLAB_CTOR_VERIFY|SLAB_CTOR_CONSTRUCTOR)) ==
+           SLAB_CTOR_CONSTRUCTOR)
+               inode_init_once(&ci->vfs_inode);
+}
+static int init_inodecache(void)
+{
+       ceph_inode_cachep = kmem_cache_create("ceph_inode_cache",
+                                            sizeof(struct ceph_inode_info),
+                                            0, (SLAB_RECLAIM_ACCOUNT|
+                                               SLAB_MEM_SPREAD),
+                                            init_once, NULL);
+       if (ceph_inode_cachep == NULL)
+               return -ENOMEM;
+       return 0;
+}
+
+static void destroy_inodecache(void)
+{
+       kmem_cache_destroy(ceph_inode_cachep);
+}
+
+static const struct super_operations ceph_sops = {
+       .alloc_inode    = ceph_alloc_inode,
+       .destroy_inode  = ceph_destroy_inode,
+       .read_inode     = ceph_read_inode,
+       .write_inode    = ceph_write_inode,
+       .delete_inode   = ceph_delete_inode,
+       .put_super      = ceph_put_super,
+       .write_super    = ceph_write_super,
+       .statfs         = ceph_statfs,
+};
+
+static int ceph_get_sb(struct file_system_type *fs_type,
+       int flags, const char *dev_name, void *data, struct vfsmount *mnt)
+{
+       printk(KERN_INFO "entered ceph_get_sb\n");
+        return 0;
+}
+
+static struct file_system_type ceph_fs_type = {
+       .owner          = THIS_MODULE,
+       .name           = "ceph",
+       .get_sb         = ceph_get_sb,
+       .kill_sb        = kill_block_super,
+/*     .fs_flags       =   */
+};
+
+static int __init init_ceph(void)
+{
+       int ret = 0;
+
+       printk(KERN_INFO "ceph init\n");
+       if (!(ret = init_inodecache())) {
+               if ((ret = register_filesystem(&ceph_fs_type))) {
+                       destroy_inodecache();
+               }
+        }
+       return ret;
+}
+
+static void __exit exit_ceph(void)
+{
+       printk(KERN_INFO "ceph exit\n");
+
+       unregister_filesystem(&ceph_fs_type);
+}
+
+
+module_init(init_ceph);
+module_exit(exit_ceph);
diff --git a/branches/sage/mds/kernel/mdsmap.h b/branches/sage/mds/kernel/mdsmap.h
new file mode 100644 (file)
index 0000000..4b3cb84
--- /dev/null
@@ -0,0 +1,46 @@
+/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*- 
+ * vim: ts=8 sw=8 smarttab
+ */
+
+#ifndef _FS_CEPH_MDSMAP_H
+#define _FS_CEPH_MDSMAP_H
+
+/* see mds/MDSMap.h */
+#define CEPH_MDS_STATE_DNE         0  /* down, never existed. */
+#define CEPH_MDS_STATE_STOPPED    -1  /* down, once existed, but no subtrees. empty log. */
+#define CEPH_MDS_STATE_FAILED      2  /* down, active subtrees needs to be recovered. */
+
+#define CEPH_MDS_STATE_BOOT       -3  /* up, boot announcement.  destiny unknown. */
+#define CEPH_MDS_STATE_STANDBY    -4  /* up, idle.  waiting for assignment by monitor. */
+#define CEPH_MDS_STATE_CREATING   -5  /* up, creating MDS instance (new journal, idalloc..). */
+#define CEPH_MDS_STATE_STARTING   -6  /* up, starting prior stopped MDS instance. */
+
+#define CEPH_MDS_STATE_REPLAY      7  /* up, starting prior failed instance. scanning journal. */
+#define CEPH_MDS_STATE_RESOLVE     8  /* up, disambiguating distributed operations (import, rename, etc.) */
+#define CEPH_MDS_STATE_RECONNECT   9  /* up, reconnect to clients */
+#define CEPH_MDS_STATE_REJOIN      10 /* up, replayed journal, rejoining distributed cache */
+#define CEPH_MDS_STATE_ACTIVE      11 /* up, active */
+#define CEPH_MDS_STATE_STOPPING    12 /* up, exporting metadata (-> standby or out) */
+
+/*
+ * mds map
+ *
+ * fields limited to those the client cares about
+ */
+struct ceph_mdsmap {
+  __u64 m_epoch;
+  __u64 m_same_in_set_since;
+  struct timeval m_created;
+  __u32 m_anchortable;
+  __u32 m_root;
+  struct ceph_entity_addr *m_addr;  /* array of addresses */
+  __u8 *m_state;                    /* array of states */
+  __u32 m_max_mds;  /* size of m_addr, m_state arrays */
+};
+
+extern int ceph_mdsmap_get_random_mds(ceph_mdsmap *m);
+extern int ceph_mdsmap_get_state(ceph_mdsmap *m, int w);
+extern struct ceph_entity_addr *ceph_mdsmap_get_addr(ceph_mdsmap *m, int w);
+extern int ceph_mdsmap_decode(ceph_mdsmap *m, iovec *v);
+
+#endif
diff --git a/branches/sage/mds/kernel/monmap.h b/branches/sage/mds/kernel/monmap.h
new file mode 100644 (file)
index 0000000..9f7e535
--- /dev/null
@@ -0,0 +1,21 @@
+/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*- 
+ * vim: ts=8 sw=8 smarttab
+ */
+
+#ifndef _FS_CEPH_MONMAP_H
+#define _FS_CEPH_MONMAP_H
+
+/*
+ * monitor map
+ */
+struct ceph_monmap {
+  __u64 m_epoch;
+  __u32 m_num_mon;
+  __u32 m_last_mon;
+  struct ceph_entity_inst m_mon_inst;
+};
+
+extern int ceph_monmap_pick_mon(ceph_monmap *m);
+extern int ceph_monmap_decode(ceph_monmap *m, iovec *v);
+
+#endif
index 990f20ffb278aa05f1a6c85d3ae4f23ebdbf7c63..8f453472a047771a4c7e32ce5889fc3ee28cd20d 100644 (file)
@@ -480,12 +480,16 @@ public:
   // -- reference counting --
   void bad_put(int by) {
     generic_dout(7) << " bad put " << *this << " by " << by << " " << pin_name(by) << " was " << ref << " (" << ref_set << ")" << dendl;
+#ifdef MDS_REF_SET
     assert(ref_set.count(by) == 1);
+#endif
     assert(ref > 0);
   }
   void bad_get(int by) {
     generic_dout(7) << " bad get " << *this << " by " << by << " " << pin_name(by) << " was " << ref << " (" << ref_set << ")" << dendl;
+#ifdef MDS_REF_SET
     assert(ref_set.count(by) == 0);
+#endif
   }
   void first_get();
   void last_put();
index 4bf90a048d338e01d611de9082cce4a372cc3774..a2f779757255e4264ea49e49046f8ca7cc2134b1 100644 (file)
@@ -19,6 +19,7 @@ using namespace std;
 #include "include/frag.h"
 #include "include/xlist.h"
 
+#define MDS_REF_SET    // define me for improved debug output, sanity checking
 
 #define MDS_PORT_MAIN     0
 #define MDS_PORT_SERVER   1
@@ -36,12 +37,12 @@ using namespace std;
 #define MDS_INO_ROOT              1
 #define MDS_INO_PGTABLE           2
 #define MDS_INO_ANCHORTABLE       3
-#define MDS_INO_PG                4       // this should match osd/osd_types.h PG_INO
-#define MDS_INO_LOG_OFFSET        0x100
-#define MDS_INO_IDS_OFFSET        0x200
-#define MDS_INO_CLIENTMAP_OFFSET  0x300
-#define MDS_INO_STRAY_OFFSET      0x400
-#define MDS_INO_BASE              0x1000
+#define MDS_INO_PG                4       // *** WARNING: this should match osd/osd_types.h PG_INO ***
+#define MDS_INO_LOG_OFFSET        (1*MAX_MDS)
+#define MDS_INO_IDS_OFFSET        (2*MAX_MDS)
+#define MDS_INO_CLIENTMAP_OFFSET  (3*MAX_MDS)
+#define MDS_INO_STRAY_OFFSET      (4*MAX_MDS)
+#define MDS_INO_BASE              (5*MAX_MDS)
 
 #define MDS_INO_STRAY(x) (MDS_INO_STRAY_OFFSET+((unsigned)x))
 #define MDS_INO_IS_STRAY(i) ((i) >= MDS_INO_STRAY_OFFSET && (i) < MDS_INO_STRAY_OFFSET+MAX_MDS)
@@ -461,26 +462,36 @@ class MDSCacheObject {
   // pins
 protected:
   int      ref;       // reference count
+#ifdef MDS_REF_SET
   multiset<int> ref_set;
+#endif
 
  public:
   int get_num_ref() { return ref; }
-  bool is_pinned_by(int by) { return ref_set.count(by); }
-  multiset<int>& get_ref_set() { return ref_set; }
   virtual const char *pin_name(int by) = 0;
+  //bool is_pinned_by(int by) { return ref_set.count(by); }
+  //multiset<int>& get_ref_set() { return ref_set; }
 
   virtual void last_put() {}
   virtual void bad_put(int by) {
+#ifdef MDS_REF_SET
     assert(ref_set.count(by) > 0);
+#endif
     assert(ref > 0);
   }
   void put(int by) {
+#ifdef MDS_REF_SET
     if (ref == 0 || ref_set.count(by) == 0) {
+#else
+    if (ref == 0) {
+#endif
       bad_put(by);
     } else {
       ref--;
+#ifdef MDS_REF_SET
       ref_set.erase(ref_set.find(by));
       assert(ref == (int)ref_set.size());
+#endif
       if (ref == 0)
        last_put();
     }
@@ -488,22 +499,29 @@ protected:
 
   virtual void first_get() {}
   virtual void bad_get(int by) {
+#ifdef MDS_REF_SET
     assert(by < 0 || ref_set.count(by) == 0);
+#endif
     assert(0);
   }
   void get(int by) {
+#ifdef MDS_REF_SET
     if (by >= 0 && ref_set.count(by)) {
       bad_get(by);
     } else {
+#endif
       if (ref == 0) 
        first_get();
       ref++;
+#ifdef MDS_REF_SET
       ref_set.insert(by);
       assert(ref == (int)ref_set.size());
     }
+#endif
   }
 
   void print_pin_set(ostream& out) {
+#ifdef MDS_REF_SET
     multiset<int>::iterator it = ref_set.begin();
     while (it != ref_set.end()) {
       out << " " << pin_name(*it);
@@ -516,6 +534,7 @@ protected:
       if (c > 1)
        out << "*" << c;
     }
+#endif
   }
 
 
index a7b2d60b015115e78377f8d9aa9020de430d9dac..ee80df3dc0626b1aaeaff06b960623ce14e1287a 100644 (file)
@@ -218,7 +218,7 @@ int fakemessenger_do_loop_2()
           // encode
           if (m->empty_payload()) 
             m->encode_payload();
-          msg_envelope_t env = m->get_envelope();
+          ceph_message_header env = m->get_envelope();
           bufferlist bl;
           bl.claim( m->get_payload() );
           //bl.c_str();   // condense into 1 buffer
@@ -273,9 +273,9 @@ FakeMessenger::FakeMessenger(entity_name_t me)  : Messenger(me)
   {
     // assign rank
     _myinst.name = me;
-    _myinst.addr.port = nranks++;
+    _myinst.addr.v.port = nranks++;
     //if (!me.is_mon())
-    _myinst.addr.nonce = getpid();
+    _myinst.addr.v.nonce = getpid();
 
     // add to directory
     directory[ _myinst.addr ] = this;
index 07867cf2b7d91519d2e4461d64f02b1b990ff89a..e3c7ce827ac614740652b387ea6765c9d1b190e0 100644 (file)
@@ -105,7 +105,7 @@ using namespace std;
 
 
 Message *
-decode_message(msg_envelope_t& env, bufferlist& payload)
+decode_message(ceph_message_header& env, bufferlist& payload)
 {
   // make message
   Message *m = 0;
index cb432136f0ebb2232d98aa40e26c86cc310a73b6..a0de9a24ddab7be7381ca39c41c6e1f673ba6c9b 100644 (file)
@@ -159,22 +159,11 @@ using std::list;
 // abstract Message class
 
 
-
-typedef struct {
-  int32_t type;
-  entity_inst_t src, dst;
-  int32_t source_port, dest_port;
-  int32_t nchunks;
-} msg_envelope_t;
-
-#define MSG_ENVELOPE_LEN  sizeof(msg_envelope_t)
-
-
 class Message {
  private:
   
  protected:
-  msg_envelope_t  env;    // envelope
+  ceph_message_header  env;    // envelope
   bufferlist      payload;        // payload
   list<int> chunk_payload_at;
   
@@ -209,10 +198,11 @@ public:
     payload = bl;
   }
   const list<int>& get_chunk_payload_at() const { return chunk_payload_at; }
-  msg_envelope_t& get_envelope() {
+  void set_chunk_payload_at(list<int>& o) { chunk_payload_at.swap(o); }
+  ceph_message_header& get_envelope() {
     return env;
   }
-  void set_envelope(msg_envelope_t& env) {
+  void set_envelope(ceph_message_header& env) {
     this->env = env;
   }
 
@@ -228,23 +218,23 @@ public:
   virtual char *get_type_name() = 0;
 
   // source/dest
-  entity_inst_t& get_dest_inst() { return env.dst; }
-  void set_dest_inst(entity_inst_t& inst) { env.dst = inst; }
+  entity_inst_t& get_dest_inst() { return *(entity_inst_t*)&env.dst; }
+  void set_dest_inst(entity_inst_t& inst) { env.dst = *(ceph_entity_inst*)&inst; }
 
-  entity_inst_t& get_source_inst() { return env.src; }
-  void set_source_inst(entity_inst_t& inst) { env.src = inst; }
+  entity_inst_t& get_source_inst() { return *(entity_inst_t*)&env.src; }
+  void set_source_inst(entity_inst_t& inst) { env.src = *(ceph_entity_inst*)&inst; }
 
-  entity_name_t& get_dest() { return env.dst.name; }
-  void set_dest(entity_name_t a, int p) { env.dst.name = a; env.dest_port = p; }
+  entity_name_t& get_dest() { return *(entity_name_t*)&env.dst.name; }
+  void set_dest(entity_name_t a, int p) { env.dst.name = *(ceph_entity_name*)&a; env.dest_port = p; }
   int get_dest_port() { return env.dest_port; }
   void set_dest_port(int p) { env.dest_port = p; }
   
-  entity_name_t& get_source() { return env.src.name; }
-  void set_source(entity_name_t a, int p) { env.src.name = a; env.source_port = p; }
+  entity_name_t& get_source() { return *(entity_name_t*)&env.src.name; }
+  void set_source(entity_name_t a, int p) { env.src.name = *(ceph_entity_name*)&a; env.source_port = p; }
   int get_source_port() { return env.source_port; }
 
-  entity_addr_t& get_source_addr() { return env.src.addr; }
-  void set_source_addr(const entity_addr_t &i) { env.src.addr = i; }
+  entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; }
+  void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; }
 
   // PAYLOAD ----
   void reset_payload() {
@@ -260,7 +250,7 @@ public:
   
 };
 
-extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
+extern Message *decode_message(ceph_message_header &env, bufferlist& bl);
 inline ostream& operator<<(ostream& out, Message& m) {
   m.print(out);
   return out;
index f9ecd890f90bad67ecc327c088ec2936a6fc003b..7e29f033d83b58efef3a690072e93168d1a68b51 100644 (file)
@@ -107,7 +107,7 @@ int Rank::Accepter::start()
        dout(15) << ".ceph_hosts: host '" << host << "' -> '" << addr << "'" << dendl;
        if (host == hostname) {
          parse_ip_port(addr.c_str(), g_my_addr);
-         dout(0) << ".ceph_hosts: my addr is " << g_my_addr << dendl;
+         dout(1) << ".ceph_hosts: my addr is " << g_my_addr << dendl;
          break;
        }
       }
@@ -153,13 +153,13 @@ int Rank::Accepter::start()
           myhostname->h_addr_list[0], 
           myhostname->h_length);
     rank.my_addr.set_addr(listen_addr);
-    rank.my_addr.port = 0;  // see below
+    rank.my_addr.v.port = 0;  // see below
   }
-  if (rank.my_addr.port == 0) {
+  if (rank.my_addr.v.port == 0) {
     entity_addr_t tmp;
     tmp.set_addr(listen_addr);
-    rank.my_addr.port = tmp.port;
-    rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+    rank.my_addr.v.port = tmp.v.port;
+    rank.my_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here.
   }
 
   dout(1) << "accepter.start my_addr is " << rank.my_addr << dendl;
@@ -605,7 +605,7 @@ Message *Rank::Pipe::read_message()
   // envelope
   //dout(10) << "receiver.read_message from sd " << sd  << dendl;
   
-  msg_envelope_t env; 
+  ceph_message_header env; 
   if (!tcp_read( sd, (char*)&env, sizeof(env) )) {
     need_to_send_close = false;
     return 0;
@@ -618,7 +618,9 @@ Message *Rank::Pipe::read_message()
   
   // payload
   bufferlist blist;
-  for (int i=0; i<env.nchunks; i++) {
+  int32_t pos = 0;
+  list<int> chunk_at;
+  for (unsigned i=0; i<env.nchunks; i++) {
     int32_t size;
     if (!tcp_read( sd, (char*)&size, sizeof(size) )) {
       need_to_send_close = false;
@@ -627,6 +629,9 @@ Message *Rank::Pipe::read_message()
 
     dout(30) << "decode chunk " << i << "/" << env.nchunks << " size " << size << dendl;
 
+    if (pos) chunk_at.push_back(pos);
+    pos += size;
+
     bufferptr bp;
     if (size % 4096 == 0) {
       dout(30) << "decoding page-aligned chunk of " << size << dendl;
@@ -649,6 +654,8 @@ Message *Rank::Pipe::read_message()
   // unmarshall message
   size_t s = blist.length();
   Message *m = decode_message(env, blist);
+
+  m->set_chunk_payload_at(chunk_at);
   
   dout(20) << "pipe(" << peer_addr << ' ' << this << ").reader got " << s << " byte message from " 
            << m->get_source() << dendl;
@@ -708,7 +715,7 @@ int Rank::Pipe::do_sendmsg(Message *m, struct msghdr *msg, int len)
 int Rank::Pipe::write_message(Message *m)
 {
   // get envelope, buffers
-  msg_envelope_t *env = &m->get_envelope();
+  ceph_message_header *env = &m->get_envelope();
   bufferlist blist;
   blist.claim( m->get_payload() );
   
diff --git a/branches/sage/mds/msg/ceph_msg_types.h b/branches/sage/mds/msg/ceph_msg_types.h
new file mode 100644 (file)
index 0000000..559c972
--- /dev/null
@@ -0,0 +1,49 @@
+/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*- 
+ * vim: ts=8 sw=8 smarttab
+ */
+#ifndef __CEPH_MSG_TYPES_H
+#define __CEPH_MSG_TYPES_H
+
+/*
+ * entity_name
+ */
+struct ceph_entity_name {
+       __u32 type;
+       __u32 num;
+};
+
+#define CEPH_ENTITY_TYPE_MON    1
+#define CEPH_ENTITY_TYPE_MDS    2
+#define CEPH_ENTITY_TYPE_OSD    3
+#define CEPH_ENTITY_TYPE_CLIENT 4
+#define CEPH_ENTITY_TYPE_ADMIN  5
+
+
+/*
+ * entity_addr
+ * ipv4 only for now
+ */
+struct ceph_entity_addr {
+       __u8  ipq[4];
+       __u32 port;
+       __u32 nonce;
+};
+
+
+struct ceph_entity_inst {
+       struct ceph_entity_name name;
+       struct ceph_entity_addr addr;
+};
+
+
+/*
+ * message header
+ */
+struct ceph_message_header {
+       __u32 type;
+       struct ceph_entity_inst src, dst;
+       __u32 source_port, dest_port;
+       __u32 nchunks;
+};
+
+#endif
index 648d6a4f5ee92744b922da7b930a78edbce744d1..652525729cdfcedfd9901d789ab3571a907c9526 100644 (file)
 #ifndef __MSG_TYPES_H
 #define __MSG_TYPES_H
 
+// raw C structs
+#include "include/ceph_inttypes.h"
+#include "ceph_msg_types.h"
+
 #include "include/types.h"
 #include "include/blobhash.h"
 #include "tcp.h"
 
-// new typed msg_addr_t way!
 class entity_name_t {
-  int32_t _type;
-  int32_t _num;
+  struct ceph_entity_name v;
 
 public:
-  static const int TYPE_MON = 1;
-  static const int TYPE_MDS = 2;
-  static const int TYPE_OSD = 3;
-  static const int TYPE_CLIENT = 4;
-  static const int TYPE_ADMIN = 5;
+  static const int TYPE_MON = CEPH_ENTITY_TYPE_MON;
+  static const int TYPE_MDS = CEPH_ENTITY_TYPE_MDS;
+  static const int TYPE_OSD = CEPH_ENTITY_TYPE_OSD;
+  static const int TYPE_CLIENT = CEPH_ENTITY_TYPE_CLIENT;
+  static const int TYPE_ADMIN = CEPH_ENTITY_TYPE_ADMIN;
 
   static const int NEW = -1;
 
   // cons
-  entity_name_t() : _type(0), _num(0) {}
-  entity_name_t(int t, int n=NEW) : _type(t), _num(n) {}
+  entity_name_t() { v.type = v.num = 0; }
+  entity_name_t(int t, int n=NEW) { v.type = t; v.num = n; }
 
   // static cons
   static entity_name_t MON(int i=NEW) { return entity_name_t(TYPE_MON, i); }
@@ -44,8 +46,8 @@ public:
   static entity_name_t CLIENT(int i=NEW) { return entity_name_t(TYPE_CLIENT, i); }
   static entity_name_t ADMIN(int i=NEW) { return entity_name_t(TYPE_ADMIN, i); }
   
-  int num() const { return _num; }
-  int type() const { return _type; }
+  int num() const { return v.num; }
+  int type() const { return v.type; }
   const char *type_str() const {
     switch (type()) {
     case TYPE_MDS: return "mds"; 
@@ -80,6 +82,9 @@ inline std::ostream& operator<<(std::ostream& out, const entity_name_t& addr) {
   else
     return out << addr.type_str() << addr.num();
 }
+inline std::ostream& operator<<(std::ostream& out, const ceph_entity_name& addr) {
+  return out << *(const entity_name_t*)&addr;
+}
 
 namespace __gnu_cxx {
   template<> struct hash< entity_name_t >
@@ -105,35 +110,34 @@ namespace __gnu_cxx {
  * ipv4 for now.
  */
 struct entity_addr_t {
-  uint8_t  ipq[4];
-  uint32_t port;
-  uint32_t nonce;  // bind time, or pid, or something unique!
+  struct ceph_entity_addr v;
   uint32_t _pad;
 
-  entity_addr_t() : port(0), nonce(0), _pad(0) {
-    ipq[0] = ipq[1] = ipq[2] = ipq[3] = 0;
+  entity_addr_t() : _pad(0) { 
+    v.port = v.nonce = 0; 
+    v.ipq[0] = v.ipq[1] = v.ipq[2] = v.ipq[3] = 0;
   }
 
   void set_addr(tcpaddr_t a) {
-    memcpy((char*)ipq, (char*)&a.sin_addr.s_addr, 4);
-    port = ntohs(a.sin_port);
+    memcpy((char*)v.ipq, (char*)&a.sin_addr.s_addr, 4);
+    v.port = ntohs(a.sin_port);
   }
   void make_addr(tcpaddr_t& a) const {
     memset(&a, 0, sizeof(a));
     a.sin_family = AF_INET;
-    memcpy((char*)&a.sin_addr.s_addr, (char*)ipq, 4);
-    a.sin_port = htons(port);
+    memcpy((char*)&a.sin_addr.s_addr, (char*)v.ipq, 4);
+    a.sin_port = htons(v.port);
   }
 };
 
 inline ostream& operator<<(ostream& out, const entity_addr_t &addr)
 {
-  return out << (int)addr.ipq[0]
-            << '.' << (int)addr.ipq[1]
-            << '.' << (int)addr.ipq[2]
-            << '.' << (int)addr.ipq[3]
-            << ':' << addr.port
-            << '.' << addr.nonce;
+  return out << (int)addr.v.ipq[0]
+            << '.' << (int)addr.v.ipq[1]
+            << '.' << (int)addr.v.ipq[2]
+            << '.' << (int)addr.v.ipq[3]
+            << ':' << addr.v.port
+            << '.' << addr.v.nonce;
 }
 
 inline bool operator==(const entity_addr_t& a, const entity_addr_t& b) { return memcmp(&a, &b, sizeof(a)) == 0; }
@@ -188,6 +192,11 @@ inline ostream& operator<<(ostream& out, const entity_inst_t &i)
 {
   return out << i.name << " " << i.addr;
 }
+inline ostream& operator<<(ostream& out, const ceph_entity_inst &i)
+{
+  return out << *(const entity_inst_t*)&i;
+}
+
 
 
 #endif