#include "rgw_cors_s3.h"
#include "rgw_rest_conn.h"
#include "rgw_rest_s3.h"
+#include "rgw_tar.h"
#include "rgw_client_io.h"
#include "rgw_compression.h"
#include "rgw_role.h"
void RGWBulkUploadOp::execute()
{
+ ceph::bufferlist buffer(64 * 1024);
+
+ ldout(s->cct, 20) << "bulk upload: start" << dendl;
+
+ /* Create an instance of stream-abstracting class. Having this indirection
+ * allows for easy introduction of decompressors like gzip and bzip2. */
+ auto stream = create_stream();
+ auto status = rgw::tar::StatusIndicator::create();
+ do {
+ op_ret = stream->get_exactly(rgw::tar::BLOCK_SIZE, buffer);
+ if (op_ret < 0) {
+ ldout(s->cct, 2) << "bulk upload: cannot read header" << dendl;
+ return;
+ }
+
+ /* We need to re-interpret the buffer as a TAR block. Exactly two blocks
+ * must be tracked to detect out end-of-archive. It occurs when both of
+ * them are empty (zeroed). Tracing this particular inter-block dependency
+ * is responsibility of the rgw::tar::StatusIndicator class. */
+ boost::optional<rgw::tar::HeaderView> header;
+ std::tie(status, header) = rgw::tar::interpret_block(status, buffer);
+
+ if (! status.empty() && header) {
+ /* This specific block isn't empty (entirely zeroed), so we can parse
+ * it as a TAR header and dispatch. At the moment we do support only
+ * regular files and directories. Everything else (symlinks, devices)
+ * will be ignored but won't cease the whole upload. */
+ switch (header->get_filetype()) {
+ case rgw::tar::FileType::NORMAL_FILE: {
+ ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl;
+ break;
+ }
+ case rgw::tar::FileType::DIRECTORY: {
+ ldout(s->cct, 2) << "bulk upload: handling regular directory" << dendl;
+ break;
+ }
+ default: {
+ /* Not recognized. Skip. */
+ op_ret = 0;
+ break;
+ }
+ }
+ } else {
+ ldout(s->cct, 2) << "bulk upload: an empty block" << dendl;
+ op_ret = 0;
+ }
+
+ buffer.clear();
+ } while (! status.eof());
+
return;
}
std::unique_ptr<RGWBulkUploadOp::StreamGetter>
RGWBulkUploadOp_ObjStore_SWIFT::create_stream()
{
- return nullptr;
+ class SwiftStreamGetter : public StreamGetter {
+ const size_t conlen;
+ size_t curpos;
+ req_state* const s;
+
+ public:
+ SwiftStreamGetter(req_state* const s)
+ : conlen(atoll(s->length)),
+ curpos(0),
+ s(s) {
+ }
+
+ ssize_t get_at_most(size_t want, ceph::bufferlist& dst) override {
+ /* maximum requested by a caller */
+ /* data provided by client */
+ /* RadosGW's limit. */
+ const size_t max_chunk_size = \
+ static_cast<size_t>(s->cct->_conf->rgw_max_chunk_size);
+ const size_t max_to_read = std::min({ want, conlen - curpos, max_chunk_size });
+
+ ldout(s->cct, 20) << "bulk_upload: get_at_most max_to_read="
+ << max_to_read
+ << ", dst.c_str()=" << reinterpret_cast<intptr_t>(dst.c_str()) << dendl;
+
+ bufferptr bp(max_to_read);
+ const auto read_len = recv_body(s, bp.c_str(), max_to_read);
+ dst.append(bp, 0, read_len);
+ //const auto read_len = recv_body(s, dst.c_str(), max_to_read);
+ if (read_len < 0) {
+ return read_len;
+ }
+
+ curpos += read_len;
+ return curpos > s->cct->_conf->rgw_max_put_size ? -ERR_TOO_LARGE
+ : read_len;
+ }
+
+ ssize_t get_exactly(size_t want, ceph::bufferlist& dst) override {
+ ldout(s->cct, 20) << "bulk_upload: get_exactly want=" << want << dendl;
+
+ /* FIXME: do this in a loop. */
+ const auto ret = get_at_most(want, dst);
+ ldout(s->cct, 20) << "bulk_upload: get_exactly ret=" << ret << dendl;
+ if (ret < 0) {
+ return ret;
+ } else if (static_cast<size_t>(ret) != want) {
+ return -EINVAL;
+ } else {
+ return want;
+ }
+ }
+ };
+
+ // FIXME: lack of conlen
+ ldout(s->cct, 20) << "bulk upload: create_stream for length="
+ << s->length << dendl;
+ return std::unique_ptr<SwiftStreamGetter>(new SwiftStreamGetter(s));
}
void RGWBulkUploadOp_ObjStore_SWIFT::send_response()
};
class RGWBulkUploadOp_ObjStore_SWIFT : public RGWBulkUploadOp_ObjStore {
+ size_t conlen;
+ size_t curpos;
+
public:
- RGWBulkUploadOp_ObjStore_SWIFT() = default;
+ RGWBulkUploadOp_ObjStore_SWIFT()
+ : conlen(0),
+ curpos(0) {
+ }
~RGWBulkUploadOp_ObjStore_SWIFT() = default;
std::unique_ptr<StreamGetter> create_stream() override;