length = NULL;
copy_source = NULL;
http_auth = NULL;
+ local_source = false;
obj_ctx = NULL;
}
bool has_acl_header;
const char *copy_source;
const char *http_auth;
+ bool local_source; /* source is local */
int prot_flags;
RGWBucketInfo bucket_info;
- bool source_in_domain = false;
-
if (s->copy_source) { /* check if copy source is within the current domain */
const char *src = s->copy_source;
if (*src == '/')
ret = store->get_bucket_info(s->obj_ctx, copy_source_str, source_info, NULL);
if (ret == 0) {
string& region = source_info.region;
- source_in_domain = (region.empty() && store->region.is_master) ||
+ s->local_source = (region.empty() && store->region.is_master) ||
(region == store->region.name);
}
}
/* we now need to make sure that the operation actually requires copy source, that is
* it's a copy operation
*/
- if (!source_in_domain ||
+ if (!s->local_source ||
(s->op != OP_PUT && s->op != OP_COPY) ||
s->object_str.empty()) {
return -ERR_PERMANENT_REDIRECT;
if (ret < 0)
return ret;
- /* get buckets info (source and dest) */
-
ret = store->get_bucket_info(s->obj_ctx, src_bucket_name, src_bucket_info, NULL);
if (ret < 0)
return ret;
src_bucket = src_bucket_info.bucket;
- if (src_bucket_name.compare(dest_bucket_name) == 0) {
+ /* get buckets info (source and dest) */
+ if (s->local_source) {
+ rgw_obj src_obj(src_bucket, src_object);
+ store->set_atomic(s->obj_ctx, src_obj);
+ store->set_prefetch_data(s->obj_ctx, src_obj);
+
+ /* check source object permissions */
+ ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object);
+ if (ret < 0)
+ return ret;
+
+ if (!src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ))
+ return -EACCES;
+ }
+
+ RGWAccessControlPolicy dest_bucket_policy(s->cct);
+
+ if (src_bucket_name.compare(dest_bucket_name) == 0) { /* will only happen if s->local_source */
dest_bucket_info = src_bucket_info;
} else {
ret = store->get_bucket_info(s->obj_ctx, dest_bucket_name, dest_bucket_info, NULL);
dest_bucket = dest_bucket_info.bucket;
- rgw_obj src_obj(src_bucket, src_object);
- store->set_atomic(s->obj_ctx, src_obj);
- store->set_prefetch_data(s->obj_ctx, src_obj);
-
rgw_obj dest_obj(dest_bucket, dest_object);
store->set_atomic(s->obj_ctx, dest_obj);
- /* check source object permissions */
- ret = read_policy(store, s, src_bucket_info, &src_policy, src_bucket, src_object);
- if (ret < 0)
- return ret;
-
- if (!src_policy.verify_permission(s->user.user_id, s->perm_mask, RGW_PERM_READ))
- return -EACCES;
-
- RGWAccessControlPolicy dest_bucket_policy(s->cct);
-
/* check dest bucket permissions */
ret = read_policy(store, s, dest_bucket_info, &dest_bucket_policy, dest_bucket, empty_str);
if (ret < 0)
src_obj.init(src_bucket, src_object);
dst_obj.init(dest_bucket, dest_object);
store->set_atomic(s->obj_ctx, src_obj);
-#if 0
- if ((dest_bucket_info.region.empty() && !store->region.is_master) ||
- (dest_bucket_info.region != store->region.name)) {
-
- map<string, bufferlist> src_attrs;
-
- int ret = get_obj_attrs(store, s, src_obj, src_attrs
- uint64_t *obj_size, RGWObjVersionTracker *objv_tracker)
-
- int ret = store->rest_conn->put_obj(s->user.user_id, dst_obj,
- if (ret < 0)
- return ret;
- }
-#endif
store->set_atomic(s->obj_ctx, dst_obj);
ret = store->copy_obj(s->obj_ctx,
dst_obj,
src_obj,
dest_bucket_info,
+ src_bucket_info,
&mtime,
mod_ptr,
unmod_ptr,
rgw_obj& dest_obj,
rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
+ RGWBucketInfo& src_bucket_info,
time_t *mtime,
const time_t *mod_ptr,
const time_t *unmod_ptr,
rgw_obj shadow_obj = dest_obj;
string shadow_oid;
+ bool remote_src;
+ bool remote_dest;
+
append_rand_alpha(cct, dest_obj.object, shadow_oid, 32);
shadow_obj.init_ns(dest_obj.bucket, shadow_oid, shadow_ns);
+ remote_dest = ((dest_bucket_info.region.empty() && !region.is_master) ||
+ (dest_bucket_info.region != region.name));
+
+ remote_src = ((src_bucket_info.region.empty() && !region.is_master) ||
+ (src_bucket_info.region != region.name));
+
+ if (remote_src && remote_dest) {
+ ldout(cct, 0) << "ERROR: can't copy object when both src and dest buckets are remote" << dendl;
+ return -EINVAL;
+ }
+
ldout(cct, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.object << " => " << dest_obj.bucket << ":" << dest_obj.object << dendl;
void *handle = NULL;
map<string, bufferlist> attrset;
off_t ofs = 0;
off_t end = -1;
- ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset,
- mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
+ if (!remote_src) {
+ ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &attrset,
+ mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
+ if (ret < 0)
+ return ret;
+ } else {
+ /* source is in a different region, copy it there */
- if (ret < 0)
- return ret;
+ map<string, bufferlist> src_attrs;
+
+ RGWRESTStreamReadRequest *in_stream_req;
+
+ int ret = rest_conn->get_obj_init(user_id, src_obj, &in_stream_req);
+ if (ret < 0)
+ return ret;
+#if 0
+ ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+ if (ret < 0)
+ return ret;
+#endif
+
+ string etag;
+
+ ret = rest_conn->complete_request(in_stream_req, etag, mtime);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+ }
if (replace_attrs) {
if (!attrs[RGW_ATTR_ETAG].length())
}
- if ((dest_bucket_info.region.empty() && !region.is_master) ||
- (dest_bucket_info.region != region.name)) {
+ if (remote_dest) {
/* dest is in a different region, copy it there */
map<string, bufferlist> src_attrs;
return ret;
return 0;
- }
-
- if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
+ } else if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
return copy_obj_data(ctx, handle, end, dest_obj, src_obj, mtime, attrset, category, ptag, err);
}
rgw_obj& dest_obj,
rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
+ RGWBucketInfo& src_bucket_info,
time_t *mtime,
const time_t *mod_ptr,
const time_t *unmod_ptr,
return status;
}
+
+int RGWRESTStreamReadRequest::get_obj_init(RGWAccessKey& key, rgw_obj& obj)
+{
+ string resource = obj.bucket.name + "/" + obj.object;
+ string new_url = url;
+ if (new_url[new_url.size() - 1] != '/')
+ new_url.append("/");
+
+ string date_str;
+ get_new_date_str(cct, date_str);
+
+ RGWEnv new_env;
+ req_info new_info(cct, &new_env);
+
+ string params_str;
+ map<string, string>& args = new_info.args.get_params();
+ get_params_str(args, params_str);
+
+ new_url.append(resource + params_str);
+
+ new_env.set("HTTP_DATE", date_str.c_str());
+
+ new_info.method = "GET";
+
+ new_info.script_uri = "/";
+ new_info.script_uri.append(resource);
+ new_info.request_uri = new_info.script_uri;
+
+ int ret = sign_request(key, new_env, new_info);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to sign request" << dendl;
+ return ret;
+ }
+
+ map<string, string>& m = new_env.get_map();
+ map<string, string>::iterator iter;
+ for (iter = m.begin(); iter != m.end(); ++iter) {
+ headers.push_back(make_pair<string, string>(iter->first, iter->second));
+ }
+
+ // cb = new RGWRESTStreamInCB(this);
+
+
+ int r = process(new_info.method, new_url.c_str());
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+int RGWRESTStreamReadRequest::complete(string& etag, time_t *mtime)
+{
+ set_str_from_headers(out_headers, "ETAG", etag);
+ if (mtime) {
+ string mtime_str;
+ set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
+ string err;
+ long t = strict_strtol(mtime_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldout(cct, 0) << "ERROR: failed converting mtime (" << mtime_str << ") to int " << dendl;
+ return -EINVAL;
+ }
+ *mtime = (time_t)t;
+ }
+
+ return status;
+}
+
+int RGWRESTStreamReadRequest::send_data(void *ptr, size_t len) {
+ return 0;
+}
+
RGWGetDataCB *get_out_cb() { return cb; }
};
+class RGWRESTStreamReadRequest : public RGWRESTSimpleRequest {
+ Mutex lock;
+ void *handle;
+ RGWGetDataCB *cb;
+public:
+ int send_data(void *ptr, size_t len);
+
+ RGWRESTStreamReadRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+ list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
+ lock("RGWRESTStreamReadRequest"), handle(NULL), cb(NULL) {}
+ ~RGWRESTStreamReadRequest() {}
+ int get_obj_init(RGWAccessKey& key, rgw_obj& obj);
+ int complete(string& etag, time_t *mtime);
+
+ void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+};
+
#endif
return ret;
}
+
+int RGWRegionConnection::get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req)
+{
+ string url;
+ int ret = get_url(url);
+ if (ret < 0)
+ return ret;
+
+ list<pair<string, string> > params;
+ params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
+ params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
+ *req = new RGWRESTStreamReadRequest(cct, url, NULL, ¶ms);
+ return (*req)->get_obj_init(key, obj);
+}
+
+int RGWRegionConnection::complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime)
+{
+ int ret = req->complete(etag, mtime);
+ delete req;
+
+ return ret;
+}
+
+
int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size,
map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
int complete_request(RGWRESTStreamWriteRequest *req, string& etag, time_t *mtime);
+
+ int get_obj_init(const string& uid, rgw_obj& obj, RGWRESTStreamReadRequest **req);
+ int complete_request(RGWRESTStreamReadRequest *req, string& etag, time_t *mtime);
};
#endif