string source_zone;
RGWBucketInfo bucket_info;
+ std::optional<rgw_placement_rule> dest_placement_rule;
rgw_obj_key key;
std::optional<uint64_t> versioned_epoch;
RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
const string& _source_zone,
RGWBucketInfo& _bucket_info,
+ std::optional<rgw_placement_rule> _dest_placement_rule,
const rgw_obj_key& _key,
std::optional<uint64_t> _versioned_epoch,
bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
+ dest_placement_rule(_dest_placement_rule),
key(_key),
versioned_epoch(_versioned_epoch),
copy_if_newer(_if_newer)
string source_zone;
RGWBucketInfo bucket_info;
+ std::optional<rgw_placement_rule> dest_placement_rule;
rgw_obj_key key;
std::optional<uint64_t> versioned_epoch;
RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const string& _source_zone,
RGWBucketInfo& _bucket_info,
+ std::optional<rgw_placement_rule> _dest_placement_rule,
const rgw_obj_key& _key,
std::optional<uint64_t> _versioned_epoch,
bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
+ dest_placement_rule(_dest_placement_rule),
key(_key),
versioned_epoch(_versioned_epoch),
copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
}
int send_request() override {
- req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+ req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
+ source_zone, bucket_info, dest_placement_rule,
key, versioned_epoch, copy_if_newer, zones_trace);
async_rados->queue(req);
return 0;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
- uint64_t extra_data_left;
- uint64_t data_len;
+ uint64_t extra_data_left{0};
+ bool need_to_process_attrs{false};
+ uint64_t data_len{0};
map<string, bufferlist> src_attrs;
uint64_t ofs{0};
uint64_t lofs{0}; /* logical ofs */
+ std::function<int(const map<string, bufferlist>&)> attrs_handler;
public:
RGWRadosPutObj(CephContext* cct,
CompressorRef& plugin,
boost::optional<RGWPutObj_Compress>& compressor,
rgw::putobj::ObjectProcessor *p,
void (*_progress_cb)(off_t, void *),
- void *_progress_data) :
+ void *_progress_data,
+ std::function<int(const map<string, bufferlist>&)> _attrs_handler) :
cct(cct),
filter(p),
compressor(compressor),
processor(p),
progress_cb(_progress_cb),
progress_data(_progress_data),
- extra_data_left(0),
- data_len(0) {}
+ attrs_handler(_attrs_handler) {}
int process_attrs(void) {
if (extra_data_bl.length()) {
src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
}
+ int ret = attrs_handler(src_attrs);
+ if (ret < 0) {
+ return ret;
+ }
+
if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
//do not compress if object is encrypted
compressor = boost::in_place(cct, plugin, filter);
buffering = boost::in_place(&*compressor, buffer_size);
filter = &*buffering;
}
+
return 0;
}
if (bl.length() == 0) {
return 0;
}
+ } else if (need_to_process_attrs) {
+ /* need to call process_attrs() even if we don't get any attrs,
+ * need it to call attrs_handler(). At the moment this
+ * will never happenas all callers will have extra_data_len > 0, but need
+ * to have it for sake of completeness.
+ */
+ int res = process_attrs();
+ if (res < 0) {
+ return res;
+ }
+ need_to_process_attrs = false;
}
ceph_assert(uint64_t(ofs) >= extra_data_len);
void set_extra_data_len(uint64_t len) override {
extra_data_left = len;
+ if (len == 0) {
+ need_to_process_attrs = true;
+ }
RGWHTTPStreamRWRequest::ReceiveCB::set_extra_data_len(len);
}
const rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
RGWBucketInfo& src_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
real_time *src_mtime,
real_time *mtime,
const real_time *mod_ptr,
append_rand_alpha(cct, tag, tag, 32);
obj_time_weight set_mtime_weight;
set_mtime_weight.high_precision = high_precision_time;
+ int ret;
rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
- rgw_placement_rule *ptail_rule{nullptr};
-#warning FIXME ptail_rule
+ const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
obj_ctx, dest_obj, olh_epoch, tag);
- int ret = processor.prepare();
- if (ret < 0) {
- return ret;
- }
-
RGWRESTConn *conn;
auto& zone_conn_map = svc.zone->get_zone_conn_map();
auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
}
}
- RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data);
+ RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data,
+ [&](const map<string, bufferlist>& obj_attrs) {
+ if (!ptail_rule) {
+ auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
+ if (iter != obj_attrs.end()) {
+ rgw_placement_rule dest_rule;
+ dest_rule.storage_class = iter->second.to_str();
+ dest_rule.inherit_from(dest_bucket_info.placement_rule);
+ processor.set_tail_placement(std::move(dest_rule));
+ }
+ }
+
+ int ret = processor.prepare();
+ if (ret < 0) {
+ return ret;
+ }
+ return 0;
+ });
string etag;
real_time set_mtime;
if (remote_src || !source_zone.empty()) {
return fetch_remote_obj(obj_ctx, user_id, info, source_zone,
- dest_obj, src_obj, dest_bucket_info, src_bucket_info, src_mtime, mtime, mod_ptr,
+ dest_obj, src_obj, dest_bucket_info, src_bucket_info,
+ dest_placement, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
olh_epoch, delete_at, ptag, petag, progress_cb, progress_data);