From 2f4b8c0cf95068e7331d697265e1f93bccd024d1 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 30 Mar 2017 10:08:42 -0400 Subject: [PATCH] osd: new op for retrieving an extent checksum Signed-off-by: Jason Dillaman --- src/include/rados.h | 13 +++ src/osd/OSD.cc | 2 + src/osd/PrimaryLogPG.cc | 215 ++++++++++++++++++++++++++++++++++++++++ src/osd/PrimaryLogPG.h | 10 ++ src/tracing/osd.tp | 22 ++++ 5 files changed, 262 insertions(+) diff --git a/src/include/rados.h b/src/include/rados.h index cc4402ff0ed..4c5d379fdf0 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -196,6 +196,7 @@ extern const char *ceph_osd_state_name(int s); f(READ, __CEPH_OSD_OP(RD, DATA, 1), "read") \ f(STAT, __CEPH_OSD_OP(RD, DATA, 2), "stat") \ f(MAPEXT, __CEPH_OSD_OP(RD, DATA, 3), "mapext") \ + f(CHECKSUM, __CEPH_OSD_OP(RD, DATA, 31), "checksum") \ \ /* fancy read */ \ f(MASKTRUNC, __CEPH_OSD_OP(RD, DATA, 4), "masktrunc") \ @@ -464,6 +465,12 @@ enum { CEPH_OSD_WATCH_OP_PING = 7, }; +enum { + CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32 = 0, + CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64 = 1, + CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C = 2 +}; + const char *ceph_osd_watch_op_name(int o); enum { @@ -568,6 +575,12 @@ struct ceph_osd_op { __le64 length; __le64 data_length; } __attribute__ ((packed)) writesame; + struct { + __le64 offset; + __le64 length; + __le32 chunk_size; + __u8 type; /* CEPH_OSD_CHECKSUM_OP_TYPE_* */ + } __attribute__ ((packed)) checksum; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c8031826f4e..142263cb3b3 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -9131,6 +9131,7 @@ int OSD::init_op_flags(OpRequestRef& op) const pg_pool_t *base_pool = osdmap->get_pg_pool(pool->tier_of); if (base_pool && base_pool->require_rollback()) { if ((iter->op.op != CEPH_OSD_OP_READ) && + (iter->op.op != CEPH_OSD_OP_CHECKSUM) && (iter->op.op != CEPH_OSD_OP_STAT) && (iter->op.op != CEPH_OSD_OP_ISDIRTY) && (iter->op.op != CEPH_OSD_OP_UNDIRTY) && @@ -9240,6 +9241,7 @@ int OSD::init_op_flags(OpRequestRef& op) case CEPH_OSD_OP_READ: case CEPH_OSD_OP_SYNC_READ: case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_CHECKSUM: case CEPH_OSD_OP_WRITEFULL: if (m->ops.size() == 1 && (iter->op.flags & CEPH_OSD_OP_FLAG_FADVISE_NOCACHE || diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 71846caabf8..7745607c8c0 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2635,6 +2635,7 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op) case CEPH_OSD_OP_READ: case CEPH_OSD_OP_SYNC_READ: case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_CHECKSUM: op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) & ~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE); } @@ -4204,6 +4205,204 @@ void PrimaryLogPG::maybe_create_new_object( } } +struct C_ChecksumRead : public Context { + PrimaryLogPG *primary_log_pg; + OSDOp &osd_op; + Checksummer::CSumType csum_type; + bufferlist init_value_bl; + ceph_le64 read_length; + bufferlist read_bl; + Context *fill_extent_ctx; + + C_ChecksumRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op, + Checksummer::CSumType csum_type, bufferlist &&init_value_bl, + boost::optional maybe_crc, uint64_t size, + OSDService *osd, hobject_t soid, __le32 flags) + : primary_log_pg(primary_log_pg), osd_op(osd_op), + csum_type(csum_type), init_value_bl(std::move(init_value_bl)), + fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval, + &read_bl, maybe_crc, size, + osd, soid, flags)) { + } + + void finish(int r) override { + fill_extent_ctx->complete(r); + + if (osd_op.rval >= 0) { + bufferlist::iterator init_value_bl_it = init_value_bl.begin(); + osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type, + &init_value_bl_it, + read_bl); + } + } +}; + +int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op, + bufferlist::iterator *bl_it, bool *async_read) +{ + dout(20) << __func__ << dendl; + + auto& op = osd_op.op; + if (op.checksum.chunk_size > 0) { + if (op.checksum.length == 0) { + dout(10) << __func__ << ": length required when chunk size provided" + << dendl; + return -EINVAL; + } + if (op.checksum.length % op.checksum.chunk_size != 0) { + dout(10) << __func__ << ": length not aligned to chunk size" << dendl; + return -EINVAL; + } + } + + auto& oi = ctx->new_obs.oi; + if (op.checksum.offset == 0 && op.checksum.length == 0) { + // zeroed offset+length implies checksum whole object + op.checksum.length = oi.size; + } else if (op.checksum.offset + op.checksum.length > oi.size) { + return -EOVERFLOW; + } + + Checksummer::CSumType csum_type; + switch (op.checksum.type) { + case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32: + csum_type = Checksummer::CSUM_XXHASH32; + break; + case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64: + csum_type = Checksummer::CSUM_XXHASH64; + break; + case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C: + csum_type = Checksummer::CSUM_CRC32C; + break; + default: + dout(10) << __func__ << ": unknown crc type (" + << static_cast(op.checksum.type) << ")" << dendl; + return -EINVAL; + } + + size_t csum_init_value_size = Checksummer::get_csum_init_value_size(csum_type); + if (bl_it->get_remaining() < csum_init_value_size) { + dout(10) << __func__ << ": init value not provided" << dendl; + return -EINVAL; + } + + bufferlist init_value_bl; + init_value_bl.substr_of(bl_it->get_bl(), bl_it->get_off(), + csum_init_value_size); + bl_it->advance(csum_init_value_size); + + if (pool.info.require_rollback() && op.checksum.length > 0) { + // If there is a data digest and it is possible we are reading + // entire object, pass the digest. + boost::optional maybe_crc; + if (oi.is_data_digest() && op.checksum.offset == 0 && + op.checksum.length >= oi.size) { + maybe_crc = oi.data_digest; + } + + // async read + auto& soid = oi.soid; + auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type, + std::move(init_value_bl), maybe_crc, + oi.size, osd, soid, op.flags); + ctx->pending_async_reads.push_back({ + {op.checksum.offset, op.checksum.length, op.flags}, + {&checksum_ctx->read_bl, checksum_ctx}}); + + dout(10) << __func__ << ": async_read noted for " << soid << dendl; + *async_read = true; + return 0; + } + + // sync read + *async_read = false; + std::vector read_ops(1); + auto& read_op = read_ops[0]; + if (op.checksum.length > 0) { + read_op.op.op = CEPH_OSD_OP_READ; + read_op.op.flags = op.flags; + read_op.op.extent.offset = op.checksum.offset; + read_op.op.extent.length = op.checksum.length; + read_op.op.extent.truncate_size = 0; + read_op.op.extent.truncate_seq = 0; + + int r = do_osd_ops(ctx, read_ops); + if (r < 0) { + derr << __func__ << ": do_osd_ops failed: " << cpp_strerror(r) << dendl; + return r; + } + } + + bufferlist::iterator init_value_bl_it = init_value_bl.begin(); + return finish_checksum(osd_op, csum_type, &init_value_bl_it, + read_op.outdata); +} + +int PrimaryLogPG::finish_checksum(OSDOp& osd_op, + Checksummer::CSumType csum_type, + bufferlist::iterator *init_value_bl_it, + const bufferlist &read_bl) { + dout(20) << __func__ << dendl; + + auto& op = osd_op.op; + + if (op.checksum.length > 0 && read_bl.length() != op.checksum.length) { + derr << __func__ << ": bytes read " << read_bl.length() << " != " + << op.checksum.length << dendl; + return -EINVAL; + } + + size_t csum_chunk_size = (op.checksum.chunk_size != 0 ? + op.checksum.chunk_size : read_bl.length()); + uint32_t csum_count = (csum_chunk_size > 0 ? + read_bl.length() / csum_chunk_size : 0); + + bufferlist csum; + bufferptr csum_data; + if (csum_count > 0) { + size_t csum_value_size = Checksummer::get_csum_value_size(csum_type); + csum_data = buffer::create(csum_value_size * csum_count); + csum_data.zero(); + csum.append(csum_data); + + switch (csum_type) { + case Checksummer::CSUM_XXHASH32: + { + Checksummer::xxhash32::init_value_t init_value; + ::decode(init_value, *init_value_bl_it); + Checksummer::calculate( + init_value, csum_chunk_size, 0, read_bl.length(), read_bl, + &csum_data); + } + break; + case Checksummer::CSUM_XXHASH64: + { + Checksummer::xxhash64::init_value_t init_value; + ::decode(init_value, *init_value_bl_it); + Checksummer::calculate( + init_value, csum_chunk_size, 0, read_bl.length(), read_bl, + &csum_data); + } + break; + case Checksummer::CSUM_CRC32C: + { + Checksummer::crc32c::init_value_t init_value; + ::decode(init_value, *init_value_bl_it); + Checksummer::calculate( + init_value, csum_chunk_size, 0, read_bl.length(), read_bl, + &csum_data); + } + break; + default: + break; + } + } + + ::encode(csum_count, osd_op.outdata); + osd_op.outdata.claim_append(csum); + return 0; +} + int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) { int result = 0; @@ -4370,6 +4569,22 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) } break; + case CEPH_OSD_OP_CHECKSUM: + ++ctx->num_read; + { + tracepoint(osd, do_osd_op_pre_checksum, soid.oid.name.c_str(), + soid.snap.val, oi.size, oi.truncate_seq, op.checksum.type, + op.checksum.offset, op.checksum.length, + op.checksum.chunk_size); + + bool async_read; + result = do_checksum(ctx, osd_op, &bp, &async_read); + if (result == 0 && async_read) { + continue; + } + } + break; + /* map extents */ case CEPH_OSD_OP_MAPEXT: tracepoint(osd, do_osd_op_pre_mapext, soid.oid.name.c_str(), soid.snap.val, op.extent.offset, op.extent.length); diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index a7e812fe8da..8f56f060edf 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -23,6 +23,7 @@ #include "Watch.h" #include "TierAgentState.h" #include "messages/MOSDOpReply.h" +#include "common/Checksummer.h" #include "common/sharedptr_registry.hpp" #include "ReplicatedBackend.h" #include "PGTransaction.h" @@ -1301,6 +1302,15 @@ protected: int do_xattr_cmp_u64(int op, __u64 v1, bufferlist& xattr); int do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr); + // -- checksum -- + int do_checksum(OpContext *ctx, OSDOp& osd_op, bufferlist::iterator *bl_it, + bool *async_read); + int finish_checksum(OSDOp& osd_op, Checksummer::CSumType csum_type, + bufferlist::iterator *init_value_bl_it, + const bufferlist &read_bl); + + friend class C_ChecksumRead; + int do_writesame(OpContext *ctx, OSDOp& osd_op); bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata); diff --git a/src/tracing/osd.tp b/src/tracing/osd.tp index 36ffa7e85f7..3582ce63b8b 100644 --- a/src/tracing/osd.tp +++ b/src/tracing/osd.tp @@ -113,6 +113,28 @@ TRACEPOINT_EVENT(osd, do_osd_op_pre_read, ) ) +TRACEPOINT_EVENT(osd, do_osd_op_pre_checksum, + TP_ARGS( + const char*, oid, + uint64_t, snap, + uint64_t, osize, + uint32_t, oseq, + uint8_t, type, + uint64_t, offset, + uint64_t, length, + uint32_t, chunk_size), + TP_FIELDS( + ctf_string(oid, oid) + ctf_integer(uint64_t, snap, snap) + ctf_integer(uint64_t, osize, osize) + ctf_integer(uint32_t, oseq, oseq) + ctf_integer(uint8_t, type, type) + ctf_integer(uint64_t, offset, offset) + ctf_integer(uint64_t, length, length) + ctf_integer(uint32_t, chunk_size, chunk_size) + ) +) + TRACEPOINT_EVENT(osd, do_osd_op_pre_mapext, TP_ARGS( const char*, oid, -- 2.39.5