From: Radoslaw Zarzynski Date: Thu, 27 Apr 2017 19:48:02 +0000 (+0200) Subject: rgw: AWSv4Completer dechunks data in the streaming mode. X-Git-Tag: v12.1.0~155^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=def8f6412a58c9da137b3c252b35e135833722e5;p=ceph.git rgw: AWSv4Completer dechunks data in the streaming mode. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/rgw/rgw_auth_s3.cc b/src/rgw/rgw_auth_s3.cc index 36906c168687..877a0297bbba 100644 --- a/src/rgw/rgw_auth_s3.cc +++ b/src/rgw/rgw_auth_s3.cc @@ -811,11 +811,167 @@ std::string get_v4_signature(CephContext* const cct, return signature; } +bool AWSv4Completer::ChunkMeta::is_new_chunk_in_stream(size_t stream_pos) const +{ + return stream_pos >= (data_offset_in_stream + data_length); +} + +size_t AWSv4Completer::ChunkMeta::get_data_size(size_t stream_pos) const +{ + if (stream_pos > (data_offset_in_stream + data_length)) { + /* Data in parsing_buf. */ + return data_length; + } else { + return data_offset_in_stream + data_length - stream_pos; + } +} + +std::pair +AWSv4Completer::ChunkMeta::create_next(ChunkMeta&& old, + const char* const metabuf, + const size_t metabuf_len) +{ + boost::string_ref metastr(metabuf, metabuf_len); + + const size_t semicolon_pos = metastr.find(";"); + if (semicolon_pos == boost::string_ref::npos) { + dout(20) << "AWSv4Completer cannot find the ';' separator" << dendl; + throw rgw::io::Exception(EINVAL, std::system_category()); + } + + char* data_field_end; + /* strtoull ignores the "\r\n" sequence after each non-first chunk. */ + const size_t data_length = std::strtoull(metabuf, &data_field_end, 16); + if (data_length == 0 && data_field_end == metabuf) { + dout(20) << "AWSv4Completer: cannot parse the data size" << dendl; + throw rgw::io::Exception(EINVAL, std::system_category()); + } + + /* Parse the chunk_signature=... part. */ + const auto signature_part = metastr.substr(semicolon_pos + 1); + const size_t eq_sign_pos = signature_part.find("="); + if (eq_sign_pos == boost::string_ref::npos) { + dout(20) << "AWSv4Completer: cannot find the '=' separator" << dendl; + throw rgw::io::Exception(EINVAL, std::system_category()); + } + + /* OK, we have at least the beginning of a signature. */ + const size_t data_sep_pos = signature_part.find("\r\n"); + if (data_sep_pos == boost::string_ref::npos) { + dout(20) << "AWSv4Completer: no new line at signature end" << dendl; + throw rgw::io::Exception(EINVAL, std::system_category()); + } + + const auto signature = \ + signature_part.substr(eq_sign_pos + 1, data_sep_pos - 1 - eq_sign_pos); + if (signature.length() != SIG_SIZE) { + dout(20) << "AWSv4Completer: signature.length() != 64" << dendl; + throw rgw::io::Exception(EINVAL, std::system_category()); + } + + const size_t data_starts_in_stream = \ + + semicolon_pos + strlen(";") + data_sep_pos + strlen("\r\n") + + old.data_offset_in_stream + old.data_length; + + dout(20) << "parsed new chunk; signature=" << signature + << ", data_length=" << data_length + << ", data_starts_in_stream=" << data_starts_in_stream + << dendl; + + return std::make_pair(ChunkMeta(data_starts_in_stream, + data_length, + signature), + semicolon_pos + 83); +} + +size_t AWSv4Completer::recv_chunked(char* const buf, const size_t buf_max) +{ + /* Buffer stores only parsed stream. Raw values reflect the stream + * we're getting from a client. */ + size_t buf_pos = 0; + + if (chunk_meta.is_new_chunk_in_stream(stream_pos)) { + /* We don't have metadata for this range. This means a new chunk, so we + * need to parse a fresh portion of the stream. Let's start. */ + dout(20) << "AWSv4Completer: -- parsing a new chunk" << dendl; + + size_t to_extract = parsing_buf.capacity() - parsing_buf.size(); + do { + const size_t orig_size = parsing_buf.size(); + parsing_buf.resize(parsing_buf.size() + to_extract); + const size_t received = io_base_t::recv_body(parsing_buf.data() + orig_size, + to_extract); + parsing_buf.resize(parsing_buf.size() - (to_extract - received)); + if (received == 0) { + break; + } + + stream_pos += received; + to_extract -= received; + } while (to_extract > 0); + + size_t consumed; + std::tie(chunk_meta, consumed) = \ + ChunkMeta::create_next(std::move(chunk_meta), + parsing_buf.data(), parsing_buf.size()); + + /* We can drop the bytes consumed during metadata parsing. The remainder + * can be chunk's data plus possibly beginning of next chunks' metadata. */ + parsing_buf.erase(std::begin(parsing_buf), + std::begin(parsing_buf) + consumed); + + /* The validity of previous chunk can be verified only after getting meta- + * data of the next one. */ + /* TODO(rzarzynski): implement chunk's signature verification. */ + } + + size_t to_extract = \ + std::min(chunk_meta.get_data_size(stream_pos), buf_max); + + /* It's quite probable we have a couple of real data bytes stored together + * with meta-data in the parsing_buf. We need to extract them and move to + * the final buffer. This is a trade-off between frontend's read overhead + * and memcpy. */ + if (to_extract > 0 && parsing_buf.size() > 0) { + const auto data_len = std::min(to_extract, parsing_buf.size()); + const auto data_end_iter = std::begin(parsing_buf) + data_len; + + std::copy(std::begin(parsing_buf), data_end_iter, buf); + parsing_buf.erase(std::begin(parsing_buf), data_end_iter); + + to_extract -= data_len; + buf_pos += data_len; + } + + /* Now we can do the bulk read directly from RestfulClient without any extra + * buffering. */ + while (to_extract > 0) { + const size_t received = io_base_t::recv_body(buf + buf_pos, to_extract); + + if (received == 0) { + break; + } + + buf_pos += received; + stream_pos += received; + to_extract -= received; + } + + dout(20) << "AWSv4Completer: filled=" << buf_pos << dendl; + return buf_pos; +} size_t AWSv4Completer::recv_body(char* const buf, const size_t max) { + if (aws4_auth_streaming_mode) { + return recv_chunked(buf, max); + } + const auto received = io_base_t::recv_body(buf, max); - calc_hash_sha256_update_stream(sha256_hash, buf, received); + + if (sha256_hash) { + calc_hash_sha256_update_stream(sha256_hash, buf, received); + } return received; } @@ -823,7 +979,7 @@ size_t AWSv4Completer::recv_body(char* const buf, const size_t max) void AWSv4Completer::modify_request_state(req_state* const s_rw) { /* TODO(rzarzynski): switch to the dedicated filter over RestfulClient. */ - s_rw->aws4_auth_streaming_mode = aws4_auth_streaming_mode; + //s_rw->aws4_auth_streaming_mode = aws4_auth_streaming_mode; s_rw->aws4_auth = std::unique_ptr(new rgw_aws4_auth); diff --git a/src/rgw/rgw_auth_s3.h b/src/rgw/rgw_auth_s3.h index f9945638b850..52086ae64779 100644 --- a/src/rgw/rgw_auth_s3.h +++ b/src/rgw/rgw_auth_s3.h @@ -9,6 +9,8 @@ #include #include +#include + #include "rgw_common.h" #include "rgw_rest_s3.h" @@ -128,14 +130,65 @@ public: }; +/* TODO(rzarzynski): decompose the AWSv4Completer into two separate completers: + * one for single chunk mode and second for streaming mode. */ class AWSv4Completer : public rgw::auth::Completer, public rgw::io::DecoratedRestfulClient, public std::enable_shared_from_this { private: using io_base_t = rgw::io::DecoratedRestfulClient; + class ChunkMeta { + size_t data_offset_in_stream = 0; + size_t data_length = 0; + std::string signature; + + ChunkMeta(const size_t data_starts_in_stream, + const size_t data_length, + const boost::string_ref signature) + : data_offset_in_stream(data_starts_in_stream), + data_length(data_length), + signature(signature.to_string()) { + } + + ChunkMeta(const boost::string_ref signature) + : signature(signature.to_string()) { + } + + public: + static constexpr size_t SIG_SIZE = 64; + + /* Let's suppose the data length fields can't exceed uint64_t. */ + static constexpr size_t META_MAX_SIZE = \ + strlen("\r\nffffffffffffffff;chunk-signature=") + SIG_SIZE + strlen("\r\n"); + + /* The metadata size of for the last, empty chunk. */ + static constexpr size_t META_MIN_SIZE = \ + strlen("0;chunk-signature=") + SIG_SIZE + strlen("\r\n"); + + /* Detect whether a given stream_pos fits in boundaries of a chunk. */ + bool is_new_chunk_in_stream(size_t stream_pos) const; + + /* Get the remaining data size. */ + size_t get_data_size(size_t stream_pos) const; + + /* Factory: create an object representing metadata of first, initial chunk + * in a stream. */ + static ChunkMeta create_first(const boost::string_ref seed_signature) { + return ChunkMeta(seed_signature); + } + + /* Factory: parse a block of META_MAX_SIZE bytes and creates an object + * representing non-first chunk in a stream. As the process is sequential + * and depends on the previous chunk, caller must pass it. */ + static std::pair + create_next(ChunkMeta&& prev, const char* metabuf, size_t metabuf_len); + } chunk_meta; + + boost::container::static_vector parsing_buf; SHA256* sha256_hash = nullptr; + size_t stream_pos = 0; const bool aws4_auth_needs_complete = true; const bool aws4_auth_streaming_mode = false; @@ -160,6 +213,7 @@ private: std::string seed_signature, const signing_key_t& signing_key) : io_base_t(nullptr), + chunk_meta(ChunkMeta::create_first(seed_signature)), aws4_auth_needs_complete(false), aws4_auth_streaming_mode(true), date(std::move(date)), @@ -175,6 +229,8 @@ private: s(s) { } + size_t recv_chunked(char* buf, size_t max); + public: /* rgw::io::DecoratedRestfulClient. */ size_t recv_body(char* buf, size_t max) override; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 046dd6c07542..4a004755646c 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3278,12 +3278,14 @@ void RGWPutObj::execute() } } +#if 0 if (!chunked_upload && ofs != s->content_length && !s->aws4_auth_streaming_mode) { op_ret = -ERR_REQUEST_TIMEOUT; goto done; } +#endif s->obj_size = ofs; perfcounter->inc(l_rgw_put_b, s->obj_size);