From f9e2c9718b3aab350f39e13473ad5a831a351171 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 20 Dec 2007 12:27:07 -0800 Subject: [PATCH] rados: refactored to use MOSDSubOp; ripped out splay, chain support for now --- src/messages/MOSDOpReply.h | 5 - src/messages/MOSDSubOp.h | 13 +- src/messages/MOSDSubOpReply.h | 8 +- src/msg/Message.cc | 4 +- src/msg/Message.h | 1 + src/osd/OSD.cc | 62 ++- src/osd/OSD.h | 3 +- src/osd/PG.cc | 3 + src/osd/PG.h | 8 +- src/osd/RAID4PG.cc | 9 +- src/osd/RAID4PG.h | 5 +- src/osd/ReplicatedPG.cc | 849 ++++++++++++++-------------------- src/osd/ReplicatedPG.h | 36 +- 13 files changed, 451 insertions(+), 555 deletions(-) diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 0a39395565395..be22f44b26665 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -51,8 +51,6 @@ class MOSDOpReply : public Message { eversion_t pg_complete_thru; epoch_t map_epoch; - - osd_peer_stat_t peer_stat; } st; map attrset; @@ -86,9 +84,6 @@ class MOSDOpReply : public Message { void set_op(int op) { st.op = op; } void set_rep_tid(tid_t t) { st.rep_tid = t; } - void set_peer_stat(const osd_peer_stat_t& stat) { st.peer_stat = stat; } - const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; } - // osdmap epoch_t get_map_epoch() { return st.map_epoch; } diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index 9f4e77bb8c8e0..37a70a120c554 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -79,20 +79,19 @@ public: const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; } MOSDSubOp(osdreqid_t r, pg_t p, pobject_t po, int o, off_t of, off_t le, - epoch_t mape, tid_t rtid, eversion_t v, evertsion_t pgtt) : - Message(CEPH_MSG_OSD_SUBOP) { + epoch_t mape, tid_t rtid, eversion_t v) : + Message(MSG_OSD_SUBOP) { memset(&st, 0, sizeof(st)); st.reqid = r; st.pgid = p; st.poid = po; - st.o = op; - st.of = offset; - st.le = length; + st.op = o; + st.offset = of; + st.length = le; st.map_epoch = mape; st.rep_tid = rtid; st.version = v; - st.pg_trim_to = pgtt; } MOSDSubOp() {} @@ -112,7 +111,7 @@ public: virtual char *get_type_name() { return "osd_sub_op"; } void print(ostream& out) { out << "osd_sub_op(" << st.reqid - << " " << get_opname(st.op) + << " " << MOSDOp::get_opname(st.op) << " " << st.poid << " v" << st.version; if (st.length) out << " " << st.offset << "~" << st.length; diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h index c65f9f5614953..056e2dd4656a4 100644 --- a/src/messages/MOSDSubOpReply.h +++ b/src/messages/MOSDSubOpReply.h @@ -34,6 +34,7 @@ class MOSDSubOpReply : public Message { epoch_t map_epoch; // subop metadata + osdreqid_t reqid; pg_t pgid; tid_t rep_tid; int32_t op; @@ -75,8 +76,9 @@ class MOSDSubOpReply : public Message { public: MOSDSubOpReply(MOSDSubOp *req, int result, epoch_t e, bool commit) : - Message(CEPH_MSG_OSD_OPREPLY) { + Message(MSG_OSD_SUBOPREPLY) { st.map_epoch = e; + st.reqid = req->get_reqid(); st.pgid = req->get_pg(); st.rep_tid = req->get_rep_tid(); st.op = req->get_op(); @@ -104,9 +106,9 @@ public: virtual char *get_type_name() { return "osd_op_reply"; } void print(ostream& out) { - out << "osd_op_reply(" << st.reqid + out << "osd_sub_op_reply(" << st.reqid << " " << MOSDOp::get_opname(st.op) - << " " << st.oid; + << " " << st.poid; if (st.length) out << " " << st.offset << "~" << st.length; if (st.op >= 10) { if (st.commit) diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 2ee7e0faa9e0e..ed4ebb8487e41 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -175,10 +175,10 @@ decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data) case CEPH_MSG_OSD_OPREPLY: m = new MOSDOpReply(); break; - case CEPH_MSG_OSD_SUBOP: + case MSG_OSD_SUBOP: m = new MOSDSubOp(); break; - case CEPH_MSG_OSD_SUBOPREPLY: + case MSG_OSD_SUBOPREPLY: m = new MOSDSubOpReply(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 6f0b3f1b72d3d..e3b8612b10e34 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -34,6 +34,7 @@ #define MSG_OSD_OUT 74 #define MSG_OSD_SUBOP 75 +#define MSG_OSD_SUBOPREPLY 76 #define MSG_OSD_PG_NOTIFY 80 #define MSG_OSD_PG_QUERY 81 diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index e3344618da7b9..eeb8dc5c5f456 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -44,6 +44,8 @@ #include "messages/MOSDFailure.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDSubOp.h" +#include "messages/MOSDSubOpReply.h" #include "messages/MOSDBoot.h" #include "messages/MOSDIn.h" #include "messages/MOSDOut.h" @@ -965,7 +967,6 @@ void OSD::dispatch(Message *m) switch (m->get_type()) { case MSG_OSD_PING: - // take note. handle_osd_ping((MOSDPing*)m); break; @@ -985,13 +986,17 @@ void OSD::dispatch(Message *m) handle_pg_activate_set((MOSDPGActivateSet*)m); break; + // client ops case CEPH_MSG_OSD_OP: handle_op((MOSDOp*)m); break; // for replication etc. - case CEPH_MSG_OSD_OPREPLY: - handle_op_reply((MOSDOpReply*)m); + case MSG_OSD_SUBOP: + handle_sub_op((MOSDSubOp*)m); + break; + case MSG_OSD_SUBOPREPLY: + handle_sub_op_reply((MOSDSubOpReply*)m); break; @@ -2301,8 +2306,10 @@ void OSD::handle_op(MOSDOp *op) // do it now. if (op->get_type() == CEPH_MSG_OSD_OP) pg->do_op((MOSDOp*)op); - else if (op->get_type() == CEPH_MSG_OSD_OPREPLY) - pg->do_op_reply((MOSDOpReply*)op); + else if (op->get_type() == MSG_OSD_SUBOP) + pg->do_sub_op((MOSDSubOp*)op); + else if (op->get_type() == MSG_OSD_SUBOPREPLY) + pg->do_sub_op_reply((MOSDSubOpReply*)op); else assert(0); } else { @@ -2314,7 +2321,42 @@ void OSD::handle_op(MOSDOp *op) } -void OSD::handle_op_reply(MOSDOpReply *op) +void OSD::handle_sub_op(MOSDSubOp *op) +{ + dout(10) << "handle_sub_op " << *op << " epoch " << op->get_map_epoch() << dendl; + if (op->get_map_epoch() < boot_epoch) { + dout(3) << "replica op from before boot" << dendl; + delete op; + return; + } + + // must be a rep op. + assert(op->get_source().is_osd()); + + // make sure we have the pg + const pg_t pgid = op->get_pg(); + + // require same or newer map + if (!require_same_or_newer_map(op, op->get_map_epoch())) return; + + // share our map with sender, if they're old + _share_map_incoming(op->get_source_inst(), op->get_map_epoch()); + + if (!_have_pg(pgid)) { + // hmm. + delete op; + return; + } + + PG *pg = _lookup_lock_pg(pgid); + if (g_conf.osd_maxthreads < 1) { + pg->do_sub_op(op); // do it now + } else { + enqueue_op(pg, op); // queue for worker threads + } + pg->unlock(); +} +void OSD::handle_sub_op_reply(MOSDSubOpReply *op) { if (op->get_map_epoch() < boot_epoch) { dout(3) << "replica op reply from before boot" << dendl; @@ -2342,7 +2384,7 @@ void OSD::handle_op_reply(MOSDOpReply *op) PG *pg = _lookup_lock_pg(pgid); if (g_conf.osd_maxthreads < 1) { - pg->do_op_reply(op); // do it now + pg->do_sub_op_reply(op); // do it now } else { enqueue_op(pg, op); // queue for worker threads } @@ -2397,8 +2439,10 @@ void OSD::dequeue_op(PG *pg) // do it if (op->get_type() == CEPH_MSG_OSD_OP) pg->do_op((MOSDOp*)op); // do it now - else if (op->get_type() == CEPH_MSG_OSD_OPREPLY) - pg->do_op_reply((MOSDOpReply*)op); + else if (op->get_type() == MSG_OSD_SUBOP) + pg->do_sub_op((MOSDSubOp*)op); + else if (op->get_type() == MSG_OSD_SUBOPREPLY) + pg->do_sub_op_reply((MOSDSubOpReply*)op); else assert(0); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index f49f22e1aef95..73e78a95ff520 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -362,7 +362,8 @@ private: void handle_osd_ping(class MOSDPing *m); void handle_op(class MOSDOp *m); - void handle_op_reply(class MOSDOpReply *m); + void handle_sub_op(class MOSDSubOp *m); + void handle_sub_op_reply(class MOSDSubOpReply *m); void force_remount(); }; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index dbdf9fff0c897..25a64a8ba101b 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1289,3 +1289,6 @@ bool PG::pick_object_rev(object_t& oid) + + + diff --git a/src/osd/PG.h b/src/osd/PG.h index 16978922f9a06..b41f8d061de12 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -38,6 +38,8 @@ using namespace __gnu_cxx; class OSD; class MOSDOp; class MOSDOpReply; +class MOSDSubOp; +class MOSDSubOpReply; class MOSDPGActivateSet; /** PG - Replica Placement Group @@ -493,7 +495,6 @@ protected: map objects_pulling; // which objects are currently being pulled - // stats off_t stat_size; off_t stat_num_blocks; @@ -645,14 +646,15 @@ public: // abstract bits virtual bool preprocess_op(MOSDOp *op, utime_t now) { return false; } virtual void do_op(MOSDOp *op) = 0; - virtual void do_op_reply(MOSDOpReply *op) = 0; + virtual void do_sub_op(MOSDSubOp *op) = 0; + virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; virtual bool same_for_read_since(epoch_t e) = 0; virtual bool same_for_modify_since(epoch_t e) = 0; virtual bool same_for_rep_modify_since(epoch_t e) = 0; virtual bool is_missing_object(object_t oid) = 0; - virtual void wait_for_missing_object(object_t oid, MOSDOp *op) = 0; + virtual void wait_for_missing_object(object_t oid, Message *op) = 0; virtual void on_osd_failure(int osd) = 0; virtual void on_acker_change() = 0; diff --git a/src/osd/RAID4PG.cc b/src/osd/RAID4PG.cc index 62eb6d05501cd..740aa1d5585ab 100644 --- a/src/osd/RAID4PG.cc +++ b/src/osd/RAID4PG.cc @@ -70,7 +70,12 @@ void RAID4PG::do_op(MOSDOp *op) } -void RAID4PG::do_op_reply(MOSDOpReply *reply) +void RAID4PG::do_sub_op(MOSDSubOp *op) +{ + +} + +void RAID4PG::do_sub_op_reply(MOSDSubOpReply *reply) { } @@ -104,7 +109,7 @@ bool RAID4PG::is_missing_object(object_t oid) return false; } -void RAID4PG::wait_for_missing_object(object_t oid, MOSDOp *op) +void RAID4PG::wait_for_missing_object(object_t oid, Message *op) { //assert(0); } diff --git a/src/osd/RAID4PG.h b/src/osd/RAID4PG.h index 0469f867f7e95..010371a25fd92 100644 --- a/src/osd/RAID4PG.h +++ b/src/osd/RAID4PG.h @@ -52,14 +52,15 @@ public: bool preprocess_op(MOSDOp *op, utime_t now); void do_op(MOSDOp *op); - void do_op_reply(MOSDOpReply *r); + void do_sub_op(MOSDSubOp *op); + void do_sub_op_reply(MOSDSubOpReply *r); bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); bool same_for_rep_modify_since(epoch_t e); bool is_missing_object(object_t oid); - void wait_for_missing_object(object_t oid, MOSDOp *op); + void wait_for_missing_object(object_t oid, Message *op); void on_osd_failure(int o); void on_acker_change(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 78d63a2a8486d..e84b30f93ea71 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -19,6 +19,8 @@ #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDSubOp.h" +#include "messages/MOSDSubOpReply.h" #include "messages/MOSDPGNotify.h" #include "messages/MOSDPGRemove.h" @@ -54,14 +56,8 @@ bool ReplicatedPG::same_for_modify_since(epoch_t e) bool ReplicatedPG::same_for_rep_modify_since(epoch_t e) { // check osd map: same set, or primary+acker? - - if (g_conf.osd_rep == OSD_REP_CHAIN) { - return e >= info.history.same_since; // whole pg set same - } else { - // primary, splay - return (e >= info.history.same_primary_since && - e >= info.history.same_acker_since); - } + return (e >= info.history.same_primary_since && + e >= info.history.same_acker_since); } // ==================== @@ -73,7 +69,7 @@ bool ReplicatedPG::is_missing_object(object_t oid) } -void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op) +void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m) { assert(is_missing_object(oid)); @@ -93,11 +89,11 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op) << dendl; pull(oid); } - waiting_for_missing_object[oid].push_back(op); + waiting_for_missing_object[oid].push_back(m); } - +// ========================================================== /** preprocess_op - preprocess an op (before it gets queued). * fasttrack read @@ -364,12 +360,43 @@ void ReplicatedPG::do_op(MOSDOp *op) op_read(op); break; + // writes + case CEPH_OSD_OP_WRNOOP: + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_TRUNCATE: + case CEPH_OSD_OP_WRLOCK: + case CEPH_OSD_OP_WRUNLOCK: + case CEPH_OSD_OP_RDLOCK: + case CEPH_OSD_OP_RDUNLOCK: + case CEPH_OSD_OP_UPLOCK: + case CEPH_OSD_OP_DNLOCK: + case CEPH_OSD_OP_BALANCEREADS: + case CEPH_OSD_OP_UNBALANCEREADS: + op_modify(op); + break; + + default: + assert(0); + } +} + + +void ReplicatedPG::do_sub_op(MOSDSubOp *op) +{ + dout(15) << "do_sub_op " << *op << dendl; + + osd->logger->inc("subop"); + + switch (op->get_op()) { + // rep stuff case CEPH_OSD_OP_PULL: - op_pull(op); + sub_op_pull(op); break; case CEPH_OSD_OP_PUSH: - op_push(op); + sub_op_push(op); break; // writes @@ -386,48 +413,27 @@ void ReplicatedPG::do_op(MOSDOp *op) case CEPH_OSD_OP_DNLOCK: case CEPH_OSD_OP_BALANCEREADS: case CEPH_OSD_OP_UNBALANCEREADS: - if (op->get_source().is_osd()) { - op_rep_modify(op); - } else { - // go go gadget pg - op_modify(op); - } + sub_op_modify(op); break; default: assert(0); } + } -void ReplicatedPG::do_op_reply(MOSDOpReply *r) +void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r) { if (r->get_op() == CEPH_OSD_OP_PUSH) { // continue peer recovery - op_push_reply(r); + sub_op_push_reply(r); } else { - // must be replication. - tid_t rep_tid = r->get_rep_tid(); - int fromosd = r->get_source().num(); - - osd->take_peer_stat(fromosd, r->get_peer_stat()); - - if (rep_gather.count(rep_tid)) { - // oh, good. - repop_ack(rep_gather[rep_tid], - r->get_result(), r->get_commit(), - fromosd, - r->get_pg_complete_thru()); - delete r; - } else { - // early ack. - waiting_for_repop[rep_tid].push_back(r); - } + sub_op_modify_reply(r); } } - // ======================================================================== // READS @@ -561,30 +567,28 @@ void ReplicatedPG::op_read(MOSDOp *op) // MODIFY void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, + osdreqid_t reqid, pobject_t poid, int op, eversion_t version, objectrev_t crev, objectrev_t rev, eversion_t trim_to) { - const object_t oid = op->get_oid(); - // clone entry? if (crev && rev && rev > crev) { eversion_t cv = version; cv.version--; - Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv, op->get_reqid()); + Log::Entry cloneentry(PG::Log::Entry::CLONE, poid.oid, cv, reqid); log.add(cloneentry); - dout(10) << "prepare_log_transaction " << op->get_op() + dout(10) << "prepare_log_transaction " << op << " " << cloneentry << dendl; } // actual op int opcode = Log::Entry::MODIFY; - if (op->get_op() == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE; - Log::Entry logentry(opcode, oid, version, op->get_reqid()); + if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE; + Log::Entry logentry(opcode, poid.oid, version, reqid); - dout(10) << "prepare_log_transaction " << op->get_op() + dout(10) << "prepare_log_transaction " << op << " " << logentry << dendl; @@ -602,24 +606,22 @@ void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t, /** prepare_op_transaction * apply an op to the store wrapped in a transaction. */ -void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev) +void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, const osdreqid_t& reqid, + pg_t pgid, int op, pobject_t poid, + off_t offset, off_t length, bufferlist& bl, + eversion_t& version, objectrev_t crev, objectrev_t rev) { - const object_t oid = op->get_oid(); - const pg_t pgid = op->get_pg(); - bool did_clone = false; - dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op->get_op() ) - << " " << oid + dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op ) + << " " << poid << " v " << version << " crev " << crev << " rev " << rev << dendl; // WRNOOP does nothing. - if (op->get_op() == CEPH_OSD_OP_WRNOOP) + if (op == CEPH_OSD_OP_WRNOOP) return; // raise last_complete? @@ -635,45 +637,46 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, // clone? if (crev && rev && rev > crev) { - object_t noid = oid; - noid.rev = rev; - dout(10) << "prepare_op_transaction cloning " << oid << " crev " << crev << " to " << noid << dendl; - t.clone(oid, noid); + assert(0); + pobject_t noid = poid; // FIXME **** + noid.oid.rev = rev; + dout(10) << "prepare_op_transaction cloning " << poid << " crev " << crev << " to " << noid << dendl; + t.clone(poid, noid); did_clone = true; } // apply the op - switch (op->get_op()) { + switch (op) { // -- locking -- case CEPH_OSD_OP_WRLOCK: { // lock object - t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t)); + t.setattr(poid, "wrlock", &reqid.name, sizeof(entity_name_t)); } break; case CEPH_OSD_OP_WRUNLOCK: { // unlock objects - t.rmattr(oid, "wrlock"); + t.rmattr(poid, "wrlock"); } break; case CEPH_OSD_OP_MININCLOCK: { - uint32_t mininc = op->get_length(); - t.setattr(oid, "mininclock", &mininc, sizeof(mininc)); + uint32_t mininc = length; + t.setattr(poid, "mininclock", &mininc, sizeof(mininc)); } break; case CEPH_OSD_OP_BALANCEREADS: { bool bal = true; - t.setattr(oid, "balance-reads", &bal, sizeof(bal)); + t.setattr(poid, "balance-reads", &bal, sizeof(bal)); } break; case CEPH_OSD_OP_UNBALANCEREADS: { - t.rmattr(oid, "balance-reads"); + t.rmattr(poid, "balance-reads"); } break; @@ -682,12 +685,10 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, case CEPH_OSD_OP_WRITE: { // write - assert(op->get_data().length() == op->get_length()); - bufferlist bl; - bl.claim( op->get_data() ); // give buffers to store; we keep *op in memory for a long time! - - //if (oid < 100000000000000ULL) // hack hack-- don't write client data - t.write( oid, op->get_offset(), op->get_length(), bl ); + assert(bl.length() == length); + bufferlist nbl; + nbl.claim(bl); // give buffers to store; we keep *op in memory for a long time! + t.write(poid, offset, length, nbl); } break; @@ -695,38 +696,28 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, { // zero, remove, or truncate? struct stat st; - int r = osd->store->stat(oid, &st); + int r = osd->store->stat(poid, &st); if (r >= 0) { - if (op->get_length() == 0 || - op->get_offset() + (off_t)op->get_length() >= (off_t)st.st_size) { - if (op->get_offset()) - t.truncate(oid, op->get_length() + op->get_offset()); - else - t.remove(oid); - } else { - // zero. the dumb way. FIXME. - bufferptr bp(op->get_length()); - bp.zero(); - bufferlist bl; - bl.push_back(bp); - t.write(oid, op->get_offset(), op->get_length(), bl); - } + if (offset == 0 && offset + length >= (off_t)st.st_size) + t.remove(poid); + else + t.zero(poid, offset, length); } else { // noop? - dout(10) << "apply_transaction zero on " << oid << ", but dne? stat returns " << r << dendl; + dout(10) << "apply_transaction zero on " << poid << ", but dne? stat returns " << r << dendl; } } break; case CEPH_OSD_OP_TRUNCATE: { // truncate - t.truncate(oid, op->get_length() ); + t.truncate(poid, length); } break; case CEPH_OSD_OP_DELETE: { // delete - t.remove(oid); + t.remove(poid); } break; @@ -735,20 +726,20 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, } // object collection, version - if (op->get_op() == CEPH_OSD_OP_DELETE) { + if (op == CEPH_OSD_OP_DELETE) { // remove object from c - t.collection_remove(pgid, oid); + t.collection_remove(pgid, poid); } else { // add object to c - t.collection_add(pgid, oid); + t.collection_add(pgid, poid); // object version - t.setattr(oid, "version", &version, sizeof(version)); + t.setattr(poid, "version", &version, sizeof(version)); // set object crev if (crev == 0 || // new object did_clone) // we cloned - t.setattr(oid, "crev", &rev, sizeof(rev)); + t.setattr(poid, "crev", &rev, sizeof(rev)); } } @@ -899,42 +890,30 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) } -void ReplicatedPG::issue_repop(MOSDOp *op, int dest, utime_t now) +void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) { - object_t oid = op->get_oid(); - - dout(7) << " issue_repop rep_tid " << op->get_rep_tid() - << " o " << oid + pobject_t poid = repop->op->get_oid(); + dout(7) << " issue_repop rep_tid " << repop->rep_tid + << " o " << poid << " to osd" << dest << dendl; // forward the write/update/whatever - MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid, - oid, - ObjectLayout(info.pgid), - osd->osdmap->get_epoch(), - op->get_op()); - wr->get_data() = op->get_data(); // _copy_ bufferlist - wr->set_length(op->get_length()); - wr->set_offset(op->get_offset()); - wr->set_version(op->get_version()); - - wr->set_rep_tid(op->get_rep_tid()); + MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid, + repop->op->get_op(), + repop->op->get_offset(), repop->op->get_length(), + osd->osdmap->get_epoch(), + repop->rep_tid, repop->new_version); + wr->get_data() = repop->op->get_data(); // _copy_ bufferlist wr->set_pg_trim_to(peers_complete_thru); - wr->set_peer_stat(osd->get_my_stat_for(now, dest)); - osd->messenger->send_message(wr, osd->osdmap->get_inst(dest)); } -ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op) +ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv) { dout(10) << "new_rep_gather rep_tid " << op->get_rep_tid() << " on " << *op << dendl; - int whoami = osd->get_nodeid(); - - RepGather *repop = new RepGather(op, op->get_rep_tid(), - op->get_version(), - info.last_complete); + RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete); // osds. commits all come to me. for (unsigned i=0; iwaitfor_commit.insert(osd); } - // acks vary: - if (g_conf.osd_rep == OSD_REP_CHAIN) { - // chain rep. - // there's my local ack... - repop->osds.insert(whoami); - repop->waitfor_ack.insert(whoami); - repop->waitfor_commit.insert(whoami); - - // also, the previous guy will ack to me - int myrank = osd->osdmap->calc_pg_rank(whoami, acting); - if (myrank > 0) { - int osd = acting[ myrank-1 ]; - repop->osds.insert(osd); - repop->waitfor_ack.insert(osd); - repop->waitfor_commit.insert(osd); - } - } else { - // primary, splay. all osds ack to me. - for (unsigned i=0; iwaitfor_ack.insert(osd); - } + // primary. all osds ack to me. + for (unsigned i=0; iwaitfor_ack.insert(osd); } - + repop->start = g_clock.now(); - rep_gather[ repop->rep_tid ] = repop; + rep_gather[repop->rep_tid] = repop; // anyone waiting? (acks that got here before the op did) if (waiting_for_repop.count(repop->rep_tid)) { @@ -1102,7 +1063,7 @@ objectrev_t ReplicatedPG::assign_version(MOSDOp *op) class C_OSD_RepModifyCommit : public Context { public: ReplicatedPG *pg; - MOSDOp *op; + MOSDSubOp *op; int destosd; eversion_t pg_last_complete; @@ -1112,7 +1073,7 @@ public: bool acked; bool waiting; - C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) : + C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : pg(p), op(oo), destosd(dosd), pg_last_complete(lc), acked(false), waiting(false) { pg->get(); // we're copying the pointer. @@ -1128,7 +1089,7 @@ public: lock.Unlock(); pg->lock(); - pg->op_rep_modify_commit(op, destosd, pg_last_complete); + pg->sub_op_modify_commit(op, destosd, pg_last_complete); pg->put_unlock(); } void ack() { @@ -1224,95 +1185,50 @@ void ReplicatedPG::op_modify(MOSDOp *op) utime_t now = g_clock.now(); // issue replica writes - RepGather *repop = 0; - bool alone = (acting.size() == 1); tid_t rep_tid = osd->get_tid(); - op->set_rep_tid(rep_tid); - - if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) { - // chain rep. send to #2 only. - int next = acting[1]; - if (acting.size() > 2) - next = acting[2]; - issue_repop(op, next, now); - } - else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) { - // splay rep. send to rest. - for (unsigned i=1; i=1; --i) - issue_repop(op, acting[i], now); - } else { - // primary rep, or alone. - repop = new_rep_gather(op); + RepGather *repop = new_rep_gather(op, rep_tid, nv); + for (unsigned i=1; iget_op() != CEPH_OSD_OP_WRNOOP) { + // log and update later. + pobject_t poid = oid; + prepare_log_transaction(repop->t, op->get_reqid(), poid, op->get_op(), nv, + crev, op->get_rev(), peers_complete_thru); + prepare_op_transaction(repop->t, op->get_reqid(), + info.pgid, op->get_op(), poid, + op->get_offset(), op->get_length(), op->get_data(), + nv, crev, op->get_rev()); } - - if (repop) { - // we are acker. - if (op->get_op() != CEPH_OSD_OP_WRNOOP) { - // log and update later. - prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), peers_complete_thru); - prepare_op_transaction(repop->t, op, nv, crev, op->get_rev()); - } - - // (logical) local ack. - // (if alone, this will apply the update.) - get_rep_gather(repop); - { - assert(repop->waitfor_ack.count(whoami)); - repop->waitfor_ack.erase(whoami); - } - put_rep_gather(repop); - - } else { - // not acker. - // chain or splay. apply. - ObjectStore::Transaction t; - prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru); - prepare_op_transaction(t, op, nv, crev, op->get_rev()); - - C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(), - info.last_complete); - unsigned r = osd->store->apply_transaction(t, oncommit); - if (r != 0 && // no errors - r != 2) { // or error on collection_add - derr(0) << "error applying transaction: r = " << r << dendl; - assert(r == 0); - } - - // lets evict the data from our cache to maintain a total large cache size - if (g_conf.osd_exclusive_caching) - osd->store->trim_from_cache(op->get_oid(), op->get_offset(), op->get_length()); - - oncommit->ack(); + + // (logical) local ack. + // (if alone, this will apply the update.) + get_rep_gather(repop); + { + assert(repop->waitfor_ack.count(whoami)); + repop->waitfor_ack.erase(whoami); } - + put_rep_gather(repop); } -// replicated - - - +// sub op modify -void ReplicatedPG::op_rep_modify(MOSDOp *op) +void ReplicatedPG::sub_op_modify(MOSDSubOp *op) { - object_t oid = op->get_oid(); + pobject_t poid = op->get_poid(); eversion_t nv = op->get_version(); const char *opname = MOSDOp::get_opname(op->get_op()); // check crev objectrev_t crev = 0; - osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); + osd->store->getattr(poid, "crev", (char*)&crev, sizeof(crev)); - dout(10) << "op_rep_modify " << opname - << " " << oid + dout(10) << "sub_op_modify " << opname + << " " << poid << " v " << nv << " " << op->get_offset() << "~" << op->get_length() << dendl; @@ -1322,113 +1238,52 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) osd->take_peer_stat(fromosd, op->get_peer_stat()); // we better not be missing this. - assert(!missing.is_missing(oid)); + assert(!missing.is_missing(poid.oid)); // prepare our transaction ObjectStore::Transaction t; - // am i acker? - RepGather *repop = 0; + // do op int ackerosd = acting[0]; - - if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) { - ackerosd = get_acker(); - - if (is_acker()) { - // i am tail acker. - if (rep_gather.count(op->get_rep_tid())) { - repop = rep_gather[ op->get_rep_tid() ]; - } else { - repop = new_rep_gather(op); - } - - // infer ack from source - get_rep_gather(repop); - { - //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice. - repop->waitfor_ack.erase(fromosd); - } - put_rep_gather(repop); - - // prepare dest socket - //messenger->prepare_send_message(op->get_client()); - } - - // chain? forward? - if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) { - // chain rep, not at the tail yet. - int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting); - int next = myrank+1; - if (next == (int)acting.size()) - next = 1; - issue_repop(op, acting[next], g_clock.now()); - } - } - - // do op? - C_OSD_RepModifyCommit *oncommit = 0; - osd->logger->inc("r_wr"); osd->logger->inc("r_wrb", op->get_length()); - if (repop) { - // acker. we'll apply later. - if (op->get_op() != CEPH_OSD_OP_WRNOOP) { - prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), op->get_pg_trim_to()); - prepare_op_transaction(repop->t, op, nv, crev, op->get_rev()); - } - } else { - // middle|replica. - if (op->get_op() != CEPH_OSD_OP_WRNOOP) { - prepare_log_transaction(t, op, nv, crev, op->get_rev(), op->get_pg_trim_to()); - prepare_op_transaction(t, op, nv, crev, op->get_rev()); - } - - oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete); - - // apply log update. and possibly update itself. - unsigned tr = osd->store->apply_transaction(t, oncommit); - if (tr != 0 && // no errors - tr != 2) { // or error on collection_add - derr(0) << "error applying transaction: r = " << tr << dendl; - assert(tr == 0); - } + if (op->get_op() != CEPH_OSD_OP_WRNOOP) { + prepare_log_transaction(t, op->get_reqid(), op->get_poid(), op->get_op(), op->get_version(), + crev, 0, op->get_pg_trim_to()); + prepare_op_transaction(t, op->get_reqid(), + info.pgid, op->get_op(), poid, + op->get_offset(), op->get_length(), op->get_data(), + nv, crev, 0); } - // ack? - if (repop) { - // (logical) local ack. this may induce the actual update. - get_rep_gather(repop); - { - assert(repop->waitfor_ack.count(osd->get_nodeid())); - repop->waitfor_ack.erase(osd->get_nodeid()); - } - put_rep_gather(repop); - } - else { - // send ack to acker? - if (g_conf.osd_rep != OSD_REP_CHAIN) { - MOSDOpReply *ack = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), false); - ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd)); - osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd)); - } - - // ack myself. - assert(oncommit); - oncommit->ack(); + C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete); + + // apply log update. and possibly update itself. + unsigned tr = osd->store->apply_transaction(t, oncommit); + if (tr != 0 && // no errors + tr != 2) { // or error on collection_add + derr(0) << "error applying transaction: r = " << tr << dendl; + assert(tr == 0); } - + + // send ack to acker + MOSDSubOpReply *ack = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false); + ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd)); + osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd)); + + // ack myself. + oncommit->ack(); } - -void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete) +void ReplicatedPG::sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete) { // send commit. dout(10) << "rep_modify_commit on op " << *op << ", sending commit to osd" << ackerosd << dendl; if (osd->osdmap->is_up(ackerosd)) { - MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); + MOSDSubOpReply *commit = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), true); commit->set_pg_complete_thru(last_complete); commit->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd)); osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd)); @@ -1436,6 +1291,27 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las } } +void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r) +{ + // must be replication. + tid_t rep_tid = r->get_rep_tid(); + int fromosd = r->get_source().num(); + + osd->take_peer_stat(fromosd, r->get_peer_stat()); + + if (rep_gather.count(rep_tid)) { + // oh, good. + repop_ack(rep_gather[rep_tid], + r->get_result(), r->get_commit(), + fromosd, + r->get_pg_complete_thru()); + delete r; + } else { + // early ack. + waiting_for_repop[rep_tid].push_back(r); + } +} + @@ -1449,36 +1325,35 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las /** pull - request object from a peer */ -void ReplicatedPG::pull(object_t oid) +void ReplicatedPG::pull(pobject_t poid) { - assert(missing.loc.count(oid)); - eversion_t v = missing.missing[oid]; - int fromosd = missing.loc[oid]; + assert(missing.loc.count(poid.oid)); + eversion_t v = missing.missing[poid.oid]; + int fromosd = missing.loc[poid.oid]; - dout(7) << "pull " << oid + dout(7) << "pull " << poid << " v " << v << " from osd" << fromosd << dendl; // send op + osdreqid_t rid; tid_t tid = osd->get_tid(); - MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, tid, - oid, info.pgid, - osd->osdmap->get_epoch(), - CEPH_OSD_OP_PULL); - op->set_version(v); - osd->messenger->send_message(op, osd->osdmap->get_inst(fromosd)); + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL, + 0, 0, + osd->osdmap->get_epoch(), tid, v); + osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd)); // take note - assert(objects_pulling.count(oid) == 0); + assert(objects_pulling.count(poid.oid) == 0); num_pulling++; - objects_pulling[oid] = v; + objects_pulling[poid.oid] = v; } /** push - send object to a peer */ -void ReplicatedPG::push(object_t oid, int peer) +void ReplicatedPG::push(pobject_t poid, int peer) { // read data+attrs bufferlist bl; @@ -1487,15 +1362,15 @@ void ReplicatedPG::push(object_t oid, int peer) map attrset; ObjectStore::Transaction t; - t.read(oid, 0, 0, &bl); - t.getattr(oid, "version", &v, &vlen); - t.getattrs(oid, attrset); + t.read(poid, 0, 0, &bl); + t.getattr(poid, "version", &v, &vlen); + t.getattrs(poid, attrset); unsigned tr = osd->store->apply_transaction(t); assert(tr == 0); // !!! // ok - dout(7) << "push " << oid << " v " << v + dout(7) << "push " << poid << " v " << v << " size " << bl.length() << " to osd" << peer << dendl; @@ -1504,36 +1379,59 @@ void ReplicatedPG::push(object_t oid, int peer) osd->logger->inc("r_pushb", bl.length()); // send - MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(), - oid, info.pgid, osd->osdmap->get_epoch(), - CEPH_OSD_OP_PUSH); - op->set_offset(0); - op->set_length(bl.length()); - op->set_data(bl); // note: claims bl, set length above here! - op->set_version(v); - op->set_attrset(attrset); - - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); + osdreqid_t rid; // useless? + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, bl.length(), + osd->osdmap->get_epoch(), osd->get_tid(), v); + subop->set_data(bl); // note: claims bl, set length above here! + subop->set_attrset(attrset); + osd->messenger->send_message(subop, osd->osdmap->get_inst(peer)); if (is_primary()) { - peer_missing[peer].got(oid); - pushing[oid].insert(peer); + peer_missing[peer].got(poid.oid); + pushing[poid.oid].insert(peer); } } +void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply) +{ + dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; + + int peer = reply->get_source().num(); + pobject_t poid = reply->get_poid(); + + if (pushing.count(poid.oid) && + pushing[poid.oid].count(peer)) { + pushing[poid.oid].erase(peer); + + if (peer_missing.count(peer) == 0 || + peer_missing[peer].num_missing() == 0) + uptodate_set.insert(peer); + + if (pushing[poid.oid].empty()) { + dout(10) << "pushed " << poid << " to all replicas" << dendl; + do_peer_recovery(); + } else { + dout(10) << "pushed " << poid << ", still waiting for push ack from " + << pushing[poid.oid] << dendl; + } + } else { + dout(10) << "huh, i wasn't pushing " << poid << dendl; + } + delete reply; +} /** op_pull * process request to pull an entire object. * NOTE: called from opqueue. */ -void ReplicatedPG::op_pull(MOSDOp *op) +void ReplicatedPG::sub_op_pull(MOSDSubOp *op) { - const object_t oid = op->get_oid(); + const pobject_t poid = op->get_poid(); const eversion_t v = op->get_version(); int from = op->get_source().num(); - dout(7) << "op_pull " << oid << " v " << op->get_version() + dout(7) << "op_pull " << poid << " v " << op->get_version() << " from " << op->get_source() << dendl; @@ -1542,46 +1440,46 @@ void ReplicatedPG::op_pull(MOSDOp *op) // primary assert(peer_missing.count(from)); // we had better know this, from the peering process. - if (!peer_missing[from].is_missing(oid)) { + if (!peer_missing[from].is_missing(poid.oid)) { dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << dendl; delete op; return; } // do we have it yet? - if (is_missing_object(oid)) { - wait_for_missing_object(oid, op); + if (is_missing_object(poid.oid)) { + wait_for_missing_object(poid.oid, op); return; } } else { // non-primary - if (missing.is_missing(oid)) { - dout(7) << "op_pull not primary, and missing " << oid << ", ignoring" << dendl; + if (missing.is_missing(poid.oid)) { + dout(7) << "op_pull not primary, and missing " << poid << ", ignoring" << dendl; delete op; return; } } // push it back! - push(oid, op->get_source().num()); + push(poid, op->get_source().num()); } /** op_push * NOTE: called from opqueue. */ -void ReplicatedPG::op_push(MOSDOp *op) +void ReplicatedPG::sub_op_push(MOSDSubOp *op) { - object_t oid = op->get_oid(); + pobject_t poid = op->get_poid(); eversion_t v = op->get_version(); - if (!is_missing_object(oid)) { - dout(7) << "op_push not missing " << oid << dendl; + if (!is_missing_object(poid.oid)) { + dout(7) << "sub_op_push not missing " << poid << dendl; return; } dout(7) << "op_push " - << oid + << poid << " v " << v << " size " << op->get_length() << " " << op->get_data().length() << dendl; @@ -1590,16 +1488,16 @@ void ReplicatedPG::op_push(MOSDOp *op) // write object and add it to the PG ObjectStore::Transaction t; - t.remove(oid); // in case old version exists - t.write(oid, 0, op->get_length(), op->get_data()); - t.setattrs(oid, op->get_attrset()); - t.collection_add(info.pgid, oid); + t.remove(poid); // in case old version exists + t.write(poid, 0, op->get_length(), op->get_data()); + t.setattrs(poid, op->get_attrset()); + t.collection_add(info.pgid, poid); // close out pull op? num_pulling--; - if (objects_pulling.count(oid)) - objects_pulling.erase(oid); - missing.got(oid, v); + if (objects_pulling.count(poid.oid)) + objects_pulling.erase(poid.oid); + missing.got(poid.oid, v); // raise last_complete? @@ -1625,15 +1523,15 @@ void ReplicatedPG::op_push(MOSDOp *op) for (unsigned i=1; itake_waiters(waiting_for_missing_object[oid]); - waiting_for_missing_object.erase(oid); + if (waiting_for_missing_object.count(poid.oid)) { + osd->take_waiters(waiting_for_missing_object[poid.oid]); + waiting_for_missing_object.erase(poid.oid); } if (is_primary()) { @@ -1641,7 +1539,7 @@ void ReplicatedPG::op_push(MOSDOp *op) do_recovery(); } else { // ack if i'm a replica and being pushed to. - MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); + MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), false); osd->messenger->send_message(reply, op->get_source_inst()); } @@ -1650,8 +1548,9 @@ void ReplicatedPG::op_push(MOSDOp *op) - - +/* + * pg status change notification + */ void ReplicatedPG::on_osd_failure(int o) { @@ -1672,7 +1571,6 @@ void ReplicatedPG::on_osd_failure(int o) repop_ack(*p, -1, true, o); } - void ReplicatedPG::on_acker_change() { dout(10) << "on_acker_change" << dendl; @@ -1682,43 +1580,15 @@ void ReplicatedPG::on_change() { dout(10) << "on_change" << dendl; - if (g_conf.osd_rep == OSD_REP_PRIMARY || - g_conf.osd_rep == OSD_REP_SPLAY) { - // apply all local repops - // (pg is inactive; we will repeer) - for (hash_map::iterator p = rep_gather.begin(); - p != rep_gather.end(); - p++) - if (!p->second->applied) - apply_repop(p->second); - } - else if (g_conf.osd_rep == OSD_REP_CHAIN) { - // apply all local repops - // (pg is inactive; we will repeer) - // note: because we hose rep_gather, clients must resubmit ops on ANY pg membership change. - for (hash_map::iterator p = rep_gather.begin(); - p != rep_gather.end(); - p++) { - if (!p->second->applied) - apply_repop(p->second); - delete p->second->op; - delete p->second; - } - rep_gather.clear(); - - // and discard repop waiters (chain/splay artifact) - for (hash_map >::iterator p = waiting_for_repop.begin(); - p != waiting_for_repop.end(); - p++) - for (list::iterator pm = p->second.begin(); - pm != p->second.end(); - pm++) - delete *pm; - waiting_for_repop.clear(); - } + // apply all local repops + // (pg is inactive; we will repeer) + for (hash_map::iterator p = rep_gather.begin(); + p != rep_gather.end(); + p++) + if (!p->second->applied) + apply_repop(p->second); } - void ReplicatedPG::on_role_change() { dout(10) << "on_role_change" << dendl; @@ -1737,81 +1607,6 @@ void ReplicatedPG::on_role_change() - - -/** clean_up_local - * remove any objects that we're storing but shouldn't. - * as determined by log. - */ -void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) -{ - dout(10) << "clean_up_local" << dendl; - - assert(info.last_update >= log.bottom); // otherwise we need some help! - - if (log.backlog) { - - // FIXME: sloppy pobject vs object conversions abound! *** - - // be thorough. - list ls; - osd->store->collection_list(info.pgid, ls); - set s; - - for (list::iterator i = ls.begin(); - i != ls.end(); - i++) - s.insert(i->oid); - - set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); - p++) { - if (did.count(p->oid)) continue; - did.insert(p->oid); - - if (p->is_delete()) { - if (s.count(p->oid)) { - dout(10) << " deleting " << p->oid - << " when " << p->version << dendl; - t.remove(p->oid); - } - s.erase(p->oid); - } else { - // just leave old objects.. they're missing or whatever - s.erase(p->oid); - } - } - - for (set::iterator i = s.begin(); - i != s.end(); - i++) { - dout(10) << " deleting stray " << *i << dendl; - t.remove(*i); - } - - } else { - // just scan the log. - set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); - p++) { - if (did.count(p->oid)) continue; - did.insert(p->oid); - - if (p->is_delete()) { - dout(10) << " deleting " << p->oid - << " when " << p->version << dendl; - t.remove(p->oid); - } else { - // keep old(+missing) objects, just for kicks. - } - } - } -} - - - void ReplicatedPG::cancel_recovery() { // forget about where missing items are, or anything we're pulling @@ -1943,34 +1738,6 @@ void ReplicatedPG::do_peer_recovery() finish_recovery(); } -void ReplicatedPG::op_push_reply(MOSDOpReply *reply) -{ - dout(10) << "op_push_reply from " << reply->get_source() << " " << *reply << dendl; - - int peer = reply->get_source().num(); - object_t oid = reply->get_oid(); - - if (pushing.count(oid) && - pushing[oid].count(peer)) { - pushing[oid].erase(peer); - - if (peer_missing.count(peer) == 0 || - peer_missing[peer].num_missing() == 0) - uptodate_set.insert(peer); - - if (pushing[oid].empty()) { - dout(10) << "pushed " << oid << " to all replicas" << dendl; - do_peer_recovery(); - } else { - dout(10) << "pushed " << oid << ", still waiting for push ack from " - << pushing[oid] << dendl; - } - } else { - dout(10) << "huh, i wasn't pushing " << oid << dendl; - } - delete reply; -} - void ReplicatedPG::purge_strays() { dout(10) << "purge_strays " << stray_set << dendl; @@ -1988,3 +1755,75 @@ void ReplicatedPG::purge_strays() stray_set.clear(); } + + +/** clean_up_local + * remove any objects that we're storing but shouldn't. + * as determined by log. + */ +void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) +{ + dout(10) << "clean_up_local" << dendl; + + assert(info.last_update >= log.bottom); // otherwise we need some help! + + if (log.backlog) { + + // FIXME: sloppy pobject vs object conversions abound! *** + + // be thorough. + list ls; + osd->store->collection_list(info.pgid, ls); + set s; + + for (list::iterator i = ls.begin(); + i != ls.end(); + i++) + s.insert(i->oid); + + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + if (s.count(p->oid)) { + dout(10) << " deleting " << p->oid + << " when " << p->version << dendl; + t.remove(p->oid); + } + s.erase(p->oid); + } else { + // just leave old objects.. they're missing or whatever + s.erase(p->oid); + } + } + + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + dout(10) << " deleting stray " << *i << dendl; + t.remove(*i); + } + + } else { + // just scan the log. + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + dout(10) << " deleting " << p->oid + << " when " << p->version << dendl; + t.remove(p->oid); + } else { + // keep old(+missing) objects, just for kicks. + } + } + } +} diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 21753debe0922..16aa7ea4a0915 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -18,7 +18,8 @@ #include "PG.h" #include "messages/MOSDOp.h" - +class MOSDSubOp; +class MOSDSubOpReply; class ReplicatedPG : public PG { public: @@ -80,8 +81,8 @@ protected: void get_rep_gather(RepGather*); void apply_repop(RepGather *repop); void put_rep_gather(RepGather*); - void issue_repop(MOSDOp *op, int osd, utime_t now); - RepGather *new_rep_gather(MOSDOp *op); + void issue_repop(RepGather *repop, int dest, utime_t now); + RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv); void repop_ack(RepGather *repop, int result, bool commit, int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); @@ -90,21 +91,22 @@ protected: int num_pulling; map > pushing; - void push(object_t oid, int dest); - void pull(object_t oid); + void push(pobject_t oid, int dest); + void pull(pobject_t oid); // modify objectrev_t assign_version(MOSDOp *op); void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru); - void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete); + void sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete); void prepare_log_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, + osdreqid_t reqid, pobject_t poid, int op, eversion_t version, objectrev_t crev, objectrev_t rev, eversion_t trim_to); - void prepare_op_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev); + void prepare_op_transaction(ObjectStore::Transaction& t, const osdreqid_t& reqid, + pg_t pgid, int op, pobject_t poid, + off_t offset, off_t length, bufferlist& bl, + eversion_t& version, objectrev_t crev, objectrev_t rev); friend class C_OSD_ModifyCommit; friend class C_OSD_RepModifyCommit; @@ -122,11 +124,12 @@ protected: void op_read(MOSDOp *op); void op_modify(MOSDOp *op); - void op_rep_modify(MOSDOp *op); - void op_push(MOSDOp *op); - void op_pull(MOSDOp *op); - void op_push_reply(MOSDOpReply *reply); + void sub_op_modify(MOSDSubOp *op); + void sub_op_modify_reply(MOSDSubOpReply *reply); + void sub_op_push(MOSDSubOp *op); + void sub_op_push_reply(MOSDSubOpReply *reply); + void sub_op_pull(MOSDSubOp *op); public: @@ -138,14 +141,15 @@ public: bool preprocess_op(MOSDOp *op, utime_t now); void do_op(MOSDOp *op); - void do_op_reply(MOSDOpReply *r); + void do_sub_op(MOSDSubOp *op); + void do_sub_op_reply(MOSDSubOpReply *op); bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); bool same_for_rep_modify_since(epoch_t e); bool is_missing_object(object_t oid); - void wait_for_missing_object(object_t oid, MOSDOp *op); + void wait_for_missing_object(object_t oid, Message *op); void on_osd_failure(int o); void on_acker_change(); -- 2.39.5