]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
some mon.pg bits
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 7 Aug 2007 04:38:07 +0000 (04:38 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 7 Aug 2007 04:38:07 +0000 (04:38 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1597 29311d96-e01e-0410-9327-a35deaab8ce9

13 files changed:
branches/sage/mds/TODO
branches/sage/mds/config.cc
branches/sage/mds/mds/Server.cc
branches/sage/mds/messages/MPGStats.h [new file with mode: 0644]
branches/sage/mds/messages/MStatfs.h [new file with mode: 0644]
branches/sage/mds/mon/ClientMonitor.cc
branches/sage/mds/mon/PGMap.h
branches/sage/mds/mon/PGMonitor.cc
branches/sage/mds/mon/PGMonitor.h
branches/sage/mds/msg/Message.cc
branches/sage/mds/msg/Message.h
branches/sage/mds/osd/OSD.cc
branches/sage/mds/osd/osd_types.h

index fdbb2703ff1606241b4486cec01e75b3e95adfa5..ee687ddb7cc96d80a54166c7f389d4cf51ac6c69 100644 (file)
@@ -26,6 +26,7 @@ some smallish projects:
 
 
 
+
 code cleanup
 - endian portability
 - word size
@@ -56,30 +57,6 @@ sage mds
 
 - the split/merge plan:
 
-/  - fragset_t to describe bounds; we need to tolerate concurrent merge/splits
-
-/  - fragtree_t
-/    - get_leaves(fg, ls) needs to be smarter
-/    - force_to_leaf()
-/    - simplified/normalized form.
-
-/  - CDir is never request pinned
-/    - add a CInode sticky_dir flag to somehow pin all cdirs on the fly.  
-/    - STICKY dir state and pin?  make sure it's kept across import/export/fragment
-/  - pull _bound maps out of Migrator; they are redundant (trust the subtree map!)
-
-/  - handle_resolve needs to infer splits/merges
-  - rejoin, too!
-
-/  - auth journals and applies update in the request update pipeline
-
-/  - dirfragtree is lazily consistent.  no lock.  bcast by primary when it updates.  
-/  - bcast to dir replicas
-
-/  - inode auth will journal inode update separately/lazily
-/    - via subtree_merge_at
-
-
 - hmm, should we move ESubtreeMap out of the journal?  
   that would avoid all the icky weirdness in shutdown, with periodic logging, etc.
 
@@ -92,20 +69,13 @@ sage mds
   - need to export stray crap to another mds..
 - verify stray is empty on shutdown
 
-- dirfrag split/merge
-  - client readdir for dirfrags
 - consistency points/snapshots
   - dentry versions vs dirfrags...
-- statfs?
 
 - more testing of failures + thrashing.
   - is export prep dir open deadlock properly fixed by forge_replica_dir()?
 - failures during recovery stages (resolve, rejoin)... make sure rejoin still works!
 
-- dirfrag split
-  - make sure we are freezing _before_ we fetch to complete the dirfrag, else 
-    we break commit()'s preconditions when it fetches an incomplete dir.
-
 - detect and deal with client failure
   - failure during reconnect vs clientmap.  although probalby the whole thing needs a larger overhaul...
 
@@ -154,18 +124,6 @@ crush
 - crush tools
 
 
-rados+ebofs
-- purge replicated writes from cache.  (with exception of partial tail blocks.)
-
-rados paper todo?
-- better experiments
-  - berkeleydb objectstore?
-- flush log only in response to subsequent read or write?
-- better behaving recovery
-- justify use of splay.
-  - dynamic replication
-- snapshots
-
 rados snapshots
 - integrate revisions into ObjectCacher
 - clean up oid.rev vs op.rev in osd+osdc
@@ -237,14 +195,36 @@ ebofs
 - fix bug in node rotation on insert (and reenable)
 - fix NEAR_LAST_FWD (?)
 
-- rewrite btree code
+- awareness of underlying software/hardware raid in allocator so that we
+  write full stripes _only_.
+  - hmm, that's basically just a large block size.
+
+- rewrite the btree code!
   - multithreaded
   - eliminate nodepools
   - allow btree sets
   - allow arbitrary embedded data?
   - allow arbitrary btrees
   - allow root node(s?) to be embedded in onode, or whereever.
+  - keys and values can be uniform (fixed-size) or non-uniform.  
+    - fixed size (if any) is a value in the btree struct.  
+      - negative indicates bytes of length value?  (1 -> 255bytes, 2 -> 65535 bytes, etc.?)
+    - non-uniform records preceeded by length.  
+    - keys sorted via a comparator defined in btree root.  
+      - lexicographically, by default.
+
+- goal
+  - object btree key->value payload, not just a data blob payload.
+  - better threading behavior.
+    - with transactional goodness!
+
+- onode
+  - object attributes.. as a btree?
+  - blob stream
+  - map stream.
+    - allow blob values.
 
+  - 
 
 
 
@@ -259,12 +239,6 @@ crush
 
 mds
 - distributed client management
-- anchormgr
-  - 2pc
-  - independent journal?
-  - distributed?
-- link count management
-  - also 2pc
 - chdir (directory opens!)
 - rewrite logstream
   - clean up
index 95f85e6e40ecd238300166332181fca426af56b4..8dbc6928837d373ec96128d5ffe44b1a4b36af27 100644 (file)
@@ -162,7 +162,7 @@ md_config_t g_conf = {
   fuse_ll: false,
   
   // --- objecter ---
-  objecter_buffer_uncommitted: true,
+  objecter_buffer_uncommitted: true,  // this must be true for proper failure handling
 
   // --- journaler ---
   journaler_allow_split_entries: true,
@@ -245,7 +245,7 @@ md_config_t g_conf = {
   osd_mkfs: false,
   osd_age: .8,
   osd_age_time: 0,
-  osd_heartbeat_interval: 5,   // shut up while i'm debugging
+  osd_heartbeat_interval: 15,   // shut up while i'm debugging
   osd_replay_window: 5,
   osd_max_pull: 2,
   osd_pad_pg_log: false,
index 7d23668ac703866461c51aa64415568743bd7f4b..0e2fc79d33822be7b38be01f08b11f4ebe008099 100644 (file)
@@ -508,8 +508,7 @@ void Server::dispatch_client_request(MDRequest *mdr)
 
     // funky.
   case MDS_OP_OPEN:
-    if ((req->args.open.flags & O_CREAT) &&
-       !mdr->ref) 
+    if (req->args.open.flags & O_CREAT)
       handle_client_openc(mdr);
     else 
       handle_client_open(mdr);
diff --git a/branches/sage/mds/messages/MPGStats.h b/branches/sage/mds/messages/MPGStats.h
new file mode 100644 (file)
index 0000000..838ab54
--- /dev/null
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MPGSTATS_H
+#define __MPGSTATS_H
+
+#include "osd/osd_types.h"
+
+class MPGStats : public Message {
+public:
+  map<pg_t,pg_stat_t> pg_stat;
+  
+  MPGStats() : Message(MSG_PGSTATS) {}
+
+  char *get_type_name() { return "pg_stats"; }
+  void print(ostream& out) {
+    out << "pg_stats" << endl;
+  }
+
+  void encode_payload() {
+    ::_encode(pg_stat, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(pg_stat, payload, off);
+  }
+};
+
+#endif
diff --git a/branches/sage/mds/messages/MStatfs.h b/branches/sage/mds/messages/MStatfs.h
new file mode 100644 (file)
index 0000000..2274707
--- /dev/null
@@ -0,0 +1,41 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MSTATFS_H
+#define __MSTATFS_H
+
+#include <sys/statvfs.h>    /* or <sys/statfs.h> */
+
+class MStatfs : public Message {
+public:
+  struct statvfs stfs;
+
+  MStatfs() : Message(MSG_STATFS) {}
+
+  char *get_type_name() { return "statfs"; }
+  void print(ostream& out) {
+    out << "statfs" << endl;
+  }
+
+  void encode_payload() {
+    ::_encode(stfs, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(stfs, payload, off);
+  }
+};
+
+#endif
index 7b3a8917c1e162c2e45d3876a2292b2dfe668296..018cbcadc6bf9e60c2ffccdc540d873120628903 100644 (file)
@@ -102,7 +102,6 @@ void ClientMonitor::encode_pending(bufferlist &bl)
   dout(10) << "encode_pending v " << pending_inc.version 
           << ", next is " << pending_inc.next_client
           << endl;
-  
   assert(paxos->get_version() + 1 == pending_inc.version);
   pending_inc._encode(bl);
 }
index dc6b500111df0480144942aa9a0768cb8b875090..38b6db494255df766fcb1c56537c701f0ce6d088 100644 (file)
 #include "osd/osd_types.h"
 
 class PGMap {
-  
 public:
+  // the map
+  version_t version;
+  hash_map<pg_t,pg_stat_t> pg_stat;
+
   class Incremental {
+  public:
+    version_t version;
+    map<pg_t,pg_stat_t> pg_stat_updates;
 
+    void _encode(bufferlist &bl) {
+      ::_encode(version, bl);
+      ::_encode(pg_stat_updates, bl);
+    }
+    void _decode(bufferlist& bl, int& off) {
+      ::_decode(version, bl, off);
+      ::_decode(pg_stat_updates, bl, off);
+    }
   };
 
+  void apply_incremental(Incremental& inc) {
+    assert(inc.version == version+1);
+    version++;
+    for (map<pg_t,pg_stat_t>::iterator p = inc.pg_stat_updates.begin();
+        p != inc.pg_stat_updates.end();
+        ++p) {
+      if (pg_stat.count(p->first))
+       stat_sub(pg_stat[p->first]);
+      pg_stat[p->first] = p->second;
+      stat_add(p->second);
+    }
+  }
+
+  // aggregate stats (soft state)
+  hash_map<int,int> num_pg_by_state;
+  int64_t num_pg;
+  int64_t total_size;
+  int64_t total_num_blocks;
+  
+  void stat_zero() {
+    num_pg = 0;
+    num_pg_by_state.clear();
+    total_size = 0;
+    total_num_blocks = 0;
+  }
+  void stat_add(pg_stat_t &s) {
+    num_pg++;
+    num_pg_by_state[s.state]++;
+    total_size += s.size;
+    total_num_blocks += s.num_blocks;
+  }
+  void stat_sub(pg_stat_t &s) {
+    num_pg--;
+    num_pg_by_state[s.state]--;
+    total_size -= s.size;
+    total_num_blocks -= s.num_blocks;
+  }
+
+  PGMap() : version(0), 
+           num_pg(0), total_size(0), total_num_blocks(0) {}
 
+  void _encode(bufferlist &bl) {
+    ::_encode(version, bl);
+    ::_encode(pg_stat, bl);
+  }
+  void _decode(bufferlist& bl, int& off) {
+    ::_decode(version, bl, off);
+    ::_decode(pg_stat, bl, off);
+    stat_zero();
+    for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
+        p != pg_stat.end();
+        ++p)
+      stat_add(p->second);
+  }
 };
 
 #endif
index 8280b87df3e9d1989ab59effcfcb05822fb1e4a7..68a75f5f5ee0ff81a0ebe64eb8e8dfdb8554c6c6 100644 (file)
@@ -19,6 +19,9 @@
 #include "OSDMonitor.h"
 #include "MonitorStore.h"
 
+#include "messages/MPGStats.h"
+#include "messages/MStatfs.h"
+
 #include "common/Timer.h"
 
 #include "config.h"
 
 void PGMonitor::create_initial()
 {
+  dout(1) << "create_initial -- creating initial map" << endl;
 }
 
 bool PGMonitor::update_from_paxos()
 {
+  version_t paxosv = paxos->get_version();
+  if (paxosv == pg_map.version) return true;
+  assert(paxosv >= pg_map.version);
+
+  if (pg_map.version == 0 && paxosv > 1 &&
+      mon->store->exists_bl_ss("pgmap","latest")) {
+    // starting up: load latest
+    dout(7) << "update_from_paxos startup: loading latest full pgmap" << endl;
+    bufferlist bl;
+    mon->store->get_bl_ss(bl, "pgmap", "latest");
+    int off = 0;
+    pg_map._decode(bl, off);
+  } 
+
+  // walk through incrementals
+  while (paxosv > pg_map.version) {
+    bufferlist bl;
+    bool success = paxos->read(pg_map.version+1, bl);
+    if (success) {
+      dout(7) << "update_from_paxos  applying incremental " << pg_map.version+1 << endl;
+      PGMap::Incremental inc;
+      int off = 0;
+      inc._decode(bl, off);
+      pg_map.apply_incremental(inc);
+      
+    } else {
+      dout(7) << "update_from_paxos  couldn't read incremental " << pg_map.version+1 << endl;
+      return false;
+    }
+  }
+
+  // save latest
+  bufferlist bl;
+  pg_map._encode(bl);
+  mon->store->put_bl_ss(bl, "pgmap", "latest");
+
   return true;
 }
 
 void PGMonitor::create_pending()
 {
-
+  pending_inc = PGMap::Incremental();
+  pending_inc.version = pg_map.version + 1;
+  dout(10) << "create_pending v " << pending_inc.version << endl;
 }
 
 void PGMonitor::encode_pending(bufferlist &bl)
 {
-
+  assert(mon->is_leader());
+  dout(10) << "encode_pending v " << pending_inc.version << endl;
+  assert(paxos->get_version() + 1 == pending_inc.version);
+  pending_inc._encode(bl);
 }
 
 bool PGMonitor::preprocess_query(Message *m)
 {
-  return true;
+  dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl;
+
+  switch (m->get_type()) {
+  case MSG_STATFS:
+    handle_statfs((MStatfs*)m);
+    return true;
+    
+  case MSG_PGSTATS:
+    {
+      MPGStats *stats = (MPGStats*)m;
+      for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
+          p != stats->pg_stat.end();
+          p++) {
+       if (pg_map.pg_stat.count(p->first) == 0 ||
+           pg_map.pg_stat[p->first].reported < p->second.reported)
+         return false;
+      }
+      dout(10) << " message contains no new pg stats" << endl;
+      return true;
+    }
+
+  default:
+    assert(0);
+    delete m;
+    return true;
+  }
 }
 
 bool PGMonitor::prepare_update(Message *m)
 {
+  dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl;
+  switch (m->get_type()) {
+  case MSG_PGSTATS:
+    return handle_pg_stats((MPGStats*)m);
+
+  default:
+    assert(0);
+    delete m;
+    return false;
+  }
+}
+
+
+void PGMonitor::handle_statfs(MStatfs *statfs)
+{
+  dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_source() << endl;
+
+  // fill out stfs
+  memset(&statfs->stfs, 0, sizeof(statfs->stfs));
+  statfs->stfs.f_blocks = pg_map.total_num_blocks;
+  statfs->stfs.f_fsid = 0; // hmm.
+  statfs->stfs.f_flag = ST_NOATIME|ST_NODIRATIME;  // for now.
+
+  // reply
+  mon->messenger->send_message(statfs, statfs->get_source_inst());
+}
+
+bool PGMonitor::handle_pg_stats(MPGStats *stats) 
+{
+  dout(10) << "handle_pg_stats " << *stats << " from " << stats->get_source() << endl;
+  
+  for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
+       p != stats->pg_stat.end();
+       p++) {
+    pg_t pgid;
+    if ((pg_map.pg_stat.count(pgid) && 
+        pg_map.pg_stat[pgid].reported >= p->second.reported)) {
+      dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported << endl;
+      continue;
+    }
+    if (pending_inc.pg_stat_updates.count(pgid) && 
+       pending_inc.pg_stat_updates[pgid].reported >= p->second.reported) {
+      dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported
+              << " (pending)" << endl;
+      continue;
+    }
+
+    dout(15) << " got " << pgid << " reported at " << p->second.reported << endl;
+    pending_inc.pg_stat_updates[pgid] = p->second;
+
+    // we don't care about consistency; apply to live map.
+    if (pg_map.pg_stat.count(pgid))
+      pg_map.stat_sub(pg_map.pg_stat[pgid]);
+    pg_map.pg_stat[pgid] = p->second;
+    pg_map.stat_add(pg_map.pg_stat[pgid]);
+  }
+  
+  delete stats;
   return true;
 }
index 917d6e272a756147850b96a968c255c46823d05e..e243d0851430d65abee12b2a0fa976a172220e8c 100644 (file)
@@ -25,10 +25,12 @@ using namespace std;
 
 #include "PGMap.h"
 
+class MPGStats;
+class MStatfs;
+
 class PGMonitor : public PaxosService {
 public:
 
-
 private:
   PGMap pg_map;
   PGMap::Incremental pending_inc;
@@ -41,7 +43,9 @@ private:
   bool preprocess_query(Message *m);  // true if processed.
   bool prepare_update(Message *m);
 
-  
+  void handle_statfs(MStatfs *statfs);
+  bool handle_pg_stats(MPGStats *stats);
+
  public:
   PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
   
index 62b13f2ce264d99c5418d1ec17448bf1801e98b9..fa63838a2d51fb793e1b891a1264e88843411d47 100644 (file)
@@ -11,6 +11,9 @@ using namespace std;
 
 #include "messages/MGenericMessage.h"
 
+#include "messages/MPGStats.h"
+#include "messages/MStatfs.h"
+
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
 #include "messages/MMonPaxos.h"
@@ -106,6 +109,13 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
 
     // -- with payload --
 
+  case MSG_PGSTATS:
+    m = new MPGStats;
+    break;
+  case MSG_STATFS:
+    m = new MStatfs;
+    break;
+
   case MSG_MON_COMMAND:
     m = new MMonCommand;
     break;
index 6adaaa34f03d4b782bf522aebe55f171d7757ad1..f593f1aac2cf3418cb317f7e5ef41ec53b75ffc2 100644 (file)
@@ -17,6 +17,9 @@
  
 #define MSG_CLOSE 0
 
+#define MSG_STATFS      1
+#define MSG_PGSTATS     2
+
 #define MSG_PING        10
 #define MSG_PING_ACK    11
 
index 5b5bcf562e9c5df111e32e85e62d38a51329c3e6..6285c0b340203d118e303af4eda276ccacb91288 100644 (file)
@@ -74,7 +74,7 @@
 char *osd_base_path = "./osddata";
 char *ebofs_base_path = "./dev";
 
-object_t SUPERBLOCK_OBJECT(0,0);
+static const object_t SUPERBLOCK_OBJECT(0,0);
 
 
 // <hack> force remount hack for performance testing FakeStore
index 1210c0089f3e5b2c2a1a59f7dec22cc7c3b92a27..da36465391644ed5e7952ad890fa81161301479a 100644 (file)
@@ -162,6 +162,9 @@ namespace __gnu_cxx {
 }
 
 
+
+
+
 /** ObjectLayout
  *
  * describes an object's placement and layout in the storage cluster.  
@@ -218,6 +221,24 @@ inline ostream& operator<<(ostream& out, const eversion_t e) {
 
 
 
+/** pg_stat
+ * aggregate stats for a single PG.
+ */
+struct pg_stat_t {
+  const static int STATE_UNKNOWN =    0;
+  const static int STATE_OK =         1;
+  const static int STATE_RECOVERING = 2;
+  const static int STATE_OFFLINE =    3;
+  
+  eversion_t reported;
+  
+  int32_t state;
+  int64_t size;         // in bytes
+  int64_t num_blocks;   // in 4k blocks
+  
+  pg_stat_t() : state(0), size(0), num_blocks(0) {}
+};
+
 
 
 // -----------------------------------------