From 359850b9f605b7cb2577d6ad1be86cdeacb0a35b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 15 Aug 2013 16:19:21 -0700 Subject: [PATCH] osd: enforce RD, WR flags for class methods Class methods are marked with RD and WR to help the OSD decide when we need to flush objects or require certain permissions. Ensure that methods do not step outside their advertised capabilities by keeping a counter of rd and wr ops we perform in do_osd_ops() and making sure that class methods, and any ops the indirectly call, do not break the rules. Signed-off-by: Sage Weil --- src/osd/ReplicatedPG.cc | 52 +++++++++++++++++++++++++++++++++++++++++ src/osd/ReplicatedPG.h | 7 +++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 25f807342a3a4..628cf891dc8a7 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2129,6 +2129,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // --- READS --- case CEPH_OSD_OP_READ: + ++ctx->num_read; { // read into a buffer bufferlist bl; @@ -2173,6 +2174,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) /* map extents */ case CEPH_OSD_OP_MAPEXT: + ++ctx->num_read; { // read into a buffer bufferlist bl; @@ -2188,6 +2190,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) /* map extents */ case CEPH_OSD_OP_SPARSE_READ: + ++ctx->num_read; { if (op.extent.truncate_seq) { dout(0) << "sparse_read does not support truncation sequence " << dendl; @@ -2299,7 +2302,21 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) bufferlist outdata; dout(10) << "call method " << cname << "." << mname << dendl; + int prev_rd = ctx->num_read; + int prev_wr = ctx->num_write; result = method->exec((cls_method_context_t)&ctx, indata, outdata); + + if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) { + derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl; + result = -EIO; + break; + } + if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) { + derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl; + result = -EIO; + break; + } + dout(10) << "method called response length=" << outdata.length() << dendl; op.extent.length = outdata.length(); osd_op.outdata.claim_append(outdata); @@ -2310,6 +2327,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_STAT: + // note: stat does not require RD { if (obs.exists) { ::encode(oi.size, osd_op.outdata); @@ -2325,6 +2343,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_GETXATTR: + ++ctx->num_read; { string aname; bp.copy(op.xattr.name_len, aname); @@ -2341,6 +2360,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_GETXATTRS: + ++ctx->num_read; { map attrset; result = osd->store->getattrs(coll, soid, attrset, true); @@ -2362,6 +2382,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_SRC_CMPXATTR: + ++ctx->num_read; { string aname; bp.copy(op.xattr.name_len, aname); @@ -2427,6 +2448,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_ASSERT_VER: + ++ctx->num_read; { uint64_t ver = op.watch.ver; if (!ver) @@ -2439,6 +2461,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } case CEPH_OSD_OP_LIST_WATCHERS: + ++ctx->num_read; { obj_list_watch_response_t resp; @@ -2464,6 +2487,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } case CEPH_OSD_OP_LIST_SNAPS: + ++ctx->num_read; { obj_list_snap_response_t resp; @@ -2541,6 +2565,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } case CEPH_OSD_OP_ASSERT_SRC_VERSION: + ++ctx->num_read; { uint64_t ver = op.watch.ver; if (!ver) @@ -2553,6 +2578,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } case CEPH_OSD_OP_NOTIFY: + ++ctx->num_read; { uint32_t ver; uint32_t timeout; @@ -2577,6 +2603,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_NOTIFY_ACK: + ++ctx->num_read; { try { uint64_t notify_id = 0; @@ -2601,6 +2628,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // -- object data -- case CEPH_OSD_OP_WRITE: + ++ctx->num_write; { // write __u32 seq = oi.truncate_seq; if (seq && (seq > op.extent.truncate_seq) && @@ -2646,6 +2674,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_WRITEFULL: + ++ctx->num_write; { // write full object result = check_offset_and_length(op.extent.offset, op.extent.length); if (result < 0) @@ -2674,10 +2703,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_ROLLBACK : + ++ctx->num_write; result = _rollback_to(ctx, op); break; case CEPH_OSD_OP_ZERO: + ++ctx->num_write; { // zero result = check_offset_and_length(op.extent.offset, op.extent.length); if (result < 0) @@ -2695,6 +2726,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_CREATE: + ++ctx->num_write; { int flags = le32_to_cpu(op.flags); if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) { @@ -2733,6 +2765,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // falling through case CEPH_OSD_OP_TRUNCATE: + ++ctx->num_write; { // truncate if (!obs.exists) { @@ -2775,6 +2808,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_DELETE: + ++ctx->num_write; if (ctx->obc->obs.oi.watchers.size()) { // Cannot delete an object with watchers result = -EBUSY; @@ -2784,6 +2818,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_CLONERANGE: + ++ctx->num_read; + ++ctx->num_write; { if (!obs.exists) { t.touch(coll, obs.oi.soid); @@ -2808,6 +2844,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_WATCH: + ++ctx->num_write; { uint64_t cookie = op.watch.cookie; bool do_watch = op.watch.flag & 1; @@ -2853,6 +2890,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // -- object attrs -- case CEPH_OSD_OP_SETXATTR: + ++ctx->num_write; { if (op.xattr.value_len > g_conf->osd_max_attr_size) { result = -EFBIG; @@ -2874,6 +2912,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_RMXATTR: + ++ctx->num_write; { string aname; bp.copy(op.xattr.name_len, aname); @@ -2886,6 +2925,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // -- fancy writers -- case CEPH_OSD_OP_APPEND: + ++ctx->num_write; { // just do it inline; this works because we are happy to execute // fancy op on replicas as well. @@ -2908,6 +2948,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) // -- trivial map -- case CEPH_OSD_OP_TMAPGET: + ++ctx->num_read; { vector nops(1); OSDOp& newop = nops[0]; @@ -2920,6 +2961,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_TMAPPUT: + ++ctx->num_write; { //_dout_lock.Lock(); //osd_op.data.hexdump(*_dout); @@ -2978,11 +3020,13 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; case CEPH_OSD_OP_TMAPUP: + ++ctx->num_write; result = do_tmapup(ctx, bp, osd_op); break; // OMAP Read ops case CEPH_OSD_OP_OMAPGETKEYS: + ++ctx->num_read; { string start_after; uint64_t max_return; @@ -3037,6 +3081,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPGETVALS: + ++ctx->num_read; { string start_after; uint64_t max_return; @@ -3100,6 +3145,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPGETHEADER: + ++ctx->num_read; { if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) { dout(20) << "CEPH_OSD_OP_OMAPGETHEADER: " @@ -3120,6 +3166,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + ++ctx->num_read; { set keys_to_get; try { @@ -3159,6 +3206,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAP_CMP: + ++ctx->num_read; { if (!obs.exists) { result = -ENOENT; @@ -3224,6 +3272,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) break; // OMAP Write ops case CEPH_OSD_OP_OMAPSETVALS: + ++ctx->num_write; { if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) { _copy_up_tmap(ctx); @@ -3252,6 +3301,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPSETHEADER: + ++ctx->num_write; { if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) { _copy_up_tmap(ctx); @@ -3266,6 +3316,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPCLEAR: + ++ctx->num_write; { if (!obs.exists) { result = -ENOENT; @@ -3280,6 +3331,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) } break; case CEPH_OSD_OP_OMAPRMKEYS: + ++ctx->num_write; { if (!obs.exists) { result = -ENOENT; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4525ce13f55ac..db19e42233071 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -310,6 +310,9 @@ public: utime_t readable_stamp; // when applied on all replicas ReplicatedPG *pg; + int num_read; ///< count read ops + int num_write; ///< count update ops + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -321,7 +324,9 @@ public: modify(false), user_modify(false), bytes_written(0), bytes_read(0), current_osd_subop_num(0), - obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { + obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg), + num_read(0), + num_write(0) { if (_ssc) { new_snapset = _ssc->snapset; snapset = &_ssc->snapset; -- 2.39.5