}
}
-int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
+int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj)
{
map<string, bufferlist> attrset;
attrset.erase(RGW_ATTR_TAIL_TAG);
return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, NULL, mtime, attrset,
- 0, real_time(),
- (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
- NULL);
+ 0, real_time(), NULL);
}
struct obj_time_weight {
}
}
+ if (version_id && !version_id->empty()) {
+ dest_obj.key.set_instance(*version_id);
+ } else if (dest_bucket_info.versioning_enabled()) {
+ gen_rand_obj_instance_name(&dest_obj);
+ }
+
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
return copy_obj_data(obj_ctx, dest_bucket_info, read_op, obj_size - 1, dest_obj,
- mtime, real_time(), attrs, olh_epoch, delete_at,
- version_id, petag);
+ mtime, real_time(), attrs, olh_epoch, delete_at, petag);
}
RGWObjManifest::obj_iterator miter = astate->manifest.obj_begin();
return ret;
}
- if (version_id && !version_id->empty()) {
- dest_obj.key.set_instance(*version_id);
- } else if (dest_bucket_info.versioning_enabled()) {
- gen_rand_obj_instance_name(&dest_obj);
- }
-
bufferlist first_chunk;
bool copy_itself = (dest_obj == src_obj);
int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
RGWRados::Object::Read& read_op, off_t end,
- rgw_obj& dest_obj,
+ const rgw_obj& dest_obj,
real_time *mtime,
real_time set_mtime,
map<string, bufferlist>& attrs,
uint64_t olh_epoch,
real_time delete_at,
- string *version_id,
string *petag)
{
string tag;
append_rand_alpha(cct, tag, tag, 32);
- RGWPutObjProcessor_Atomic processor(obj_ctx,
- dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
- cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
- if (version_id) {
- processor.set_version_id(*version_id);
- }
- processor.set_olh_epoch(olh_epoch);
- int ret = processor.prepare(this, NULL);
+ using namespace rgw::putobj;
+ AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+ AtomicObjectProcessor processor(&aio, this, dest_bucket_info,
+ dest_bucket_info.owner, obj_ctx,
+ dest_obj, olh_epoch, tag);
+ int ret = processor.prepare();
if (ret < 0)
return ret;
}
uint64_t read_len = ret;
- bool again;
-
- do {
- void *handle;
- rgw_raw_obj obj;
-
- ret = processor.handle_data(bl, ofs, &handle, &obj, &again);
- if (ret < 0) {
- return ret;
- }
- ret = processor.throttle_data(handle, obj, read_len, false);
- if (ret < 0)
- return ret;
- } while (again);
+ ret = processor.process(std::move(bl), ofs);
+ if (ret < 0) {
+ return ret;
+ }
ofs += read_len;
} while (ofs <= end);
+ // flush
+ ret = processor.process({}, ofs);
+ if (ret < 0) {
+ return ret;
+ }
+
string etag;
auto iter = attrs.find(RGW_ATTR_ETAG);
if (iter != attrs.end()) {
accounted_size = compressed ? cs_info.orig_size : ofs;
}
- return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at);
+ return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+ nullptr, nullptr, nullptr, nullptr, nullptr);
}
bool RGWRados::is_meta_master()
ATTRSMOD_MERGE = 2
};
- int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj);
+ int rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj);
int stat_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
int copy_obj_data(RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
RGWRados::Object::Read& read_op, off_t end,
- rgw_obj& dest_obj,
+ const rgw_obj& dest_obj,
ceph::real_time *mtime,
ceph::real_time set_mtime,
map<string, bufferlist>& attrs,
uint64_t olh_epoch,
ceph::real_time delete_at,
- string *version_id,
string *petag);
int check_bucket_empty(RGWBucketInfo& bucket_info);