return 0;
}
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState** state,
- optional_yield y, bool follow_olh = true) override;
+ virtual int load_obj_state(const DoutPrefixProvider *dpp, optional_yield y,
+ bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
const DoutPrefixProvider* dpp, optional_yield y) override;
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
+ virtual int load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh = true) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
const DoutPrefixProvider* dpp, optional_yield y) override;
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
+ virtual int load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
}
}
- bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags) {
++bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags* tags) const {
+ if (bucket_exports_data()) {
+ for (auto& entry : target_pipes.pipe_map) {
+ auto& filter = entry.second.params.source.filter;
- if (filter.check_prefix(obj_name) && filter.check_tags(tags.get_tags())) {
++ if (filter.check_prefix(obj_name) &&
++ (tags == nullptr || filter.check_tags(tags->get_tags()))) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
if (!bucket) {
return target_hints;
}
- bool bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags);
++ bool bucket_exports_object(const std::string& obj_name, const RGWObjTags* tags) const;
bool bucket_exports_data() const;
bool bucket_imports_data() const;
delete rados_ctx;
}
- int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh)
+bool RadosObject::is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime)
+{
+ const auto& bucket_info = get_bucket()->get_info();
+ if (bucket_info.is_indexless()) {
+ ldpp_dout(dpp, 0) << "ERROR: Trying to check object replication status for object in an indexless bucket. obj=" << get_key() << dendl;
+ return false;
+ }
+
+ const auto& log_layout = bucket_info.layout.logs.front();
+ const uint32_t shard_count = num_shards(log_to_index_layout(log_layout));
+
+ std::string marker;
+ bool truncated;
+ list<rgw_bi_log_entry> entries;
+
+ const int shard_id = RGWSI_BucketIndex_RADOS::bucket_shard_index(get_key(), shard_count);
+
+ int ret = store->svc()->bilog_rados->log_list(dpp, bucket_info, log_layout, shard_id,
+ marker, 1, entries, &truncated);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: Failed to retrieve bilog info for obj=" << get_key() << dendl;
+ return false;
+ }
+
+ if (entries.empty()) {
+ return true;
+ }
+
+ const rgw_bi_log_entry& earliest_marker = entries.front();
+ return earliest_marker.timestamp > obj_mtime;
+}
+
+ int RadosObject::load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh)
{
- int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), pstate, &manifest, follow_olh, y);
+ RGWObjState *pstate{nullptr};
+
+ int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), &pstate, &manifest, follow_olh, y);
if (ret < 0) {
return ret;
}
return read_op.prepare(y, dpp);
}
-int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
{
Attrs empty;
- const auto mtime = state.mtime + std::chrono::nanoseconds(1);
++ const bool log_op = flags & rgw::sal::FLAG_LOG_OP;
+ // make a tiny adjustment to the existing mtime so that fetch_remote_obj()
+ // won't return ERR_NOT_MODIFIED when syncing the modified object
++ const auto mtime = log_op ? state.mtime + std::chrono::nanoseconds(1) : state.mtime;
return store->getRados()->set_attrs(dpp, rados_ctx,
bucket->get_info(),
get_obj(),
setattrs ? *setattrs : empty,
delattrs ? delattrs : nullptr,
- y, flags & rgw::sal::FLAG_LOG_OP);
- y, mtime);
++ y, log_op, mtime);
}
int RadosObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
StoreObject::set_compressed();
}
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
++
+ virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+ /* For rgw_admin.cc */
+ RGWObjState& get_state() { return state; }
+ virtual int load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh = true) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
}
}
- setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = bl;
+void handle_replication_status_header(
+ const DoutPrefixProvider *dpp,
+ rgw::sal::Attrs& attrs,
+ req_state* s,
+ const ceph::real_time &obj_mtime) {
+ auto attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS);
+ if (attr_iter != attrs.end() && attr_iter->second.to_str() == "PENDING") {
+ if (s->object->is_sync_completed(dpp, obj_mtime)) {
+ s->object->set_atomic();
+ rgw::sal::Attrs setattrs, rmattrs;
+ bufferlist bl;
+ bl.append("COMPLETED");
++ setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = std::move(bl);
+ int ret = s->object->set_obj_attrs(dpp, &setattrs, &rmattrs, s->yield, 0);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << *s->object << " has amz-replication-status header set to COMPLETED" << dendl;
+ }
+ }
+ }
+}
+
/*
* GET on CloudTiered objects is processed only when sent from the sync client.
* In all other cases, fail with `ERR_INVALID_OBJECT_STATE`.
}
}
- if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), *obj_tags)) {
+ RGWBucketSyncPolicyHandlerRef policy_handler;
+ op_ret = driver->get_sync_policy_handler(this, std::nullopt, s->bucket->get_key(), &policy_handler, s->yield);
+
+ if (op_ret < 0) {
+ ldpp_dout(this, 0) << "failed to read sync policy for bucket: " << s->bucket << dendl;
+ return;
+ }
++ if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), obj_tags.get())) {
+ bufferlist repl_bl;
+ repl_bl.append("PENDING");
+ emplace_attr(RGW_ATTR_OBJ_REPLICATION_STATUS, std::move(repl_bl));
+ }
+
if (slo_info) {
bufferlist manifest_bl;
encode(*slo_info, manifest_bl);
/** Get the name of this object */
virtual const std::string &get_name() const = 0;
- /** Get the object state for this object. Will be removed in the future */
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) = 0;
- /** Set the object state for this object */
- virtual void set_obj_state(RGWObjState& _state) = 0;
+ /** Load the object state for this object. */
+ virtual int load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh = true) = 0;
/** Set attributes for this object from the backing store. Attrs can be set or
* deleted. @note the attribute APIs may be revisited in the future. */
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) = 0;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) = 0;
/** Get attributes for this object */
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) = 0;
/** Modify attributes for this object. */
}
set_atomic();
state.attrset[attr_name] = attr_val;
- return set_obj_attrs(dpp, &state.attrset, nullptr, y, 0);
- return set_obj_attrs(dpp, &state.attrset, nullptr, y);
++ return set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
}
int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
set_atomic();
rmattr[attr_name] = bl;
- return set_obj_attrs(dpp, nullptr, &rmattr, y, 0);
- return set_obj_attrs(dpp, nullptr, &rmattr, y);
++ return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
}
bool DBObject::is_expired() {
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
+ virtual int load_obj_state(const DoutPrefixProvider* dpp, optional_yield y, bool follow_olh = true) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
virtual bool empty() const override { return next->empty(); }
virtual const std::string &get_name() const override { return next->get_name(); }
- virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state,
- optional_yield y, bool follow_olh = true) override;
- virtual void set_obj_state(RGWObjState& _state) override { return next->set_obj_state(_state); }
+ virtual int load_obj_state(const DoutPrefixProvider *dpp, optional_yield y,
+ bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
zone.iam_conn.get_role, RoleName=role_name)
log.info(f'success, zone: {zone.name} does not have role: {role_name}')
++
+def test_replication_status():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ bucket = zone.conn.create_bucket(gen_bucket_name())
+ obj_name = "a"
+ k = new_key(zone, bucket.name, obj_name)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ head_res = zone.head_object(bucket.name, obj_name)
+ log.info("checking if object has PENDING ReplicationStatus")
+ assert(head_res["ReplicationStatus"] == "PENDING")
+
+ bilog_autotrim(zone.zone)
+ zonegroup_data_checkpoint(zonegroup_conns)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ head_res = zone.head_object(bucket.name, obj_name)
+ log.info("checking if object has COMPLETED ReplicationStatus")
+ assert(head_res["ReplicationStatus"] == "COMPLETED")
+
+ log.info("checking that ReplicationStatus update did not write a bilog")
+ bilog = bilog_list(zone.zone, bucket.name)
+ assert(len(bilog) == 0)
+
+ def test_object_acl():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+
+ # upload a dummy object and wait for sync.
+ k = new_key(primary, bucket, 'dummy')
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+ #check object on secondary before setacl
+ bucket2 = get_bucket(secondary, bucket.name)
+ before_set_acl = bucket2.get_acl(k)
+ assert(len(before_set_acl.acl.grants) == 1)
+
+ #set object acl on primary and wait for sync.
+ bucket.set_canned_acl('public-read', key_name=k)
+ log.debug('set acl=%s', bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ #check object secondary after setacl
+ bucket2 = get_bucket(secondary, bucket.name)
+ after_set_acl = bucket2.get_acl(k)
+ assert(len(after_set_acl.acl.grants) == 2) # read grant added on AllUsers
+
++
+ @attr('fails_with_rgw')
@attr('data_sync_init')
def test_bucket_full_sync_after_data_sync_init():
zonegroup = realm.master_zonegroup()