From: Jacek J. Lakis Date: Thu, 6 Aug 2015 10:14:17 +0000 (+0200) Subject: MOSDOp::decode : Splitting message decoding, new version X-Git-Tag: v10.0.0~128^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=818d790f7d424520bc96c651571f2c86e94caf1e;p=ceph.git MOSDOp::decode : Splitting message decoding, new version Signed-off-by: Jacek J. Lakis --- diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h old mode 100644 new mode 100755 index 4857b0a8eb1..96f53f1172b --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -69,6 +69,7 @@ #define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3 (1ULL<<53) #define CEPH_FEATURE_OSD_HITSET_GMT (1ULL<<54) #define CEPH_FEATURE_HAMMER_0_94_4 (1ULL<<55) +#define CEPH_FEATURE_NEW_OSDOP_ENCODING (1ULL<<56) /* New, v7 encoding */ #define CEPH_FEATURE_RESERVED2 (1ULL<<61) /* slow down, we are almost out... */ #define CEPH_FEATURE_RESERVED (1ULL<<62) /* DO NOT USE THIS ... last bit! */ @@ -131,6 +132,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) { CEPH_FEATURE_OSD_HBMSGS | \ CEPH_FEATURE_MDSENC | \ CEPH_FEATURE_OSDHASHPSPOOL | \ + CEPH_FEATURE_NEW_OSDOP_ENCODING | \ CEPH_FEATURE_MON_SINGLE_PAXOS | \ CEPH_FEATURE_OSD_SNAPMAPPER | \ CEPH_FEATURE_MON_SCRUB | \ diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h old mode 100644 new mode 100755 index 5d4ab62af37..345afa129ae --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -32,7 +32,7 @@ class OSD; class MOSDOp : public Message { - static const int HEAD_VERSION = 6; + static const int HEAD_VERSION = 7; static const int COMPAT_VERSION = 3; private: @@ -46,6 +46,11 @@ private: object_t oid; object_locator_t oloc; pg_t pgid; + bufferlist::iterator p; + // Decoding flags. Decoding is only needed for messages catched by pipe reader. + bool partial_decode_needed; + bool final_decode_needed; + // public: vector ops; private: @@ -61,12 +66,8 @@ private: public: friend class MOSDOpReply; - // read - const snapid_t& get_snapid() { return snapid; } + ceph_tid_t get_client_tid() { return header.tid; } void set_snapid(const snapid_t& s) { snapid = s; } - // writ - const snapid_t& get_snap_seq() const { return snap_seq; } - const vector &get_snaps() const { return snaps; } void set_snaps(const vector& i) { snaps = i; } @@ -75,7 +76,13 @@ public: reqid = rid; } + // Fields decoded in partial decoding + const pg_t& get_pg() const { assert(!partial_decode_needed); return pgid; } + epoch_t get_map_epoch() { assert(!partial_decode_needed); return osdmap_epoch; } + int get_flags() const { assert(!partial_decode_needed); return flags; } + const eversion_t& get_version() { assert(!partial_decode_needed); return reassert_version; } osd_reqid_t get_reqid() const { + assert(!partial_decode_needed); if (reqid != osd_reqid_t()) return reqid; else @@ -83,32 +90,40 @@ public: client_inc, header.tid); } - int get_client_inc() { return client_inc; } - ceph_tid_t get_client_tid() { return header.tid; } - - object_t& get_oid() { return oid; } - const pg_t& get_pg() const { return pgid; } - - const object_locator_t& get_object_locator() const { - return oloc; + // Fields decoded in final decoding + int get_client_inc() { assert(!final_decode_needed); return client_inc; } + utime_t get_mtime() { assert(!final_decode_needed); return mtime; } + const object_locator_t& get_object_locator() const { assert(!final_decode_needed); return oloc; } + object_t& get_oid() { assert(!final_decode_needed); return oid; } + const snapid_t& get_snapid() { assert(!final_decode_needed); return snapid; } + const snapid_t& get_snap_seq() const { assert(!final_decode_needed); return snap_seq; } + const vector &get_snaps() const { assert(!final_decode_needed); return snaps; } + /** + * get retry attempt + * + * 0 is the first attempt. + * + * @return retry attempt, or -1 if we don't know + */ + int get_retry_attempt() const { + return retry_attempt; + } + uint64_t get_features() const { + if (features) + return features; + return get_connection()->get_features(); } - - epoch_t get_map_epoch() { return osdmap_epoch; } - - const eversion_t& get_version() { return reassert_version; } - - utime_t get_mtime() { return mtime; } MOSDOp() - : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { } + : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), partial_decode_needed(true), final_decode_needed(true) { } MOSDOp(int inc, long tid, object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch, int _flags, uint64_t feat) : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), client_inc(inc), osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1), - oid(_oid), oloc(_oloc), pgid(_pgid), + oid(_oid), oloc(_oloc), pgid(_pgid), partial_decode_needed(false), final_decode_needed(false), features(feat) { set_tid(tid); } @@ -154,16 +169,7 @@ public: add_simple_op(CEPH_OSD_OP_STAT, 0, 0); } - uint64_t get_features() const { - if (features) - return features; - return get_connection()->get_features(); - } - - // flags - int get_flags() const { return flags; } bool has_flag(__u32 flag) { return flags & flag; }; - bool wants_ack() const { return flags & CEPH_OSD_FLAG_ACK; } bool wants_ondisk() const { return flags & CEPH_OSD_FLAG_ONDISK; } bool wants_onnvram() const { return flags & CEPH_OSD_FLAG_ONNVRAM; } @@ -181,17 +187,6 @@ public: retry_attempt = a; } - /** - * get retry attempt - * - * 0 is the first attempt. - * - * @return retry attempt, or -1 if we don't know - */ - int get_retry_attempt() const { - return retry_attempt; - } - // marshalling virtual void encode_payload(uint64_t features) { @@ -248,22 +243,22 @@ struct ceph_osd_request_head { ::encode_nohead(oid.name, payload); ::encode_nohead(snaps, payload); - } else { - header.version = HEAD_VERSION; + } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) { + header.version = 6; ::encode(client_inc, payload); ::encode(osdmap_epoch, payload); ::encode(flags, payload); ::encode(mtime, payload); ::encode(reassert_version, payload); - ::encode(oloc, payload); ::encode(pgid, payload); + ::encode(oid, payload); __u16 num_ops = ops.size(); ::encode(num_ops, payload); for (unsigned i = 0; i < ops.size(); i++) - ::encode(ops[i].op, payload); + ::encode(ops[i].op, payload); ::encode(snapid, payload); ::encode(snap_seq, payload); @@ -272,11 +267,36 @@ struct ceph_osd_request_head { ::encode(retry_attempt, payload); ::encode(features, payload); ::encode(reqid, payload); + } else { + // new, reordered, v7 message encoding + header.version = HEAD_VERSION; + ::encode(pgid, payload); + ::encode(osdmap_epoch, payload); + ::encode(flags, payload); + ::encode(reassert_version, payload); + ::encode(reqid, payload); + ::encode(client_inc, payload); + ::encode(mtime, payload); + ::encode(oloc, payload); + ::encode(oid, payload); + + __u16 num_ops = ops.size(); + ::encode(num_ops, payload); + for (unsigned i = 0; i < ops.size(); i++) + ::encode(ops[i].op, payload); + + ::encode(snapid, payload); + ::encode(snap_seq, payload); + ::encode(snaps, payload); + + ::encode(retry_attempt, payload); + ::encode(features, payload); } } virtual void decode_payload() { - bufferlist::iterator p = payload.begin(); + assert(partial_decode_needed && final_decode_needed); + p = payload.begin(); if (header.version < 2) { // old decode @@ -320,8 +340,11 @@ struct ceph_osd_request_head { retry_attempt = -1; features = 0; reqid = osd_reqid_t(); - } else { - // new decode + OSDOp::split_osd_op_vector_in_data(ops, data); + // In old versions, final decoding is done in first step + final_decode_needed = false; + + } else if (header.version < 7) { ::decode(client_inc, p); ::decode(osdmap_epoch, p); ::decode(flags, p); @@ -338,6 +361,26 @@ struct ceph_osd_request_head { ::decode(pgid, p); } + } else { + // new, v7 decode, splitted to partial and final + ::decode(pgid, p); + ::decode(osdmap_epoch, p); + ::decode(flags, p); + ::decode(reassert_version, p); + ::decode(reqid, p); + } + + partial_decode_needed = false; + + } + + void finish_decode() { + assert(!partial_decode_needed); // partial decoding required + + if (!final_decode_needed) + return; //Message is already final decoded + + if (header.version < 7) { ::decode(oid, p); //::decode(ops, p); @@ -345,19 +388,19 @@ struct ceph_osd_request_head { ::decode(num_ops, p); ops.resize(num_ops); for (unsigned i = 0; i < num_ops; i++) - ::decode(ops[i].op, p); + ::decode(ops[i].op, p); ::decode(snapid, p); ::decode(snap_seq, p); ::decode(snaps, p); if (header.version >= 4) - ::decode(retry_attempt, p); + ::decode(retry_attempt, p); else - retry_attempt = -1; + retry_attempt = -1; if (header.version >= 5) - ::decode(features, p); + ::decode(features, p); else features = 0; @@ -365,9 +408,34 @@ struct ceph_osd_request_head { ::decode(reqid, p); else reqid = osd_reqid_t(); + + OSDOp::split_osd_op_vector_in_data(ops, data); + + } else { // final decoding for reordered v7 + ::decode(client_inc, p); + ::decode(mtime, p); + ::decode(oloc, p); + ::decode(oid, p); + + __u16 num_ops; + ::decode(num_ops, p); + ops.resize(num_ops); + for (unsigned i = 0; i < num_ops; i++) + ::decode(ops[i].op, p); + + ::decode(snapid, p); + ::decode(snap_seq, p); + ::decode(snaps, p); + + ::decode(retry_attempt, p); + + ::decode(features, p); + + OSDOp::split_osd_op_vector_in_data(ops, data); + } - OSDOp::split_osd_op_vector_in_data(ops, data); + final_decode_needed = false; } void clear_buffers() { @@ -376,7 +444,8 @@ struct ceph_osd_request_head { const char *get_type_name() const { return "osd_op"; } void print(ostream& out) const { - out << "osd_op(" << get_reqid(); + if (!partial_decode_needed) + out << "osd_op(" << get_reqid(); out << " "; if (!oloc.nspace.empty()) out << oloc.nspace << "/"; @@ -401,9 +470,10 @@ struct ceph_osd_request_head { out << " RETRY=" << get_retry_attempt(); if (reassert_version != eversion_t()) out << " reassert_version=" << reassert_version; - if (get_snap_seq()) + if (!final_decode_needed) out << " snapc " << get_snap_seq() << "=" << snaps; - out << " " << ceph_osd_flag_string(get_flags()); + if (!partial_decode_needed) + out << " " << ceph_osd_flag_string(get_flags()); out << " e" << osdmap_epoch; out << ")"; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 535b96c9707..0a4fc1fbd97 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -8090,9 +8090,6 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) return; } - // we don't need encoded payload anymore - m->clear_payload(); - // set up a map send if the Op gets blocked for some reason send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch()); Session *client_session = diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 79cd54a1c36..cd409b18f66 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -4952,6 +4952,7 @@ ostream& operator<<(ostream& out, const PG& pg) bool PG::can_discard_op(OpRequestRef& op) { MOSDOp *m = static_cast(op->get_req()); + if (OSD::op_is_discardable(m)) { dout(20) << " discard " << *m << dendl; return true; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0ca3d34cd06..7eb12d2e6e9 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1345,10 +1345,6 @@ void ReplicatedPG::do_request( OpRequestRef& op, ThreadPool::TPHandle &handle) { - if (!op_has_sufficient_caps(op)) { - osd->reply_op_error(op, -EPERM); - return; - } assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op)); if (can_discard_request(op)) { return; @@ -1483,6 +1479,9 @@ void ReplicatedPG::do_op(OpRequestRef& op) MOSDOp *m = static_cast(op->get_req()); assert(m->get_type() == CEPH_MSG_OSD_OP); + m->finish_decode(); + m->clear_payload(); + if (op->rmw_flags == 0) { int r = osd->osd->init_op_flags(op); if (r) { @@ -1499,6 +1498,11 @@ void ReplicatedPG::do_op(OpRequestRef& op) return do_pg_op(op); } + if (!op_has_sufficient_caps(op)) { + osd->reply_op_error(op, -EPERM); + return; + } + // object name too long? unsigned max_name_len = MIN(g_conf->osd_max_object_name_len, osd->osd->store->get_max_object_name_length());