]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement the object creation in BulkUpload of Swift API.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Thu, 24 Nov 2016 13:20:33 +0000 (14:20 +0100)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Sun, 2 Apr 2017 16:35:22 +0000 (18:35 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h

index bd5f380623f09ac9b29e14b1d49b64b02bd4b5fa..f906689cfaa942d5d2c4bd8ab2f9bf9d4efa3c09 100644 (file)
@@ -5742,6 +5742,182 @@ int RGWBulkUploadOp::handle_dir(const boost::string_ref path)
   return op_ret;
 }
 
+
+bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo& binfo,
+                                                    std::map<std::string, ceph::bufferlist>& battrs,
+                                                    ACLOwner& bucket_owner /* out */)
+{
+  RGWAccessControlPolicy bacl(store->ctx());
+  op_ret = read_bucket_policy(store, s, binfo, battrs, &bacl, binfo.bucket);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "bulk upload: cannot read_policy() for bucket"
+                      << dendl;
+    return false;
+  }
+
+  bucket_owner = bacl.get_owner();
+  return verify_bucket_permission(s, s->user_acl.get(), &bacl, RGW_PERM_WRITE);
+}
+
+int RGWBulkUploadOp::handle_file(const boost::string_ref path,
+                                 const size_t size,
+                                 AlignedStreamGetter& body)
+{
+
+  ldout(s->cct, 20) << "bulk upload: got file=" << path << ", size=" << size
+                    << dendl;
+
+  RGWPutObjDataProcessor *filter = nullptr;
+  boost::optional<RGWPutObj_Compress> compressor;
+
+  if (size > static_cast<const size_t>(s->cct->_conf->rgw_max_put_size)) {
+    op_ret = -ERR_TOO_LARGE;
+    return op_ret;
+  }
+
+  std::string bucket_name;
+  rgw_obj_key object;
+  std::tie(bucket_name, object) = *parse_path(path);
+
+  auto& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+  RGWBucketInfo binfo;
+  std::map<std::string, ceph::bufferlist> battrs;
+  ACLOwner bowner;
+  op_ret = store->get_bucket_info(obj_ctx, s->user->user_id.tenant,
+                                  bucket_name, binfo, nullptr, &battrs);
+  if (op_ret == -ENOENT) {
+    ldout(s->cct, 20) << "bulk upload: non existent directory=" << bucket_name
+                      << dendl;
+  } else if (op_ret < 0) {
+    return op_ret;
+  }
+
+  if (! handle_file_verify_permission(binfo, battrs, bowner)) {
+    ldout(s->cct, 20) << "bulk upload: object creation unauthorized" << dendl;
+    op_ret = -EACCES;
+    return op_ret;
+  }
+
+  op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+                              user_quota, bucket_quota, size);
+  if (op_ret < 0) {
+    return op_ret;
+  }
+
+  RGWPutObjProcessor_Atomic processor(obj_ctx,
+                                      binfo,
+                                      binfo.bucket,
+                                      object.name,
+                                      /* part size */
+                                      s->cct->_conf->rgw_obj_stripe_size,
+                                      s->req_id,
+                                      binfo.versioning_enabled());
+
+  /* No filters by default. */
+  filter = &processor;
+
+  op_ret = processor.prepare(store, nullptr);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "bulk upload: cannot prepare processor due to ret="
+                      << op_ret << dendl;
+    return op_ret;
+  }
+
+  const auto& compression_type = store->get_zone_params().get_compression_type(
+      binfo.placement_rule);
+  CompressorRef plugin;
+  if (compression_type != "none") {
+    plugin = Compressor::create(s->cct, compression_type);
+    if (! plugin) {
+      ldout(s->cct, 1) << "Cannot load plugin for rgw_compression_type "
+                       << compression_type << dendl;
+    } else {
+      compressor.emplace(s->cct, plugin, filter);
+      filter = &*compressor;
+    }
+  }
+
+  /* Upload file content. */
+  ssize_t len = 0;
+  size_t ofs = 0;
+  MD5 hash;
+  do {
+    ceph::bufferlist data;
+    len = body.get_at_most(s->cct->_conf->rgw_max_chunk_size, data);
+
+    ldout(s->cct, 20) << "bulk upload: body=" << data.c_str() << dendl;
+    if (len < 0) {
+      op_ret = len;
+      return op_ret;
+    } else if (len > 0) {
+      hash.Update((const byte *)data.c_str(), data.length());
+      op_ret = put_data_and_throttle(filter, data, ofs, false);
+      if (op_ret < 0) {
+        ldout(s->cct, 20) << "processor->thottle_data() returned ret="
+                         << op_ret << dendl;
+        return op_ret;
+      }
+
+      ofs += len;
+    }
+
+  } while (len > 0);
+
+  if (ofs != size) {
+    ldout(s->cct, 10) << "bulk upload: real file size different from declared"
+                      << dendl;
+    op_ret = -EINVAL;
+  }
+
+  op_ret = store->check_quota(bowner.get_id(), binfo.bucket,
+                             user_quota, bucket_quota, size);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "bulk upload: quota exceeded for path=" << path
+                      << dendl;
+    return op_ret;
+  }
+
+  char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+  unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  hash.Final(m);
+  buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
+
+  /* Create metadata: ETAG. */
+  std::map<std::string, ceph::bufferlist> attrs;
+  std::string etag = calc_md5;
+  ceph::bufferlist etag_bl;
+  etag_bl.append(etag.c_str(), etag.size() + 1);
+  attrs.emplace(RGW_ATTR_ETAG, std::move(etag_bl));
+
+  /* Create metadata: ACLs. */
+  RGWAccessControlPolicy policy;
+  policy.create_default(s->user->user_id, s->user->display_name);
+  ceph::bufferlist aclbl;
+  policy.encode(aclbl);
+  attrs.emplace(RGW_ATTR_ACL, std::move(aclbl));
+
+  /* Create metadata: compression info. */
+  if (compressor && compressor->is_compressed()) {
+    ceph::bufferlist tmp;
+    RGWCompressionInfo cs_info;
+    cs_info.compression_type = plugin->get_type_name();
+    cs_info.orig_size = s->obj_size;
+    cs_info.blocks = std::move(compressor->get_compression_blocks());
+    ::encode(cs_info, tmp);
+    attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
+  }
+
+  /* Complete the transaction. */
+  op_ret = processor.complete(size, etag, nullptr, ceph::real_time(), attrs,
+                              ceph::real_time() /* delete_at */);
+  if (op_ret < 0) {
+    ldout(s->cct, 20) << "bulk upload: processor::complete returned op_ret="
+                      << op_ret << dendl;
+  }
+
+  return op_ret;
+}
+
 void RGWBulkUploadOp::execute()
 {
   ceph::bufferlist buffer(64 * 1024);
@@ -5774,6 +5950,12 @@ void RGWBulkUploadOp::execute()
       switch (header->get_filetype()) {
         case rgw::tar::FileType::NORMAL_FILE: {
           ldout(s->cct, 2) << "bulk upload: handling regular file" << dendl;
+
+          auto body = AlignedStreamGetter(0, header->get_filesize(),
+                                          rgw::tar::BLOCK_SIZE, *stream);
+          op_ret = handle_file(header->get_filename(),
+                               header->get_filesize(),
+                               body);
           break;
         }
         case rgw::tar::FileType::DIRECTORY: {
index c671f85072f4445c844f22502aba7d63fb8afd8a..df865926454c71c62f0c0d8da357f8ddb44288eb 100644 (file)
@@ -398,6 +398,13 @@ protected:
   boost::optional<std::pair<std::string, rgw_obj_key>>
   parse_path(const boost::string_ref& path);
 
+  bool handle_file_verify_permission(RGWBucketInfo& binfo,
+                                     std::map<std::string, ceph::bufferlist>& battrs,
+                                     ACLOwner& bucket_owner /* out */);
+  int handle_file(boost::string_ref path,
+                  size_t size,
+                  AlignedStreamGetter& body);
+
   int handle_dir_verify_permission();
   int handle_dir(boost::string_ref path);
 public: