return 0;
}
-#if 0
+class RGWBucketSyncSourcesManager {
+public:
+ static string sync_sources_oid(const rgw_bucket bucket) {
+ return bucket_sync_sources_oid_prefix + "." + bucket.to_str();
+ }
+
+ static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
+ return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)),
+ }
+};
+
+struct rgw_bucket_sync_source_local_info {
+ string id;
+ string type;
+ string zone;
+ rgw_bucket bucket;
+ /* FIXME: config */
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(type, bl);
+ encode(zone, bl);
+ encode(bucket, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(type, bl);
+ decode(zone, bl);
+ decode(bucket, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter *f) const {
+ encode_json("id", id, f);
+ encode_json("type", type, f);
+ encode_json("zone", zone, f);
+ encode_json("bucket", bucket, f);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_sync_source_local_info)
+
+struct rgw_bucket_sync_sources_local_info {
+ map<string, rgw_bucket_sync_source_local_info> sources; /* id -> source */
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(sources, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(sources, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter *f) const {
+ encode_json("sources", type, f);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
+
+class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+ rgw_bucket bucket;
+
+ rgw_raw_obj sources_obj;
+
+ RGWSyncTraceNodeRef tn;
+
+public:
+ RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc,
+ const rgw_bucket& _bucket,
+ const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_zone_svc->cct),
+ bucket(_bucket),
+ source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
+ SSTR(bucket))) {
+ }
+
+ int operate() override;
+};
+
+int RGWReadBucketSourcesInfoCR::operate()
+{
+ reenter(this) {
+ yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
+ sync_env->svc.sysobj,
+ RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
+ &pinfo));
+ return set_cr_done();
+ }
+}
+
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
rgw_sync_source source;
- RGWMetaSyncEnv meta_sync_env;
- const std::string status_oid;
+ rgw_raw_obj sources_obj;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
RGWSyncTraceNodeRef tn;
public:
- RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, const rgw_bucket& bucket, const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bucket(_bucket),
- status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, bucket)),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_source",
- SSTR(bucket_shard_str{bs}))) {
+ RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc,
+ const rgw_bucket& _bucket,
+ const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env),
+ bucket(_bucket),
+ source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
+ SSTR(bucket))) {
}
~RGWRunBucketSourcesSyncCR() override {
if (lease_cr) {
set_status("acquiring sync lock");
auto store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(store->svc->zone->get_zone_params().log_pool, status_oid),
+ RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
"sync_lock",
cct->_conf->rgw_sync_lease_period,
this));
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketSourcesInfoCR(sc, bs.bucket, &info));
+ yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
return 0;
}
-#endif
int RGWRunBucketSyncCoroutine::operate()
{