]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: generate read request if source rgw is remote
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 04:29:23 +0000 (21:29 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 13 Jun 2013 04:29:23 +0000 (21:29 -0700)
in a copy operation

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h

index 1a1b974cbec701a85b9ef068d07e5544ea6a7b94..31bb200e61521924a6b8bd8e6f86a018d70ef451 100644 (file)
@@ -148,6 +148,7 @@ req_state::req_state(CephContext *_cct, struct RGWEnv *e) : cct(_cct), cio(NULL)
   length = NULL;
   copy_source = NULL;
   http_auth = NULL;
+  local_source = false;
 
   obj_ctx = NULL;
 }
index d3792060eafdf976cf22d6933b7092493dbe34cf..dfd128294323241d0542759b3a619c85dfdc0b25 100644 (file)
@@ -731,6 +731,7 @@ struct req_state {
    bool has_acl_header;
    const char *copy_source;
    const char *http_auth;
+   bool local_source; /* source is local */
 
    int prot_flags;
 
index c30987b8e22170339891d865996c65d6722c68a7..67fd3db98e0367510fcdfb49731b9bf640a69c4d 100644 (file)
@@ -275,8 +275,6 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu
 
   RGWBucketInfo bucket_info;
 
-  bool source_in_domain = false;
-
   if (s->copy_source) { /* check if copy source is within the current domain */
     const char *src = s->copy_source;
     if (*src == '/')
@@ -292,7 +290,7 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu
     ret = store->get_bucket_info(s->obj_ctx, copy_source_str, source_info, NULL);
     if (ret == 0) {
       string& region = source_info.region;
-      source_in_domain = (region.empty() && store->region.is_master) ||
+      s->local_source = (region.empty() && store->region.is_master) ||
                          (region == store->region.name);
     }
   }
@@ -322,7 +320,7 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu
       /* we now need to make sure that the operation actually requires copy source, that is
        * it's a copy operation
        */
-      if (!source_in_domain ||
+      if (!s->local_source ||
           (s->op != OP_PUT && s->op != OP_COPY) ||
           s->object_str.empty()) {
         return -ERR_PERMANENT_REDIRECT;
@@ -1750,15 +1748,30 @@ int RGWCopyObj::verify_permission()
   if (ret < 0)
     return ret;
 
-  /* get buckets info (source and dest) */
-
   ret = store->get_bucket_info(s->obj_ctx, src_bucket_name, src_bucket_info, NULL);
   if (ret < 0)
     return ret;
 
   src_bucket = src_bucket_info.bucket;
 
-  if (src_bucket_name.compare(dest_bucket_name) == 0) {
+  /* get buckets info (source and dest) */
+  if (s->local_source) {
+    rgw_obj src_obj(src_bucket, src_object);
+    store->set_atomic(s->obj_ctx, src_obj);
+    store->set_prefetch_data(s->obj_ctx, src_obj);
+
+    /* check source object permissions */
+    ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object);
+    if (ret < 0)
+      return ret;
+
+    if (!src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ))
+      return -EACCES;
+  }
+
+  RGWAccessControlPolicy dest_bucket_policy(s->cct);
+
+  if (src_bucket_name.compare(dest_bucket_name) == 0) { /* will only happen if s->local_source */
     dest_bucket_info = src_bucket_info;
   } else {
     ret = store->get_bucket_info(s->obj_ctx, dest_bucket_name, dest_bucket_info, NULL);
@@ -1768,23 +1781,9 @@ int RGWCopyObj::verify_permission()
 
   dest_bucket = dest_bucket_info.bucket;
 
-  rgw_obj src_obj(src_bucket, src_object);
-  store->set_atomic(s->obj_ctx, src_obj);
-  store->set_prefetch_data(s->obj_ctx, src_obj);
-
   rgw_obj dest_obj(dest_bucket, dest_object);
   store->set_atomic(s->obj_ctx, dest_obj);
 
-  /* check source object permissions */
-  ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object);
-  if (ret < 0)
-    return ret;
-
-  if (!src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ))
-    return -EACCES;
-
-  RGWAccessControlPolicy dest_bucket_policy(s->cct);
-
   /* check dest bucket permissions */
   ret = read_policy(store, s, dest_bucket_info, &dest_bucket_policy, dest_bucket, empty_str);
   if (ret < 0)
@@ -1844,21 +1843,7 @@ void RGWCopyObj::execute()
   src_obj.init(src_bucket, src_object);
   dst_obj.init(dest_bucket, dest_object);
   store->set_atomic(s->obj_ctx, src_obj);
-#if 0
 
-  if ((dest_bucket_info.region.empty() && !store->region.is_master) ||
-      (dest_bucket_info.region != store->region.name)) {
-
-    map<string, bufferlist> src_attrs;
-  
-    int ret = get_obj_attrs(store, s, src_obj, src_attrs
-                         uint64_t *obj_size, RGWObjVersionTracker *objv_tracker)
-
-    int ret = store->rest_conn->put_obj(s->user.user_id, dst_obj, 
-    if (ret < 0)
-      return ret;
-  }
-#endif
   store->set_atomic(s->obj_ctx, dst_obj);
 
   ret = store->copy_obj(s->obj_ctx,
@@ -1866,6 +1851,7 @@ void RGWCopyObj::execute()
                         dst_obj,
                         src_obj,
                         dest_bucket_info,
+                        src_bucket_info,
                         &mtime,
                         mod_ptr,
                         unmod_ptr,
index 05d9ceffc01d52bbeb55f1eee717ebbc99627e94..317a1b1a02941dfba87cd1645bbd12686879a0d6 100644 (file)
@@ -1899,6 +1899,7 @@ int RGWRados::copy_obj(void *ctx,
                rgw_obj& dest_obj,
                rgw_obj& src_obj,
                RGWBucketInfo& dest_bucket_info,
+               RGWBucketInfo& src_bucket_info,
                time_t *mtime,
                const time_t *mod_ptr,
                const time_t *unmod_ptr,
@@ -1916,9 +1917,23 @@ int RGWRados::copy_obj(void *ctx,
   rgw_obj shadow_obj = dest_obj;
   string shadow_oid;
 
+  bool remote_src;
+  bool remote_dest;
+
   append_rand_alpha(cct, dest_obj.object, shadow_oid, 32);
   shadow_obj.init_ns(dest_obj.bucket, shadow_oid, shadow_ns);
 
+  remote_dest = ((dest_bucket_info.region.empty() && !region.is_master) ||
+      (dest_bucket_info.region != region.name));
+
+  remote_src = ((src_bucket_info.region.empty() && !region.is_master) ||
+      (src_bucket_info.region != region.name));
+
+  if (remote_src && remote_dest) {
+    ldout(cct, 0) << "ERROR: can't copy object when both src and dest buckets are remote" << dendl;
+    return -EINVAL;
+  }
+
   ldout(cct, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.object << " => " << dest_obj.bucket << ":" << dest_obj.object << dendl;
 
   void *handle = NULL;
@@ -1926,11 +1941,35 @@ int RGWRados::copy_obj(void *ctx,
   map<string, bufferlist> attrset;
   off_t ofs = 0;
   off_t end = -1;
-  ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset,
-                mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
+  if (!remote_src) {
+    ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset,
+                  mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
+    if (ret < 0)
+      return ret;
+  } else {
+    /* source is in a different region, copy it there */
 
-  if (ret < 0)
-    return ret;
+    map<string, bufferlist> src_attrs;
+
+    RGWRESTStreamReadRequest *in_stream_req;
+  
+    int ret = rest_conn->get_obj_init(user_id, src_obj, &in_stream_req);
+    if (ret < 0)
+      return ret;
+#if 0
+    ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+    if (ret < 0)
+      return ret;
+#endif
+
+    string etag;
+
+    ret = rest_conn->complete_request(in_stream_req, etag, mtime);
+    if (ret < 0)
+      return ret;
+
+    return 0;
+  }
 
   if (replace_attrs) {
     if (!attrs[RGW_ATTR_ETAG].length())
@@ -1969,8 +2008,7 @@ int RGWRados::copy_obj(void *ctx,
   }
 
 
-  if ((dest_bucket_info.region.empty() && !region.is_master) ||
-      (dest_bucket_info.region != region.name)) {
+  if (remote_dest) {
     /* dest is in a different region, copy it there */
 
     map<string, bufferlist> src_attrs;
@@ -1992,9 +2030,7 @@ int RGWRados::copy_obj(void *ctx,
       return ret;
 
     return 0;
-  }
-      
-  if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
+  } else if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
     return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, attrset, category, ptag, err);
   }
 
index 771cde77e9576528e79c09a63f2c4cfec9c779bc..826f136569e5b39490543d626bfb01b73dc83ff4 100644 (file)
@@ -766,6 +766,7 @@ public:
                rgw_obj& dest_obj,
                rgw_obj& src_obj,
                RGWBucketInfo& dest_bucket_info,
+               RGWBucketInfo& src_bucket_info,
                time_t *mtime,
                const time_t *mod_ptr,
                const time_t *unmod_ptr,
index aa01b499f2cd91d6b32b74bd60874074b591a19e..863b7147c41eefb0007572ec7f24d7c2ef4f8d4d 100644 (file)
@@ -528,3 +528,75 @@ int RGWRESTStreamWriteRequest::complete(string& etag, time_t *mtime)
 
   return status;
 }
+
+int RGWRESTStreamReadRequest::get_obj_init(RGWAccessKey& key, rgw_obj& obj)
+{
+  string resource = obj.bucket.name + "/" + obj.object;
+  string new_url = url;
+  if (new_url[new_url.size() - 1] != '/')
+    new_url.append("/");
+
+  string date_str;
+  get_new_date_str(cct, date_str);
+
+  RGWEnv new_env;
+  req_info new_info(cct, &new_env);
+  
+  string params_str;
+  map<string, string>& args = new_info.args.get_params();
+  get_params_str(args, params_str);
+
+  new_url.append(resource + params_str);
+
+  new_env.set("HTTP_DATE", date_str.c_str());
+
+  new_info.method = "GET";
+
+  new_info.script_uri = "/";
+  new_info.script_uri.append(resource);
+  new_info.request_uri = new_info.script_uri;
+
+  int ret = sign_request(key, new_env, new_info);
+  if (ret < 0) {
+    ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+    return ret;
+  }
+
+  map<string, string>& m = new_env.get_map();
+  map<string, string>::iterator iter;
+  for (iter = m.begin(); iter != m.end(); ++iter) {
+    headers.push_back(make_pair<string, string>(iter->first, iter->second));
+  }
+
+  // cb = new RGWRESTStreamInCB(this);
+
+
+  int r = process(new_info.method, new_url.c_str());
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
+int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime)
+{
+  set_str_from_headers(out_headers, "ETAG", etag);
+  if (mtime) {
+    string mtime_str;
+    set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
+    string err;
+    long t = strict_strtol(mtime_str.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldout(cct, 0) << "ERROR: failed converting mtime (" << mtime_str << ") to int " << dendl;
+      return -EINVAL;
+    }
+    *mtime = (time_t)t;
+  }
+
+  return status;
+}
+
+int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len) {
+  return 0;
+}
+
index a7cc571819fb6253d617bc2cb80800e5e39a00e6..a2c14b42cb5dee67d0c7592fb871b02b1b0e5028 100644 (file)
@@ -72,5 +72,22 @@ public:
   RGWGetDataCB *get_out_cb() { return cb; }
 };
 
+class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest {
+  Mutex lock;
+  void *handle;
+  RGWGetDataCB *cb;
+public:
+  int send_data(void *ptr, size_t len);
+
+  RGWRESTStreamReadRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+                list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
+                lock("RGWRESTStreamReadRequest"), handle(NULL), cb(NULL) {}
+  ~RGWRESTStreamReadRequest() {}
+  int get_obj_init(RGWAccessKey& key, rgw_obj& obj);
+  int complete(string& etag, time_t *mtime);
+
+  void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+};
+
 #endif
 
index 71f97d861f5cc7ea8efc3796a68b261354fd463d..fc6f23b0350beedb0c1456cb1300e57971b012ab 100644 (file)
@@ -68,3 +68,27 @@ int RGWRegionConnection::complete_request(RGWRESTStreamWriteRequest *req, string
 
   return ret;
 }
+
+int RGWRegionConnection::get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req)
+{
+  string url;
+  int ret = get_url(url);
+  if (ret < 0)
+    return ret;
+
+  list<pair<string, string> > params;
+  params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
+  params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
+  *req = new RGWRESTStreamReadRequest(cct, url, NULL, &params);
+  return (*req)->get_obj_init(key, obj);
+}
+
+int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime)
+{
+  int ret = req->complete(etag, mtime);
+  delete req;
+
+  return ret;
+}
+
+
index f9e35e4dedf36f7a6b7e1a6acf42426c55e1891b..f9527afe8a1abd41a8acd30f6ae1292506e7fe7c 100644 (file)
@@ -27,6 +27,9 @@ public:
   int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size,
                    map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
   int complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime);
+
+  int get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req);
+  int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime);
 };
 
 #endif