NULL, /* string *petag */
NULL, /* void (*progress_cb)(off_t, void *) */
NULL, /* void *progress_data */
+ NULL, /* rgw::sal::DataProcessorFactory *dp_factory */
dpp,
y,
no_trace);
nullptr, /* string *petag */
nullptr, /* void (*progress_cb)(off_t, void *) */
nullptr, /* void *progress_data */
+ nullptr, /* rgw::sal::DataProcessorFactory *dp_factory */
dpp,
y,
no_trace);
return copy_obj_data(octx, owner, dest_bucket_info,
dest_bucket_info.placement_rule,
read_op, obj_size - 1, obj, NULL, mtime,
- attrset, 0, real_time(), NULL, dpp, y);
+ attrset, 0, real_time(), NULL, NULL, dpp, y);
}
string *petag,
void (*progress_cb)(off_t, void *),
void *progress_data,
+ rgw::sal::DataProcessorFactory *dp_factory,
const DoutPrefixProvider *dpp,
optional_yield y,
jspan_context& trace)
if (ret < 0) {
return ret;
}
- if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
- // Current implementation does not follow S3 spec and even
- // may result in data corruption silently when copying
- // multipart objects across pools. So reject COPY operations
- //on encrypted objects before it is fully functional.
- ldpp_dout(dpp, 0) << "ERROR: copy op for encrypted object " << src_obj
- << " has not been implemented." << dendl;
- return -ERR_NOT_IMPLEMENTED;
- }
src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
src_attrs.erase(RGW_ATTR_DELETE_AT);
src_attrs.erase(RGW_ATTR_OLH_VER);
}
+ src_attrs.erase(RGW_ATTR_STORAGE_CLASS);
+
src_attrs.erase(RGW_ATTR_OBJ_REPLICATION_TRACE);
src_attrs.erase(RGW_ATTR_OBJ_REPLICATION_TIMESTAMP);
src_attrs.erase(RGW_ATTR_OBJ_REPLICATION_STATUS);
+ // drop encryption attributes
+ // will be generated by copy_obj_data() if encryption is requested
+ src_attrs.erase(RGW_ATTR_CRYPT_KEYSEL);
+ src_attrs.erase(RGW_ATTR_CRYPT_CONTEXT);
+ src_attrs.erase(RGW_ATTR_CRYPT_MODE);
+ src_attrs.erase(RGW_ATTR_CRYPT_KEYID);
+ src_attrs.erase(RGW_ATTR_CRYPT_KEYMD5);
+ src_attrs.erase(RGW_ATTR_CRYPT_DATAKEY);
+
set_copy_attrs(src_attrs, attrs, attrs_mod);
attrs.erase(RGW_ATTR_ID_TAG);
attrs.erase(RGW_ATTR_PG_VER);
}
}
+ if (dp_factory && dp_factory->need_copy_data()) {
+ copy_data = true;
+ }
+
if (petag) {
const auto iter = attrs.find(RGW_ATTR_ETAG);
if (iter != attrs.end()) {
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
attrs.erase(RGW_ATTR_TAIL_TAG);
return copy_obj_data(dest_obj_ctx, owner, dest_bucket_info, dest_placement, read_op, obj_size - 1, dest_obj,
- mtime, real_time(), attrs, olh_epoch, delete_at, petag, dpp, y);
+ mtime, real_time(), attrs, olh_epoch, delete_at, petag, dp_factory, dpp, y);
}
/* This has been in for 2 years, so we can safely assume amanifest is not NULL */
uint64_t olh_epoch,
real_time delete_at,
string *petag,
+ rgw::sal::DataProcessorFactory *dp_factory,
const DoutPrefixProvider *dpp,
optional_yield y,
bool log_op)
auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y);
using namespace rgw::putobj;
jspan_context no_trace{false, false};
- AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info,
- &dest_placement, owner,
- obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
- int ret = processor.prepare(y);
+ AtomicObjectProcessor aoproc(aio.get(), this, dest_bucket_info,
+ &dest_placement, owner,
+ obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
+
+ DataProcessorFilter aoproc_filter(&aoproc);
+ RGWGetObj_Filter *filter = &aoproc_filter;
+ if (dp_factory) {
+ int ret = dp_factory->set_writer(&aoproc, attrs, dpp, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ filter = dp_factory->get_filter();
+ }
+
+ int ret = aoproc.prepare(y);
if (ret < 0)
return ret;
off_t ofs = 0;
- do {
- bufferlist bl;
- ret = read_op.read(ofs, end, bl, y, dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: fail to read object data, ret = " << ret << dendl;
- return ret;
- }
-
- uint64_t read_len = ret;
- ret = processor.process(std::move(bl), ofs);
- if (ret < 0) {
- return ret;
- }
+ ret = read_op.iterate(dpp, ofs, end, filter, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to read object data ret=" << ret << dendl;
+ return ret;
+ }
- ofs += read_len;
- } while (ofs <= end);
+ ofs = end;
// flush
- ret = processor.process({}, ofs);
+ ret = filter->flush();
if (ret < 0) {
return ret;
}
}
const req_context rctx{dpp, y, nullptr};
- return processor.complete(accounted_size, etag, mtime, set_mtime, attrs,
+ return aoproc.complete(accounted_size, etag, mtime, set_mtime, attrs,
rgw::cksum::no_cksum, delete_at,
nullptr, nullptr, nullptr, nullptr, nullptr, rctx,
log_op ? rgw::sal::FLAG_LOG_OP : 0);
olh_epoch,
real_time(),
nullptr /* petag */,
+ nullptr, /* dp_factory */
dpp,
y,
log_op);