case CEPH_OSD_OP_SYNC_READ:
case CEPH_OSD_OP_SPARSE_READ:
case CEPH_OSD_OP_CHECKSUM:
+ case CEPH_OSD_OP_CMPEXT:
op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
}
}
}
-int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
-{
- ceph_osd_op& op = osd_op.op;
- vector<OSDOp> read_ops(1);
- OSDOp& read_op = read_ops[0];
- int result = 0;
-
- read_op.op.op = CEPH_OSD_OP_SYNC_READ;
- read_op.op.extent.offset = op.extent.offset;
- read_op.op.extent.length = op.extent.length;
- read_op.op.extent.truncate_seq = op.extent.truncate_seq;
- read_op.op.extent.truncate_size = op.extent.truncate_size;
-
- result = do_osd_ops(ctx, read_ops);
- if (result < 0) {
- derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
- return result;
- }
-
- for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
- if (p >= read_op.outdata.length()) {
- if (osd_op.indata[p] != 0)
- return (-MAX_ERRNO - p);
- } else {
- if (read_op.outdata[p] != osd_op.indata[p]) {
- return (-MAX_ERRNO - p);
- }
- }
- }
-
- return result;
-}
-
int PrimaryLogPG::do_writesame(OpContext *ctx, OSDOp& osd_op)
{
ceph_osd_op& op = osd_op.op;
return 0;
}
+struct C_ExtentCmpRead : public Context {
+ PrimaryLogPG *primary_log_pg;
+ OSDOp &osd_op;
+ ceph_le64 read_length;
+ bufferlist read_bl;
+ Context *fill_extent_ctx;
+
+ C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+ boost::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, __le32 flags)
+ : primary_log_pg(primary_log_pg), osd_op(osd_op),
+ fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+ &read_bl, maybe_crc, size,
+ osd, soid, flags)) {
+ }
+
+ void finish(int r) override {
+ fill_extent_ctx->complete(r);
+
+ if (osd_op.rval >= 0) {
+ osd_op.rval = primary_log_pg->finish_extent_cmp(osd_op, read_bl);
+ }
+ }
+};
+
+int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+ dout(20) << __func__ << dendl;
+ ceph_osd_op& op = osd_op.op;
+
+ if (pool.info.require_rollback()) {
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest.
+ auto& oi = ctx->new_obs.oi;
+ boost::optional<uint32_t> maybe_crc;
+ if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ op.checksum.length >= oi.size) {
+ maybe_crc = oi.data_digest;
+ }
+
+ // async read
+ auto& soid = oi.soid;
+ auto extent_cmp_ctx = new C_ExtentCmpRead(this, osd_op, maybe_crc, oi.size,
+ osd, soid, op.flags);
+ ctx->pending_async_reads.push_back({
+ {op.extent.offset, op.extent.length, op.flags},
+ {&extent_cmp_ctx->read_bl, extent_cmp_ctx}});
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+
+ osd_op.op_finisher = new ReadFinisher(osd_op);
+ return -EINPROGRESS;
+ }
+
+ // sync read
+ vector<OSDOp> read_ops(1);
+ OSDOp& read_op = read_ops[0];
+
+ read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+ read_op.op.extent.offset = op.extent.offset;
+ read_op.op.extent.length = op.extent.length;
+ read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+ read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+ int result = do_osd_ops(ctx, read_ops);
+ if (result < 0) {
+ derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
+ return result;
+ }
+ return finish_extent_cmp(osd_op, read_op.outdata);
+}
+
+int PrimaryLogPG::finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl)
+{
+ for (uint64_t idx = 0; idx < osd_op.indata.length(); ++idx) {
+ char read_byte = (idx < read_bl.length() ? read_bl[idx] : 0);
+ if (osd_op.indata[idx] != read_byte) {
+ return (-MAX_ERRNO - idx);
+ }
+ }
+
+ return 0;
+}
+
int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
dout(20) << __func__ << dendl;
auto& op = osd_op.op;
case CEPH_OSD_OP_CMPEXT:
++ctx->num_read;
- tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
- result = do_extent_cmp(ctx, osd_op);
+ tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+ op.extent.length, op.extent.truncate_size,
+ op.extent.truncate_seq);
+
+ if (osd_op.op_finisher == nullptr) {
+ result = do_extent_cmp(ctx, osd_op);
+ } else {
+ result = osd_op.op_finisher->execute();
+ }
break;
case CEPH_OSD_OP_SYNC_READ:
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
#include "include/encoding.h"
+#include "include/err.h"
#include "include/scope_guard.h"
#include "test/librados/test.h"
#include "test/librados/TestCase.h"
}
}
}
+
+TEST_F(LibRadosIoPP, CmpExtPP) {
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write1));
+
+ bufferlist new_bl;
+ new_bl.append("CEPH");
+ ObjectWriteOperation write2;
+ write2.cmpext(0, bl, nullptr);
+ write2.write(0, new_bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write2));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_EQ(0, ioctx.operate("foo", &read, &bl));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "CEPH", 4));
+}
+
+TEST_F(LibRadosIoPP, CmpExtMismatchPP) {
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write1));
+
+ bufferlist new_bl;
+ new_bl.append("CEPH");
+ ObjectWriteOperation write2;
+ write2.cmpext(0, new_bl, nullptr);
+ write2.write(0, new_bl);
+ ASSERT_EQ(-MAX_ERRNO, ioctx.operate("foo", &write2));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_EQ(0, ioctx.operate("foo", &read, &bl));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
+}
+
+TEST_F(LibRadosIoECPP, CmpExtPP) {
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write1));
+
+ bufferlist new_bl;
+ new_bl.append("CEPH");
+ ObjectWriteOperation write2;
+ write2.cmpext(0, bl, nullptr);
+ write2.write_full(new_bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write2));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_EQ(0, ioctx.operate("foo", &read, &bl));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "CEPH", 4));
+}
+
+TEST_F(LibRadosIoECPP, CmpExtMismatchPP) {
+ bufferlist bl;
+ bl.append("ceph");
+ ObjectWriteOperation write1;
+ write1.write(0, bl);
+ ASSERT_EQ(0, ioctx.operate("foo", &write1));
+
+ bufferlist new_bl;
+ new_bl.append("CEPH");
+ ObjectWriteOperation write2;
+ write2.cmpext(0, new_bl, nullptr);
+ write2.write_full(new_bl);
+ ASSERT_EQ(-MAX_ERRNO, ioctx.operate("foo", &write2));
+
+ ObjectReadOperation read;
+ read.read(0, bl.length(), NULL, NULL);
+ ASSERT_EQ(0, ioctx.operate("foo", &read, &bl));
+ ASSERT_EQ(0, memcmp(bl.c_str(), "ceph", 4));
+}