NULL, /* void (*progress_cb)(off_t, void *), */
NULL, /* void *progress_data*); */
dpp,
+ filter.get(),
&zones_trace,
&bytes_transferred);
real_time src_mtime;
bool copy_if_newer;
+ std::shared_ptr<RGWFetchObjFilter> filter;
rgw_zone_set zones_trace;
PerfCounters* counters;
const DoutPrefixProvider *dpp;
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace,
+ bool _if_newer,
+ std::shared_ptr<RGWFetchObjFilter> _filter,
+ rgw_zone_set *_zones_trace,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), counters(counters),
+ copy_if_newer(_if_newer),
+ filter(_filter),
+ counters(counters),
dpp(dpp)
{
if (_zones_trace) {
bool copy_if_newer;
+ std::shared_ptr<RGWFetchObjFilter> filter;
+
RGWAsyncFetchRemoteObj *req;
rgw_zone_set *zones_trace;
PerfCounters* counters;
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace,
+ bool _if_newer,
+ std::shared_ptr<RGWFetchObjFilter> _filter,
+ rgw_zone_set *_zones_trace,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), req(NULL),
+ copy_if_newer(_if_newer),
+ filter(_filter),
+ req(NULL),
zones_trace(_zones_trace), counters(counters), dpp(dpp) {}
int send_request() override {
req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
- key, dest_key, versioned_epoch, copy_if_newer,
+ key, dest_key, versioned_epoch, copy_if_newer, filter,
zones_trace, counters, dpp);
async_rados->queue(req);
return 0;
return 0;
}
+class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
+ rgw_bucket_sync_pipe sync_pipe;
+
+public:
+ RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
+ }
+
+ int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) override;
+};
+
+int RGWFetchObjFilter_Sync::filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule)
+{
+ rgw_sync_pipe_params params;
+
+ RGWObjTags obj_tags;
+
+ auto iter = obj_attrs.find(RGW_ATTR_TAGS);
+ if (iter != obj_attrs.end()) {
+ try{
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error &err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ }
+ }
+
+ if (!sync_pipe.info.handler.find_obj_params(source_key,
+ obj_tags.get_tags(),
+ ¶ms)) {
+ return -ERR_PRECONDITION_FAILED;
+ }
+
+ if (!dest_placement_rule &&
+ params.dest.storage_class) {
+ dest_rule.storage_class = *params.dest.storage_class;
+ dest_rule.inherit_from(dest_bucket_info.placement_rule);
+ dest_placement_rule = dest_rule;
+ *prule = &dest_rule;
+ }
+
+ return RGWFetchObjFilter_Default::filter(cct,
+ source_key,
+ dest_bucket_info,
+ dest_placement_rule,
+ obj_attrs,
+ prule);
+}
+
+
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
+
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ sync_pipe.info.source_bs.bucket,
std::nullopt, sync_pipe.dest_bucket_info,
key, std::nullopt, versioned_epoch,
- true, zones_trace, sync_env->counters, sync_env->dpp);
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, sync_env->counters, sync_env->dpp);
}
RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
}
}
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
key, dest_key, versioned_epoch,
- true, zones_trace, nullptr, sync_env->dpp);
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, nullptr, sync_env->dpp);
}
RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
}
}
+ void set_tail_placement(const rgw_placement_rule& tpr) {
+ tail_placement_rule = tpr;
+ }
void set_tail_placement(const rgw_placement_rule&& tpr) {
tail_placement_rule = tpr;
}
return 0;
}
+int RGWFetchObjFilter_Default::filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule)
+{
+ const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
+ if (!ptail_rule) {
+ auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
+ if (iter != obj_attrs.end()) {
+ dest_rule.storage_class = iter->second.to_str();
+ dest_rule.inherit_from(dest_bucket_info.placement_rule);
+ ptail_rule = &dest_rule;
+ } else {
+ ptail_rule = &dest_bucket_info.placement_rule;
+ }
+ }
+ *prule = ptail_rule;
+ return 0;
+}
+
int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
req_info *info,
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
+ RGWFetchObjFilter *filter,
rgw_zone_set *zones_trace,
std::optional<uint64_t>* bytes_transferred)
{
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
- const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
- AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, ptail_rule, user_id,
+ AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, nullptr, user_id,
obj_ctx, dest_obj, olh_epoch, tag, dpp, null_yield);
RGWRESTConn *conn;
auto& zone_conn_map = svc.zone->get_zone_conn_map();
conn = iter->second;
}
- string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_oid();
-
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
- rgw_placement_rule dest_rule;
+ RGWFetchObjFilter_Default source_filter;
+ if (!filter) {
+ filter = &source_filter;
+ }
+
RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data,
[&](const map<string, bufferlist>& obj_attrs) {
- if (!ptail_rule) {
- auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
- if (iter != obj_attrs.end()) {
- dest_rule.storage_class = iter->second.to_str();
- dest_rule.inherit_from(dest_bucket_info.placement_rule);
- processor.set_tail_placement(std::move(dest_rule));
- ptail_rule = &dest_rule;
- } else {
- ptail_rule = &dest_bucket_info.placement_rule;
- }
+ const rgw_placement_rule *ptail_rule;
+ int ret = filter->filter(cct,
+ src_obj.key,
+ dest_bucket_info,
+ dest_placement_rule,
+ obj_attrs,
+ &ptail_rule);
+ if (ret < 0) {
+ ldout(cct, 5) << "Aborting fetch: source object filter returned ret=" << ret << dendl;
+ return ret;
}
+
+ processor.set_tail_placement(*ptail_rule);
+
const auto& compression_type = svc.zone->get_zone_params().get_compression_type(*ptail_rule);
if (compression_type != "none") {
plugin = Compressor::create(cct, compression_type);
}
}
- int ret = processor.prepare(null_yield);
+ ret = processor.prepare(null_yield);
if (ret < 0) {
return ret;
}
dest_placement, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
- olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp);
+ olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp,
+ nullptr /* filter */);
}
map<string, bufferlist> src_attrs;
}
};
+class RGWFetchObjFilter {
+public:
+ virtual ~RGWFetchObjFilter() {}
+
+ virtual int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) = 0;
+};
+
+class RGWFetchObjFilter_Default : public RGWFetchObjFilter {
+protected:
+ rgw_placement_rule dest_rule;
+public:
+ RGWFetchObjFilter_Default() {}
+
+ int filter(CephContext *cct,
+ const rgw_obj_key& source_key,
+ const RGWBucketInfo& dest_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement_rule,
+ const map<string, bufferlist>& obj_attrs,
+ const rgw_placement_rule **prule) override;
+};
+
class RGWObjectCtx {
rgw::sal::RGWRadosStore *store;
ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx");
void (*progress_cb)(off_t, void *),
void *progress_data,
const DoutPrefixProvider *dpp,
+ RGWFetchObjFilter *filter,
rgw_zone_set *zones_trace= nullptr,
std::optional<uint64_t>* bytes_transferred = 0);
/**
return true;
}
- for (auto& t : tags) {
- if (t == s) {
+ auto iter = tags.find(rgw_sync_pipe_filter_tag(s));
+ return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const
+{
+ if (tags.empty()) { /* tag filter wasn't defined */
+ return true;
+ }
+
+ auto iter = tags.find(rgw_sync_pipe_filter_tag(k, v));
+ return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const
+{
+ if (tags.empty()) {
+ return true;
+ }
+
+ for (auto& t : _tags) {
+ if (check_tag(t)) {
return true;
}
}
return false;
}
-bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const
+bool rgw_sync_pipe_filter::check_tags(const RGWObjTags::tag_map_t& _tags) const
{
- for (auto& t : tags) {
- if (check_tag(t)) {
+ if (tags.empty()) {
+ return true;
+ }
+
+ for (auto& item : _tags) {
+ if (check_tag(item.first, item.second)) {
return true;
}
}
string key;
string value;
+ rgw_sync_pipe_filter_tag() {}
+ rgw_sync_pipe_filter_tag(const string& s) {
+ from_str(s);
+ }
+ rgw_sync_pipe_filter_tag(const string& _key,
+ const string& _value) : key(_key),
+ value(_value) {}
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(key, bl);
bool is_subset_of(const rgw_sync_pipe_filter& f) const;
bool check_tag(const string& s) const;
+ bool check_tag(const string& k, const string& v) const;
bool check_tags(const std::vector<string>& tags) const;
+ bool check_tags(const RGWObjTags::tag_map_t& tags) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)