case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_SYNC_READ:
case CEPH_OSD_OP_SPARSE_READ:
+ case CEPH_OSD_OP_CHECKSUM:
op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
}
}
}
+struct C_ChecksumRead : public Context {
+ PrimaryLogPG *primary_log_pg;
+ OSDOp &osd_op;
+ Checksummer::CSumType csum_type;
+ bufferlist init_value_bl;
+ ceph_le64 read_length;
+ bufferlist read_bl;
+ Context *fill_extent_ctx;
+
+ C_ChecksumRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+ Checksummer::CSumType csum_type, bufferlist &&init_value_bl,
+ boost::optional<uint32_t> maybe_crc, uint64_t size,
+ OSDService *osd, hobject_t soid, __le32 flags)
+ : primary_log_pg(primary_log_pg), osd_op(osd_op),
+ csum_type(csum_type), init_value_bl(std::move(init_value_bl)),
+ fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+ &read_bl, maybe_crc, size,
+ osd, soid, flags)) {
+ }
+
+ void finish(int r) override {
+ fill_extent_ctx->complete(r);
+
+ if (osd_op.rval >= 0) {
+ bufferlist::iterator init_value_bl_it = init_value_bl.begin();
+ osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type,
+ &init_value_bl_it,
+ read_bl);
+ }
+ }
+};
+
+int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
+ bufferlist::iterator *bl_it, bool *async_read)
+{
+ dout(20) << __func__ << dendl;
+
+ auto& op = osd_op.op;
+ if (op.checksum.chunk_size > 0) {
+ if (op.checksum.length == 0) {
+ dout(10) << __func__ << ": length required when chunk size provided"
+ << dendl;
+ return -EINVAL;
+ }
+ if (op.checksum.length % op.checksum.chunk_size != 0) {
+ dout(10) << __func__ << ": length not aligned to chunk size" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ auto& oi = ctx->new_obs.oi;
+ if (op.checksum.offset == 0 && op.checksum.length == 0) {
+ // zeroed offset+length implies checksum whole object
+ op.checksum.length = oi.size;
+ } else if (op.checksum.offset + op.checksum.length > oi.size) {
+ return -EOVERFLOW;
+ }
+
+ Checksummer::CSumType csum_type;
+ switch (op.checksum.type) {
+ case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32:
+ csum_type = Checksummer::CSUM_XXHASH32;
+ break;
+ case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64:
+ csum_type = Checksummer::CSUM_XXHASH64;
+ break;
+ case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C:
+ csum_type = Checksummer::CSUM_CRC32C;
+ break;
+ default:
+ dout(10) << __func__ << ": unknown crc type ("
+ << static_cast<uint32_t>(op.checksum.type) << ")" << dendl;
+ return -EINVAL;
+ }
+
+ size_t csum_init_value_size = Checksummer::get_csum_init_value_size(csum_type);
+ if (bl_it->get_remaining() < csum_init_value_size) {
+ dout(10) << __func__ << ": init value not provided" << dendl;
+ return -EINVAL;
+ }
+
+ bufferlist init_value_bl;
+ init_value_bl.substr_of(bl_it->get_bl(), bl_it->get_off(),
+ csum_init_value_size);
+ bl_it->advance(csum_init_value_size);
+
+ if (pool.info.require_rollback() && op.checksum.length > 0) {
+ // If there is a data digest and it is possible we are reading
+ // entire object, pass the digest.
+ boost::optional<uint32_t> maybe_crc;
+ if (oi.is_data_digest() && op.checksum.offset == 0 &&
+ op.checksum.length >= oi.size) {
+ maybe_crc = oi.data_digest;
+ }
+
+ // async read
+ auto& soid = oi.soid;
+ auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type,
+ std::move(init_value_bl), maybe_crc,
+ oi.size, osd, soid, op.flags);
+ ctx->pending_async_reads.push_back({
+ {op.checksum.offset, op.checksum.length, op.flags},
+ {&checksum_ctx->read_bl, checksum_ctx}});
+
+ dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+ *async_read = true;
+ return 0;
+ }
+
+ // sync read
+ *async_read = false;
+ std::vector<OSDOp> read_ops(1);
+ auto& read_op = read_ops[0];
+ if (op.checksum.length > 0) {
+ read_op.op.op = CEPH_OSD_OP_READ;
+ read_op.op.flags = op.flags;
+ read_op.op.extent.offset = op.checksum.offset;
+ read_op.op.extent.length = op.checksum.length;
+ read_op.op.extent.truncate_size = 0;
+ read_op.op.extent.truncate_seq = 0;
+
+ int r = do_osd_ops(ctx, read_ops);
+ if (r < 0) {
+ derr << __func__ << ": do_osd_ops failed: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+
+ bufferlist::iterator init_value_bl_it = init_value_bl.begin();
+ return finish_checksum(osd_op, csum_type, &init_value_bl_it,
+ read_op.outdata);
+}
+
+int PrimaryLogPG::finish_checksum(OSDOp& osd_op,
+ Checksummer::CSumType csum_type,
+ bufferlist::iterator *init_value_bl_it,
+ const bufferlist &read_bl) {
+ dout(20) << __func__ << dendl;
+
+ auto& op = osd_op.op;
+
+ if (op.checksum.length > 0 && read_bl.length() != op.checksum.length) {
+ derr << __func__ << ": bytes read " << read_bl.length() << " != "
+ << op.checksum.length << dendl;
+ return -EINVAL;
+ }
+
+ size_t csum_chunk_size = (op.checksum.chunk_size != 0 ?
+ op.checksum.chunk_size : read_bl.length());
+ uint32_t csum_count = (csum_chunk_size > 0 ?
+ read_bl.length() / csum_chunk_size : 0);
+
+ bufferlist csum;
+ bufferptr csum_data;
+ if (csum_count > 0) {
+ size_t csum_value_size = Checksummer::get_csum_value_size(csum_type);
+ csum_data = buffer::create(csum_value_size * csum_count);
+ csum_data.zero();
+ csum.append(csum_data);
+
+ switch (csum_type) {
+ case Checksummer::CSUM_XXHASH32:
+ {
+ Checksummer::xxhash32::init_value_t init_value;
+ ::decode(init_value, *init_value_bl_it);
+ Checksummer::calculate<Checksummer::xxhash32>(
+ init_value, csum_chunk_size, 0, read_bl.length(), read_bl,
+ &csum_data);
+ }
+ break;
+ case Checksummer::CSUM_XXHASH64:
+ {
+ Checksummer::xxhash64::init_value_t init_value;
+ ::decode(init_value, *init_value_bl_it);
+ Checksummer::calculate<Checksummer::xxhash64>(
+ init_value, csum_chunk_size, 0, read_bl.length(), read_bl,
+ &csum_data);
+ }
+ break;
+ case Checksummer::CSUM_CRC32C:
+ {
+ Checksummer::crc32c::init_value_t init_value;
+ ::decode(init_value, *init_value_bl_it);
+ Checksummer::calculate<Checksummer::crc32c>(
+ init_value, csum_chunk_size, 0, read_bl.length(), read_bl,
+ &csum_data);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ ::encode(csum_count, osd_op.outdata);
+ osd_op.outdata.claim_append(csum);
+ return 0;
+}
+
int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
int result = 0;
}
break;
+ case CEPH_OSD_OP_CHECKSUM:
+ ++ctx->num_read;
+ {
+ tracepoint(osd, do_osd_op_pre_checksum, soid.oid.name.c_str(),
+ soid.snap.val, oi.size, oi.truncate_seq, op.checksum.type,
+ op.checksum.offset, op.checksum.length,
+ op.checksum.chunk_size);
+
+ bool async_read;
+ result = do_checksum(ctx, osd_op, &bp, &async_read);
+ if (result == 0 && async_read) {
+ continue;
+ }
+ }
+ break;
+
/* map extents */
case CEPH_OSD_OP_MAPEXT:
tracepoint(osd, do_osd_op_pre_mapext, soid.oid.name.c_str(), soid.snap.val, op.extent.offset, op.extent.length);