static int log_index_operation(cls_method_context_t hctx, cls_rgw_obj_key& obj_key, RGWModifyOp op,
string& tag, real_time& timestamp,
rgw_bucket_entry_ver& ver, RGWPendingState state, uint64_t index_ver,
- string& max_marker, uint16_t bilog_flags, string *owner, string *owner_display_name)
+ string& max_marker, uint16_t bilog_flags, string *owner, string *owner_display_name, rgw_zone_set *zones_trace)
{
bufferlist bl;
if (owner_display_name) {
entry.owner_display_name = *owner_display_name;
}
+ if (zones_trace) {
+ entry.zones_trace = std::move(*zones_trace);
+ }
string key;
bi_log_index_key(hctx, key, entry.id, index_ver);
if (op.log_op) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime,
- entry.ver, info.state, header.ver, header.max_marker, op.bilog_flags, NULL, NULL);
+ entry.ver, info.state, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
return rc;
}
if (cancel) {
if (op.log_op) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime, entry.ver,
- CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL);
+ CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
return rc;
}
if (op.log_op) {
rc = log_index_operation(hctx, op.key, op.op, op.tag, entry.meta.mtime, entry.ver,
- CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL);
+ CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
return rc;
}
if (op.log_op) {
rc = log_index_operation(hctx, remove_key, CLS_RGW_OP_DEL, op.tag, remove_entry.meta.mtime,
- remove_entry.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL);
+ remove_entry.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags, NULL, NULL, &op.zones_trace);
if (rc < 0)
continue;
}
ret = log_index_operation(hctx, op.key, operation, op.op_tag,
entry.meta.mtime, ver,
CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, op.bilog_flags | RGW_BILOG_FLAG_VERSIONED_OP,
- powner, powner_display_name);
+ powner, powner_display_name, &op.zones_trace);
if (ret < 0)
return ret;
}
ret = log_index_operation(hctx, op.key, CLS_RGW_OP_UNLINK_INSTANCE, op.op_tag,
mtime, ver,
CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker,
- op.bilog_flags | RGW_BILOG_FLAG_VERSIONED_OP, NULL, NULL);
+ op.bilog_flags | RGW_BILOG_FLAG_VERSIONED_OP, NULL, NULL, &op.zones_trace);
if (ret < 0)
return ret;
}
return ret;
if (log_op && cur_disk.exists) {
ret = log_index_operation(hctx, cur_disk.key, CLS_RGW_OP_DEL, cur_disk.tag, cur_disk.meta.mtime,
- cur_disk.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL);
+ cur_disk.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): failed to log operation ret=%d", __func__, ret);
return ret;
return ret;
if (log_op) {
ret = log_index_operation(hctx, cur_change.key, CLS_RGW_OP_ADD, cur_change.tag, cur_change.meta.mtime,
- cur_change.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL);
+ cur_change.ver, CLS_RGW_STATE_COMPLETE, header.ver, header.max_marker, 0, NULL, NULL, NULL);
if (ret < 0) {
CLS_LOG(0, "ERROR: %s(): failed to log operation ret=%d", __func__, ret);
return ret;
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
const cls_rgw_obj_key& key, const string& locator, bool log_op,
- uint16_t bilog_flags)
+ uint16_t bilog_flags, rgw_zone_set& zones_trace)
{
struct rgw_cls_obj_prepare_op call;
call.op = op;
call.locator = locator;
call.log_op = log_op;
call.bilog_flags = bilog_flags;
+ call.zones_trace = zones_trace;
bufferlist in;
::encode(call, in);
o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in);
const cls_rgw_obj_key& key,
rgw_bucket_dir_entry_meta& dir_meta,
list<cls_rgw_obj_key> *remove_objs, bool log_op,
- uint16_t bilog_flags)
+ uint16_t bilog_flags,
+ rgw_zone_set& zones_trace)
{
bufferlist in;
call.bilog_flags = bilog_flags;
if (remove_objs)
call.remove_objs = *remove_objs;
+ call.zones_trace = zones_trace;
::encode(call, in);
o.exec(RGW_CLASS, RGW_BUCKET_COMPLETE_OP, in);
}
int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag,
bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op)
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
{
bufferlist in, out;
struct rgw_cls_link_olh_op call;
call.log_op = log_op;
call.unmod_since = unmod_since;
call.high_precision_time = high_precision_time;
+ call.zones_trace = zones_trace;
::encode(call, in);
int r = io_ctx.exec(oid, RGW_CLASS, RGW_BUCKET_LINK_OLH, in, out);
if (r < 0)
int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
const cls_rgw_obj_key& key, const string& op_tag,
- const string& olh_tag, uint64_t olh_epoch, bool log_op)
+ const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace)
{
bufferlist in, out;
struct rgw_cls_unlink_instance_op call;
call.olh_epoch = olh_epoch;
call.olh_tag = olh_tag;
call.log_op = log_op;
+ call.zones_trace = zones_trace;
::encode(call, in);
int r = io_ctx.exec(oid, RGW_CLASS, RGW_BUCKET_UNLINK_INSTANCE, in, out);
if (r < 0)
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
const cls_rgw_obj_key& key, const string& locator, bool log_op,
- uint16_t bilog_op);
+ uint16_t bilog_op, rgw_zone_set& zones_trace);
void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
rgw_bucket_dir_entry_meta& dir_meta,
list<cls_rgw_obj_key> *remove_objs, bool log_op,
- uint16_t bilog_op);
+ uint16_t bilog_op, rgw_zone_set& zones_trace);
void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes);
void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr);
int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag,
bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op);
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace);
int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, const string& op_tag,
- const string& olh_tag, uint64_t olh_epoch, bool log_op);
+ const string& olh_tag, uint64_t olh_epoch, bool log_op, rgw_zone_set& zones_trace);
int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
const string& olh_tag,
map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
f->dump_string("locator", locator);
f->dump_bool("log_op", log_op);
f->dump_int("bilog_flags", bilog_flags);
+ ::encode_json("zones_trace", zones_trace, f);
}
void rgw_cls_obj_complete_op::generate_test_instances(list<rgw_cls_obj_complete_op*>& o)
f->dump_string("tag", tag);
f->dump_bool("log_op", log_op);
f->dump_int("bilog_flags", bilog_flags);
+ ::encode_json("zones_trace", zones_trace, f);
}
void rgw_cls_link_olh_op::generate_test_instances(list<rgw_cls_link_olh_op*>& o)
utime_t ut(unmod_since);
::encode_json("unmod_since", ut, f);
::encode_json("high_precision_time", high_precision_time, f);
+ ::encode_json("zones_trace", zones_trace, f);
}
void rgw_cls_unlink_instance_op::generate_test_instances(list<rgw_cls_unlink_instance_op*>& o)
::encode_json("olh_epoch", olh_epoch, f);
::encode_json("log_op", log_op, f);
::encode_json("bilog_flags", (uint32_t)bilog_flags, f);
+ ::encode_json("zones_trace", zones_trace, f);
}
void rgw_cls_read_olh_log_op::generate_test_instances(list<rgw_cls_read_olh_log_op*>& o)
string locator;
bool log_op;
uint16_t bilog_flags;
+ rgw_zone_set zones_trace;
rgw_cls_obj_prepare_op() : op(CLS_RGW_OP_UNKNOWN), log_op(false), bilog_flags(0) {}
void encode(bufferlist &bl) const {
- ENCODE_START(6, 5, bl);
+ ENCODE_START(7, 5, bl);
uint8_t c = (uint8_t)op;
::encode(c, bl);
::encode(tag, bl);
::encode(log_op, bl);
::encode(key, bl);
::encode(bilog_flags, bl);
+ ::encode(zones_trace, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(7, 3, 3, bl);
uint8_t c;
::decode(c, bl);
op = (RGWModifyOp)c;
if (struct_v >= 6) {
::decode(bilog_flags, bl);
}
+ if (struct_v >= 7) {
+ ::decode(zones_trace, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
uint16_t bilog_flags;
list<cls_rgw_obj_key> remove_objs;
+ rgw_zone_set zones_trace;
rgw_cls_obj_complete_op() : op(CLS_RGW_OP_ADD), log_op(false), bilog_flags(0) {}
void encode(bufferlist &bl) const {
- ENCODE_START(8, 7, bl);
+ ENCODE_START(9, 7, bl);
uint8_t c = (uint8_t)op;
::encode(c, bl);
::encode(ver.epoch, bl);
::encode(log_op, bl);
::encode(key, bl);
::encode(bilog_flags, bl);
+ ::encode(zones_trace, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START_LEGACY_COMPAT_LEN(8, 3, 3, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(9, 3, 3, bl);
uint8_t c;
::decode(c, bl);
op = (RGWModifyOp)c;
if (struct_v >= 8) {
::decode(bilog_flags, bl);
}
+ if (struct_v >= 9) {
+ ::decode(zones_trace, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
uint16_t bilog_flags;
real_time unmod_since; /* only create delete marker if newer then this */
bool high_precision_time;
+ rgw_zone_set zones_trace;
rgw_cls_link_olh_op() : delete_marker(false), olh_epoch(0), log_op(false), bilog_flags(0), high_precision_time(false) {}
void encode(bufferlist& bl) const {
- ENCODE_START(4, 1, bl);
+ ENCODE_START(5, 1, bl);
::encode(key, bl);
::encode(olh_tag, bl);
::encode(delete_marker, bl);
::encode(t, bl);
::encode(unmod_since, bl);
::encode(high_precision_time, bl);
+ ::encode(zones_trace, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(4, bl);
+ DECODE_START(5, bl);
::decode(key, bl);
::decode(olh_tag, bl);
::decode(delete_marker, bl);
unmod_since = ceph::real_clock::from_time_t(static_cast<time_t>(t));
}
if (struct_v >= 3) {
+ uint64_t t;
+ ::decode(t, bl);
::decode(unmod_since, bl);
}
if (struct_v >= 4) {
::decode(high_precision_time, bl);
}
+ if (struct_v >= 5) {
+ ::decode(zones_trace, bl);
+ }
DECODE_FINISH(bl);
}
bool log_op;
uint16_t bilog_flags;
string olh_tag;
+ rgw_zone_set zones_trace;
rgw_cls_unlink_instance_op() : olh_epoch(0), log_op(false), bilog_flags(0) {}
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
::encode(key, bl);
::encode(op_tag, bl);
::encode(olh_epoch, bl);
::encode(log_op, bl);
::encode(bilog_flags, bl);
::encode(olh_tag, bl);
+ ::encode(zones_trace, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
::decode(key, bl);
::decode(op_tag, bl);
::decode(olh_epoch, bl);
if (struct_v >= 2) {
::decode(olh_tag, bl);
}
+ if (struct_v >= 3) {
+ ::decode(zones_trace, bl);
+ }
DECODE_FINISH(bl);
}
bilog_flags = (uint16_t)f;
JSONDecoder::decode_json("owner", owner, obj);
JSONDecoder::decode_json("owner_display_name", owner_display_name, obj);
+ JSONDecoder::decode_json("zones_trace", zones_trace, obj);
}
void rgw_bi_log_entry::dump(Formatter *f) const
f->dump_bool("versioned", (bilog_flags & RGW_BILOG_FLAG_VERSIONED_OP) != 0);
f->dump_string("owner", owner);
f->dump_string("owner_display_name", owner_display_name);
+ encode_json("zones_trace", zones_trace, f);
}
void rgw_bi_log_entry::generate_test_instances(list<rgw_bi_log_entry*>& ls)
class Formatter;
}
+using rgw_zone_set = std::set<std::string>;
+
enum RGWPendingState {
CLS_RGW_STATE_PENDING_MODIFY = 0,
CLS_RGW_STATE_COMPLETE = 1,
uint16_t bilog_flags;
string owner; /* only being set if it's a delete marker */
string owner_display_name; /* only being set if it's a delete marker */
+ rgw_zone_set zones_trace;
rgw_bi_log_entry() : op(CLS_RGW_OP_UNKNOWN), state(CLS_RGW_STATE_PENDING_MODIFY), index_ver(0), bilog_flags(0) {}
void encode(bufferlist &bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(4, 1, bl);
::encode(id, bl);
::encode(object, bl);
::encode(timestamp, bl);
::encode(bilog_flags, bl);
::encode(owner, bl);
::encode(owner_display_name, bl);
+ ::encode(zones_trace, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
- DECODE_START(2, bl);
+ DECODE_START(4, bl);
::decode(id, bl);
::decode(object, bl);
::decode(timestamp, bl);
::decode(owner, bl);
::decode(owner_display_name, bl);
}
+ if (struct_v >= 4) {
+ ::decode(zones_trace, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
NULL, /* string *petag, */
NULL, /* struct rgw_err *err, */
NULL, /* void (*progress_cb)(off_t, void *), */
- NULL); /* void *progress_data*); */
+ NULL, /* void *progress_data*); */
+ zones_trace);
if (r < 0) {
ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
del_op.params.obj_owner.set_name(owner_display_name);
del_op.params.mtime = timestamp;
del_op.params.high_precision_time = true;
+ del_op.params.zones_trace = zones_trace;
ret = del_op.delete_obj();
if (ret < 0) {
real_time src_mtime;
bool copy_if_newer;
+ rgw_zone_set *zones_trace;
protected:
int _send_request() override;
RGWBucketInfo& _bucket_info,
const rgw_obj_key& _key,
uint64_t _versioned_epoch,
- bool _if_newer) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer) {}
+ copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
};
class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
bool copy_if_newer;
RGWAsyncFetchRemoteObj *req;
+ rgw_zone_set *zones_trace;
public:
RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWBucketInfo& _bucket_info,
const rgw_obj_key& _key,
uint64_t _versioned_epoch,
- bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), req(NULL) {}
+ copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
~RGWFetchRemoteObjCR() override {
int send_request() override {
req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
- key, versioned_epoch, copy_if_newer);
+ key, versioned_epoch, copy_if_newer, zones_trace);
async_rados->queue(req);
return 0;
}
bool del_if_older;
ceph::real_time timestamp;
+ rgw_zone_set *zones_trace;
protected:
int _send_request() override;
uint64_t _versioned_epoch,
bool _delete_marker,
bool _if_older,
- real_time& _timestamp) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ real_time& _timestamp,
+ rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
versioned(_versioned),
versioned_epoch(_versioned_epoch),
del_if_older(_if_older),
- timestamp(_timestamp) {
+ timestamp(_timestamp), zones_trace(_zones_trace) {
if (_delete_marker) {
marker_version_id = key.instance;
}
real_time timestamp;
RGWAsyncRemoveObj *req;
+
+ rgw_zone_set *zones_trace;
public:
RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
string *_owner,
string *_owner_display_name,
bool _delete_marker,
- real_time *_timestamp) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ real_time *_timestamp,
+ rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
versioned(_versioned),
versioned_epoch(_versioned_epoch),
- delete_marker(_delete_marker), req(NULL) {
+ delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) {
del_if_older = (_timestamp != NULL);
if (_timestamp) {
timestamp = *_timestamp;
int send_request() override {
req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
key, owner, owner_display_name, versioned, versioned_epoch,
- delete_marker, del_if_older, timestamp);
+ delete_marker, del_if_older, timestamp, zones_trace);
async_rados->queue(req);
return 0;
}
public:
RGWDefaultDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override;
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override;
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
};
class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
return 0;
}
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
key, versioned_epoch,
- true);
+ true, zones_trace);
}
RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
- real_time& mtime, bool versioned, uint64_t versioned_epoch)
+ real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
bucket_info, key, versioned, versioned_epoch,
- NULL, NULL, false, &mtime);
+ NULL, NULL, false, &mtime, zones_trace);
}
RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch)
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
bucket_info, key, versioned, versioned_epoch,
- &owner.id, &owner.display_name, true, &mtime);
+ &owner.id, &owner.display_name, true, &mtime, zones_trace);
}
class RGWDataSyncControlCR : public RGWBackoffControlCR
bool error_injection;
RGWDataSyncModule *data_sync_module;
+
+ rgw_zone_set zones_trace;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
real_time& _timestamp,
const rgw_bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
- const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+ const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket_info(_bucket_info), bs(bs),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
op_state(_op_state),
entry_marker(_entry_marker),
marker_tracker(_marker_tracker),
- sync_status(0) {
+ sync_status(0){
stringstream ss;
ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
data_sync_module = sync_env->sync_module->get_data_handler();
+
+ zones_trace = _zones_trace;
+ zones_trace.insert(sync_env->store->get_zone().id);
}
int operate() override {
set_status("syncing obj");
ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
logger.log("fetch");
- call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
+ call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
set_status("removing obj");
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
logger.log("remove");
- call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
+ call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
logger.log("creating delete marker");
set_status("creating delete marker");
ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
+ call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
}
}
} while (marker_tracker->need_retry(key));
const string& status_oid;
RGWDataSyncDebugLogger logger;
+ rgw_zone_set zones_trace;
public:
RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
RGWBucketInfo *_bucket_info,
marker_tracker(sync_env, status_oid, full_marker),
status_oid(status_oid) {
logger.init(sync_env, "BucketFull", bs.get_key());
+ zones_trace.insert(sync_env->source_zone);
}
int operate() override;
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
} else {
op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
-
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
entry->owner, op, CLS_RGW_STATE_COMPLETE,
- entry->key, &marker_tracker),
+ entry->key, &marker_tracker, zones_trace),
false);
}
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
RGWBucketIncSyncShardMarkerTrack marker_tracker;
bool updated_status{false};
const string& status_oid;
+ const string& zone_id;
string cur_id;
rgw_bucket_shard_inc_sync_marker& _inc_marker)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
- marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) {
+ marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
set_description() << "bucket shard incremental sync bucket="
<< bucket_shard_str{bs};
set_status("init");
if (e.state != CLS_RGW_STATE_COMPLETE) {
continue;
}
+ if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
+ continue;
+ }
auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
if (squash_entry.first <= e.timestamp) {
squash_entry = make_pair<>(e.timestamp, e.op);
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
+ if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
+ set_status() << "redundant operation, skipping";
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
+ <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
+ marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
+ continue;
+ }
if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
set_status() << "squashed operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
spawn(new SyncCR(sync_env, bucket_info, bs, key,
entry->is_versioned(), versioned_epoch,
entry->timestamp, owner, entry->op, entry->state,
- cur_id, &marker_tracker),
+ cur_id, &marker_tracker, entry->zones_trace),
false);
}
// }
map<string, bufferlist>& attrs,
real_time delete_at,
const char *if_match,
- const char *if_nomatch, const string *user_data)
+ const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace)
{
complete_writing_data();
head_obj_op.meta.mtime = mtime;
head_obj_op.meta.owner = s->owner.get_id();
head_obj_op.meta.delete_at = delete_at;
+ head_obj_op.meta.zones_trace = zones_trace;
int r = head_obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0)
int RGWPutObjProcessor::complete(size_t accounted_size, const string& etag,
real_time *mtime, real_time set_mtime,
map<string, bufferlist>& attrs, real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data)
+ const char *if_match, const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace)
{
- int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data);
+ int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace);
if (r < 0)
return r;
map<string, bufferlist>& attrs,
real_time delete_at,
const char *if_match,
- const char *if_nomatch, const string *user_data) {
+ const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace) {
int r = complete_writing_data();
if (r < 0)
return r;
obj_op.meta.olh_epoch = olh_epoch;
obj_op.meta.delete_at = delete_at;
obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
r = obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0) {
state = NULL;
if (versioned_op) {
- r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false);
+ r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
if (r < 0) {
return r;
}
RGWRados::Bucket bop(target->get_store(), bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());
-
+ index_op.set_zones_trace(meta.zones_trace);
+
bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
int r;
if (assume_noent) {
}
int complete(const string& etag, real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs, real_time delete_at) {
- return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at);
+ map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
+ return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
}
bool is_canceled() {
ceph::buffer::list *petag,
struct rgw_err *err,
void (*progress_cb)(off_t, void *),
- void *progress_data)
+ void *progress_data,
+ rgw_zone_set *zones_trace)
{
/* source is in a different zonegroup, copy from there */
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
- ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at);
+ ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
if (ret < 0) {
goto set_err_state;
}
meta.mtime = params.mtime;
}
- int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time);
+ int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time, params.zones_trace);
if (r < 0) {
return r;
}
return r;
}
result.delete_marker = dirent.is_delete_marker();
- r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch);
+ r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch, params.zones_trace);
if (r < 0) {
return r;
}
RGWRados::Bucket bop(store, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
-
+
+ index_op.set_zones_trace(params.zones_trace);
index_op.set_bilog_flags(params.bilog_flags);
const rgw_obj& obj,
int versioning_status,
uint16_t bilog_flags,
- const real_time& expiration_time)
+ const real_time& expiration_time,
+ rgw_zone_set *zones_trace)
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.versioning_status = versioning_status;
del_op.params.bilog_flags = bilog_flags;
del_op.params.expiration_time = expiration_time;
+ del_op.params.zones_trace = zones_trace;
return del_op.delete_obj();
}
}
}
- int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags);
+ int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
if (r < 0) {
return r;
}
ent.meta.owner_display_name = owner.get_display_name();
ent.meta.content_type = content_type;
- ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags);
+ ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
return ret;
}
- ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags);
+ ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
return ret;
}
- ret = store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags);
+ ret = store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
/*
* need to update data log anyhow, so that whoever follows needs to update its internal markers
const string& op_tag,
struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- real_time unmod_since, bool high_precision_time)
+ real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}
+
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ else {
+ zones_trace.insert(get_zone().id);
+ }
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
ret = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
unmod_since, high_precision_time,
- get_zone().log_data);
+ get_zone().log_data, zones_trace);
if (ret < 0) {
return ret;
}
}
int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance,
- const string& op_tag, const string& olh_tag, uint64_t olh_epoch)
+ const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *_zones_trace)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
return ret;
}
+
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ zones_trace.insert(get_zone().id);
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data);
+ ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data, zones_trace);
if (ret < 0) {
return ret;
}
int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
- uint64_t *plast_ver)
+ uint64_t *plast_ver, rgw_zone_set* zones_trace)
{
if (log.empty()) {
return 0;
liter != remove_instances.end(); ++liter) {
cls_rgw_obj_key& key = *liter;
rgw_obj obj_instance(bucket, key);
- int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP);
+ int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP, ceph::real_time(), zones_trace);
if (ret < 0 && ret != -ENOENT) {
ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
return ret;
/*
* read olh log and apply it
*/
-int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
+int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace)
{
map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
bool is_truncated;
if (ret < 0) {
return ret;
}
- ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker);
+ ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace);
if (ret < 0) {
return ret;
}
}
int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, real_time unmod_since, bool high_precision_time)
+ uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
{
string op_tag;
int ret = 0;
int i;
-
+
#define MAX_ECANCELED_RETRY 100
for (i = 0; i < MAX_ECANCELED_RETRY; i++) {
if (ret == -ECANCELED) {
}
return ret;
}
- ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time);
+ ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {
}
int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
- uint64_t olh_epoch)
+ uint64_t olh_epoch, rgw_zone_set *zones_trace)
{
string op_tag;
string olh_tag(state->olh_tag.c_str(), state->olh_tag.length());
- ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch);
+ ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
return -EIO;
}
- ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
+ ret = update_olh(obj_ctx, state, bucket_info, olh_obj, zones_trace);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
return 0;
}
}
int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
- rgw_obj& obj, uint16_t bilog_flags)
+ rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
{
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ else {
+ zones_trace.insert(get_zone().id);
+ }
+
ObjectWriteOperation o;
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
- cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags);
+ cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags, zones_trace);
return bs.index_ctx.operate(bs.bucket_obj, &o);
}
int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+ list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
{
list<cls_rgw_obj_key> *pro = NULL;
list<cls_rgw_obj_key> ro;
dir_meta = ent.meta;
dir_meta.category = category;
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ else {
+ zones_trace.insert(get_zone().id);
+ }
+
rgw_bucket_entry_ver ver;
ver.pool = pool;
ver.epoch = epoch;
cls_rgw_obj_key key(ent.key.name, ent.key.instance);
cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
- get_zone().log_data, bilog_flags);
+ get_zone().log_data, bilog_flags, zones_trace);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+ list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
rgw_obj& obj,
real_time& removed_mtime,
list<rgw_obj_index_key> *remove_objs,
- uint16_t bilog_flags)
+ uint16_t bilog_flags,
+ rgw_zone_set *zones_trace)
{
rgw_bucket_dir_entry ent;
ent.meta.mtime = removed_mtime;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
}
-int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
rgw_bucket_dir_entry ent;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)
ceph::real_time delete_at;
bool canceled;
const string *user_data;
+ rgw_zone_set *zones_trace;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
- if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr) {}
+ if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr), zones_trace(nullptr) {}
} meta;
explicit Write(RGWRados::Object *_target) : target(_target) {}
ceph::real_time unmod_since;
ceph::real_time mtime; /* for setting delete marker mtime */
bool high_precision_time;
+ rgw_zone_set *zones_trace;
- DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL), high_precision_time(false) {}
+ DeleteParams() : versioning_status(0), olh_epoch(0), bilog_flags(0), remove_objs(NULL), high_precision_time(false), zones_trace(nullptr) {}
} params;
struct DeleteResult {
bool bs_initialized{false};
bool blind;
bool prepared{false};
+ rgw_zone_set *zones_trace{nullptr};
public:
UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj),
void set_bilog_flags(uint16_t flags) {
bilog_flags = flags;
}
+
+ void set_zones_trace(rgw_zone_set *_zones_trace) {
+ zones_trace = _zones_trace;
+ }
int prepare(RGWModifyOp, const string *write_tag);
int complete(int64_t poolid, uint64_t epoch, uint64_t size,
ceph::buffer::list *petag,
struct rgw_err *err,
void (*progress_cb)(off_t, void *),
- void *progress_data);
+ void *progress_data,
+ rgw_zone_set *zones_trace= nullptr);
/**
* Copy an object.
* dest_obj: the object to copy into
const rgw_obj& src_obj,
int versioning_status,
uint16_t bilog_flags = 0,
- const ceph::real_time& expiration_time = ceph::real_time());
+ const ceph::real_time& expiration_time = ceph::real_time(),
+ rgw_zone_set *zones_trace = nullptr);
/** Delete a raw object.*/
int delete_raw_obj(const rgw_raw_obj& obj);
const rgw_obj& obj_instance, bool delete_marker,
const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- ceph::real_time unmod_since, bool high_precision_time);
- int bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance, const string& op_tag, const string& olh_tag, uint64_t olh_epoch);
+ ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
+ int bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance, const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
int bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance, uint64_t ver_marker,
map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
int bucket_index_trim_olh_log(const RGWBucketInfo& bucket_info, RGWObjState& obj_state, const rgw_obj& obj_instance, uint64_t ver);
int bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance);
int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
- uint64_t *plast_ver);
- int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj);
+ uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
+ int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time);
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
int unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
- uint64_t olh_epoch);
+ uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
void check_pending_olh_entries(map<string, bufferlist>& pending_entries, map<string, bufferlist> *rm_pending_entries);
int remove_olh_pending_entries(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, map<string, bufferlist>& pending_attrs);
map<string, bufferlist> *pattrs, bool create_entry_point);
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags);
+ int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
- rgw_bucket_dir_entry& ent, RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags);
+ rgw_bucket_dir_entry& ent, RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent,
- RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags);
+ RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj,
- ceph::real_time& removed_mtime, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags);
- int cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags);
+ ceph::real_time& removed_mtime, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
+ int cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout);
int cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
virtual int do_complete(size_t accounted_size, const string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data) = 0;
+ const char *if_match, const char *if_nomatch, const string *user_data,
+ rgw_zone_set* zones_trace = nullptr) = 0;
public:
RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL),
int complete(size_t accounted_size, const string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr);
+ const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr,
+ rgw_zone_set *zones_trace = nullptr);
CephContext *ctx();
int do_complete(size_t accounted_size, const string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data) override;
+ const char *if_match, const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) override;
int prepare_next_part(off_t ofs);
int complete_parts();
int do_complete(size_t accounted_size, const string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data) override;
+ const char *if_match, const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace) override;
public:
bool immutable_head() { return true; }
RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) :
RGWDataSyncModule() {}
virtual ~RGWDataSyncModule() {}
- virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) = 0;
+ virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- bool versioned, uint64_t versioned_epoch) = 0;
+ bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) = 0;
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
};
class RGWSyncModuleInstance {
delete conf.conn;
}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
public:
RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
- rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
#include <string>
#include <vector>
#include <map>
+#include <set>
using namespace librados;
{
ObjectWriteOperation *op = mgr.write_op();
cls_rgw_obj_key key(obj, string());
- cls_rgw_bucket_prepare_op(*op, index_op, tag, key, loc, true, 0);
+ rgw_zone_set zones_trace;
+ cls_rgw_bucket_prepare_op(*op, index_op, tag, key, loc, true, 0, zones_trace);
ASSERT_EQ(0, ioctx.operate(oid, op));
}
ver.pool = ioctx.get_id();
ver.epoch = epoch;
meta.accounted_size = meta.size;
- cls_rgw_bucket_complete_op(*op, index_op, tag, ver, key, meta, NULL, true, 0);
+ rgw_zone_set zones_trace;
+ cls_rgw_bucket_complete_op(*op, index_op, tag, ver, key, meta, NULL, true, 0, zones_trace);
ASSERT_EQ(0, ioctx.operate(oid, op));
}