]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: aws sync: multipart upload complete
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 21 Oct 2017 00:43:32 +0000 (17:43 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_sync_module_aws.cc

index cf6f178b6e0430fe71300df016924abf16b6974e..8ecfd2dec4b8dba2059d436aef174a057a8706dd 100644 (file)
@@ -42,7 +42,10 @@ public:
       }
     }
 
-    env->manager->io_complete(cr, io_id);
+#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024
+    if (bl.length() >= GET_DATA_WINDOW_SIZE) {
+      env->manager->io_complete(cr, io_id);
+    }
     return 0;
   }
 
@@ -202,6 +205,10 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
       yield caller->io_block(0, req->get_io_id());
       *need_retry = !req->is_done();
     }
+
+#warning need to lock in_req->headers
+    handle_headers(req->get_out_headers());
+
     return req->get_req_retcode();
   }
   return 0;
@@ -267,6 +274,7 @@ int RGWStreamSpliceCR::operate() {
       total_read += bl.length();
 
       yield {
+        ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
         ret = out_crf->write(bl);
         if (ret < 0)  {
           return set_cr_error(ret);
@@ -277,8 +285,6 @@ int RGWStreamSpliceCR::operate() {
         ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
         return set_cr_error(ret);
       }
-
-      ldout(cct, 20) << "wrote " << bl.length() << " bytes" << dendl;
     } while (true);
 
     do {
index ee2b944bb97e137b3dc6c77f29fe2c68ae11ad75..72767ebbf5c8e873015eba114e99e5cba129abf0 100644 (file)
@@ -442,6 +442,8 @@ public:
   int write(bufferlist& data) override; /* reentrant */
   int drain_writes(bool *need_retry) override; /* reentrant */
 
+  virtual void handle_headers(const std::map<string, string>& headers) {}
+
   void set_req(RGWHTTPStreamRWRequest *r) {
     req = r;
   }
index 0723baf853feef1c9f118fcc1907a38e77474440..aa9639534b555c6bd45c7c758a3ff029e19b2a91 100644 (file)
@@ -427,6 +427,11 @@ void RGWRESTStreamS3PutObj::send_init(rgw_obj& obj)
   map<string, string>& args = new_info.args.get_params();
   get_params_str(args, params_str);
 
+  /* merge params with extra args so that we can sign correctly */
+  for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) {
+    new_info.args.append(iter->first, iter->second);
+  }
+
   new_url.append(resource + params_str);
 
   new_env.set("HTTP_DATE", date_str.c_str());
index 94ce689f99d25d57b864c040eb4d724218346450..9413566f10261fb0eb6fee4e7127164c75b41060 100644 (file)
@@ -1,3 +1,5 @@
+#include "common/errno.h"
+
 #include "rgw_common.h"
 #include "rgw_coroutine.h"
 #include "rgw_sync_module.h"
@@ -46,6 +48,8 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
   RGWRESTConn *conn;
   rgw_obj src_obj;
   RGWRESTConn::get_obj_params req_params;
+
+  string etag;
 public:
   RGWRESTStreamGetCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
@@ -86,6 +90,8 @@ public:
       const string& val = header.second;
       if (header.first == "RGWX_OBJECT_SIZE") {
         rest_obj.content_len = atoi(val.c_str());
+      } else if (header.first == "ETAG") {
+        etag = val;
       } else {
         rest_obj.attrs[header.first] = val;
       }
@@ -107,6 +113,7 @@ class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *conn;
   rgw_obj dest_obj;
+  string etag;
 public:
   RGWAWSStreamPutCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
@@ -155,6 +162,22 @@ public:
 
     r->send_ready(conn->get_key(), new_attrs, false);
   }
+
+  void handle_headers(const map<string, string>& headers) {
+    for (auto h : headers) {
+      if (h.first == "ETAG") {
+        etag = h.second;
+      }
+    }
+  }
+
+  bool get_etag(string *petag) {
+    if (etag.empty()) {
+      return false;
+    }
+    *petag = etag;
+    return true;
+  }
 };
 
 
@@ -201,6 +224,13 @@ public:
   }
 };
 
+struct multipart_part_info {
+  int part_num;
+  uint64_t ofs;
+  uint64_t size;
+  string etag;
+};
+
 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *source_conn;
@@ -209,13 +239,14 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   rgw_obj dest_obj;
 
   string upload_id;
-  uint64_t ofs;
-  uint64_t size;
-  int part_num;
+
+  multipart_part_info part_info;
 
   std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
   std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
 
+  string *petag;
+
 public:
   RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env,
                                 RGWRESTConn *_source_conn,
@@ -223,36 +254,40 @@ public:
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj,
                                 const string& _upload_id,
-                                uint64_t _ofs,
-                                uint64_t _size,
-                                int _part_num) : RGWCoroutine(_sync_env->cct),
+                                const multipart_part_info& _part_info,
+                                string *_petag) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    source_conn(_source_conn),
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
                                                    dest_obj(_dest_obj),
                                                    upload_id(_upload_id),
-                                                   ofs(_ofs), size(_size),
-                                                   part_num(_part_num) {}
+                                                   part_info(_part_info),
+                                                   petag(_petag) {}
 
   int operate() {
     reenter(this) {
       /* init input */
       in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
 
-      in_crf->set_range(ofs, size);
+      in_crf->set_range(part_info.ofs, part_info.size);
 
       /* init output */
       out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
                                            dest_obj));
 
-      out_crf->set_multipart(upload_id, part_num, size);
+      out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
 
       yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
 
+      if (!((RGWAWSStreamPutCRF *)out_crf.get())->get_etag(petag)) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+        return set_cr_error(-EIO);
+      }
+
       return set_cr_done();
     }
 
@@ -389,6 +424,115 @@ public:
   }
 };
 
+class RGWAWSCompleteMultipartCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *dest_conn;
+  rgw_obj dest_obj;
+
+  bufferlist out_bl;
+
+  string upload_id;
+
+  struct CompleteMultipartReq {
+    map<int, multipart_part_info> parts;
+
+    CompleteMultipartReq(const map<int, multipart_part_info>& _parts) : parts(_parts) {}
+
+    void dump_xml(Formatter *f) const {
+      for (auto p : parts) {
+        f->open_object_section("Part");
+        encode_xml("PartNumber", p.first, f);
+        encode_xml("ETag", p.second.etag, f);
+        f->close_section();
+      };
+    }
+  } req_enc;
+
+  struct CompleteMultipartResult {
+    string location;
+    string bucket;
+    string key;
+    string etag;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("Location", bucket, obj);
+      RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+      RGWXMLDecoder::decode_xml("Key", key, obj);
+      RGWXMLDecoder::decode_xml("ETag", etag, obj);
+    }
+  } result;
+
+public:
+  RGWAWSCompleteMultipartCR(RGWDataSyncEnv *_sync_env,
+                        RGWRESTConn *_dest_conn,
+                        const rgw_obj& _dest_obj,
+                        string _upload_id,
+                        const map<int, multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
+                                                   sync_env(_sync_env),
+                                                   dest_conn(_dest_conn),
+                                                   dest_obj(_dest_obj),
+                                                   upload_id(_upload_id),
+                                                   req_enc(_parts) {}
+
+  int operate() {
+    reenter(this) {
+
+      yield {
+        rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+        stringstream ss;
+        XMLFormatter formatter;
+
+        encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+
+        formatter.flush(ss);
+
+        bufferlist bl;
+        bl.append(ss.str());
+
+        call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
+                                                 obj_to_aws_path(dest_obj), params, bl, &out_bl));
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+        return set_cr_error(retcode);
+      }
+      {
+        /*
+         * If one of the following fails we cannot abort upload, as we cannot
+         * extract the upload id. If one of these fail it's very likely that that's
+         * the least of our problem.
+         */
+        RGWXMLDecoder::XMLParser parser;
+        if (!parser.init()) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+
+        try {
+          RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+        } catch (RGWXMLDecoder::err& err) {
+          string str(out_bl.c_str(), out_bl.length());
+          ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+          return set_cr_error(-EIO);
+        }
+      }
+
+      ldout(sync_env->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
+
+      return set_cr_done();
+    }
+
+    return 0;
+  }
+};
+
 class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   RGWRESTConn *source_conn;
@@ -404,16 +548,15 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   uint32_t part_size;
   int num_parts;
 
-  struct part_info {
-    int part_num;
-    uint64_t ofs;
-    uint64_t size;
-    string etag;
-  };
-
   int cur_part{0};
   uint64_t cur_ofs{0};
 
+  map<int, multipart_part_info> parts;
+
+  multipart_part_info *pcur_part_info{nullptr};
+
+  int ret_err{0};
+
 public:
   RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
                                 RGWRESTConn *_source_conn,
@@ -445,20 +588,46 @@ public:
 
       for (cur_part = 1; cur_part <= num_parts; ++cur_part) {
         yield {
-          part_info cur_part_info;
-          cur_part_info.part_num = cur_part + 1;
+          multipart_part_info& cur_part_info = parts[cur_part];
+          cur_part_info.part_num = cur_part;
           cur_part_info.ofs = cur_ofs;
           cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
 
+          pcur_part_info = &cur_part_info;
+
           cur_ofs += part_size;
 
           call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
                                                              source_conn, src_obj,
                                                              dest_conn, dest_obj,
                                                              upload_id,
-                                                             cur_part_info.ofs, cur_part_info.size,
-                                                             cur_part_info.part_num));
+                                                             cur_part_info,
+                                                             &cur_part_info.etag));
+        }
+
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+          ret_err = retcode;
+          yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+          if (retcode < 0) {
+            ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+          }
+          return set_cr_error(ret_err);
+        }
+
+        ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl;
+
+      }
+
+      yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+        ret_err = retcode;
+        yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+        if (retcode < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
         }
+        return set_cr_error(ret_err);
       }
 
       return set_cr_done();