// --- READS ---
case CEPH_OSD_OP_READ:
+ ++ctx->num_read;
{
// read into a buffer
bufferlist bl;
/* map extents */
case CEPH_OSD_OP_MAPEXT:
+ ++ctx->num_read;
{
// read into a buffer
bufferlist bl;
/* map extents */
case CEPH_OSD_OP_SPARSE_READ:
+ ++ctx->num_read;
{
if (op.extent.truncate_seq) {
dout(0) << "sparse_read does not support truncation sequence " << dendl;
bufferlist outdata;
dout(10) << "call method " << cname << "." << mname << dendl;
+ int prev_rd = ctx->num_read;
+ int prev_wr = ctx->num_write;
result = method->exec((cls_method_context_t)&ctx, indata, outdata);
+
+ if (ctx->num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
+ derr << "method " << cname << "." << mname << " tried to read object but is not marked RD" << dendl;
+ result = -EIO;
+ break;
+ }
+ if (ctx->num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
+ derr << "method " << cname << "." << mname << " tried to update object but is not marked WR" << dendl;
+ result = -EIO;
+ break;
+ }
+
dout(10) << "method called response length=" << outdata.length() << dendl;
op.extent.length = outdata.length();
osd_op.outdata.claim_append(outdata);
break;
case CEPH_OSD_OP_STAT:
+ // note: stat does not require RD
{
if (obs.exists) {
::encode(oi.size, osd_op.outdata);
break;
case CEPH_OSD_OP_GETXATTR:
+ ++ctx->num_read;
{
string aname;
bp.copy(op.xattr.name_len, aname);
break;
case CEPH_OSD_OP_GETXATTRS:
+ ++ctx->num_read;
{
map<string,bufferptr> attrset;
result = osd->store->getattrs(coll, soid, attrset, true);
case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_SRC_CMPXATTR:
+ ++ctx->num_read;
{
string aname;
bp.copy(op.xattr.name_len, aname);
break;
case CEPH_OSD_OP_ASSERT_VER:
+ ++ctx->num_read;
{
uint64_t ver = op.watch.ver;
if (!ver)
}
case CEPH_OSD_OP_LIST_WATCHERS:
+ ++ctx->num_read;
{
obj_list_watch_response_t resp;
}
case CEPH_OSD_OP_LIST_SNAPS:
+ ++ctx->num_read;
{
obj_list_snap_response_t resp;
}
case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+ ++ctx->num_read;
{
uint64_t ver = op.watch.ver;
if (!ver)
}
case CEPH_OSD_OP_NOTIFY:
+ ++ctx->num_read;
{
uint32_t ver;
uint32_t timeout;
break;
case CEPH_OSD_OP_NOTIFY_ACK:
+ ++ctx->num_read;
{
try {
uint64_t notify_id = 0;
// -- object data --
case CEPH_OSD_OP_WRITE:
+ ++ctx->num_write;
{ // write
__u32 seq = oi.truncate_seq;
if (seq && (seq > op.extent.truncate_seq) &&
break;
case CEPH_OSD_OP_WRITEFULL:
+ ++ctx->num_write;
{ // write full object
result = check_offset_and_length(op.extent.offset, op.extent.length);
if (result < 0)
break;
case CEPH_OSD_OP_ROLLBACK :
+ ++ctx->num_write;
result = _rollback_to(ctx, op);
break;
case CEPH_OSD_OP_ZERO:
+ ++ctx->num_write;
{ // zero
result = check_offset_and_length(op.extent.offset, op.extent.length);
if (result < 0)
}
break;
case CEPH_OSD_OP_CREATE:
+ ++ctx->num_write;
{
int flags = le32_to_cpu(op.flags);
if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) {
// falling through
case CEPH_OSD_OP_TRUNCATE:
+ ++ctx->num_write;
{
// truncate
if (!obs.exists) {
break;
case CEPH_OSD_OP_DELETE:
+ ++ctx->num_write;
if (ctx->obc->obs.oi.watchers.size()) {
// Cannot delete an object with watchers
result = -EBUSY;
break;
case CEPH_OSD_OP_CLONERANGE:
+ ++ctx->num_read;
+ ++ctx->num_write;
{
if (!obs.exists) {
t.touch(coll, obs.oi.soid);
break;
case CEPH_OSD_OP_WATCH:
+ ++ctx->num_write;
{
uint64_t cookie = op.watch.cookie;
bool do_watch = op.watch.flag & 1;
// -- object attrs --
case CEPH_OSD_OP_SETXATTR:
+ ++ctx->num_write;
{
if (op.xattr.value_len > g_conf->osd_max_attr_size) {
result = -EFBIG;
break;
case CEPH_OSD_OP_RMXATTR:
+ ++ctx->num_write;
{
string aname;
bp.copy(op.xattr.name_len, aname);
// -- fancy writers --
case CEPH_OSD_OP_APPEND:
+ ++ctx->num_write;
{
// just do it inline; this works because we are happy to execute
// fancy op on replicas as well.
// -- trivial map --
case CEPH_OSD_OP_TMAPGET:
+ ++ctx->num_read;
{
vector<OSDOp> nops(1);
OSDOp& newop = nops[0];
break;
case CEPH_OSD_OP_TMAPPUT:
+ ++ctx->num_write;
{
//_dout_lock.Lock();
//osd_op.data.hexdump(*_dout);
break;
case CEPH_OSD_OP_TMAPUP:
+ ++ctx->num_write;
result = do_tmapup(ctx, bp, osd_op);
break;
// OMAP Read ops
case CEPH_OSD_OP_OMAPGETKEYS:
+ ++ctx->num_read;
{
string start_after;
uint64_t max_return;
}
break;
case CEPH_OSD_OP_OMAPGETVALS:
+ ++ctx->num_read;
{
string start_after;
uint64_t max_return;
}
break;
case CEPH_OSD_OP_OMAPGETHEADER:
+ ++ctx->num_read;
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETHEADER: "
}
break;
case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+ ++ctx->num_read;
{
set<string> keys_to_get;
try {
}
break;
case CEPH_OSD_OP_OMAP_CMP:
+ ++ctx->num_read;
{
if (!obs.exists) {
result = -ENOENT;
break;
// OMAP Write ops
case CEPH_OSD_OP_OMAPSETVALS:
+ ++ctx->num_write;
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
break;
case CEPH_OSD_OP_OMAPSETHEADER:
+ ++ctx->num_write;
{
if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
break;
case CEPH_OSD_OP_OMAPCLEAR:
+ ++ctx->num_write;
{
if (!obs.exists) {
result = -ENOENT;
}
break;
case CEPH_OSD_OP_OMAPRMKEYS:
+ ++ctx->num_write;
{
if (!obs.exists) {
result = -ENOENT;
utime_t readable_stamp; // when applied on all replicas
ReplicatedPG *pg;
+ int num_read; ///< count read ops
+ int num_write; ///< count update ops
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
modify(false), user_modify(false),
bytes_written(0), bytes_read(0),
current_osd_subop_num(0),
- obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {
+ obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
+ num_read(0),
+ num_write(0) {
if (_ssc) {
new_snapset = _ssc->snapset;
snapset = &_ssc->snapset;