int nroot = osdmap.crush.add_bucket(root);
// rules
+ // replication
for (int i=1; i<=ndom; i++) {
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot));
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1));
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0));
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ int r = CRUSH_REP_RULE(i);
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 1));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, 1, 0));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
+ // raid
+ for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) {
+ int r = CRUSH_RAID_RULE(i);
+ if (ndom >= i) {
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 1));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, 1, 0));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ } else {
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, nroot));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
}
// test
b->add_item(i, 1.0);
}
+ // rules
+ // replication
for (int i=1; i<=g_conf.osd_max_rep; i++) {
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
- osdmap.crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ int r = CRUSH_REP_RULE(i);
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
+ }
+ // raid
+ for (int i=g_conf.osd_min_raid_width; i <= g_conf.osd_max_raid_width; i++) {
+ int r = CRUSH_RAID_RULE(i);
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE_INDEP, i, 0));
+ osdmap.crush.rules[r].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
}
}
#include "osbdb/OSBDB.h"
#endif // USE_OSBDB
+
+#include "ReplicatedPG.h"
+#include "RAID4PG.h"
+
#include "Ager.h"
// -- don't need OSDMap --
- /*
- // host monitor
- case MSG_PING_ACK:
- case MSG_FAILURE_ACK:
- monitor->proc_message(m);
- break;
- */
-
// map and replication
case MSG_OSD_MAP:
handle_osd_map((MOSDMap*)m);
it++) {
pg_t pgid = *it;
- PG *pg = new PG(this, pgid);
+ PG *pg = 0;
+ if (pgid->is_rep())
+ new ReplicatedPG(this, pgid);
+ else if (pgid->is_raid())
+ new RAID4PG(this, pgid);
+ else
+ assert(0);
pg_map[pgid] = pg;
// read pg info
waiting_for_pg[pgid].push_back(op);
return;
}
-
- if (read) {
- // read. am i the (same) acker?
- if (//pg->get_acker() != whoami ||
- op->get_map_epoch() < pg->info.history.same_acker_since) {
- dout(7) << "acting acker is osd" << pg->get_acker()
- << " since " << pg->info.history.same_acker_since
- << ", dropping" << endl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- delete op;
- return;
- }
- } else {
- // write. am i the (same) primary?
- if (pg->get_primary() != whoami ||
- op->get_map_epoch() < pg->info.history.same_primary_since) {
- dout(7) << "acting primary is osd" << pg->get_primary()
- << " since " << pg->info.history.same_primary_since
- << ", dropping" << endl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- delete op;
- return;
- }
+
+ // pg must be same-ish...
+ if (read && !pg->same_for_read_since(op->get_map_epoch())) {
+ dout(7) << "handle_rep_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << endl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ delete op;
+ return;
+ }
+ if (!read && !pg->same_for_modify_since(op->get_map_epoch())) {
+ dout(7) << "handle_rep_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << endl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ delete op;
+ return;
}
- // must be active.
+ // pg must be active.
if (!pg->is_active()) {
// replay?
if (op->get_version().version > 0) {
}
// check osd map: same set, or primary+acker?
- if (g_conf.osd_rep == OSD_REP_CHAIN &&
- op->get_map_epoch() < pg->info.history.same_since) {
+ if (!pg->same_for_rep_modify_since(op->get_map_epoch())) {
dout(10) << "handle_rep_op pg changed " << pg->info.history
<< " after " << op->get_map_epoch()
<< ", dropping" << endl;
delete op;
return;
}
- if (g_conf.osd_rep != OSD_REP_CHAIN &&
- (op->get_map_epoch() < pg->info.history.same_primary_since ||
- op->get_map_epoch() < pg->info.history.same_acker_since)) {
- dout(10) << "handle_rep_op pg primary|acker changed " << pg->info.history
- << " after " << op->get_map_epoch()
- << ", dropping" << endl;
- delete op;
- return;
- }
assert(pg->get_role() >= 0);
dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
int op_read(MOSDOp *op) = 0;
void op_modify(MOSDOp *op) = 0;
+ bool same_for_read_since(epoch_t e);
+ bool same_for_modify_since(epoch_t e);
+ bool same_for_rep_modify_since(epoch_t e);
+
+ bool is_missing_object(object_t oid);
+ void wait_for_missing_object(object_t oid, op);
+
};
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << osd->whoami << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " "
+
+bool ReplicatedPG::same_for_read_since(epoch_t e)
+{
+ return (e >= info.history.same_acker_since);
+}
+
+bool ReplicatedPG::same_for_modify_since(epoch_t e)
+{
+ return (get_primary() == whoami &&
+ e >= info.history.same_primary_since);
+}
+
+bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
+{
+ // check osd map: same set, or primary+acker?
+
+ if (g_conf.osd_rep == OSD_REP_CHAIN) {
+ return e >= info.history.same_since; // whole pg set same
+ } else {
+ // primary, splay
+ return (e >= info.history.same_primary_since &&|
+ e >= info.history.same_acker_since);
+ }
+}
+
+
+bool ReplicatedPG::is_missing_object(object_t oid)
+{
+ return missing.missing.count(oid);
+}
+
+
+
+void ReplicatedPG::wait_for_missing_object(object_t oid, op)
+{
+
+}
+
+
+
+
// ========================================================================
// READS
int op_read(MOSDOp *op);
void op_modify(MOSDOp *op);
+ bool same_for_read_since(epoch_t e);
+ bool same_for_modify_since(epoch_t e);
+ bool same_for_rep_modify_since(epoch_t e);
+
+ bool is_missing_object(object_t oid);
+ void wait_for_missing_object(object_t oid, op);
+
};