]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync, add hooks for decoding/encoding rest obj
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 12 Oct 2017 00:03:44 +0000 (17:03 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Object sync is now functional. Create abstraction that will later help
with different cloud providers.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_sync_module_aws.cc

index 65ad60b9aabba38851cbe5dbfab462e394fcd823..dfe22fa451a2a155a6b8eb83443c24b42032877c 100644 (file)
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+class TestSpliceCR : public RGWCoroutine {
+  CephContext *cct;
+  RGWHTTPManager *http_manager;
+  RGWHTTPStreamRWRequest *in_req{nullptr};
+  RGWHTTPStreamRWRequest *out_req{nullptr};
+  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+public:
+  TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+               RGWHTTPStreamRWRequest *_in_req,
+               RGWHTTPStreamRWRequest *_out_req);
+
+  int operate();
+};
+
 class RGWCRHTTPGetDataCB : public RGWGetDataCB {
   Mutex lock;
   RGWCoroutinesEnv *env;
@@ -93,7 +108,7 @@ int RGWStreamReadHTTPResourceCRF::init()
   return 0;
 }
 
-int RGWStreamWriteHTTPResourceCRF::init()
+int RGWStreamWriteHTTPResourceCRF::send()
 {
   env->stack->init_new_io(req);
 
@@ -116,6 +131,17 @@ void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
   *attrs = req->get_out_headers();
 }
 
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) {
+  /* basic generic implementation */
+  for (auto header : headers) {
+    const string& val = header.second;
+
+    rest_obj.attrs[header.first] = val;
+  }
+
+  return 0;
+}
+
 int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
 {
     reenter(&read_state) {
@@ -130,10 +156,22 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
           continue;
         }
         extra_data.claim_append(in_cb->get_extra_data());
+        map<string, string> attrs = req->get_out_headers();
+        int ret = decode_rest_obj(attrs, extra_data, &rest_obj);
+        if (ret < 0) {
+          ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
+          return ret;
+        }
         got_extra_data = true;
       }
       *io_pending = false;
       in_cb->claim_data(out, max_size);
+      if (out->length() == 0) {
+        /* this may happen if we just read the prepended extra_data and didn't have any data
+         * after. In that case, retry reading, so that caller doesn't assume it's EOF.
+         */
+        continue;
+      }
       if (!req->is_done()) {
         yield;
       }
@@ -142,14 +180,11 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
   return 0;
 }
 
-void RGWStreamWriteHTTPResourceCRF::send_ready(const map<string, string>& attrs)
+void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj)
 {
-  for (auto h : attrs) {
-    if (h.first == "CONTENT_LENGTH") {
-      req->set_send_length(atoi(h.second.c_str()));
-    } else {
-      req->append_header(h.first, h.second);
-    }
+  req->set_send_length(rest_obj.content_len);
+  for (auto h : rest_obj.attrs) {
+    req->append_header(h.first, h.second);
   }
 }
 
@@ -225,13 +260,15 @@ int RGWStreamSpliceCR::operate() {
       }
 
       if (!sent_attrs) {
-        map<string, string> attrs;
-        in_crf->get_attrs(&attrs);
-        out_crf->send_ready(attrs);
         int ret = out_crf->init();
         if (ret < 0) {
           return set_cr_error(ret);
         }
+        out_crf->send_ready(in_crf->get_rest_obj());
+        ret = out_crf->send();
+        if (ret < 0) {
+          return set_cr_error(ret);
+        }
         sent_attrs = true;
       }
 
index 1338682549e68b832ca35389fd25caf26417d59c..512082b5e0951cf54faeee557a21b4b4ce4f845c 100644 (file)
@@ -7,7 +7,16 @@
 #include "rgw_coroutine.h"
 #include "rgw_rest_conn.h"
 
-class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine{
+
+struct rgw_rest_obj {
+  rgw_obj_key key;
+  uint64_t content_len;
+  std::map<string, string> attrs;
+  std::map<string, string> custom_attrs;
+  RGWAccessControlPolicy acls;
+};
+
+class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
   bufferlist *result;
  protected:
   RGWRESTConn *conn;
@@ -315,6 +324,7 @@ protected:
 public:
   virtual int init() = 0;
   virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+  virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0;
   virtual bool has_attrs() = 0;
   virtual void get_attrs(std::map<string, string> *attrs) = 0;
 };
@@ -326,12 +336,14 @@ protected:
 
 public:
   virtual int init() = 0;
-  virtual void send_ready(const std::map<string, string>& attrs) = 0;
+  virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
+  virtual int send() = 0;
   virtual int write(bufferlist& data) = 0; /* reentrant */
   virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
 };
 
 class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
+  CephContext *cct;
   RGWCoroutinesEnv *env;
   RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
@@ -345,24 +357,33 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
   bool got_attrs{false};
   bool got_extra_data{false};
 
+protected:
+  rgw_rest_obj rest_obj;
+
 public:
   RGWStreamReadHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
-                               RGWHTTPManager *_http_manager) : env(_env),
-                                                               caller(_caller),
-                                                               http_manager(_http_manager) {}
+                               RGWHTTPManager *_http_manager) : cct(_cct),
+                                                                env(_env),
+                                                                caller(_caller),
+                                                                http_manager(_http_manager) {}
   virtual ~RGWStreamReadHTTPResourceCRF();
 
   int init() override;
   int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info);
   bool has_attrs() override;
-  void get_attrs(std::map<string, string> *pattrs) override;
+  void get_attrs(std::map<string, string> *attrs);
   virtual bool need_extra_data() { return false; }
 
   void set_req(RGWHTTPStreamRWRequest *r) {
     req = r;
   }
+
+  rgw_rest_obj& get_rest_obj() {
+    return rest_obj;
+  }
 };
 
 class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
@@ -382,8 +403,11 @@ public:
                                                                http_manager(_http_manager) {}
   virtual ~RGWStreamWriteHTTPResourceCRF() {}
 
-  int init() override;
-  void send_ready(const std::map<string, string>& attrs) override;
+  int init() override {
+    return 0;
+  }
+  void send_ready(const rgw_rest_obj& rest_obj) override;
+  int send() override;
   int write(bufferlist& data) override; /* reentrant */
   int drain_writes(bool *need_retry) override; /* reentrant */
 
@@ -412,19 +436,4 @@ public:
   int operate();
 };
 
-class TestSpliceCR : public RGWCoroutine {
-  CephContext *cct;
-  RGWHTTPManager *http_manager;
-  RGWHTTPStreamRWRequest *in_req{nullptr};
-  RGWHTTPStreamRWRequest *out_req{nullptr};
-  std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
-  std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-public:
-  TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
-               RGWHTTPStreamRWRequest *_in_req,
-               RGWHTTPStreamRWRequest *_out_req);
-
-  int operate();
-};
-
 #endif
index eefcb438061ef07eb235efec56d91123a7891c12..749fe4371957c1a9958529c0be29f2b0a1db25a6 100644 (file)
@@ -541,18 +541,32 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *
   return 0;
 }
 
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+static void send_prepare_convert(const rgw_obj& obj, string *resource)
 {
   string urlsafe_bucket, urlsafe_object;
   url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
   url_encode(obj.key.name, urlsafe_object);
-  string resource = urlsafe_bucket + "/" + urlsafe_object;
+  *resource = urlsafe_bucket + "/" + urlsafe_object;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+{
+  string resource;
+  send_prepare_convert(obj, &resource);
 
   return send_request(&key, extra_headers, resource, mgr);
 }
 
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
-                                         RGWHTTPManager *mgr, bufferlist *send_data)
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+{
+  string resource;
+  send_prepare_convert(obj, &resource);
+
+  return send_prepare(&key, extra_headers, resource);
+}
+
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+                                         bufferlist *send_data)
 {
   string new_url = url;
   if (new_url[new_url.size() - 1] != '/')
@@ -609,7 +623,6 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
     headers.emplace_back(kv);
   }
 
-  bool send_data_hint = false;
   if (send_data) {
     set_outbl(*send_data);
     send_data_hint = true;
@@ -618,6 +631,23 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
   method = new_info.method;
   url = new_url;
 
+  return 0;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+                                         RGWHTTPManager *mgr, bufferlist *send_data)
+{
+  int ret = send_prepare(key, extra_headers, resource, send_data);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return send(mgr);
+}
+
+
+int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
+{
   if (!mgr) {
     return RGWHTTP::send(this);
   }
index 585af1d0c99b2db03da1a44a12fa033948c44540..e990715c84e4222faa0d36667d7356ab684c61c1 100644 (file)
@@ -110,13 +110,20 @@ public:
 };
 
 class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
+  bool send_data_hint{false};
 public:
   RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
                param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
   }
   virtual ~RGWRESTStreamRWRequest() override {}
+
+  int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */);
+  int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
+  int send(RGWHTTPManager *mgr);
+
   int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
   int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
+
   int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
 };
 
index 7ac889ed6706e8255cf0ceca55ee552ffd9d561b..70ee7f0bccf038e786bfc189db6b3af5fab62b25 100644 (file)
@@ -199,7 +199,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
   param_vec_t params;
   populate_params(params, &uid, self_zone_group);
   if (prepend_metadata) {
-    params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", self_zone_group));
+    params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true"));
   }
   if (rgwx_stat) {
     params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true"));
@@ -244,18 +244,24 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
     set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
   }
 
+  int r = (*req)->send_prepare(key, extra_headers, obj);
+  if (r < 0) {
+    goto done_err;
+  }
+  
   if (!send) {
     return 0;
   }
 
-  int r = (*req)->send_request(key, extra_headers, obj, nullptr);
+  r = (*req)->send(nullptr);
   if (r < 0) {
-    delete *req;
-    *req = nullptr;
-    return r;
+    goto done_err;
   }
-  
   return 0;
+done_err:
+  delete *req;
+  *req = nullptr;
+  return r;
 }
 
 int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime,
index d9723a1a14429f4f06b78029fa4c167de71b86c5..07631aa1f6f8eedda88d9a7c653dce2e64474a25 100644 (file)
@@ -52,12 +52,12 @@ public:
                                                                                  sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
   }
 
-  int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) {
+  int init() override {
     /* init input connection */
     RGWRESTStreamRWRequest *in_req;
     int ret = conn->get_obj(rgw_user(),  nullptr, src_obj,
                             nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
-                            true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
+                            true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */,
                             false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
                             nullptr /* cb */, &in_req);
     if (ret < 0) {
@@ -67,7 +67,23 @@ public:
 
     set_req(in_req);
 
+    return RGWStreamReadHTTPResourceCRF::init();
+  }
+
+  int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override {
+    for (auto header : headers) {
+      const string& val = header.second;
+      if (header.first == "RGWX_OBJECT_SIZE") {
+        rest_obj.content_len = atoi(val.c_str());
+      } else {
+        rest_obj.attrs[header.first] = val;
+      }
+    }
+
+    ldout(sync_env->cct, 20) << __func << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
     return 0;
+
   }
 
   bool need_extra_data() override {
@@ -98,22 +114,25 @@ public:
 
     set_req(out_req);
 
-    return 0;
+    return RGWStreamWriteHTTPResourceCRF::init();
   }
 
-  void send_ready(const std::map<string, string>& attrs) override {
+  void send_ready(const rgw_rest_obj& rest_obj) override {
     RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
 
+    /* here we need to convert rest_obj.attrs to cloud specific representation */
+
     map<string, bufferlist> new_attrs;
 
-    for (auto attr : attrs) {
-      const string& val = attr.second;
-      new_attrs[attr.first].append(bufferptr(val.c_str(), val.size() - 1));
+    for (auto attr : rest_obj.attrs) {
+      new_attrs[attr.first].append(attr.second);
     }
 
     RGWAccessControlPolicy policy;
     ::encode(policy, new_attrs[RGW_ATTR_ACL]);
 
+    r->set_send_length(rest_obj.content_len);
+
     r->send_ready(conn->get_key(), new_attrs, false);
   }
 };
@@ -127,7 +146,7 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   string target_bucket_name;
   std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
   std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-
+  rgw_rest_obj rest_obj;
   string obj_path;
   int ret{0};