From b947c7c73f1f8a60454b088caa6e9f09ff49ad13 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 9 Dec 2014 15:44:56 -0800 Subject: [PATCH] rgw: cleaup RGWRados::copy_obj() separate into multiple methods. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_op.cc | 1 + src/rgw/rgw_rados.cc | 323 ++++++++++++++++++++++++------------------- src/rgw/rgw_rados.h | 29 ++++ 3 files changed, 212 insertions(+), 141 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 3c04703eb6285..bc9a600ed2b19 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2305,6 +2305,7 @@ int RGWCopyObj::init_common() const string& val = iter->second; attrbl.append(val.c_str(), val.size() + 1); } + return 0; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 6586071a4c6ac..116147335f3d1 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2962,8 +2962,7 @@ done_cancel: * should treat it as a success */ if (meta.if_match == NULL && meta.if_nomatch == NULL) { - if ((r == -ECANCELED || r == -ENOENT) || - (!(flags & PUT_OBJ_EXCL) && r == -EEXIST)) { + if (r == -ECANCELED || r == -ENOENT || r == -EEXIST) { r = 0; } } else { @@ -3242,6 +3241,158 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj) return copy_obj_data(rctx, dest_bucket_info, read_op, end, obj, obj, max_chunk_size, NULL, mtime, attrset, RGW_OBJ_CATEGORY_MAIN, NULL, NULL, NULL); } +int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, + const string& user_id, + const string& client_id, + const string& op_id, + req_info *info, + const string& source_zone, + rgw_obj& dest_obj, + rgw_obj& src_obj, + RGWBucketInfo& dest_bucket_info, + RGWBucketInfo& src_bucket_info, + time_t *mtime, + const time_t *mod_ptr, + const time_t *unmod_ptr, + const char *if_match, + const char *if_nomatch, + bool replace_attrs, + map& attrs, + RGWObjCategory category, + string *ptag, + string *petag, + struct rgw_err *err, + void (*progress_cb)(off_t, void *), + void *progress_data) +{ + /* source is in a different region, copy from there */ + + RGWRESTStreamReadRequest *in_stream_req; + string tag; + map src_attrs; + append_rand_alpha(cct, tag, tag, 32); + + RGWPutObjProcessor_Atomic processor(obj_ctx, + dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(), + cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled()); + int ret = processor.prepare(this, NULL); + if (ret < 0) + return ret; + + RGWRESTConn *conn; + if (source_zone.empty()) { + if (dest_bucket_info.region.empty()) { + /* source is in the master region */ + conn = rest_master_conn; + } else { + map::iterator iter = region_conn_map.find(src_bucket_info.region); + if (iter == region_conn_map.end()) { + ldout(cct, 0) << "could not find region connection to region: " << source_zone << dendl; + return -ENOENT; + } + conn = iter->second; + } + } else { + map::iterator iter = zone_conn_map.find(source_zone); + if (iter == zone_conn_map.end()) { + ldout(cct, 0) << "could not find zone connection to zone: " << source_zone << dendl; + return -ENOENT; + } + conn = iter->second; + } + + string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_object(); + + RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name); + + ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + return ret; + } + RGWRadosPutObj cb(&processor, &opstate, progress_cb, progress_data); + string etag; + map req_headers; + time_t set_mtime; + + ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req); + if (ret < 0) + goto set_err_state; + + ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers); + if (ret < 0) + goto set_err_state; + + { /* opening scope so that we can do goto, sorry */ + bufferlist& extra_data_bl = processor.get_extra_data(); + if (extra_data_bl.length()) { + JSONParser jp; + if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { + ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; + goto set_err_state; + } + + JSONDecoder::decode_json("attrs", src_attrs, &jp); + + src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout + } + } + + if (petag) { + map::iterator iter = src_attrs.find(RGW_ATTR_ETAG); + if (iter != src_attrs.end()) { + bufferlist& etagbl = iter->second; + *petag = string(etagbl.c_str(), etagbl.length()); + } + } + + set_copy_attrs(src_attrs, attrs, replace_attrs, !source_zone.empty()); + + ret = cb.complete(etag, mtime, set_mtime, src_attrs); + if (ret < 0) + goto set_err_state; + + ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + } + + return 0; +set_err_state: + int r = opstate.set_state(RGWOpState::OPSTATE_ERROR); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl; + } + return ret; +} + + +int RGWRados::copy_obj_to_remote_dest(RGWObjState *astate, + map& src_attrs, + RGWRados::Object::Read& read_op, + const string& user_id, + rgw_obj& dest_obj, + time_t *mtime) +{ + string etag; + + RGWRESTStreamWriteRequest *out_stream_req; + + int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req); + if (ret < 0) + return ret; + + ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb()); + if (ret < 0) + return ret; + + ret = rest_master_conn->complete_request(out_stream_req, etag, mtime); + if (ret < 0) + return ret; + + return 0; +} + /** * Copy an object. * dest_obj: the object to copy into @@ -3296,122 +3447,34 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx, ldout(cct, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.get_object() << " => " << dest_obj.bucket << ":" << dest_obj.get_object() << dendl; + if (remote_src || !source_zone.empty()) { + return fetch_remote_obj(obj_ctx, user_id, client_id, op_id, info, source_zone, + dest_obj, src_obj, dest_bucket_info, src_bucket_info, mtime, mod_ptr, + unmod_ptr, if_match, if_nomatch, replace_attrs, attrs, + category, ptag, petag, err, progress_cb, progress_data); + } + map src_attrs; int64_t ofs = 0; int64_t end = -1; RGWRados::Object src_op_target(this, obj_ctx, src_obj); RGWRados::Object::Read read_op(&src_op_target); - if (!remote_src && source_zone.empty()) { - read_op.conds.mod_ptr = mod_ptr; - read_op.conds.unmod_ptr = unmod_ptr; - read_op.conds.if_match = if_match; - read_op.conds.if_nomatch = if_nomatch; - read_op.params.attrs = &src_attrs; - read_op.params.lastmod = &lastmod; - read_op.params.read_size = &total_len; - read_op.params.obj_size = &obj_size; - read_op.params.perr = err; - - ret = read_op.prepare(&ofs, &end); - if (ret < 0) - return ret; - } else { - /* source is in a different region, copy it there */ - - RGWRESTStreamReadRequest *in_stream_req; - string tag; - append_rand_alpha(cct, tag, tag, 32); - - RGWPutObjProcessor_Atomic processor(obj_ctx, - dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(), - cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled()); - ret = processor.prepare(this, NULL); - if (ret < 0) - return ret; - - RGWRESTConn *conn; - if (source_zone.empty()) { - if (dest_bucket_info.region.empty()) { - /* source is in the master region */ - conn = rest_master_conn; - } else { - map::iterator iter = region_conn_map.find(src_bucket_info.region); - if (iter == region_conn_map.end()) { - ldout(cct, 0) << "could not find region connection to region: " << source_zone << dendl; - return -ENOENT; - } - conn = iter->second; - } - } else { - map::iterator iter = zone_conn_map.find(source_zone); - if (iter == zone_conn_map.end()) { - ldout(cct, 0) << "could not find zone connection to zone: " << source_zone << dendl; - return -ENOENT; - } - conn = iter->second; - } - - string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_object(); - - RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name); - - int ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; - return ret; - } - RGWRadosPutObj cb(&processor, &opstate, progress_cb, progress_data); - string etag; - map req_headers; - time_t set_mtime; - - ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req); - if (ret < 0) - goto set_err_state; - - ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers); - if (ret < 0) - goto set_err_state; - - if (petag) { - *petag = etag; - } - - { /* opening scope so that we can do goto, sorry */ - bufferlist& extra_data_bl = processor.get_extra_data(); - if (extra_data_bl.length()) { - JSONParser jp; - if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { - ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; - goto set_err_state; - } - - JSONDecoder::decode_json("attrs", src_attrs, &jp); - - src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout - } - } - - set_copy_attrs(src_attrs, attrs, replace_attrs, !source_zone.empty()); - - ret = cb.complete(etag, mtime, set_mtime, src_attrs); - if (ret < 0) - goto set_err_state; - - ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; - } + read_op.conds.mod_ptr = mod_ptr; + read_op.conds.unmod_ptr = unmod_ptr; + read_op.conds.if_match = if_match; + read_op.conds.if_nomatch = if_nomatch; + read_op.params.attrs = &src_attrs; + read_op.params.lastmod = &lastmod; + read_op.params.read_size = &total_len; + read_op.params.obj_size = &obj_size; + read_op.params.perr = err; - return 0; -set_err_state: - int r = opstate.set_state(RGWOpState::OPSTATE_ERROR); - if (r < 0) { - ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl; - } + ret = read_op.prepare(&ofs, &end); + if (ret < 0) { return ret; } + set_copy_attrs(src_attrs, attrs, replace_attrs, false); src_attrs.erase(RGW_ATTR_ID_TAG); @@ -3425,30 +3488,8 @@ set_err_state: if (remote_dest) { /* dest is in a different region, copy it there */ - - string etag; - - RGWRESTStreamWriteRequest *out_stream_req; - - int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req); - if (ret < 0) - return ret; - - ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb()); - if (ret < 0) - return ret; - - ret = rest_master_conn->complete_request(out_stream_req, etag, mtime); - if (ret < 0) - return ret; - - if (petag) { - *petag = etag; - } - - return 0; + return copy_obj_to_remote_dest(astate, src_attrs, read_op, user_id, dest_obj, mtime); } - uint64_t max_chunk_size; ret = get_max_chunk_size(dest_obj.bucket, &max_chunk_size); @@ -3474,6 +3515,14 @@ set_err_state: } } + if (petag) { + map::iterator iter = src_attrs.find(RGW_ATTR_ETAG); + if (iter != src_attrs.end()) { + bufferlist& etagbl = iter->second; + *petag = string(etagbl.c_str(), etagbl.length()); + } + } + if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */ return copy_obj_data(obj_ctx, dest_bucket_info, read_op, end, dest_obj, src_obj, max_chunk_size, mtime, 0, src_attrs, category, ptag, petag, err); @@ -3558,14 +3607,6 @@ set_err_state: if (ret < 0) goto done_ret; - if (petag) { - map::iterator iter = src_attrs.find(RGW_ATTR_ETAG); - if (iter != src_attrs.end()) { - bufferlist& etagbl = iter->second; - *petag = string(etagbl.c_str(), etagbl.length()); - } - } - return 0; done_ret: @@ -3986,7 +4027,7 @@ int RGWRados::Object::Delete::delete_obj() ObjectWriteOperation op; - r = target->prepare_atomic_modification(op, false, NULL); + r = target->prepare_atomic_modification(op, false, NULL, NULL, NULL); if (r < 0) return r; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 7d670ecad4a4a..ea94fa7522520 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1577,6 +1577,35 @@ public: virtual bool aio_completed(void *handle); int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj); + int fetch_remote_obj(RGWObjectCtx& obj_ctx, + const string& user_id, + const string& client_id, + const string& op_id, + req_info *info, + const string& source_zone, + rgw_obj& dest_obj, + rgw_obj& src_obj, + RGWBucketInfo& dest_bucket_info, + RGWBucketInfo& src_bucket_info, + time_t *mtime, + const time_t *mod_ptr, + const time_t *unmod_ptr, + const char *if_match, + const char *if_nomatch, + bool replace_attrs, + map& attrs, + RGWObjCategory category, + string *ptag, + string *petag, + struct rgw_err *err, + void (*progress_cb)(off_t, void *), + void *progress_data); + int copy_obj_to_remote_dest(RGWObjState *astate, + map& src_attrs, + RGWRados::Object::Read& read_op, + const string& user_id, + rgw_obj& dest_obj, + time_t *mtime); /** * Copy an object. * dest_obj: the object to copy into -- 2.39.5