]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWPostObj is able now to handle multiple files in single form.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Wed, 14 Sep 2016 11:25:53 +0000 (13:25 +0200)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Mon, 24 Apr 2017 11:09:44 +0000 (13:09 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index 4ec0fb7688118ac5c31db3274fbdaf5902751a30..2f2efde1dd7366a76ac9d91ad53b95c403c997d3 100644 (file)
@@ -3268,16 +3268,10 @@ void RGWPostObj::pre_exec()
 void RGWPostObj::execute()
 {
   RGWPutObjDataProcessor *filter = nullptr;
-  std::unique_ptr<RGWPutObjDataProcessor> encrypt;
-  char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
-  unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
-  MD5 hash;
-  buffer::list bl, aclbl;
-  int len = 0;
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
 
-  // read in the data from the POST form
+  /* Read in the data from the POST form. */
   op_ret = get_params();
   if (op_ret < 0) {
     return;
@@ -3293,116 +3287,133 @@ void RGWPostObj::execute()
     return;
   }
 
-  op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
-                             user_quota, bucket_quota, s->content_length);
-  if (op_ret < 0) {
-    return;
-  }
-
-  RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
-                                      s->bucket_info,
-                                      s->bucket,
-                                      s->object.name,
-                                      /* part size */
-                                      s->cct->_conf->rgw_obj_stripe_size,
-                                      s->req_id,
-                                      s->bucket_info.versioning_enabled());
+  /* Start iteration over data fields. It's necessary as Swift's FormPost
+   * is capable to handle multiple files in single form. */
+  do {
+    std::unique_ptr<RGWPutObjDataProcessor> encrypt;
+    char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+    unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+    MD5 hash;
+    ceph::buffer::list bl, aclbl;
+    int len = 0;
+
+    op_ret = store->check_quota(s->bucket_owner.get_id(),
+                                s->bucket,
+                                user_quota,
+                                bucket_quota,
+                                s->content_length);
+    if (op_ret < 0) {
+      return;
+    }
 
-  // no filters by default
-  filter = &processor;
+    RGWPutObjProcessor_Atomic processor(*static_cast<RGWObjectCtx *>(s->obj_ctx),
+                                        s->bucket_info,
+                                        s->bucket,
+                                        get_current_filename(),
+                                        /* part size */
+                                        s->cct->_conf->rgw_obj_stripe_size,
+                                        s->req_id,
+                                        s->bucket_info.versioning_enabled());
+    /* No filters by default. */
+    filter = &processor;
 
-  op_ret = processor.prepare(store, nullptr);
-  if (op_ret < 0)
-    return;
+    op_ret = processor.prepare(store, nullptr);
+    if (op_ret < 0) {
+      return;
+    }
 
-  op_ret = get_encrypt_filter(&encrypt, filter);
-  if (op_ret < 0) {
-    return;
-  }
-  if (encrypt != nullptr) {
-    filter = encrypt.get();
-  } else {
-    const auto& compression_type = store->get_zone_params().get_compression_type(
-        s->bucket_info.placement_rule);
-    if (compression_type != "none") {
-      plugin = Compressor::create(s->cct, compression_type);
-      if (!plugin) {
-        ldout(s->cct, 1) << "Cannot load plugin for compression type "
-            << compression_type << dendl;
-      } else {
-        compressor.emplace(s->cct, plugin, filter);
-        filter = &*compressor;
+    op_ret = get_encrypt_filter(&encrypt, filter);
+    if (op_ret < 0) {
+      return;
+    }
+    if (encrypt != nullptr) {
+      filter = encrypt.get();
+    } else {
+      const auto& compression_type = store->get_zone_params().get_compression_type(
+          s->bucket_info.placement_rule);
+      if (compression_type != "none") {
+        plugin = Compressor::create(s->cct, compression_type);
+        if (!plugin) {
+          ldout(s->cct, 1) << "Cannot load plugin for compression type "
+                           << compression_type << dendl;
+        } else {
+          compressor.emplace(s->cct, plugin, filter);
+          filter = &*compressor;
+        }
       }
     }
-  }
 
-  while (data_pending) {
-     bufferlist data;
-     len = get_data(data);
+    while (data_pending) {
+      bufferlist data;
+      len = get_data(data);
 
-     if (len < 0) {
-       op_ret = len;
-       return;
-     }
+      if (len < 0) {
+        op_ret = len;
+        return;
+      }
 
-     if (!len)
-       break;
+      if (!len) {
+        break;
+      }
 
-     hash.Update((const byte *)data.c_str(), data.length());
-     op_ret = put_data_and_throttle(filter, data, ofs, false);
+      hash.Update((const byte *)data.c_str(), data.length());
+      op_ret = put_data_and_throttle(filter, data, ofs, false);
 
-     ofs += len;
+      ofs += len;
 
-     if (ofs > max_len) {
-       op_ret = -ERR_TOO_LARGE;
-       return;
-     }
-   }
-  {
-    bufferlist flush;
-    op_ret = put_data_and_throttle(filter, flush, ofs, false);
-  }
-  if (len < min_len) {
-    op_ret = -ERR_TOO_SMALL;
-    return;
-  }
+      if (ofs > max_len) {
+        op_ret = -ERR_TOO_LARGE;
+        return;
+      }
+    }
 
-  s->obj_size = ofs;
+    {
+      bufferlist flush;
+      op_ret = put_data_and_throttle(filter, flush, ofs, false);
+    }
 
-  op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
-                             user_quota, bucket_quota, s->obj_size);
-  if (op_ret < 0) {
-    return;
-  }
+    if (len < min_len) {
+      op_ret = -ERR_TOO_SMALL;
+      return;
+    }
 
-  hash.Final(m);
-  buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
+    s->obj_size = ofs;
 
-  etag = calc_md5;
-  bl.append(etag.c_str(), etag.size() + 1);
-  emplace_attr(RGW_ATTR_ETAG, std::move(bl));
+    op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
+                                user_quota, bucket_quota, s->obj_size);
+    if (op_ret < 0) {
+      return;
+    }
 
-  policy.encode(aclbl);
-  emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
+    hash.Final(m);
+    buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
 
-  if (content_type.size()) {
-    bufferlist ct_bl;
-    ct_bl.append(content_type.c_str(), content_type.size() + 1);
-    emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
-  }
+    etag = calc_md5;
+    bl.append(etag.c_str(), etag.size() + 1);
+    emplace_attr(RGW_ATTR_ETAG, std::move(bl));
 
-  if (compressor && compressor->is_compressed()) {
-    bufferlist tmp;
-    RGWCompressionInfo cs_info;
-    cs_info.compression_type = plugin->get_type_name();
-    cs_info.orig_size = s->obj_size;
-    cs_info.blocks = move(compressor->get_compression_blocks());
-    ::encode(cs_info, tmp);
-    emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
-  }
+    policy.encode(aclbl);
+    emplace_attr(RGW_ATTR_ACL, std::move(aclbl));
+
+    if (content_type.size()) {
+      bufferlist ct_bl;
+      ct_bl.append(content_type.c_str(), content_type.size() + 1);
+      emplace_attr(RGW_ATTR_CONTENT_TYPE, std::move(ct_bl));
+    }
+
+    if (compressor && compressor->is_compressed()) {
+      ceph::bufferlist tmp;
+      RGWCompressionInfo cs_info;
+      cs_info.compression_type = plugin->get_type_name();
+      cs_info.orig_size = s->obj_size;
+      cs_info.blocks = move(compressor->get_compression_blocks());
+      ::encode(cs_info, tmp);
+      emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
+    }
 
-  op_ret = processor.complete(s->obj_size, etag, NULL, real_time(), attrs,
-                              (delete_at ? *delete_at : real_time()));
+    op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(),
+                                attrs, (delete_at ? *delete_at : real_time()));
+  } while (is_next_file_to_upload());
 }
 
 
index 11c88a37fa22e6f94da78b3e043bee36d0b2b16e..060a4031c36312c461e8c24e3758560ba5f79964 100644 (file)
@@ -1020,6 +1020,11 @@ protected:
   map<string, bufferlist> attrs;
   boost::optional<ceph::real_time> delete_at;
 
+  /* Must be called after get_data() or the result is undefined. */
+  virtual std::string get_current_filename() const = 0;
+  virtual bool is_next_file_to_upload() {
+     return false;
+  }
 public:
   RGWPostObj() : min_len(0),
                  max_len(LLONG_MAX),
index 1f504b7b0ef4c60067751dad6cd34172ff1135f9..b790f254e4be85d36b62b788070a7923195a9e8b 100644 (file)
@@ -1768,6 +1768,11 @@ void RGWPostObj_ObjStore_S3::rebuild_key(string& key)
   key = new_key;
 }
 
+std::string RGWPostObj_ObjStore_S3::get_current_filename() const
+{
+  return s->object.name;
+}
+
 int RGWPostObj_ObjStore_S3::get_params()
 {
   // get the part boundary
index 291b04b9281ea90fa0d24f18affe1fa9cb827af7..5b1f8143c2d1aff0734ed4f14c2d8539e435ff87 100644 (file)
@@ -235,6 +235,8 @@ class RGWPostObj_ObjStore_S3 : public RGWPostObj_ObjStore {
 
   int get_policy();
   void rebuild_key(string& key);
+
+  std::string get_current_filename() const override;
 public:
   RGWPostObj_ObjStore_S3() {}
   ~RGWPostObj_ObjStore_S3() override {}