]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: AWSv4Completer dechunks data in the streaming mode.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 27 Apr 2017 19:48:02 +0000 (21:48 +0200)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Wed, 7 Jun 2017 10:43:18 +0000 (12:43 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_auth_s3.cc
src/rgw/rgw_auth_s3.h
src/rgw/rgw_op.cc

index 36906c16868784842cccb5492f9849520f3cb354..877a0297bbbaaa4e0ecddf5f770d690acbf91911 100644 (file)
@@ -811,11 +811,167 @@ std::string get_v4_signature(CephContext* const cct,
   return signature;
 }
 
+bool AWSv4Completer::ChunkMeta::is_new_chunk_in_stream(size_t stream_pos) const
+{
+  return stream_pos >= (data_offset_in_stream + data_length);
+}
+
+size_t AWSv4Completer::ChunkMeta::get_data_size(size_t stream_pos) const
+{
+  if (stream_pos > (data_offset_in_stream + data_length)) {
+    /* Data in parsing_buf. */
+    return data_length;
+  } else {
+    return data_offset_in_stream + data_length - stream_pos;
+  }
+}
+
+std::pair<AWSv4Completer::ChunkMeta, size_t /* consumed */>
+AWSv4Completer::ChunkMeta::create_next(ChunkMeta&& old,
+                                       const char* const metabuf,
+                                       const size_t metabuf_len)
+{
+  boost::string_ref metastr(metabuf, metabuf_len);
+
+  const size_t semicolon_pos = metastr.find(";");
+  if (semicolon_pos == boost::string_ref::npos) {
+    dout(20) << "AWSv4Completer cannot find the ';' separator" << dendl;
+    throw rgw::io::Exception(EINVAL, std::system_category());
+  }
+
+  char* data_field_end;
+  /* strtoull ignores the "\r\n" sequence after each non-first chunk. */
+  const size_t data_length = std::strtoull(metabuf, &data_field_end, 16);
+  if (data_length == 0 && data_field_end == metabuf) {
+    dout(20) << "AWSv4Completer: cannot parse the data size" << dendl;
+    throw rgw::io::Exception(EINVAL, std::system_category());
+  }
+
+  /* Parse the chunk_signature=... part. */
+  const auto signature_part = metastr.substr(semicolon_pos + 1);
+  const size_t eq_sign_pos = signature_part.find("=");
+  if (eq_sign_pos == boost::string_ref::npos) {
+    dout(20) << "AWSv4Completer: cannot find the '=' separator" << dendl;
+    throw rgw::io::Exception(EINVAL, std::system_category());
+  }
+
+  /* OK, we have at least the beginning of a signature. */
+  const size_t data_sep_pos = signature_part.find("\r\n");
+  if (data_sep_pos == boost::string_ref::npos) {
+    dout(20) << "AWSv4Completer: no new line at signature end" << dendl;
+    throw rgw::io::Exception(EINVAL, std::system_category());
+  }
+
+  const auto signature = \
+    signature_part.substr(eq_sign_pos + 1, data_sep_pos - 1 - eq_sign_pos);
+  if (signature.length() != SIG_SIZE) {
+    dout(20) << "AWSv4Completer: signature.length() != 64" << dendl;
+    throw rgw::io::Exception(EINVAL, std::system_category());
+  }
+
+  const size_t data_starts_in_stream = \
+    + semicolon_pos + strlen(";") + data_sep_pos  + strlen("\r\n")
+    + old.data_offset_in_stream + old.data_length;
+
+  dout(20) << "parsed new chunk; signature=" << signature
+           << ", data_length=" << data_length
+           << ", data_starts_in_stream=" << data_starts_in_stream
+           << dendl;
+
+  return std::make_pair(ChunkMeta(data_starts_in_stream,
+                                  data_length,
+                                  signature),
+                        semicolon_pos + 83);
+}
+
+size_t AWSv4Completer::recv_chunked(char* const buf, const size_t buf_max)
+{
+  /* Buffer stores only parsed stream. Raw values reflect the stream
+   * we're getting from a client. */
+  size_t buf_pos = 0;
+
+  if (chunk_meta.is_new_chunk_in_stream(stream_pos)) {
+    /* We don't have metadata for this range. This means a new chunk, so we
+     * need to parse a fresh portion of the stream. Let's start. */
+    dout(20) << "AWSv4Completer: -- parsing a new chunk" << dendl;
+
+    size_t to_extract = parsing_buf.capacity() - parsing_buf.size();
+    do {
+      const size_t orig_size = parsing_buf.size();
+      parsing_buf.resize(parsing_buf.size() + to_extract);
+      const size_t received = io_base_t::recv_body(parsing_buf.data() + orig_size,
+                                                   to_extract);
+      parsing_buf.resize(parsing_buf.size() - (to_extract - received));
+      if (received == 0) {
+        break;
+      }
+
+      stream_pos += received;
+      to_extract -= received;
+    } while (to_extract > 0);
+
+    size_t consumed;
+    std::tie(chunk_meta, consumed) = \
+      ChunkMeta::create_next(std::move(chunk_meta),
+                                 parsing_buf.data(), parsing_buf.size());
+
+    /* We can drop the bytes consumed during metadata parsing. The remainder
+     * can be chunk's data plus possibly beginning of next chunks' metadata. */
+    parsing_buf.erase(std::begin(parsing_buf),
+                      std::begin(parsing_buf) + consumed);
+
+    /* The validity of previous chunk can be verified only after getting meta-
+     * data of the next one. */
+    /* TODO(rzarzynski): implement chunk's signature verification. */
+  }
+
+  size_t to_extract = \
+    std::min(chunk_meta.get_data_size(stream_pos), buf_max);
+  
+  /* It's quite probable we have a couple of real data bytes stored together
+   * with meta-data in the parsing_buf. We need to extract them and move to
+   * the final buffer. This is a trade-off between frontend's read overhead
+   * and memcpy. */
+  if (to_extract > 0 && parsing_buf.size() > 0) {
+    const auto data_len = std::min(to_extract, parsing_buf.size());
+    const auto data_end_iter = std::begin(parsing_buf) + data_len;
+
+    std::copy(std::begin(parsing_buf), data_end_iter, buf);
+    parsing_buf.erase(std::begin(parsing_buf), data_end_iter);
+
+    to_extract -= data_len;
+    buf_pos += data_len;
+  }
+
+  /* Now we can do the bulk read directly from RestfulClient without any extra
+   * buffering. */
+  while (to_extract > 0) {
+    const size_t received = io_base_t::recv_body(buf + buf_pos, to_extract);
+
+    if (received == 0) {
+      break;
+    }
+
+    buf_pos += received;
+    stream_pos += received;
+    to_extract -= received;
+  }
+
+  dout(20) << "AWSv4Completer: filled=" << buf_pos << dendl;
+  return buf_pos;
+}
 
 size_t AWSv4Completer::recv_body(char* const buf, const size_t max)
 {
+  if (aws4_auth_streaming_mode) {
+    return recv_chunked(buf, max);
+  }
+
   const auto received = io_base_t::recv_body(buf, max);
-  calc_hash_sha256_update_stream(sha256_hash, buf, received);
+
+  if (sha256_hash) {
+    calc_hash_sha256_update_stream(sha256_hash, buf, received);
+  }
 
   return received;
 }
@@ -823,7 +979,7 @@ size_t AWSv4Completer::recv_body(char* const buf, const size_t max)
 void AWSv4Completer::modify_request_state(req_state* const s_rw)
 {
   /* TODO(rzarzynski): switch to the dedicated filter over RestfulClient. */
-  s_rw->aws4_auth_streaming_mode = aws4_auth_streaming_mode;
+  //s_rw->aws4_auth_streaming_mode = aws4_auth_streaming_mode;
 
   s_rw->aws4_auth = std::unique_ptr<rgw_aws4_auth>(new rgw_aws4_auth);
 
index f9945638b850951fdc290b9f61d266a1d534df0c..52086ae64779aef8f65ea1f5fd036ac438a8a7d2 100644 (file)
@@ -9,6 +9,8 @@
 #include <string>
 #include <tuple>
 
+#include <boost/container/static_vector.hpp>
+
 #include "rgw_common.h"
 #include "rgw_rest_s3.h"
 
@@ -128,14 +130,65 @@ public:
 };
 
 
+/* TODO(rzarzynski): decompose the AWSv4Completer into two separate completers:
+ * one for single chunk mode and second for streaming mode. */
 class AWSv4Completer : public rgw::auth::Completer,
                        public rgw::io::DecoratedRestfulClient<rgw::io::RestfulClient*>,
                        public std::enable_shared_from_this<AWSv4Completer> {
 private:
   using io_base_t = rgw::io::DecoratedRestfulClient<rgw::io::RestfulClient*>;
 
+  class ChunkMeta {
+    size_t data_offset_in_stream = 0;
+    size_t data_length = 0;
+    std::string signature;
+
+    ChunkMeta(const size_t data_starts_in_stream,
+              const size_t data_length,
+              const boost::string_ref signature)
+      : data_offset_in_stream(data_starts_in_stream),
+        data_length(data_length),
+        signature(signature.to_string()) {
+    }
+
+    ChunkMeta(const boost::string_ref signature)
+      : signature(signature.to_string()) {
+    }
+
+  public:
+    static constexpr size_t SIG_SIZE = 64;
+
+    /* Let's suppose the data length fields can't exceed uint64_t. */
+    static constexpr size_t META_MAX_SIZE = \
+      strlen("\r\nffffffffffffffff;chunk-signature=") + SIG_SIZE + strlen("\r\n");
+
+    /* The metadata size of for the last, empty chunk. */
+    static constexpr size_t META_MIN_SIZE = \
+      strlen("0;chunk-signature=") + SIG_SIZE + strlen("\r\n");
+
+    /* Detect whether a given stream_pos fits in boundaries of a chunk. */
+    bool is_new_chunk_in_stream(size_t stream_pos) const;
+
+    /* Get the remaining data size. */
+    size_t get_data_size(size_t stream_pos) const;
+
+    /* Factory: create an object representing metadata of first, initial chunk
+     * in a stream. */
+    static ChunkMeta create_first(const boost::string_ref seed_signature) {
+      return ChunkMeta(seed_signature);
+    }
+
+    /* Factory: parse a block of META_MAX_SIZE bytes and creates an object
+     * representing non-first chunk in a stream. As the process is sequential
+     * and depends on the previous chunk, caller must pass it. */
+    static std::pair<ChunkMeta, size_t>
+    create_next(ChunkMeta&& prev, const char* metabuf, size_t metabuf_len);
+  } chunk_meta;
+
+  boost::container::static_vector<char, ChunkMeta::META_MAX_SIZE> parsing_buf;
   SHA256* sha256_hash = nullptr;
 
+  size_t stream_pos = 0;
   const bool aws4_auth_needs_complete = true;
   const bool aws4_auth_streaming_mode = false;
 
@@ -160,6 +213,7 @@ private:
                  std::string seed_signature,
                  const signing_key_t& signing_key)
     : io_base_t(nullptr),
+      chunk_meta(ChunkMeta::create_first(seed_signature)),
       aws4_auth_needs_complete(false),
       aws4_auth_streaming_mode(true),
       date(std::move(date)),
@@ -175,6 +229,8 @@ private:
       s(s) {
   }
 
+  size_t recv_chunked(char* buf, size_t max);
+
 public:
   /* rgw::io::DecoratedRestfulClient. */
   size_t recv_body(char* buf, size_t max) override;
index 046dd6c07542abf9e77cc6a2b5df79fb49d0efaa..4a004755646c291c2ba80064b26c0fd934a993f8 100644 (file)
@@ -3278,12 +3278,14 @@ void RGWPutObj::execute()
     }
   }
 
+#if 0
   if (!chunked_upload &&
       ofs != s->content_length &&
       !s->aws4_auth_streaming_mode) {
     op_ret = -ERR_REQUEST_TIMEOUT;
     goto done;
   }
+#endif
   s->obj_size = ofs;
 
   perfcounter->inc(l_rgw_put_b, s->obj_size);