}
int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y)
+ Attrs* delattrs, optional_yield y, uint32_t flags)
{
if (setattrs != NULL) {
/* Ensure setattrs and delattrs do not overlap */
ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
}
- return next->set_obj_attrs(dpp, setattrs, delattrs, y);
+ return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
}
int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
const DoutPrefixProvider* dpp, optional_yield y) override;
virtual const std::string &get_name() const override { return next->get_name(); }
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
DaosObject::~DaosObject() { close(nullptr); }
int DaosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) {
+ Attrs* delattrs, optional_yield y, uint32_t flags) {
ldpp_dout(dpp, 20) << "DEBUG: DaosObject::set_obj_attrs()" << dendl;
// TODO handle target_obj
// Get object's metadata (those stored in rgw_bucket_dir_entry)
bufferlist bl;
rmattr[attr_name] = bl;
- return set_obj_attrs(dpp, nullptr, &rmattr, y);
+ return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
}
bool DaosObject::is_expired() {
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState** state,
optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
// return read_op.prepare(dpp);
// }
-int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
{
// TODO: implement
ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl;
}
set_atomic();
state.attrset[attr_name] = attr_val;
- return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+ return set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
}
int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
set_atomic();
rmattr[attr_name] = bl;
- return set_obj_attrs(dpp, nullptr, &rmattr, y);
+ return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
}
bool MotrObject::is_expired() {
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
}
int POSIXObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y)
+ Attrs* delattrs, optional_yield y, uint32_t flags)
{
if (delattrs) {
for (auto& it : *delattrs) {
return ret;
}
- ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y);
+ ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y, rgw::sal::FLAG_LOG_OP);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: could not write attrs to dest object "
<< dobj->get_name() << dendl;
attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl;
- return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y);
+ return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
}
int POSIXMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
}
}
+bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags) {
+ if (bucket_exports_data()) {
+ for (auto& entry : target_pipes.pipe_map) {
+ auto& filter = entry.second.params.source.filter;
+ if (filter.check_prefix(obj_name) && filter.check_tags(tags.get_tags())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
if (!bucket) {
return target_hints;
}
+ bool bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags);
bool bucket_exports_data() const;
bool bucket_imports_data() const;
};
return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
- add_attrs, nullptr, y, set_mtime);
+ add_attrs, nullptr, y, true, set_mtime);
}
static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
{
map<string, bufferlist> attrs;
attrs[name] = bl;
- return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y);
+ return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y, true);
}
int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
optional_yield y,
+ bool log_op,
ceph::real_time set_mtime /* = zero() */)
{
rgw_obj obj = src_obj;
string tag;
append_rand_alpha(cct, tag, tag, 32);
state->write_tag = tag;
- r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y);
+ r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y, log_op);
if (r < 0)
return r;
int64_t poolid = ioctx.get_id();
r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
mtime, etag, content_type, storage_class, owner,
- RGWObjCategory::Main, nullptr, y);
+ RGWObjCategory::Main, nullptr, y, nullptr, false, log_op);
} else {
- int ret = index_op.cancel(dpp, nullptr, y);
+ int ret = index_op.cancel(dpp, nullptr, y, log_op);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
}
std::map<std::string, bufferlist>& attrs,
std::map<std::string, bufferlist>* rmattrs,
optional_yield y,
+ bool log_op,
ceph::real_time set_mtime = ceph::real_clock::zero());
int get_obj_state(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx,
#include "services/svc_meta.h"
#include "services/svc_meta_be_sobj.h"
#include "services/svc_cls.h"
+#include "services/svc_bilog_rados.h"
+#include "services/svc_bi_rados.h"
#include "services/svc_zone.h"
#include "services/svc_tier_rados.h"
#include "services/svc_quota.h"
delete rados_ctx;
}
+bool RadosObject::is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime)
+{
+ const auto& bucket_info = get_bucket()->get_info();
+ if (bucket_info.is_indexless()) {
+ ldpp_dout(dpp, 0) << "ERROR: Trying to check object replication status for object in an indexless bucket. obj=" << get_key() << dendl;
+ return false;
+ }
+
+ const auto& log_layout = bucket_info.layout.logs.front();
+ const uint32_t shard_count = num_shards(log_to_index_layout(log_layout));
+
+ std::string marker;
+ bool truncated;
+ list<rgw_bi_log_entry> entries;
+
+ const int shard_id = RGWSI_BucketIndex_RADOS::bucket_shard_index(get_key(), shard_count);
+
+ int ret = store->svc()->bilog_rados->log_list(dpp, bucket_info, log_layout, shard_id,
+ marker, 1, entries, &truncated);
+
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: Failed to retrieve bilog info for obj=" << get_key() << dendl;
+ return false;
+ }
+
+ if (entries.empty()) {
+ return true;
+ }
+
+ const rgw_bi_log_entry& earliest_marker = entries.front();
+ return earliest_marker.timestamp > obj_mtime;
+}
+
int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh)
{
int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), pstate, &manifest, follow_olh, y);
return read_op.prepare(y, dpp);
}
-int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
{
Attrs empty;
return store->getRados()->set_attrs(dpp, rados_ctx,
get_obj(),
setattrs ? *setattrs : empty,
delattrs ? delattrs : nullptr,
- y);
+ y, flags & rgw::sal::FLAG_LOG_OP);
}
int RadosObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
state.obj = target;
set_atomic();
state.attrset[attr_name] = attr_val;
- r = set_obj_attrs(dpp, &state.attrset, nullptr, y);
+ r = set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
/* Restore target */
state.obj = save;
set_atomic();
rmattr[attr_name] = bl;
- return set_obj_attrs(dpp, nullptr, &rmattr, y);
+ return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
}
bool RadosObject::is_expired() {
set_atomic();
map<string, bufferlist> attrs;
attrs[RGW_ATTR_ACL] = bl;
- r = set_obj_attrs(dpp, &attrs, nullptr, y);
+ r = set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: modify attr failed " << cpp_strerror(-r) << dendl;
return r;
StoreObject::set_compressed();
}
+ virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime) override;
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
#include "rgw_lua_data_filter.h"
#include "rgw_lua.h"
#include "rgw_iam_managed_policy.h"
+#include "rgw_bucket_sync.h"
#include "services/svc_zone.h"
#include "services/svc_quota.h"
}
}
+void handle_replication_status_header(
+ const DoutPrefixProvider *dpp,
+ rgw::sal::Attrs& attrs,
+ req_state* s,
+ const ceph::real_time &obj_mtime) {
+ auto attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS);
+ if (attr_iter != attrs.end() && attr_iter->second.to_str() == "PENDING") {
+ if (s->object->is_sync_completed(dpp, obj_mtime)) {
+ s->object->set_atomic();
+ rgw::sal::Attrs setattrs, rmattrs;
+ bufferlist bl;
+ bl.append("COMPLETED");
+ setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = bl;
+ int ret = s->object->set_obj_attrs(dpp, &setattrs, &rmattrs, s->yield, 0);
+ if (ret == 0) {
+ ldpp_dout(dpp, 20) << *s->object << " has amz-replication-status header set to COMPLETED" << dendl;
+ }
+ }
+ }
+}
+
/*
* GET on CloudTiered objects is processed only when sent from the sync client.
* In all other cases, fail with `ERR_INVALID_OBJECT_STATE`.
}
#endif
+
op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
if (op_ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
filter = &*decompress;
}
+ handle_replication_status_header(this, attrs, s, lastmod);
+
attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_TRACE);
if (attr_iter != attrs.end()) {
try {
} catch (const buffer::error&) {}
}
+
if (get_type() == RGW_OP_GET_OBJ && get_data) {
op_ret = handle_cloudtier_obj(attrs, sync_cloudtiered);
if (op_ret < 0) {
}
}
+ RGWBucketSyncPolicyHandlerRef policy_handler;
+ op_ret = driver->get_sync_policy_handler(this, std::nullopt, s->bucket->get_key(), &policy_handler, s->yield);
+
+ if (op_ret < 0) {
+ ldpp_dout(this, 0) << "failed to read sync policy for bucket: " << s->bucket << dendl;
+ return;
+ }
+ if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), *obj_tags)) {
+ bufferlist repl_bl;
+ repl_bl.append("PENDING");
+ emplace_attr(RGW_ATTR_OBJ_REPLICATION_STATUS, std::move(repl_bl));
+ }
+
if (slo_info) {
bufferlist manifest_bl;
encode(*slo_info, manifest_bl);
}
}
- op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield);
+ op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield, rgw::sal::FLAG_LOG_OP);
}
int RGWDeleteObj::handle_slo_manifest(bufferlist& bl, optional_yield y)
s->object->set_atomic();
- op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y);
+ op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y, rgw::sal::FLAG_LOG_OP);
if (op_ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to delete obj attrs, obj=" << s->object
<< " ret=" << op_ret << dendl;
if (!rgw::sal::Object::empty(s->object.get())) {
rgw::sal::Attrs a(attrs);
- op_ret = s->object->set_obj_attrs(this, &a, nullptr, y);
+ op_ret = s->object->set_obj_attrs(this, &a, nullptr, y, rgw::sal::FLAG_LOG_OP);
} else {
op_ret = s->bucket->merge_and_store_attrs(this, attrs, y);
}
virtual void set_compressed() = 0;
/** Check if this object is compressed */
virtual bool is_compressed() = 0;
+ /** Check if object is synced */
+ virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime) = 0;
/** Invalidate cached info about this object, except atomic, prefetch, and
* compressed */
virtual void invalidate() = 0;
virtual void set_obj_state(RGWObjState& _state) = 0;
/** Set attributes for this object from the backing store. Attrs can be set or
* deleted. @note the attribute APIs may be revisited in the future. */
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) = 0;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) = 0;
/** Get attributes for this object */
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) = 0;
/** Modify attributes for this object. */
return read_op.prepare(dpp);
}
- int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+ int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
{
Attrs empty;
DB::Object op_target(store->getDB(),
}
set_atomic();
state.attrset[attr_name] = attr_val;
- return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+ return set_obj_attrs(dpp, &state.attrset, nullptr, y, 0);
}
int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
set_atomic();
rmattr[attr_name] = bl;
- return set_obj_attrs(dpp, nullptr, &rmattr, y);
+ return set_obj_attrs(dpp, nullptr, &rmattr, y, 0);
}
bool DBObject::is_expired() {
virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
- virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+ virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
}
int FilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y)
+ Attrs* delattrs, optional_yield y, uint32_t flags)
{
- return next->set_obj_attrs(dpp, setattrs, delattrs, y);
+ return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
}
int FilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
virtual bool is_prefetch_data() override { return next->is_prefetch_data(); }
virtual void set_compressed() override { return next->set_compressed(); }
virtual bool is_compressed() override { return next->is_compressed(); }
+ virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime) override { return next->is_sync_completed(dpp, obj_mtime); }
virtual void invalidate() override { return next->invalidate(); }
virtual bool empty() const override { return next->empty(); }
virtual const std::string &get_name() const override { return next->get_name(); }
optional_yield y, bool follow_olh = true) override;
virtual void set_obj_state(RGWObjState& _state) override { return next->set_obj_state(_state); }
virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
- Attrs* delattrs, optional_yield y) override;
+ Attrs* delattrs, optional_yield y, uint32_t flags) override;
virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
rgw_obj* target_obj = NULL) override;
virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
virtual bool is_prefetch_data() override { return state.prefetch_data; }
virtual void set_compressed() override { state.compressed = true; }
virtual bool is_compressed() override { return state.compressed; }
+ virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+ const ceph::real_time& obj_mtime) override { return false; }
virtual void invalidate() override {
rgw_obj obj = state.obj;
bool is_atomic = state.is_atomic;
}
}
+bool rgw_sync_pipe_filter::check_prefix(const std::string& obj_name) const
+{
+ if (prefix.has_value()) {
+ return boost::starts_with(obj_name, prefix.value());
+ }
+ return true;
+}
+
void rgw_sync_pipe_filter::set_tags(std::list<std::string>& tags_add,
std::list<std::string>& tags_rm)
{
bool check_tag(const std::string& k, const std::string& v) const;
bool check_tags(const std::vector<std::string>& tags) const;
bool check_tags(const RGWObjTags::tag_map_t& tags) const;
+ bool check_prefix(const std::string& obj_name) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)
zone.iam_conn.get_role, RoleName=role_name)
log.info(f'success, zone: {zone.name} does not have role: {role_name}')
+def test_replication_status():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ zone = zonegroup_conns.rw_zones[0]
+
+ bucket = zone.conn.create_bucket(gen_bucket_name())
+ obj_name = "a"
+ k = new_key(zone, bucket.name, obj_name)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ head_res = zone.head_object(bucket.name, obj_name)
+ log.info("checking if object has PENDING ReplicationStatus")
+ assert(head_res["ReplicationStatus"] == "PENDING")
+
+ bilog_autotrim(zone.zone)
+ zonegroup_data_checkpoint(zonegroup_conns)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ head_res = zone.head_object(bucket.name, obj_name)
+ log.info("checking if object has COMPLETED ReplicationStatus")
+ assert(head_res["ReplicationStatus"] == "COMPLETED")
+
+ log.info("checking that ReplicationStatus update did not write a bilog")
+ bilog = bilog_list(zone.zone, bucket.name)
+ assert(len(bilog) == 0)
@attr('data_sync_init')
def test_bucket_full_sync_after_data_sync_init():
return out['TopicConfigurations']
return []
+ def head_object(self, bucket_name, obj_name):
+ return self.s3_client.head_object(Bucket=bucket_name, Key=obj_name)
+
def get_conn(self, credentials):
return self.Conn(self, credentials)