From 4c4cdf2183f53d616ba7b397fa9ef18c2edd303a Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Thu, 24 Nov 2016 11:34:01 +0100 Subject: [PATCH] rgw: implement the TAR extraction loop of Swift's BulkUpload. Signed-off-by: Radoslaw Zarzynski --- src/rgw/rgw_op.cc | 51 ++++++++++++++++++++++++++++++++++ src/rgw/rgw_rest_swift.cc | 58 ++++++++++++++++++++++++++++++++++++++- src/rgw/rgw_rest_swift.h | 8 +++++- 3 files changed, 115 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index f0030f915d30e..9db697a5ac89a 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -31,6 +31,7 @@ #include "rgw_cors_s3.h" #include "rgw_rest_conn.h" #include "rgw_rest_s3.h" +#include "rgw_tar.h" #include "rgw_client_io.h" #include "rgw_compression.h" #include "rgw_role.h" @@ -5509,6 +5510,56 @@ void RGWBulkUploadOp::pre_exec() void RGWBulkUploadOp::execute() { + ceph::bufferlist buffer(64 * 1024); + + ldout(s->cct, 20) << "bulk upload: start" << dendl; + + /* Create an instance of stream-abstracting class. Having this indirection + * allows for easy introduction of decompressors like gzip and bzip2. */ + auto stream = create_stream(); + auto status = rgw::tar::StatusIndicator::create(); + do { + op_ret = stream->get_exactly(rgw::tar::BLOCK_SIZE, buffer); + if (op_ret < 0) { + ldout(s->cct, 2) << "bulk upload: cannot read header" << dendl; + return; + } + + /* We need to re-interpret the buffer as a TAR block. Exactly two blocks + * must be tracked to detect out end-of-archive. It occurs when both of + * them are empty (zeroed). Tracing this particular inter-block dependency + * is responsibility of the rgw::tar::StatusIndicator class. */ + boost::optional header; + std::tie(status, header) = rgw::tar::interpret_block(status, buffer); + + if (! status.empty() && header) { + /* This specific block isn't empty (entirely zeroed), so we can parse + * it as a TAR header and dispatch. At the moment we do support only + * regular files and directories. Everything else (symlinks, devices) + * will be ignored but won't cease the whole upload. */ + switch (header->get_filetype()) { + case rgw::tar::FileType::NORMAL_FILE: { + ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl; + break; + } + case rgw::tar::FileType::DIRECTORY: { + ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl; + break; + } + default: { + /* Not recognized. Skip. */ + op_ret = 0; + break; + } + } + } else { + ldout(s->cct, 2) << "bulk upload: an empty block" << dendl; + op_ret = 0; + } + + buffer.clear(); + } while (! status.eof()); + return; } diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index da308a639d8c9..b0fa219b3b71f 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -1424,7 +1424,63 @@ void RGWBulkDelete_ObjStore_SWIFT::send_response() std::unique_ptr RGWBulkUploadOp_ObjStore_SWIFT::create_stream() { - return nullptr; + class SwiftStreamGetter : public StreamGetter { + const size_t conlen; + size_t curpos; + req_state* const s; + + public: + SwiftStreamGetter(req_state* const s) + : conlen(atoll(s->length)), + curpos(0), + s(s) { + } + + ssize_t get_at_most(size_t want, ceph::bufferlist& dst) override { + /* maximum requested by a caller */ + /* data provided by client */ + /* RadosGW's limit. */ + const size_t max_chunk_size = \ + static_cast(s->cct->_conf->rgw_max_chunk_size); + const size_t max_to_read = std::min({ want, conlen - curpos, max_chunk_size }); + + ldout(s->cct, 20) << "bulk_upload: get_at_most max_to_read=" + << max_to_read + << ", dst.c_str()=" << reinterpret_cast(dst.c_str()) << dendl; + + bufferptr bp(max_to_read); + const auto read_len = recv_body(s, bp.c_str(), max_to_read); + dst.append(bp, 0, read_len); + //const auto read_len = recv_body(s, dst.c_str(), max_to_read); + if (read_len < 0) { + return read_len; + } + + curpos += read_len; + return curpos > s->cct->_conf->rgw_max_put_size ? -ERR_TOO_LARGE + : read_len; + } + + ssize_t get_exactly(size_t want, ceph::bufferlist& dst) override { + ldout(s->cct, 20) << "bulk_upload: get_exactly want=" << want << dendl; + + /* FIXME: do this in a loop. */ + const auto ret = get_at_most(want, dst); + ldout(s->cct, 20) << "bulk_upload: get_exactly ret=" << ret << dendl; + if (ret < 0) { + return ret; + } else if (static_cast(ret) != want) { + return -EINVAL; + } else { + return want; + } + } + }; + + // FIXME: lack of conlen + ldout(s->cct, 20) << "bulk upload: create_stream for length=" + << s->length << dendl; + return std::unique_ptr(new SwiftStreamGetter(s)); } void RGWBulkUploadOp_ObjStore_SWIFT::send_response() diff --git a/src/rgw/rgw_rest_swift.h b/src/rgw/rgw_rest_swift.h index 70f0ed67517be..8478b1bad7b73 100644 --- a/src/rgw/rgw_rest_swift.h +++ b/src/rgw/rgw_rest_swift.h @@ -202,8 +202,14 @@ public: }; class RGWBulkUploadOp_ObjStore_SWIFT : public RGWBulkUploadOp_ObjStore { + size_t conlen; + size_t curpos; + public: - RGWBulkUploadOp_ObjStore_SWIFT() = default; + RGWBulkUploadOp_ObjStore_SWIFT() + : conlen(0), + curpos(0) { + } ~RGWBulkUploadOp_ObjStore_SWIFT() = default; std::unique_ptr create_stream() override; -- 2.39.5