From d5c6af16dbf44c8d1171a63263b946fc8c276aea Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 24 Nov 2008 16:35:43 -0800 Subject: [PATCH] osd: infrastructure for ack vs nvram vs disk osd_op ack types NVRAM ack not generated, yet. The completion callbacks from the store need some work first. --- src/include/ceph_fs.h | 10 +++---- src/kernel/osd_client.c | 4 +-- src/messages/MOSDOp.h | 10 +++++-- src/messages/MOSDOpReply.h | 15 ++++++---- src/osd/OSD.cc | 2 +- src/osd/PG.h | 1 - src/osd/ReplicatedPG.cc | 56 +++++++++++++++++++++++--------------- src/osd/ReplicatedPG.h | 31 ++++++++++++++------- src/osdc/Objecter.cc | 16 +++++------ 9 files changed, 87 insertions(+), 58 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 22e114471375f..0e87d3270061d 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -1162,11 +1162,11 @@ static inline const char *ceph_osd_op_name(int op) */ enum { CEPH_OSD_OP_ACK = 1, /* want (or is) "ack" ack */ - CEPH_OSD_OP_SAFE = 2, /* want (or is) "safe" ack */ - CEPH_OSD_OP_RETRY = 4, /* resend attempt */ - CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */ - CEPH_OSD_OP_MODIFY = 16, /* op is/was a mutation */ - CEPH_OSD_OP_ACKNVRAM = 32, /* ACK when stable in NVRAM, not RAM */ + CEPH_OSD_OP_ONNVRAM = 2, /* want (or is) "onnvram" ack */ + CEPH_OSD_OP_ONDISK = 4, /* want (or is) "ondisk" ack */ + CEPH_OSD_OP_RETRY = 8, /* resend attempt */ + CEPH_OSD_OP_INCLOCK_FAIL = 16, /* fail on inclock collision */ + CEPH_OSD_OP_MODIFY = 32, /* op is/was a mutation */ CEPH_OSD_OP_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */ CEPH_OSD_OP_PEERSTAT = 128, /* msg includes osd_peer_stat */ CEPH_OSD_OP_BALANCE_READS = 256, diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index c25c9c7e19441..35b1259565586 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -1067,7 +1067,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK) flags |= CEPH_OSD_OP_ACK; else - flags |= CEPH_OSD_OP_SAFE; + flags |= CEPH_OSD_OP_ONDISK; reqhead->flags = cpu_to_le32(flags); len = le64_to_cpu(op->length); @@ -1109,7 +1109,7 @@ int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK) flags |= CEPH_OSD_OP_ACK; else - flags |= CEPH_OSD_OP_SAFE; + flags |= CEPH_OSD_OP_ONDISK; reqhead->flags = cpu_to_le32(flags); op->length = cpu_to_le64(len); diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 5fe4291f26072..090a8cdf36dd7 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -129,12 +129,16 @@ public: // flags int get_flags() const { return head.flags; } + bool wants_ack() const { return get_flags() & CEPH_OSD_OP_ACK; } - bool wants_commit() const { return get_flags() & CEPH_OSD_OP_SAFE; } - bool is_retry_attempt() const { return get_flags() & CEPH_OSD_OP_RETRY; } + bool wants_ondisk() const { return get_flags() & CEPH_OSD_OP_ONDISK; } + bool wants_onnvram() const { return get_flags() & CEPH_OSD_OP_ONNVRAM; } void set_want_ack(bool b) { head.flags = get_flags() | CEPH_OSD_OP_ACK; } - void set_want_commit(bool b) { head.flags = get_flags() | CEPH_OSD_OP_SAFE; } + void set_want_onnvram(bool b) { head.flags = get_flags() | CEPH_OSD_OP_ONNVRAM; } + void set_want_ondisk(bool b) { head.flags = get_flags() | CEPH_OSD_OP_ONDISK; } + + bool is_retry_attempt() const { return get_flags() & CEPH_OSD_OP_RETRY; } void set_retry_attempt(bool a) { if (a) head.flags = head.flags | CEPH_OSD_OP_RETRY; diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 246a71b904efb..491d4ee731d65 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -38,7 +38,9 @@ class MOSDOpReply : public Message { object_t get_oid() { return head.oid; } pg_t get_pg() { return pg_t(head.layout.ol_pgid); } int get_flags() { return head.flags; } - bool is_safe() { return get_flags() & CEPH_OSD_OP_SAFE; } + + bool is_ondisk() { return get_flags() & CEPH_OSD_OP_ONDISK; } + bool is_onnvram() { return get_flags() & CEPH_OSD_OP_ONNVRAM; } __s32 get_result() { return head.result; } eversion_t get_version() { return head.reassert_version; } @@ -53,15 +55,14 @@ class MOSDOpReply : public Message { public: - MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, bool commit) : + MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) : Message(CEPH_MSG_OSD_OPREPLY) { memset(&head, 0, sizeof(head)); head.tid = req->head.tid; ops = req->ops; head.result = result; head.flags = - (req->head.flags & ~(CEPH_OSD_OP_SAFE|CEPH_OSD_OP_ACK)) | - (commit ? CEPH_OSD_OP_SAFE:CEPH_OSD_OP_ACK); + (req->head.flags & ~(CEPH_OSD_OP_ONDISK|CEPH_OSD_OP_ONNVRAM|CEPH_OSD_OP_ACK)) | acktype; head.oid = req->head.oid; head.layout = req->head.layout; head.osdmap_epoch = e; @@ -88,8 +89,10 @@ public: out << "osd_op_reply(" << get_tid() << " " << head.oid << " " << ops; if (is_modify()) { - if (is_safe()) - out << " commit"; + if (is_ondisk()) + out << " ondisk"; + else if (is_onnvram()) + out << " onnvram"; else out << " ack"; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c93d0ca1e2915..cfe3a0555b6d2 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3026,7 +3026,7 @@ void OSD::defer_recovery(PG *pg) void OSD::reply_op_error(MOSDOp *op, int err) { - MOSDOpReply *reply = new MOSDOpReply(op, err, osdmap->get_epoch(), true); + MOSDOpReply *reply = new MOSDOpReply(op, err, osdmap->get_epoch(), CEPH_OSD_OP_ACK); messenger->send_message(reply, op->get_orig_source_inst()); delete op; } diff --git a/src/osd/PG.h b/src/osd/PG.h index c068cbafd1568..4fb43a85239fe 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -38,7 +38,6 @@ using namespace __gnu_cxx; class OSD; class MOSDOp; -class MOSDOpReply; class MOSDSubOp; class MOSDSubOpReply; class MOSDPGInfo; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index e8e03022fc215..cdfba10bfc87f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -765,7 +765,7 @@ void ReplicatedPG::op_read(MOSDOp *op) done: // reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); + MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK); reply->set_data(data); reply->get_header().data_off = data_off; reply->set_result(result); @@ -1182,7 +1182,7 @@ public: void finish(int r) { pg->lock(); if (!pg->is_deleted()) - pg->op_modify_commit(rep_tid, pg_last_complete); + pg->op_modify_ondisk(rep_tid, pg_last_complete); pg->put_unlock(); } }; @@ -1252,14 +1252,25 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) { dout(10) << "put_repop " << *repop << dendl; - // commit? - if (repop->can_send_commit()) { - if (repop->op->wants_commit()) { + // disk? + if (repop->can_send_disk()) { + if (repop->op->wants_ondisk()) { // send commit. - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true); + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONDISK); dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); - repop->sent_commit = true; + repop->sent_disk = true; + } + } + + // nvram? + else if (repop->can_send_nvram()) { + if (repop->op->wants_onnvram()) { + // send commit. + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONNVRAM); + dout(10) << "put_repop sending onnvram on " << *repop << " " << reply << dendl; + osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); + repop->sent_nvram = true; } } @@ -1271,7 +1282,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) if (repop->op->wants_ack()) { // send ack - MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false); + MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK); dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); repop->sent_ack = true; @@ -1346,7 +1357,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, for (unsigned i=0; iosds.insert(osd); - repop->waitfor_commit.insert(osd); + repop->waitfor_disk.insert(osd); } // primary. all osds ack to me. @@ -1382,9 +1393,9 @@ void ReplicatedPG::repop_ack(RepGather *repop, get_rep_gather(repop); { if (commit) { - // commit - assert(repop->waitfor_commit.count(fromosd)); - repop->waitfor_commit.erase(fromosd); + // disk + assert(repop->waitfor_disk.count(fromosd)); + repop->waitfor_disk.erase(fromosd); repop->waitfor_ack.erase(fromosd); repop->pg_complete_thru[fromosd] = pg_complete_thru; } else { @@ -1420,22 +1431,23 @@ void ReplicatedPG::repop_ack(RepGather *repop, /** op_modify_commit * transaction commit on the acker. */ -void ReplicatedPG::op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru) +void ReplicatedPG::op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru) { if (rep_gather.count(rep_tid)) { RepGather *repop = rep_gather[rep_tid]; - dout(10) << "op_modify_commit " << *repop->op << dendl; + dout(10) << "op_modify_ondisk " << *repop->op << dendl; get_rep_gather(repop); { - assert(repop->waitfor_commit.count(osd->get_nodeid())); - repop->waitfor_commit.erase(osd->get_nodeid()); + assert(repop->waitfor_disk.count(osd->get_nodeid())); + repop->waitfor_nvram.erase(osd->get_nodeid()); + repop->waitfor_disk.erase(osd->get_nodeid()); repop->pg_complete_thru[osd->get_nodeid()] = pg_complete_thru; } put_rep_gather(repop); - dout(10) << "op_modify_commit done on " << repop << dendl; + dout(10) << "op_modify_ondisk done on " << repop << dendl; } else { - dout(10) << "op_modify_commit rep_tid " << rep_tid << " dne" << dendl; + dout(10) << "op_modify_ondisk rep_tid " << rep_tid << " dne" << dendl; } } @@ -1472,7 +1484,7 @@ public: lock.Unlock(); pg->lock(); - pg->sub_op_modify_commit(op, destosd, pg_last_complete); + pg->sub_op_modify_ondisk(op, destosd, pg_last_complete); pg->put_unlock(); } void ack() { @@ -1720,7 +1732,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) oncommit->ack(); } -void ReplicatedPG::sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete) +void ReplicatedPG::sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete) { // send commit. dout(10) << "rep_modify_commit on op " << *op @@ -2371,7 +2383,7 @@ void ReplicatedPG::on_osd_failure(int o) p++; dout(-1) << "checking repop tid " << repop->rep_tid << dendl; if (repop->waitfor_ack.count(o) || - repop->waitfor_commit.count(o)) + repop->waitfor_disk.count(o)) repop_ack(repop, -1, true, o); } @@ -2420,7 +2432,7 @@ void ReplicatedPG::on_change() p++; dout(-1) << "checking repop tid " << repop->rep_tid << dendl; set all; - set_union(repop->waitfor_commit.begin(), repop->waitfor_commit.end(), + set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(), repop->waitfor_ack.begin(), repop->waitfor_ack.end(), inserter(all, all.begin())); for (set::iterator q = all.begin(); q != all.end(); q++) { diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c9aae374f5b6e..f385743a4bb31 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -36,11 +36,12 @@ public: pg_stat_t stats; set waitfor_ack; - set waitfor_commit; + set waitfor_nvram; + set waitfor_disk; utime_t start; - bool sent_ack, sent_commit; + bool sent_ack, sent_nvram, sent_disk; set osds; eversion_t old_version, at_version; @@ -55,19 +56,28 @@ public: SnapSet& ss, SnapContext& sc) : op(o), rep_tid(rt), applied(false), - sent_ack(false), sent_commit(false), + sent_ack(false), sent_nvram(false), sent_disk(false), at_version(av), snapset(ss), snapc(sc), pg_local_last_complete(lc) { } bool can_send_ack() { - return !sent_ack && !sent_commit && waitfor_ack.empty(); + return + !sent_ack && !sent_nvram && !sent_disk && + waitfor_ack.empty(); } - bool can_send_commit() { - return !sent_commit && waitfor_ack.empty() && waitfor_commit.empty(); + bool can_send_nvram() { + return + !sent_nvram && !sent_disk && + waitfor_ack.empty() && waitfor_disk.empty(); + } + bool can_send_disk() { + return + !sent_disk && + waitfor_ack.empty() && waitfor_nvram.empty() && waitfor_disk.empty(); } bool can_delete() { - return waitfor_ack.empty() && waitfor_commit.empty(); + return waitfor_ack.empty() && waitfor_nvram.empty() && waitfor_disk.empty(); } }; @@ -112,8 +122,8 @@ protected: // modify - void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru); - void sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete); + void op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru); + void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete); void _make_clone(ObjectStore::Transaction& t, pobject_t head, pobject_t coid, @@ -192,7 +202,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid << " wfack=" << repop.waitfor_ack - << " wfcommit=" << repop.waitfor_commit; + << " wfnvram=" << repop.waitfor_nvram + << " wfdisk=" << repop.waitfor_disk; out << " pct=" << repop.pg_complete_thru; out << " op=" << *(repop.op); out << " repop=" << &repop; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 71a8a597e4f11..e13668f2a76ac 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -466,7 +466,7 @@ tid_t Objecter::modify_submit(ModifyOp *wr) dout(20) << " note: not requesting ack" << dendl; } if (wr->oncommit) { - flags |= CEPH_OSD_OP_SAFE; + flags |= CEPH_OSD_OP_ONDISK; ++num_uncommitted; } else { dout(20) << " note: not requesting commit" << dendl; @@ -516,17 +516,17 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) tid_t tid = m->get_tid(); if (op_modify.count(tid) == 0) { - dout(7) << "handle_osd_modify_reply " << tid - << (m->is_safe() ? " commit":" ack") - << " ... stray" << dendl; + dout(7) << "handle_osd_modify_reply " << tid + << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack")) + << " ... stray" << dendl; delete m; return; } - dout(7) << "handle_osd_modify_reply " << tid - << (m->is_safe() ? " commit":" ack") - << " v " << m->get_version() << " in " << m->get_pg() - << dendl; + dout(7) << "handle_osd_modify_reply " << tid + << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack")) + << " v " << m->get_version() << " in " << m->get_pg() + << dendl; ModifyOp *wr = op_modify[ tid ]; Context *onack = 0; -- 2.39.5