]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement the TAR extraction loop of Swift's BulkUpload.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 24 Nov 2016 10:34:01 +0000 (11:34 +0100)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Sat, 1 Apr 2017 13:46:26 +0000 (15:46 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_rest_swift.cc
src/rgw/rgw_rest_swift.h

index f0030f915d30e1d983ae35a0526346ceb42c258f..9db697a5ac89ac4b603cb27dd602d6d113ff2ce5 100644 (file)
@@ -31,6 +31,7 @@
 #include "rgw_cors_s3.h"
 #include "rgw_rest_conn.h"
 #include "rgw_rest_s3.h"
+#include "rgw_tar.h"
 #include "rgw_client_io.h"
 #include "rgw_compression.h"
 #include "rgw_role.h"
@@ -5509,6 +5510,56 @@ void RGWBulkUploadOp::pre_exec()
 
 void RGWBulkUploadOp::execute()
 {
+  ceph::bufferlist buffer(64 * 1024);
+
+  ldout(s->cct, 20) << "bulk upload: start" << dendl;
+
+  /* Create an instance of stream-abstracting class. Having this indirection
+   * allows for easy introduction of decompressors like gzip and bzip2. */
+  auto stream = create_stream();
+  auto status = rgw::tar::StatusIndicator::create();
+  do {
+    op_ret = stream->get_exactly(rgw::tar::BLOCK_SIZE, buffer);
+    if (op_ret < 0) {
+      ldout(s->cct, 2) << "bulk upload: cannot read header" << dendl;
+      return;
+    }
+
+    /* We need to re-interpret the buffer as a TAR block. Exactly two blocks
+     * must be tracked to detect out end-of-archive. It occurs when both of
+     * them are empty (zeroed). Tracing this particular inter-block dependency
+     * is responsibility of the rgw::tar::StatusIndicator class. */
+    boost::optional<rgw::tar::HeaderView> header;
+    std::tie(status, header) = rgw::tar::interpret_block(status, buffer);
+
+    if (! status.empty() && header) {
+      /* This specific block isn't empty (entirely zeroed), so we can parse
+       * it as a TAR header and dispatch. At the moment we do support only
+       * regular files and directories. Everything else (symlinks, devices)
+       * will be ignored but won't cease the whole upload. */
+      switch (header->get_filetype()) {
+        case rgw::tar::FileType::NORMAL_FILE: {
+          ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl;
+          break;
+        }
+        case rgw::tar::FileType::DIRECTORY: {
+          ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
+          break;
+        }
+        default: {
+          /* Not recognized. Skip. */
+          op_ret = 0;
+          break;
+        }
+      }
+    } else {
+      ldout(s->cct, 2) << "bulk upload: an empty block" << dendl;
+      op_ret = 0;
+    }
+
+    buffer.clear();
+  } while (! status.eof());
+
   return;
 }
 
index da308a639d8c940c76967ad8102a2548ec1b3de9..b0fa219b3b71fb5c1eb1da1c50e0f4535e5b992a 100644 (file)
@@ -1424,7 +1424,63 @@ void RGWBulkDelete_ObjStore_SWIFT::send_response()
 std::unique_ptr<RGWBulkUploadOp::StreamGetter>
 RGWBulkUploadOp_ObjStore_SWIFT::create_stream()
 {
-  return nullptr;
+  class SwiftStreamGetter : public StreamGetter {
+    const size_t conlen;
+    size_t curpos;
+    req_state* const s;
+
+  public:
+    SwiftStreamGetter(req_state* const s)
+      : conlen(atoll(s->length)),
+        curpos(0),
+        s(s) {
+    }
+
+    ssize_t get_at_most(size_t want, ceph::bufferlist& dst) override {
+      /* maximum requested by a caller */
+      /* data provided by client */
+      /* RadosGW's limit. */
+      const size_t max_chunk_size = \
+        static_cast<size_t>(s->cct->_conf->rgw_max_chunk_size);
+      const size_t max_to_read = std::min({ want, conlen - curpos, max_chunk_size });
+
+      ldout(s->cct, 20) << "bulk_upload: get_at_most max_to_read="
+                        << max_to_read
+                        << ", dst.c_str()=" << reinterpret_cast<intptr_t>(dst.c_str()) << dendl;
+
+      bufferptr bp(max_to_read);
+      const auto read_len = recv_body(s, bp.c_str(), max_to_read);
+      dst.append(bp, 0, read_len);
+      //const auto read_len = recv_body(s, dst.c_str(), max_to_read);
+      if (read_len < 0) {
+        return read_len;
+      }
+
+      curpos += read_len;
+      return curpos > s->cct->_conf->rgw_max_put_size ? -ERR_TOO_LARGE
+                                                      : read_len;
+    }
+
+    ssize_t get_exactly(size_t want, ceph::bufferlist& dst) override {
+      ldout(s->cct, 20) << "bulk_upload: get_exactly want=" << want << dendl;
+
+      /* FIXME: do this in a loop. */
+      const auto ret = get_at_most(want, dst);
+      ldout(s->cct, 20) << "bulk_upload: get_exactly ret=" << ret << dendl;
+      if (ret < 0) {
+        return ret;
+      } else if (static_cast<size_t>(ret) != want) {
+        return -EINVAL;
+      } else {
+        return want;
+      }
+    }
+  };
+
+  // FIXME: lack of conlen
+  ldout(s->cct, 20) << "bulk upload: create_stream for length="
+                    << s->length << dendl;
+  return std::unique_ptr<SwiftStreamGetter>(new SwiftStreamGetter(s));
 }
 
 void RGWBulkUploadOp_ObjStore_SWIFT::send_response()
index 70f0ed67517be61681b4d767269f24ca298d2bb2..8478b1bad7b73421b9fe0998108ce15840dff0ad 100644 (file)
@@ -202,8 +202,14 @@ public:
 };
 
 class RGWBulkUploadOp_ObjStore_SWIFT : public RGWBulkUploadOp_ObjStore {
+  size_t conlen;
+  size_t curpos;
+
 public:
-  RGWBulkUploadOp_ObjStore_SWIFT() = default;
+  RGWBulkUploadOp_ObjStore_SWIFT()
+    : conlen(0),
+      curpos(0) {
+  }
   ~RGWBulkUploadOp_ObjStore_SWIFT() = default;
 
   std::unique_ptr<StreamGetter> create_stream() override;