]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync, in_crf init abstraction
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Oct 2017 23:26:14 +0000 (16:26 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:38 +0000 (08:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_sync_module_aws.cc

index 4ecebf8a7fb81928cd7eac84c3476ad0ddfddd0b..65ad60b9aabba38851cbe5dbfab462e394fcd823 100644 (file)
@@ -16,12 +16,23 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB {
   RGWCoroutine *cr;
   int64_t io_id;
   bufferlist data;
+  bufferlist extra_data;
 public:
   RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
 
   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
     {
       Mutex::Locker l(lock);
+
+      if (!has_all_extra_data()) {
+        off_t max = extra_data_len - extra_data.length();
+        if (max > bl_len) {
+          max = bl_len;
+        }
+        bl.splice(0, max, &extra_data);
+        bl_len -= max;
+      }
+
       if (bl_len == bl.length()) {
         data.claim_append(bl);
       } else {
@@ -47,9 +58,17 @@ public:
     data.splice(0, max, dest);
   }
 
+  bufferlist& get_extra_data() {
+    return extra_data;
+  }
+
   bool has_data() {
     return (data.length() > 0);
   }
+
+  bool has_all_extra_data() {
+    return (extra_data.length() == extra_data_len);
+  }
 };
 
 
@@ -106,6 +125,13 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool
         yield caller->io_block(0, req->get_io_id());
       }
       got_attrs = true;
+      if (need_extra_data() && !got_extra_data) {
+        if (!in_cb->has_all_extra_data()) {
+          continue;
+        }
+        extra_data.claim_append(in_cb->get_extra_data());
+        got_extra_data = true;
+      }
       *io_pending = false;
       in_cb->claim_data(out, max_size);
       if (!req->is_done()) {
@@ -243,12 +269,15 @@ int RGWStreamSpliceCR::operate() {
 TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
                            RGWHTTPStreamRWRequest *_in_req,
                            RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
-                                                               in_req(_in_req), out_req(_out_req) {}
+                                                               in_req(_in_req), out_req(_out_req) {
+    in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager));
+    in_crf->set_req(in_req);
+    out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager));
+    out_crf->set_req(out_req);
+}
+
 int TestSpliceCR::operate() {
   reenter(this) {
-    in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req));
-    out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req));
-
     yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf));
 
     if (retcode < 0) {
index 40374f651630efac1cb362545dbd0cb8289de9c0..1338682549e68b832ca35389fd25caf26417d59c 100644 (file)
@@ -336,27 +336,33 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
   RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
 
-  RGWHTTPStreamRWRequest *req;
+  RGWHTTPStreamRWRequest *req{nullptr};
 
   RGWCRHTTPGetDataCB *in_cb{nullptr};
 
+  bufferlist extra_data;
+
   bool got_attrs{false};
+  bool got_extra_data{false};
 
 public:
   RGWStreamReadHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
-                               RGWHTTPManager *_http_manager,
-                               RGWHTTPStreamRWRequest *_req) : env(_env),
+                               RGWHTTPManager *_http_manager) : env(_env),
                                                                caller(_caller),
-                                                               http_manager(_http_manager),
-                                                               req(_req) {}
+                                                               http_manager(_http_manager) {}
   virtual ~RGWStreamReadHTTPResourceCRF();
 
   int init() override;
   int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
   bool has_attrs() override;
   void get_attrs(std::map<string, string> *pattrs) override;
+  virtual bool need_extra_data() { return false; }
+
+  void set_req(RGWHTTPStreamRWRequest *r) {
+    req = r;
+  }
 };
 
 class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
@@ -365,23 +371,25 @@ protected:
   RGWCoroutine *caller;
   RGWHTTPManager *http_manager;
 
-  RGWHTTPStreamRWRequest *req;
+  RGWHTTPStreamRWRequest *req{nullptr};
 
 public:
   RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
-                               RGWHTTPManager *_http_manager,
-                               RGWHTTPStreamRWRequest *_req) : env(_env),
+                               RGWHTTPManager *_http_manager) : env(_env),
                                                                caller(_caller),
-                                                               http_manager(_http_manager),
-                                                               req(_req) {}
+                                                               http_manager(_http_manager) {}
   virtual ~RGWStreamWriteHTTPResourceCRF() {}
 
   int init() override;
   void send_ready(const std::map<string, string>& attrs) override;
   int write(bufferlist& data) override; /* reentrant */
   int drain_writes(bool *need_retry) override; /* reentrant */
+
+  void set_req(RGWHTTPStreamRWRequest *r) {
+    req = r;
+  }
 };
 
 class RGWStreamSpliceCR : public RGWCoroutine {
index 63cf07f9bb899dc0c13e769c184a5f26f2b6c4d2..d9723a1a14429f4f06b78029fa4c167de71b86c5 100644 (file)
@@ -37,16 +37,69 @@ struct AWSConfig {
   std::unique_ptr<RGWRESTConn> conn;
 };
 
+class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
+{
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *conn;
+  rgw_obj src_obj;
+public:
+  RGWRESTStreamGetCRF(CephContext *_cct,
+                               RGWCoroutinesEnv *_env,
+                               RGWCoroutine *_caller,
+                               RGWDataSyncEnv *_sync_env,
+                               RGWRESTConn *_conn,
+                               rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+                                                                                 sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
+  }
+
+  int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) {
+    /* init input connection */
+    RGWRESTStreamRWRequest *in_req;
+    int ret = conn->get_obj(rgw_user(),  nullptr, src_obj,
+                            nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
+                            true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
+                            false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
+                            nullptr /* cb */, &in_req);
+    if (ret < 0) {
+      ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+      return ret;
+    }
+
+    set_req(in_req);
+
+    return 0;
+  }
+
+  bool need_extra_data() override {
+    return true;
+  }
+};
+
 class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
 {
-  RGWAccessKey access_key;
+  RGWDataSyncEnv *sync_env;
+  RGWRESTConn *conn;
+  rgw_obj dest_obj;
 public:
   RGWAWSStreamPutCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
-                               RGWHTTPManager *_http_manager,
-                               RGWAccessKey& _key,
-                               RGWRESTStreamS3PutObj *_req) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager, _req), access_key(_key) {}
+                               RGWDataSyncEnv *_sync_env,
+                               RGWRESTConn* _conn,
+                               rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+                                                     sync_env(_sync_env), conn(_conn), dest_obj(_dest_obj) {
+  }
+
+  int init() {
+    /* init output connection */
+    RGWRESTStreamS3PutObj *out_req{nullptr};
+
+    conn->put_obj_send_init(dest_obj, &out_req);
+
+    set_req(out_req);
+
+    return 0;
+  }
 
   void send_ready(const std::map<string, string>& attrs) override {
     RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
@@ -61,20 +114,19 @@ public:
     RGWAccessControlPolicy policy;
     ::encode(policy, new_attrs[RGW_ATTR_ACL]);
 
-    r->send_ready(access_key, new_attrs, false);
+    r->send_ready(conn->get_key(), new_attrs, false);
   }
 };
 
 // maybe use Fetch Remote Obj instead?
 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   const AWSConfig& conf;
+  RGWRESTConn *source_conn;
   bufferlist res;
   unordered_map <string, bool> bucket_created;
   string target_bucket_name;
   std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
   std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-  RGWRESTStreamRWRequest *in_req{nullptr};
-  RGWRESTStreamS3PutObj *out_req{nullptr};
 
   string obj_path;
   int ret{0};
@@ -90,9 +142,7 @@ public:
   ~RGWAWSHandleRemoteObjCBCR(){
   }
 
-#if 0
-  int operate () override {
-
+  int operate() override {
     reenter(this) {
 
       ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
@@ -100,69 +150,12 @@ public:
                               << " mtime=" << mtime << " attrs=" << attrs
                               << dendl;
 
-      yield {
-        string obj_path = bucket_info.bucket.name + "/" + key.name;
-
-        // TODO-future: And we should do a part by part get and initiate mp on the aws side
-        call(new RGWReadRawRESTResourceCR(sync_env->cct,
-                                          sync_env->store->rest_master_conn,
-                                          sync_env->http_manager,
-                                          obj_path,
-                                          nullptr,
-                                          &res));
-
-      }
-      if (retcode < 0) {
-        return set_cr_error(retcode);
-      }
-
-      bucket_name=aws_bucket_name(bucket_info);
-      if (bucket_created.find(bucket_name) == bucket_created.end()){
-      //   // TODO: maybe do a head request for subsequent tries & make it configurable
-        yield {
-        //string bucket_name = aws_bucket_name(bucket_info);
-          ldout(sync_env->cct,0) << "AWS: creating bucket" << bucket_name << dendl;
-          bufferlist bl;
-          call(new RGWPutRawRESTResourceCR <int> (sync_env->cct, conf.conn.get(),
-                                                  sync_env->http_manager,
-                                                  bucket_name, nullptr, bl, nullptr));
-        }
-        if (retcode < 0) {
-          return set_cr_error(retcode);
-        }
-
-        bucket_created[bucket_name]=true;
-      }
-
-      yield {
-        string path=aws_object_name(bucket_info, key);
-        ldout(sync_env->cct,0) << "AWS: creating object at path" << path << dendl;
-        call(new RGWPutRawRESTResourceCR<int> (sync_env->cct, conf.conn.get(),
-                                                        sync_env->http_manager,
-                                                        path, nullptr,
-                                                        res, nullptr));
-      }
-      if (retcode < 0) {
-        return set_cr_error(retcode);
+      source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone);
+      if (!source_conn) {
+        ldout(sync_env->cct, 0) << "ERROR: cannot find http connection to zone " << sync_env->source_zone << dendl;
+        return set_cr_error(-EINVAL);
       }
 
-
-      return set_cr_done();
-    }
-
-    return 0;
-  }
-#endif
-
-  int operate() override {
-
-    reenter(this) {
-
-      ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
-                              << " b=" << bucket_info.bucket << " k=" << key << " size=" << size
-                              << " mtime=" << mtime << " attrs=" << attrs
-                              << dendl;
-
       obj_path = bucket_info.bucket.name + "/" + key.name;
 
       target_bucket_name = aws_bucket_name(bucket_info);
@@ -181,32 +174,28 @@ public:
         bucket_created[target_bucket_name] = true;
       }
 
-#warning FIXME conn
       {
-        /* init input connection */
-        rgw_obj source_obj(bucket_info.bucket, key);
-        ret = sync_env->store->rest_master_conn->get_obj(rgw_user(),  nullptr, source_obj,
-                                                         nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
-                                                         false /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
-                                                         false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
-                                                         nullptr /* cb */, &in_req);
+        rgw_obj src_obj(bucket_info.bucket, key);
+        /* init input */
+        in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+        ret = in_crf->init();
         if (ret < 0) {
           return set_cr_error(ret);
         }
 
-        /* init output connection */
+        /* init output */
         rgw_bucket target_bucket;
         target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
-                                                    uri resolution */
-        rgw_obj target_obj(target_bucket, aws_object_name(bucket_info, key));
-        in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, sync_env->http_manager, in_req));
+                                                uri resolution */
+        rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
 
-        map<string, bufferlist> attrs;
-        RGWAccessControlPolicy empty_policy;
-        ::encode(empty_policy, attrs[RGW_ATTR_ACL]);
-        conf.conn->put_obj_send_init(target_obj, &out_req);
-
-        out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env->http_manager, conf.conn->get_key(), out_req));
+        out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, conf.conn.get(),
+                                             dest_obj));
+        ret = out_crf->init();
+        if (ret < 0) {
+          return set_cr_error(ret);
+        }
       }
 
       yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));