+
code cleanup
- endian portability
- word size
- the split/merge plan:
-/ - fragset_t to describe bounds; we need to tolerate concurrent merge/splits
-
-/ - fragtree_t
-/ - get_leaves(fg, ls) needs to be smarter
-/ - force_to_leaf()
-/ - simplified/normalized form.
-
-/ - CDir is never request pinned
-/ - add a CInode sticky_dir flag to somehow pin all cdirs on the fly.
-/ - STICKY dir state and pin? make sure it's kept across import/export/fragment
-/ - pull _bound maps out of Migrator; they are redundant (trust the subtree map!)
-
-/ - handle_resolve needs to infer splits/merges
- - rejoin, too!
-
-/ - auth journals and applies update in the request update pipeline
-
-/ - dirfragtree is lazily consistent. no lock. bcast by primary when it updates.
-/ - bcast to dir replicas
-
-/ - inode auth will journal inode update separately/lazily
-/ - via subtree_merge_at
-
-
- hmm, should we move ESubtreeMap out of the journal?
that would avoid all the icky weirdness in shutdown, with periodic logging, etc.
- need to export stray crap to another mds..
- verify stray is empty on shutdown
-- dirfrag split/merge
- - client readdir for dirfrags
- consistency points/snapshots
- dentry versions vs dirfrags...
-- statfs?
- more testing of failures + thrashing.
- is export prep dir open deadlock properly fixed by forge_replica_dir()?
- failures during recovery stages (resolve, rejoin)... make sure rejoin still works!
-- dirfrag split
- - make sure we are freezing _before_ we fetch to complete the dirfrag, else
- we break commit()'s preconditions when it fetches an incomplete dir.
-
- detect and deal with client failure
- failure during reconnect vs clientmap. although probalby the whole thing needs a larger overhaul...
- crush tools
-rados+ebofs
-- purge replicated writes from cache. (with exception of partial tail blocks.)
-
-rados paper todo?
-- better experiments
- - berkeleydb objectstore?
-- flush log only in response to subsequent read or write?
-- better behaving recovery
-- justify use of splay.
- - dynamic replication
-- snapshots
-
rados snapshots
- integrate revisions into ObjectCacher
- clean up oid.rev vs op.rev in osd+osdc
- fix bug in node rotation on insert (and reenable)
- fix NEAR_LAST_FWD (?)
-- rewrite btree code
+- awareness of underlying software/hardware raid in allocator so that we
+ write full stripes _only_.
+ - hmm, that's basically just a large block size.
+
+- rewrite the btree code!
- multithreaded
- eliminate nodepools
- allow btree sets
- allow arbitrary embedded data?
- allow arbitrary btrees
- allow root node(s?) to be embedded in onode, or whereever.
+ - keys and values can be uniform (fixed-size) or non-uniform.
+ - fixed size (if any) is a value in the btree struct.
+ - negative indicates bytes of length value? (1 -> 255bytes, 2 -> 65535 bytes, etc.?)
+ - non-uniform records preceeded by length.
+ - keys sorted via a comparator defined in btree root.
+ - lexicographically, by default.
+
+- goal
+ - object btree key->value payload, not just a data blob payload.
+ - better threading behavior.
+ - with transactional goodness!
+
+- onode
+ - object attributes.. as a btree?
+ - blob stream
+ - map stream.
+ - allow blob values.
+ -
mds
- distributed client management
-- anchormgr
- - 2pc
- - independent journal?
- - distributed?
-- link count management
- - also 2pc
- chdir (directory opens!)
- rewrite logstream
- clean up
fuse_ll: false,
// --- objecter ---
- objecter_buffer_uncommitted: true,
+ objecter_buffer_uncommitted: true, // this must be true for proper failure handling
// --- journaler ---
journaler_allow_split_entries: true,
osd_mkfs: false,
osd_age: .8,
osd_age_time: 0,
- osd_heartbeat_interval: 5, // shut up while i'm debugging
+ osd_heartbeat_interval: 15, // shut up while i'm debugging
osd_replay_window: 5,
osd_max_pull: 2,
osd_pad_pg_log: false,
// funky.
case MDS_OP_OPEN:
- if ((req->args.open.flags & O_CREAT) &&
- !mdr->ref)
+ if (req->args.open.flags & O_CREAT)
handle_client_openc(mdr);
else
handle_client_open(mdr);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MPGSTATS_H
+#define __MPGSTATS_H
+
+#include "osd/osd_types.h"
+
+class MPGStats : public Message {
+public:
+ map<pg_t,pg_stat_t> pg_stat;
+
+ MPGStats() : Message(MSG_PGSTATS) {}
+
+ char *get_type_name() { return "pg_stats"; }
+ void print(ostream& out) {
+ out << "pg_stats" << endl;
+ }
+
+ void encode_payload() {
+ ::_encode(pg_stat, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(pg_stat, payload, off);
+ }
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MSTATFS_H
+#define __MSTATFS_H
+
+#include <sys/statvfs.h> /* or <sys/statfs.h> */
+
+class MStatfs : public Message {
+public:
+ struct statvfs stfs;
+
+ MStatfs() : Message(MSG_STATFS) {}
+
+ char *get_type_name() { return "statfs"; }
+ void print(ostream& out) {
+ out << "statfs" << endl;
+ }
+
+ void encode_payload() {
+ ::_encode(stfs, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ ::_decode(stfs, payload, off);
+ }
+};
+
+#endif
dout(10) << "encode_pending v " << pending_inc.version
<< ", next is " << pending_inc.next_client
<< endl;
-
assert(paxos->get_version() + 1 == pending_inc.version);
pending_inc._encode(bl);
}
#include "osd/osd_types.h"
class PGMap {
-
public:
+ // the map
+ version_t version;
+ hash_map<pg_t,pg_stat_t> pg_stat;
+
class Incremental {
+ public:
+ version_t version;
+ map<pg_t,pg_stat_t> pg_stat_updates;
+ void _encode(bufferlist &bl) {
+ ::_encode(version, bl);
+ ::_encode(pg_stat_updates, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ ::_decode(version, bl, off);
+ ::_decode(pg_stat_updates, bl, off);
+ }
};
+ void apply_incremental(Incremental& inc) {
+ assert(inc.version == version+1);
+ version++;
+ for (map<pg_t,pg_stat_t>::iterator p = inc.pg_stat_updates.begin();
+ p != inc.pg_stat_updates.end();
+ ++p) {
+ if (pg_stat.count(p->first))
+ stat_sub(pg_stat[p->first]);
+ pg_stat[p->first] = p->second;
+ stat_add(p->second);
+ }
+ }
+
+ // aggregate stats (soft state)
+ hash_map<int,int> num_pg_by_state;
+ int64_t num_pg;
+ int64_t total_size;
+ int64_t total_num_blocks;
+
+ void stat_zero() {
+ num_pg = 0;
+ num_pg_by_state.clear();
+ total_size = 0;
+ total_num_blocks = 0;
+ }
+ void stat_add(pg_stat_t &s) {
+ num_pg++;
+ num_pg_by_state[s.state]++;
+ total_size += s.size;
+ total_num_blocks += s.num_blocks;
+ }
+ void stat_sub(pg_stat_t &s) {
+ num_pg--;
+ num_pg_by_state[s.state]--;
+ total_size -= s.size;
+ total_num_blocks -= s.num_blocks;
+ }
+
+ PGMap() : version(0),
+ num_pg(0), total_size(0), total_num_blocks(0) {}
+ void _encode(bufferlist &bl) {
+ ::_encode(version, bl);
+ ::_encode(pg_stat, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ ::_decode(version, bl, off);
+ ::_decode(pg_stat, bl, off);
+ stat_zero();
+ for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
+ p != pg_stat.end();
+ ++p)
+ stat_add(p->second);
+ }
};
#endif
#include "OSDMonitor.h"
#include "MonitorStore.h"
+#include "messages/MPGStats.h"
+#include "messages/MStatfs.h"
+
#include "common/Timer.h"
#include "config.h"
void PGMonitor::create_initial()
{
+ dout(1) << "create_initial -- creating initial map" << endl;
}
bool PGMonitor::update_from_paxos()
{
+ version_t paxosv = paxos->get_version();
+ if (paxosv == pg_map.version) return true;
+ assert(paxosv >= pg_map.version);
+
+ if (pg_map.version == 0 && paxosv > 1 &&
+ mon->store->exists_bl_ss("pgmap","latest")) {
+ // starting up: load latest
+ dout(7) << "update_from_paxos startup: loading latest full pgmap" << endl;
+ bufferlist bl;
+ mon->store->get_bl_ss(bl, "pgmap", "latest");
+ int off = 0;
+ pg_map._decode(bl, off);
+ }
+
+ // walk through incrementals
+ while (paxosv > pg_map.version) {
+ bufferlist bl;
+ bool success = paxos->read(pg_map.version+1, bl);
+ if (success) {
+ dout(7) << "update_from_paxos applying incremental " << pg_map.version+1 << endl;
+ PGMap::Incremental inc;
+ int off = 0;
+ inc._decode(bl, off);
+ pg_map.apply_incremental(inc);
+
+ } else {
+ dout(7) << "update_from_paxos couldn't read incremental " << pg_map.version+1 << endl;
+ return false;
+ }
+ }
+
+ // save latest
+ bufferlist bl;
+ pg_map._encode(bl);
+ mon->store->put_bl_ss(bl, "pgmap", "latest");
+
return true;
}
void PGMonitor::create_pending()
{
-
+ pending_inc = PGMap::Incremental();
+ pending_inc.version = pg_map.version + 1;
+ dout(10) << "create_pending v " << pending_inc.version << endl;
}
void PGMonitor::encode_pending(bufferlist &bl)
{
-
+ assert(mon->is_leader());
+ dout(10) << "encode_pending v " << pending_inc.version << endl;
+ assert(paxos->get_version() + 1 == pending_inc.version);
+ pending_inc._encode(bl);
}
bool PGMonitor::preprocess_query(Message *m)
{
- return true;
+ dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << endl;
+
+ switch (m->get_type()) {
+ case MSG_STATFS:
+ handle_statfs((MStatfs*)m);
+ return true;
+
+ case MSG_PGSTATS:
+ {
+ MPGStats *stats = (MPGStats*)m;
+ for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
+ p != stats->pg_stat.end();
+ p++) {
+ if (pg_map.pg_stat.count(p->first) == 0 ||
+ pg_map.pg_stat[p->first].reported < p->second.reported)
+ return false;
+ }
+ dout(10) << " message contains no new pg stats" << endl;
+ return true;
+ }
+
+ default:
+ assert(0);
+ delete m;
+ return true;
+ }
}
bool PGMonitor::prepare_update(Message *m)
{
+ dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl;
+ switch (m->get_type()) {
+ case MSG_PGSTATS:
+ return handle_pg_stats((MPGStats*)m);
+
+ default:
+ assert(0);
+ delete m;
+ return false;
+ }
+}
+
+
+void PGMonitor::handle_statfs(MStatfs *statfs)
+{
+ dout(10) << "handle_statfs " << *statfs << " from " << statfs->get_source() << endl;
+
+ // fill out stfs
+ memset(&statfs->stfs, 0, sizeof(statfs->stfs));
+ statfs->stfs.f_blocks = pg_map.total_num_blocks;
+ statfs->stfs.f_fsid = 0; // hmm.
+ statfs->stfs.f_flag = ST_NOATIME|ST_NODIRATIME; // for now.
+
+ // reply
+ mon->messenger->send_message(statfs, statfs->get_source_inst());
+}
+
+bool PGMonitor::handle_pg_stats(MPGStats *stats)
+{
+ dout(10) << "handle_pg_stats " << *stats << " from " << stats->get_source() << endl;
+
+ for (map<pg_t,pg_stat_t>::iterator p = stats->pg_stat.begin();
+ p != stats->pg_stat.end();
+ p++) {
+ pg_t pgid;
+ if ((pg_map.pg_stat.count(pgid) &&
+ pg_map.pg_stat[pgid].reported >= p->second.reported)) {
+ dout(15) << " had " << pgid << " from " << pg_map.pg_stat[pgid].reported << endl;
+ continue;
+ }
+ if (pending_inc.pg_stat_updates.count(pgid) &&
+ pending_inc.pg_stat_updates[pgid].reported >= p->second.reported) {
+ dout(15) << " had " << pgid << " from " << pending_inc.pg_stat_updates[pgid].reported
+ << " (pending)" << endl;
+ continue;
+ }
+
+ dout(15) << " got " << pgid << " reported at " << p->second.reported << endl;
+ pending_inc.pg_stat_updates[pgid] = p->second;
+
+ // we don't care about consistency; apply to live map.
+ if (pg_map.pg_stat.count(pgid))
+ pg_map.stat_sub(pg_map.pg_stat[pgid]);
+ pg_map.pg_stat[pgid] = p->second;
+ pg_map.stat_add(pg_map.pg_stat[pgid]);
+ }
+
+ delete stats;
return true;
}
#include "PGMap.h"
+class MPGStats;
+class MStatfs;
+
class PGMonitor : public PaxosService {
public:
-
private:
PGMap pg_map;
PGMap::Incremental pending_inc;
bool preprocess_query(Message *m); // true if processed.
bool prepare_update(Message *m);
-
+ void handle_statfs(MStatfs *statfs);
+ bool handle_pg_stats(MPGStats *stats);
+
public:
PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
#include "messages/MGenericMessage.h"
+#include "messages/MPGStats.h"
+#include "messages/MStatfs.h"
+
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
// -- with payload --
+ case MSG_PGSTATS:
+ m = new MPGStats;
+ break;
+ case MSG_STATFS:
+ m = new MStatfs;
+ break;
+
case MSG_MON_COMMAND:
m = new MMonCommand;
break;
#define MSG_CLOSE 0
+#define MSG_STATFS 1
+#define MSG_PGSTATS 2
+
#define MSG_PING 10
#define MSG_PING_ACK 11
char *osd_base_path = "./osddata";
char *ebofs_base_path = "./dev";
-object_t SUPERBLOCK_OBJECT(0,0);
+static const object_t SUPERBLOCK_OBJECT(0,0);
// <hack> force remount hack for performance testing FakeStore
}
+
+
+
/** ObjectLayout
*
* describes an object's placement and layout in the storage cluster.
+/** pg_stat
+ * aggregate stats for a single PG.
+ */
+struct pg_stat_t {
+ const static int STATE_UNKNOWN = 0;
+ const static int STATE_OK = 1;
+ const static int STATE_RECOVERING = 2;
+ const static int STATE_OFFLINE = 3;
+
+ eversion_t reported;
+
+ int32_t state;
+ int64_t size; // in bytes
+ int64_t num_blocks; // in 4k blocks
+
+ pg_stat_t() : state(0), size(0), num_blocks(0) {}
+};
+
// -----------------------------------------