]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: enforce RD, WR flags for class methods
authorSage Weil <sage@inktank.com>
Thu, 15 Aug 2013 23:19:21 +0000 (16:19 -0700)
committerSage Weil <sage@inktank.com>
Fri, 16 Aug 2013 00:21:29 +0000 (17:21 -0700)
Class methods are marked with RD and WR to help the OSD decide when we need
to flush objects or require certain permissions.  Ensure that methods do
not step outside their advertised capabilities by keeping a counter of rd
and wr ops we perform in do_osd_ops() and making sure that class methods,
and any ops the indirectly call, do not break the rules.

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 25f807342a3a4ba0f6ccd49ff8d53d283e2dea4f..628cf891dc8a7128863d3382b64b1d3a65c82aac 100644 (file)
@@ -2129,6 +2129,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // --- READS ---
 
     case CEPH_OSD_OP_READ:
+      ++ctx->num_read;
       {
        // read into a buffer
        bufferlist bl;
@@ -2173,6 +2174,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_MAPEXT:
+      ++ctx->num_read;
       {
        // read into a buffer
        bufferlist bl;
@@ -2188,6 +2190,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_SPARSE_READ:
+      ++ctx->num_read;
       {
         if (op.extent.truncate_seq) {
           dout(0) << "sparse_read does not support truncation sequence " << dendl;
@@ -2299,7 +2302,21 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
        bufferlist outdata;
        dout(10) << "call method " << cname << "." << mname << dendl;
+       int prev_rd = ctx->num_read;
+       int prev_wr = ctx->num_write;
        result = method->exec((cls_method_context_t)&ctx, indata, outdata);
+
+       if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
+         derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl;
+         result = -EIO;
+         break;
+       }
+       if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
+         derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl;
+         result = -EIO;
+         break;
+       }
+
        dout(10) << "method called response length=" << outdata.length() << dendl;
        op.extent.length = outdata.length();
        osd_op.outdata.claim_append(outdata);
@@ -2310,6 +2327,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_STAT:
+      // note: stat does not require RD
       {
        if (obs.exists) {
          ::encode(oi.size, osd_op.outdata);
@@ -2325,6 +2343,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_GETXATTR:
+      ++ctx->num_read;
       {
        string aname;
        bp.copy(op.xattr.name_len, aname);
@@ -2341,6 +2360,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
    case CEPH_OSD_OP_GETXATTRS:
+      ++ctx->num_read;
       {
        map<string,bufferptr> attrset;
         result = osd->store->getattrs(coll, soid, attrset, true);
@@ -2362,6 +2382,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       
     case CEPH_OSD_OP_CMPXATTR:
     case CEPH_OSD_OP_SRC_CMPXATTR:
+      ++ctx->num_read;
       {
        string aname;
        bp.copy(op.xattr.name_len, aname);
@@ -2427,6 +2448,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_ASSERT_VER:
+      ++ctx->num_read;
       {
        uint64_t ver = op.watch.ver;
        if (!ver)
@@ -2439,6 +2461,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
 
     case CEPH_OSD_OP_LIST_WATCHERS:
+      ++ctx->num_read;
       {
         obj_list_watch_response_t resp;
 
@@ -2464,6 +2487,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
 
     case CEPH_OSD_OP_LIST_SNAPS:
+      ++ctx->num_read;
       {
         obj_list_snap_response_t resp;
 
@@ -2541,6 +2565,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
 
     case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+      ++ctx->num_read;
       {
        uint64_t ver = op.watch.ver;
        if (!ver)
@@ -2553,6 +2578,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
 
    case CEPH_OSD_OP_NOTIFY:
+      ++ctx->num_read;
       {
        uint32_t ver;
        uint32_t timeout;
@@ -2577,6 +2603,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_NOTIFY_ACK:
+      ++ctx->num_read;
       {
        try {
          uint64_t notify_id = 0;
@@ -2601,6 +2628,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // -- object data --
 
     case CEPH_OSD_OP_WRITE:
+      ++ctx->num_write;
       { // write
         __u32 seq = oi.truncate_seq;
         if (seq && (seq > op.extent.truncate_seq) &&
@@ -2646,6 +2674,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
       
     case CEPH_OSD_OP_WRITEFULL:
+      ++ctx->num_write;
       { // write full object
        result = check_offset_and_length(op.extent.offset, op.extent.length);
        if (result < 0)
@@ -2674,10 +2703,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_ROLLBACK :
+      ++ctx->num_write;
       result = _rollback_to(ctx, op);
       break;
 
     case CEPH_OSD_OP_ZERO:
+      ++ctx->num_write;
       { // zero
        result = check_offset_and_length(op.extent.offset, op.extent.length);
        if (result < 0)
@@ -2695,6 +2726,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_CREATE:
+      ++ctx->num_write;
       {
         int flags = le32_to_cpu(op.flags);
        if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) {
@@ -2733,6 +2765,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // falling through
 
     case CEPH_OSD_OP_TRUNCATE:
+      ++ctx->num_write;
       {
        // truncate
        if (!obs.exists) {
@@ -2775,6 +2808,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
     
     case CEPH_OSD_OP_DELETE:
+      ++ctx->num_write;
       if (ctx->obc->obs.oi.watchers.size()) {
        // Cannot delete an object with watchers
        result = -EBUSY;
@@ -2784,6 +2818,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_CLONERANGE:
+      ++ctx->num_read;
+      ++ctx->num_write;
       {
        if (!obs.exists) {
          t.touch(coll, obs.oi.soid);
@@ -2808,6 +2844,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
       
     case CEPH_OSD_OP_WATCH:
+      ++ctx->num_write;
       {
         uint64_t cookie = op.watch.cookie;
        bool do_watch = op.watch.flag & 1;
@@ -2853,6 +2890,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // -- object attrs --
       
     case CEPH_OSD_OP_SETXATTR:
+      ++ctx->num_write;
       {
        if (op.xattr.value_len > g_conf->osd_max_attr_size) {
          result = -EFBIG;
@@ -2874,6 +2912,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_RMXATTR:
+      ++ctx->num_write;
       {
        string aname;
        bp.copy(op.xattr.name_len, aname);
@@ -2886,6 +2925,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
       // -- fancy writers --
     case CEPH_OSD_OP_APPEND:
+      ++ctx->num_write;
       {
        // just do it inline; this works because we are happy to execute
        // fancy op on replicas as well.
@@ -2908,6 +2948,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
       // -- trivial map --
     case CEPH_OSD_OP_TMAPGET:
+      ++ctx->num_read;
       {
        vector<OSDOp> nops(1);
        OSDOp& newop = nops[0];
@@ -2920,6 +2961,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_TMAPPUT:
+      ++ctx->num_write;
       {
        //_dout_lock.Lock();
        //osd_op.data.hexdump(*_dout);
@@ -2978,11 +3020,13 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_TMAPUP:
+      ++ctx->num_write;
       result = do_tmapup(ctx, bp, osd_op);
       break;
 
       // OMAP Read ops
     case CEPH_OSD_OP_OMAPGETKEYS:
+      ++ctx->num_read;
       {
        string start_after;
        uint64_t max_return;
@@ -3037,6 +3081,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPGETVALS:
+      ++ctx->num_read;
       {
        string start_after;
        uint64_t max_return;
@@ -3100,6 +3145,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPGETHEADER:
+      ++ctx->num_read;
       {
        if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
          dout(20) << "CEPH_OSD_OP_OMAPGETHEADER: "
@@ -3120,6 +3166,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+      ++ctx->num_read;
       {
        set<string> keys_to_get;
        try {
@@ -3159,6 +3206,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAP_CMP:
+      ++ctx->num_read;
       {
        if (!obs.exists) {
          result = -ENOENT;
@@ -3224,6 +3272,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
       // OMAP Write ops
     case CEPH_OSD_OP_OMAPSETVALS:
+      ++ctx->num_write;
       {
        if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
          _copy_up_tmap(ctx);
@@ -3252,6 +3301,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPSETHEADER:
+      ++ctx->num_write;
       {
        if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
          _copy_up_tmap(ctx);
@@ -3266,6 +3316,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPCLEAR:
+      ++ctx->num_write;
       {
        if (!obs.exists) {
          result = -ENOENT;
@@ -3280,6 +3331,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
     case CEPH_OSD_OP_OMAPRMKEYS:
+      ++ctx->num_write;
       {
        if (!obs.exists) {
          result = -ENOENT;
index 4525ce13f55ace218e56fa1defd22ddec4a0d212..db19e42233071fec991994717866b513031d4f6c 100644 (file)
@@ -310,6 +310,9 @@ public:
     utime_t readable_stamp;  // when applied on all replicas
     ReplicatedPG *pg;
 
+    int num_read;    ///< count read ops
+    int num_write;   ///< count update ops
+
     OpContext(const OpContext& other);
     const OpContext& operator=(const OpContext& other);
 
@@ -321,7 +324,9 @@ public:
       modify(false), user_modify(false),
       bytes_written(0), bytes_read(0),
       current_osd_subop_num(0),
-      obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) { 
+      obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
+      num_read(0),
+      num_write(0) {
       if (_ssc) {
        new_snapset = _ssc->snapset;
        snapset = &_ssc->snapset;