]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync env cleanup
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 25 Jan 2016 23:58:36 +0000 (15:58 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 21:03:32 +0000 (13:03 -0800)
pass RGWDataSyncEnv container object instead of passing around multiple
params. (incomplete)

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 2507d31aeb3c70a1604f670d7387dc9d1f425e7d..887f174f890a676ab81e093f5e0b4faa3878d75d 100644 (file)
@@ -57,23 +57,20 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
 };
 
 class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_data_sync_info> {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-  RGWObjectCtx& obj_ctx;
+  RGWDataSyncEnv *sync_env;
 
-  string source_zone;
+  RGWObjectCtx& obj_ctx;
 
   rgw_data_sync_status *sync_status;
 
 public:
-  RGWReadDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     RGWObjectCtx& _obj_ctx, const string& _source_zone,
-                     rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
-                                                                           _store->get_zone_params().log_pool,
-                                                                           RGWDataSyncStatusManager::sync_status_oid(_source_zone),
+  RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, RGWObjectCtx& _obj_ctx,
+                     rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx,
+                                                                           _sync_env->store->get_zone_params().log_pool,
+                                                                           RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone),
                                                                            &_status->sync_info),
-                                                                            async_rados(_async_rados), store(_store),
-                                                                            obj_ctx(_obj_ctx), source_zone(_source_zone),
+                                                                            sync_env(_sync_env),
+                                                                            obj_ctx(_obj_ctx),
                                                                            sync_status(_status) {}
 
   int handle_data(rgw_data_sync_info& data);
@@ -86,18 +83,16 @@ int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data)
   }
 
   map<uint32_t, rgw_data_sync_marker>& markers = sync_status->sync_markers;
+  RGWRados *store = sync_env->store;
   for (int i = 0; i < (int)data.num_shards; i++) {
-    spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
-                                                    RGWDataSyncStatusManager::shard_obj_name(source_zone, i), &markers[i]), true);
+    spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+                                                    RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true);
   }
   return 0;
 }
 
 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   RGWRESTReadResource *http_op;
 
@@ -105,11 +100,9 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
   RGWDataChangesLogInfo *shard_info;
 
 public:
-  RGWReadRemoteDataLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
-                                                      int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+  RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
+                                                      int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                       http_op(NULL),
                                                       shard_id(_shard_id),
                                                       shard_info(_shard_info) {
@@ -133,13 +126,13 @@ public:
 
         string p = "/admin/log/";
 
-        http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+        http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
 
         http_op->set_user_info((void *)stack);
 
         int ret = http_op->aio_read();
         if (ret < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
           return set_cr_error(ret);
         }
@@ -173,10 +166,7 @@ struct read_remote_data_log_response {
 };
 
 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   RGWRESTReadResource *http_op;
 
@@ -188,11 +178,9 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
   read_remote_data_log_response response;
 
 public:
-  RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
-                              int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+  RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
+                              int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                       http_op(NULL),
                                                       shard_id(_shard_id),
                                                       pmarker(_pmarker),
@@ -216,13 +204,13 @@ public:
 
         string p = "/admin/log/";
 
-        http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+        http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
 
         http_op->set_user_info((void *)stack);
 
         int ret = http_op->aio_read();
         if (ret < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
           log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
           return set_cr_error(ret);
         }
@@ -246,11 +234,10 @@ public:
 };
 
 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
+
   RGWRados *store;
-  RGWHTTPManager *http_manager;
   RGWObjectCtx& obj_ctx;
-  string source_zone;
 
   string sync_status_oid;
 
@@ -259,10 +246,10 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
   rgw_data_sync_info status;
   map<int, RGWDataChangesLogInfo> shards_info;
 public:
-  RGWInitDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
-                     RGWObjectCtx& _obj_ctx, const string& _source_zone, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
-                                                http_manager(_http_mgr),
-                                                obj_ctx(_obj_ctx), source_zone(_source_zone) {
+  RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
+                     RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
+                                                sync_env(_sync_env), store(sync_env->store),
+                                                obj_ctx(_obj_ctx) {
     lock_name = "sync_lock";
     status.num_shards = _num_shards;
 
@@ -272,7 +259,7 @@ public:
     gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
     string cookie = buf;
 
-    sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(source_zone);
+    sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
   }
 
   int operate() {
@@ -280,7 +267,7 @@ public:
     reenter(this) {
       yield {
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
          ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
@@ -288,12 +275,12 @@ public:
        }
       }
       yield {
-        call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 sync_status_oid, status));
       }
       yield { /* take lock again, we just recreated the object */
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
          ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
@@ -302,13 +289,13 @@ public:
       }
       /* fetch current position in logs */
       yield {
-        RGWRESTConn *conn = store->get_zone_conn_by_id(source_zone);
+        RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
         if (!conn) {
-          ldout(cct, 0) << "ERROR: connection to zone " << source_zone << " does not exist!" << dendl;
+          ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
           return set_cr_error(-EIO);
         }
         for (int i = 0; i < (int)status.num_shards; i++) {
-          spawn(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, i, &shards_info[i]), true);
+          spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
        }
       }
       while (collect(&ret)) {
@@ -323,17 +310,17 @@ public:
           RGWDataChangesLogInfo& info = shards_info[i];
          marker.next_step_marker = info.marker;
          marker.timestamp = info.last_update;
-          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
-                                                         RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true);
+          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+                                                         RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), marker), true);
         }
       }
       yield {
        status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
-        call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 sync_status_oid, status));
       }
       yield { /* unlock */
-       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie));
       }
       while (collect(&ret)) {
@@ -353,7 +340,7 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
   rgw_http_param_pair pairs[] = { { "type", "data" },
                                   { NULL, NULL } };
 
-  int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
+  int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
     return ret;
@@ -370,8 +357,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn)
     return 0;
   }
 
-  source_zone = _source_zone;
-  conn = _conn;
+  sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, NULL /* error_logger */, _source_zone);
 
   int ret = http_manager.set_threaded();
   if (ret < 0) {
@@ -400,7 +386,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id)
                                   { NULL, NULL } };
 
   RGWDataChangesLogInfo info;
-  int ret = conn->get_json_resource("/admin/log", pairs, info);
+  int ret = sync_env.conn->get_json_resource("/admin/log", pairs, info);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
     return ret;
@@ -414,7 +400,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id)
 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
 {
   RGWObjectCtx obj_ctx(store, NULL);
-  int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, sync_status));
+  int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, sync_status));
   if (r == -ENOENT) {
     r = 0;
   }
@@ -424,7 +410,7 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
 int RGWRemoteDataLog::init_sync_status(int num_shards)
 {
   RGWObjectCtx obj_ctx(store, NULL);
-  return run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
+  return run(new RGWInitDataSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
 }
 
 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
@@ -451,12 +437,10 @@ struct bucket_instance_meta_info {
 };
 
 class RGWListBucketIndexesCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+
   RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
 
-  RGWRESTConn *conn;
-  string source_zone;
   rgw_data_sync_status *sync_status;
   int num_shards;
 
@@ -478,16 +462,11 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
   bool failed;
 
 public:
-  RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                         RGWRESTConn *_conn,
-                         const string& _source_zone,
-                         rgw_data_sync_status *_sync_status) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                     conn(_conn), source_zone(_source_zone),
-                                                      sync_status(_sync_status),
+  RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
+                         rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+                                                      store(sync_env->store), sync_status(_sync_status),
                                                      req_ret(0), entries_index(NULL), i(0), failed(false) {
-    oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone; 
+    oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone; 
     path = "/admin/metadata/bucket.instance";
     num_shards = sync_status->sync_info.num_shards;
   }
@@ -497,21 +476,21 @@ public:
 
   int operate() {
     reenter(this) {
-      entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
+      entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
                                                  store->get_zone_params().log_pool,
                                                   oid_prefix);
       yield {
         string entrypoint = string("/admin/metadata/bucket.instance");
 #warning need a better scaling solution here, requires streaming output
-        call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
+        call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
                                                       entrypoint, NULL, &result));
       }
       if (get_ret_status() < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
         return set_state(RGWCoroutine_Error);
       }
       for (iter = result.begin(); iter != result.end(); ++iter) {
-        ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+        ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
 
         key = *iter;
 
@@ -519,7 +498,7 @@ public:
           rgw_http_param_pair pairs[] = { { "key", key.c_str() },
                                           { NULL, NULL } };
 
-          call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
+          call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
         }
 
         num_shards = meta_info.data.get_bucket_info().num_shards;
@@ -545,8 +524,8 @@ public:
           int shard_id = (int)iter->first;
           rgw_data_sync_marker& marker = iter->second;
           marker.total_entries = entries_index->get_total_entries(shard_id);
-          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
-                                                                RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), marker), true);
+          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+                                                                RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id), marker), true);
         }
       }
       int ret;
@@ -565,8 +544,7 @@ public:
 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
 
 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
-  RGWRados *store;
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
 
   string marker_oid;
   rgw_data_sync_marker sync_marker;
@@ -585,11 +563,10 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
   }
 
 public:
-  RGWDataSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+  RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
                          const string& _marker_oid,
                          const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
-                                                                store(_store),
-                                                                async_rados(_async_rados),
+                                                                sync_env(_sync_env),
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
@@ -597,8 +574,10 @@ public:
     sync_marker.marker = new_marker;
     sync_marker.pos = index_pos;
 
-    ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    RGWRados *store = sync_env->store;
+
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 marker_oid, sync_marker);
   }
 
@@ -621,26 +600,18 @@ public:
 };
 
 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  RGWRados *store;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
   string bucket_name;
   string bucket_id;
   RGWBucketInfo bucket_info;
   int shard_id;
   rgw_bucket_shard_sync_info sync_status;
-  RGWMetaSyncEnv sync_env;
+  RGWMetaSyncEnv meta_sync_env;
 
 public:
-  RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                            RGWRESTConn *_conn, RGWRados *_store,
-                            const string& _source_zone,
-                            const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
-                                                                            http_manager(_mgr), async_rados(_async_rados), conn(_conn),
-                                                                            store(_store),
-                                                                           source_zone(_source_zone),
+  RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env,
+                            const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct),
+                                                                            sync_env(_sync_env),
                                                                             bucket_name(_bucket_name),
                                                                            bucket_id(_bucket_id), shard_id(_shard_id) {}
 
@@ -669,12 +640,7 @@ static int parse_bucket_shard(CephContext *cct, const string& raw_key, string *b
 }
 
 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-
-  RGWRESTConn *conn;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
 
   string raw_key;
   string entry_marker;
@@ -689,16 +655,13 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncShardMarkerTrack *marker_tracker;
 
 public:
-  RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                           RGWRESTConn *_conn, const string& _source_zone,
-                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn), source_zone(_source_zone),
+  RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
+                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                      raw_key(_raw_key), entry_marker(_entry_marker),
                                                       sync_status(0),
                                                       marker_tracker(_marker_tracker) {
-    set_description() << "data sync single entry (source_zone=" << source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
+    set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
   }
 
   int operate() {
@@ -706,12 +669,12 @@ public:
       do {
         yield {
           int shard_id;
-          int ret = parse_bucket_shard(store->ctx(), raw_key, &bucket_name, &bucket_instance, &shard_id);
+          int ret = parse_bucket_shard(sync_env->cct, raw_key, &bucket_name, &bucket_instance, &shard_id);
           if (ret < 0) {
             return set_cr_error(-EIO);
           }
           marker_tracker->reset_need_retry(raw_key);
-          call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id));
+          call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id));
         }
       } while (marker_tracker->need_retry(raw_key));
 
@@ -736,14 +699,10 @@ public:
 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
 
 class RGWDataSyncShardCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   rgw_bucket pool;
 
-  string source_zone;
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
@@ -784,21 +743,18 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   RGWContinuousLeaseCR *lease_cr;
   string status_oid;
 public:
-  RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                     RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
-                    uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+  RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
+                     rgw_bucket& _pool,
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                      pool(_pool),
-                                                      source_zone(_source_zone),
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
                                                       lease_cr(NULL) {
-    set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
-    status_oid = RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id);
+    set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
+    status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
   }
 
   ~RGWDataSyncShardCR() {
@@ -852,7 +808,8 @@ public:
       lease_cr->abort();
       lease_cr->put();
     }
-    lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+    RGWRados *store = sync_env->store;
+    lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                         lock_name, lock_duration, this);
     lease_cr->get();
     spawn(lease_cr, false);
@@ -872,28 +829,26 @@ public:
         set_sleeping(true);
         yield;
       }
-      oid = full_data_sync_index_shard_oid(source_zone, shard_id);
-      set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
-                                                         status_oid,
-                                                         sync_marker));
+      oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
       total_entries = sync_marker.pos;
       do {
-        yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, oid, sync_marker.marker, &entries, max_entries));
         if (retcode < 0) {
-          ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
-          ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+          ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
           total_entries++;
           if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
-            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
             if (retcode < 0) {
               lease_cr->go_down();
               drain_all();
@@ -912,11 +867,12 @@ public:
         sync_marker.state = rgw_data_sync_marker::IncrementalSync;
         sync_marker.marker = sync_marker.next_step_marker;
         sync_marker.next_step_marker.clear();
-        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+        RGWRados *store = sync_env->store;
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                                              status_oid, sync_marker));
       }
       if (retcode < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
         lease_cr->go_down();
         return set_cr_error(retcode);
       }
@@ -937,9 +893,7 @@ public:
         yield;
       }
       set_status("lease acquired");
-      set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
-                                                         status_oid,
-                                                         sync_marker));
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
       do {
         current_modified.clear();
         inc_lock.Lock();
@@ -949,40 +903,40 @@ public:
         /* process out of band updates */
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
           yield {
-            ldout(store->ctx(), 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
-            spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, *modified_iter, string(), marker_tracker), false);
+            ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false);
           }
         }
 
-        yield call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info));
+        yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
         if (retcode < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
         datalog_marker = shard_info.marker;
 #define INCREMENTAL_MAX_ENTRIES 100
-       ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (datalog_marker > sync_marker.marker) {
           spawned_keys.clear();
-          yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated));
+          yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
-            ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+            ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
             if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
-              ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+              ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
               marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
               continue;
             }
             if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
-              ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+              ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
             } else {
               /*
                * don't spawn the same key more than once. We can do that as long as we don't yield
                */
               if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
                 spawned_keys.insert(log_iter->entry.key);
-                spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
                 if (retcode < 0) {
                   lease_cr->go_down();
                   drain_all();
@@ -997,7 +951,7 @@ public:
             int ret;
             while (collect(&ret)) {
               if (ret < 0) {
-                ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+                ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
                 /* we should have reported this error */
 #warning deal with error
               }
@@ -1005,7 +959,7 @@ public:
             }
           }
        }
-       ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (datalog_marker == sync_marker.marker) {
 #define INCREMENTAL_INTERVAL 20
          yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
@@ -1017,32 +971,24 @@ public:
 };
 
 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   rgw_bucket pool;
 
-  string source_zone;
   uint32_t shard_id;
   rgw_data_sync_marker sync_marker;
 
 public:
-  RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                     RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
-                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+  RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_bucket& _pool,
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                      pool(_pool),
-                                                      source_zone(_source_zone),
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker) {
   }
 
   RGWCoroutine *alloc_cr() {
-    return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr());
+    return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
   }
 
   void append_modified_shards(set<string>& keys) {
@@ -1058,11 +1004,7 @@ public:
 };
 
 class RGWDataSyncCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
   uint32_t num_shards;
 
   RGWObjectCtx obj_ctx;
@@ -1077,14 +1019,10 @@ class RGWDataSyncCR : public RGWCoroutine {
   bool *reset_backoff;
 
 public:
-  RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
-                                                      source_zone(_source_zone),
+  RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+                                                      sync_env(_sync_env),
                                                       num_shards(_num_shards),
-                                                      obj_ctx(store),
+                                                      obj_ctx(sync_env->store),
                                                       marker_tracker(NULL),
                                                       shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
                                                       reset_backoff(_reset_backoff) {
@@ -1094,21 +1032,21 @@ public:
     reenter(this) {
 
       /* read sync status */
-      yield call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+      yield call(new RGWReadDataSyncStatusCoroutine(sync_env, obj_ctx, &sync_status));
 
       if (retcode == -ENOENT) {
         sync_status.sync_info.num_shards = num_shards;
       } else if (retcode < 0 && retcode != -ENOENT) {
-        ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
         return set_cr_error(retcode);
       }
 
       /* state: init status */
       if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
-        ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
-        yield call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
+        ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
+        yield call(new RGWInitDataSyncStatusCoroutine(sync_env, obj_ctx, sync_status.sync_info.num_shards));
         if (retcode < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
           return set_cr_error(retcode);
         }
         sync_status.sync_info.num_shards = num_shards;
@@ -1117,7 +1055,7 @@ public:
         yield call(set_sync_info_cr());
 
         if (retcode < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
           return set_cr_error(retcode);
         }
 
@@ -1126,14 +1064,14 @@ public:
 
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
         /* state: building full sync maps */
-        ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
-        yield call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, &sync_status));
+        ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
+        yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
         sync_status.sync_info.state = rgw_data_sync_info::StateSync;
 
         /* update new state */
         yield call(set_sync_info_cr());
         if (retcode < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
           return set_cr_error(retcode);
         }
 
@@ -1145,8 +1083,7 @@ public:
           case rgw_data_sync_info::StateSync:
             for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
                  iter != sync_status.sync_markers.end(); ++iter) {
-              RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados,
-                                                        conn, store->get_zone_params().log_pool, source_zone,
+              RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
                                                         iter->first, iter->second);
               shard_crs_lock.Lock();
               shard_crs[iter->first] = cr;
@@ -1162,8 +1099,9 @@ public:
   }
 
   RGWCoroutine *set_sync_info_cr() {
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
-                                                         RGWDataSyncStatusManager::sync_status_oid(source_zone),
+    RGWRados *store = sync_env->store;
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+                                                         RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone),
                                                          sync_status.sync_info);
   }
 
@@ -1180,24 +1118,16 @@ public:
 
 class RGWDataSyncControlCR : public RGWBackoffControlCR
 {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
   uint32_t num_shards;
 
 public:
-  RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards) : RGWBackoffControlCR(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
-                                                      source_zone(_source_zone), num_shards(_num_shards) {
+  RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct),
+                                                      sync_env(_sync_env), num_shards(_num_shards) {
   }
 
   RGWCoroutine *alloc_cr() {
-    return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, num_shards, backoff_ptr());
+    return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
   }
 
   void wakeup(int shard_id, set<string>& keys) {
@@ -1222,14 +1152,14 @@ int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status
 {
   RGWObjectCtx obj_ctx(store, NULL);
 
-  int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+  int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
   if (r < 0 && r != -ENOENT) {
-    ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << source_zone << " r=" << r << dendl;
+    ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl;
     return r;
   }
   
   lock.get_write();
-  data_sync_cr = new RGWDataSyncControlCR(store, &http_manager, async_rados, conn, source_zone, num_shards);
+  data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
   lock.unlock();
   r = run(data_sync_cr);
   if (r < 0) {
@@ -1309,6 +1239,8 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, con
   bucket_id = _bucket_id;
   shard_id = _shard_id;
 
+  sync_env.init(store->ctx(), store, conn, async_rados, http_manager, error_logger, source_zone);
+
   return 0;
 }
 
@@ -1325,11 +1257,7 @@ struct bucket_index_marker_info {
 };
 
 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   string bucket_name;
   string bucket_id;
@@ -1340,13 +1268,9 @@ class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
   bucket_index_marker_info *info;
 
 public:
-  RGWReadRemoteBucketIndexLogInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                                  RGWRESTConn *_conn,
+  RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
                                   const string& _bucket_name, const string& _bucket_id, int _shard_id,
-                                  bucket_index_marker_info *_info) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+                                  bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                       bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
                                                       info(_info) {
     instance_key = bucket_name + ":" + bucket_id;
@@ -1366,7 +1290,7 @@ public:
                                        { NULL, NULL } };
 
         string p = "/admin/log/";
-        call(new RGWReadRESTResourceCR<bucket_index_marker_info>(store->ctx(), conn, http_manager, p, pairs, info));
+        call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
       }
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1377,26 +1301,10 @@ public:
   }
 };
 
-class RGWReadBucketShardSyncStatusCR : public RGWSimpleRadosReadCR<rgw_bucket_shard_sync_info> {
-  map<string, bufferlist> attrs;
-public:
-  RGWReadBucketShardSyncStatusCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
-                     RGWObjectCtx& obj_ctx, const string& source_zone,
-                      const string& bucket_name, const string bucket_id, int shard_id,
-                     rgw_bucket_shard_sync_info *status) : RGWSimpleRadosReadCR(async_rados, store, obj_ctx,
-                                                                           store->get_zone_params().log_pool,
-                                                                           RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
-                                                                            status) {}
-
-};
-
-
 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
   RGWRados *store;
-  RGWHTTPManager *http_manager;
-  string source_zone;
-  RGWRESTConn *conn;
+
   string bucket_name;
   string bucket_id;
   int shard_id;
@@ -1409,12 +1317,10 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
 
   bucket_index_marker_info info;
 public:
-  RGWInitBucketShardSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
-                     const string& _source_zone, RGWRESTConn *_conn,
-                      const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
-                                                                                             http_manager(_http_mgr),
-                                                                                             source_zone(_source_zone), conn(_conn),
+  RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
+                      const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                                                              bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {
+    store = sync_env->store;
     lock_name = "sync_lock";
 
 #define COOKIE_LEN 16
@@ -1423,25 +1329,25 @@ public:
     gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
     string cookie = buf;
 
-    sync_status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+    sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
   }
 
   int operate() {
     reenter(this) {
       yield {
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
          ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
          return set_cr_error(retcode);
        }
       }
-      yield call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+      yield call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 sync_status_oid, status));
       yield { /* take lock again, we just recreated the object */
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie, lock_duration));
        if (retcode < 0) {
          ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
@@ -1449,7 +1355,7 @@ public:
        }
       }
       /* fetch current position in logs */
-      yield call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info));
+      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bucket_name, bucket_id, shard_id, &info));
       if (retcode < 0 && retcode != -ENOENT) {
         ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
         return set_cr_error(retcode);
@@ -1459,11 +1365,11 @@ public:
         status.inc_marker.position = info.max_marker;
         map<string, bufferlist> attrs;
         status.encode_all_attrs(attrs);
-        call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+        call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                             sync_status_oid, attrs));
       }
       yield { /* unlock */
-       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+       call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie));
       }
       return set_cr_done();
@@ -1474,8 +1380,8 @@ public:
 
 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
 {
-  return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
-                                                   conn, bucket_name, bucket_id, shard_id);
+  return new RGWInitBucketShardSyncStatusCoroutine(&sync_env,
+                                                   bucket_name, bucket_id, shard_id);
 }
 
 template <class T>
@@ -1525,22 +1431,19 @@ void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attr
 }
 
 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
+  RGWDataSyncEnv *sync_env;
   RGWObjectCtx obj_ctx;
   string oid;
   rgw_bucket_shard_sync_info *status;
 
   map<string, bufferlist> attrs;
 public:
-  RGWReadBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     const string& _source_zone,
+  RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
                       const string& _bucket_name, const string _bucket_id, int _shard_id,
-                     rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_store->ctx()),
-                                                            async_rados(_async_rados),
-                                                            store(_store),
-                                                            obj_ctx(_store),
-                                                            oid(RGWBucketSyncStatusManager::status_oid(_source_zone, _bucket_name, _bucket_id, _shard_id)),
+                     rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct),
+                                                            sync_env(_sync_env),
+                                                            obj_ctx(sync_env->store),
+                                                            oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)),
                                                             status(_status) {}
   int operate();
 };
@@ -1548,8 +1451,8 @@ public:
 int RGWReadBucketSyncStatusCoroutine::operate()
 {
   reenter(this) {
-    yield call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx,
-                                                   store->get_zone_params().log_pool,
+    yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store, obj_ctx,
+                                                   sync_env->store->get_zone_params().log_pool,
                                                    oid,
                                                    &attrs));
     if (retcode == -ENOENT) {
@@ -1557,18 +1460,17 @@ int RGWReadBucketSyncStatusCoroutine::operate()
       return set_cr_done();
     }
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
       return set_cr_error(retcode);
     }
-    status->decode_from_attrs(store->ctx(), attrs);
+    status->decode_from_attrs(sync_env->cct, attrs);
     return set_cr_done();
   }
   return 0;
 }
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
-  return new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone,
-                                              bucket_name, bucket_id, shard_id, sync_status);
+  return new RGWReadBucketSyncStatusCoroutine(&sync_env, bucket_name, bucket_id, shard_id, sync_status);
 }
 
 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
@@ -1652,11 +1554,7 @@ struct bucket_list_result {
 };
 
 class RGWListBucketShardCR: public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   string bucket_name;
   string bucket_id;
@@ -1668,14 +1566,10 @@ class RGWListBucketShardCR: public RGWCoroutine {
   bucket_list_result *result;
 
 public:
-  RGWListBucketShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                                  RGWRESTConn *_conn,
+  RGWListBucketShardCR(RGWDataSyncEnv *_sync_env,
                                   const string& _bucket_name, const string& _bucket_id, int _shard_id,
                                   rgw_obj_key& _marker_position,
-                                  bucket_list_result *_result) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+                                  bucket_list_result *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                       bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
                                                       marker_position(_marker_position),
                                                       result(_result) {
@@ -1699,7 +1593,7 @@ public:
                                        { NULL, NULL } };
 
         string p = string("/") + bucket_name;
-        call(new RGWReadRESTResourceCR<bucket_list_result>(store->ctx(), conn, http_manager, p, pairs, result));
+        call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
       }
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1711,11 +1605,7 @@ public:
 };
 
 class RGWListBucketIndexLogCR: public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-
-  RGWRESTConn *conn;
+  RGWDataSyncEnv *sync_env;
 
   string bucket_name;
   string bucket_id;
@@ -1727,14 +1617,10 @@ class RGWListBucketIndexLogCR: public RGWCoroutine {
   list<rgw_bi_log_entry> *result;
 
 public:
-  RGWListBucketIndexLogCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                                  RGWRESTConn *_conn,
-                                  const string& _bucket_name, const string& _bucket_id, int _shard_id,
-                                  string& _marker,
-                                  list<rgw_bi_log_entry> *_result) : RGWCoroutine(_store->ctx()), store(_store),
-                                                      http_manager(_mgr),
-                                                     async_rados(_async_rados),
-                                                      conn(_conn),
+  RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env,
+                          const string& _bucket_name, const string& _bucket_id, int _shard_id,
+                          string& _marker,
+                          list<rgw_bi_log_entry> *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                       bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
                                                       marker(_marker),
                                                       result(_result) {
@@ -1755,7 +1641,7 @@ public:
                                        { "type", "bucket-index" },
                                        { NULL, NULL } };
 
-        call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(store->ctx(), conn, http_manager, "/admin/log", pairs, result));
+        call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
       }
       if (retcode < 0) {
         return set_cr_error(retcode);
@@ -1769,18 +1655,16 @@ public:
 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
 
 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
-  RGWRados *store;
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
 
   string marker_oid;
   rgw_bucket_shard_full_sync_marker sync_marker;
 
 public:
-  RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
+  RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
                          const string& _marker_oid,
                          const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
-                                                                store(_store),
-                                                                async_rados(_async_rados),
+                                                                sync_env(_sync_env),
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
@@ -1791,15 +1675,16 @@ public:
     map<string, bufferlist> attrs;
     sync_marker.encode_attr(attrs);
 
-    ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
-    return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+    RGWRados *store = sync_env->store;
+
+    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 marker_oid, attrs);
   }
 };
 
 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
-  RGWRados *store;
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
 
   string marker_oid;
   rgw_bucket_shard_inc_sync_marker sync_marker;
@@ -1818,11 +1703,10 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
   }
 
 public:
-  RGWBucketIncSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
+  RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
                          const string& _marker_oid,
                          const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
-                                                                store(_store),
-                                                                async_rados(_async_rados),
+                                                                sync_env(_sync_env),
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
@@ -1832,8 +1716,10 @@ public:
     map<string, bufferlist> attrs;
     sync_marker.encode_attr(attrs);
 
-    ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
-    return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+    RGWRados *store = sync_env->store;
+
+    ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                 marker_oid, attrs);
   }
 
@@ -1868,10 +1754,8 @@ public:
 
 template <class T, class K>
 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWAsyncRadosProcessor *async_rados;
+  RGWDataSyncEnv *sync_env;
 
-  string source_zone;
   RGWBucketInfo *bucket_info;
   int shard_id;
 
@@ -1890,15 +1774,14 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
 
 
 public:
-  RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
-                             const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id,
+  RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
+                             RGWBucketInfo *_bucket_info, int _shard_id,
                              const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
                              utime_t& _timestamp,
                              const bucket_entry_owner& _owner,
                              RGWModifyOp _op, RGWPendingState _op_state,
-                            const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
-                                                     async_rados(_async_rados),
-                                                     source_zone(_source_zone),
+                            const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+                                                     sync_env(_sync_env),
                                                       bucket_info(_bucket_info), shard_id(_shard_id),
                                                       key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
                                                       owner(_owner),
@@ -1907,8 +1790,8 @@ public:
                                                       entry_marker(_entry_marker),
                                                       marker_tracker(_marker_tracker),
                                                       sync_status(0) {
-    set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
-    ldout(store->ctx(), 20) << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
+    set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
+    ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
     set_status("init");
   }
 
@@ -1925,13 +1808,13 @@ public:
               op == CLS_RGW_OP_LINK_OLH) {
             if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
               set_status("skipping entry");
-              ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
+              ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
               goto done;
 
             }
             set_status("syncing obj");
-            ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
-            call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+            ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+            call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info,
                                          key, versioned_epoch,
                                          true));
           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
@@ -1939,18 +1822,18 @@ public:
             if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
               versioned = true;
             }
-            call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, &timestamp));
+            call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, &timestamp));
           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
             set_status("creating delete marker");
-            ldout(store->ctx(), 10) << "creating delete marker: obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
-            call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &timestamp));
+            ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+            call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &timestamp));
           }
         }
       } while (marker_tracker->need_retry(key));
       if (retcode < 0 && retcode != -ENOENT) {
         set_status() << "failed to sync obj; retcode=" << retcode;
         rgw_bucket& bucket = bucket_info->bucket;
-        ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
         sync_status = retcode;
       }
 done:
@@ -1972,11 +1855,7 @@ done:
 #define BUCKET_SYNC_SPAWN_WINDOW 20
 
 class RGWBucketShardFullSyncCR : public RGWCoroutine {
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  RGWRados *store;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
   string bucket_name;
   string bucket_id;
   int shard_id;
@@ -1996,14 +1875,10 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
 
   string status_oid;
 public:
-  RGWBucketShardFullSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                           RGWRESTConn *_conn, RGWRados *_store,
-                           const string& _source_zone,
+  RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
                            const string& _bucket_name, const string _bucket_id, int _shard_id,
-                           RGWBucketInfo *_bucket_info,  rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_store->ctx()),
-                                                                            http_manager(_mgr), async_rados(_async_rados), conn(_conn),
-                                                                            store(_store),
-                                                                           source_zone(_source_zone),
+                           RGWBucketInfo *_bucket_info,  rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_sync_env->cct),
+                                                                           sync_env(_sync_env),
                                                                             bucket_name(_bucket_name),
                                                                            bucket_id(_bucket_id), shard_id(_shard_id),
                                                                             bucket_info(_bucket_info),
@@ -2011,7 +1886,7 @@ public:
                                                                             spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
                                                                             op(CLS_RGW_OP_ADD),
                                                                             total_entries(0), lease_cr(NULL) {
-    status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+    status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
   }
 
   ~RGWBucketShardFullSyncCR() {
@@ -2032,7 +1907,8 @@ int RGWBucketShardFullSyncCR::operate()
       set_status("acquiring sync lock");
       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
       string lock_name = "sync_lock";
-      lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+      RGWRados *store = sync_env->store;
+      lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                           lock_name, lock_duration, this);
       lease_cr->get();
       spawn(lease_cr, false);
@@ -2048,16 +1924,14 @@ int RGWBucketShardFullSyncCR::operate()
     }
     set_status("lock acquired");
     list_marker = full_marker.position;
-    marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, 
-                                                           status_oid,
-                                                           full_marker);
+    marker_tracker = new RGWBucketFullSyncShardMarkerTrack(sync_env, status_oid, full_marker);
 
     total_entries = full_marker.count;
     do {
       set_status("listing remote bucket");
-      ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl;
-      yield call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
-                                      list_marker, &list_result));
+      ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
+      yield call(new RGWListBucketShardCR(sync_env, bucket_name, bucket_id, shard_id,
+                                          list_marker, &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
         set_status("failed bucket listing, going down");
         yield lease_cr->go_down();
@@ -2066,17 +1940,17 @@ int RGWBucketShardFullSyncCR::operate()
       }
       entries_iter = list_result.entries.begin();
       for (; entries_iter != list_result.entries.end(); ++entries_iter) {
-        ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
+        ldout(sync_env->cct, 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
         entry = &(*entries_iter);
         total_entries++;
         list_marker = entries_iter->key;
         if (!marker_tracker->start(entry->key, total_entries, utime_t())) {
-          ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
+          ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
         } else {
           op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
 
           yield {
-            spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+            spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(sync_env, bucket_info, shard_id,
                                                                            entry->key,
                                                                            false, /* versioned, only matters for object removal */
                                                                            entry->versioned_epoch, entry->mtime,
@@ -2087,7 +1961,7 @@ int RGWBucketShardFullSyncCR::operate()
           yield wait_for_child();
           while (collect(&ret)) {
             if (ret < 0) {
-              ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+              ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
               /* we should have reported this error */
 #warning deal with error
             }
@@ -2104,14 +1978,15 @@ int RGWBucketShardFullSyncCR::operate()
       sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
       map<string, bufferlist> attrs;
       sync_status.encode_state_attr(attrs);
-      string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
-      call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+      string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
+      RGWRados *store = sync_env->store;
+      call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                           oid, attrs));
     }
     yield lease_cr->go_down();
     drain_all();
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
+      ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
         << " retcode=" << retcode << dendl;
       return set_cr_error(retcode);
     }
@@ -2121,11 +1996,7 @@ int RGWBucketShardFullSyncCR::operate()
 }
 
 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  RGWRados *store;
-  string source_zone;
+  RGWDataSyncEnv *sync_env;
   string bucket_name;
   string bucket_id;
   int shard_id;
@@ -2148,21 +2019,17 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
 
 
 public:
-  RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                           RGWRESTConn *_conn, RGWRados *_store,
-                           const string& _source_zone,
+  RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
                            const string& _bucket_name, const string _bucket_id, int _shard_id,
-                           RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_store->ctx()),
-                                                                            http_manager(_mgr), async_rados(_async_rados), conn(_conn),
-                                                                            store(_store),
-                                                                           source_zone(_source_zone),
+                           RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_sync_env->cct),
+                                                                            sync_env(_sync_env),
                                                                             bucket_name(_bucket_name),
                                                                            bucket_id(_bucket_id), shard_id(_shard_id),
                                                                             bucket_info(_bucket_info),
                                                                             inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
                                                                             spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false),
                                                                             lease_cr(NULL) {
-    status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+    status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
     set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
     set_status("init");
   }
@@ -2185,7 +2052,8 @@ int RGWBucketShardIncrementalSyncCR::operate()
       set_status("acquiring sync lock");
       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
       string lock_name = "sync_lock";
-      lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+      RGWRados *store = sync_env->store;
+      lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                           lock_name, lock_duration, this);
       lease_cr->get();
       spawn(lease_cr, false);
@@ -2199,13 +2067,13 @@ int RGWBucketShardIncrementalSyncCR::operate()
       set_sleeping(true);
       yield;
     }
-    marker_tracker = new RGWBucketIncSyncShardMarkerTrack(store, async_rados, 
+    marker_tracker = new RGWBucketIncSyncShardMarkerTrack(sync_env,
                                                           status_oid,
                                                           inc_marker);
     do {
-      ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
+      ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
       set_status() << "listing bilog; position=" << inc_marker.position;
-      yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
+      yield call(new RGWListBucketIndexLogCR(sync_env, bucket_name, bucket_id, shard_id,
                                          inc_marker.position, &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
         /* wait for all operations to complete */
@@ -2220,15 +2088,15 @@ int RGWBucketShardIncrementalSyncCR::operate()
 
         if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
           set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
-          ldout(store->ctx(), 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
+          ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
           continue;
         }
 
-        ldout(store->ctx(), 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
+        ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
 
         if (!ns.empty()) {
           set_status() << "skipping entry in namespace: " << entries_iter->object;
-          ldout(store->ctx(), 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
+          ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
           continue;
         }
 
@@ -2236,42 +2104,42 @@ int RGWBucketShardIncrementalSyncCR::operate()
         set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
         if (entry->op == CLS_RGW_OP_CANCEL) {
           set_status() << "canceled operation, skipping";
-          ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+          ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
           continue;
         }
         if (entry->state != CLS_RGW_STATE_COMPLETE) {
           set_status() << "non-complete operation, skipping";
-          ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
+          ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
           continue;
         }
-        ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
+        ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
         updated_status = false;
         while (!marker_tracker->can_do_op(key, entry->op)) {
           if (!updated_status) {
             set_status() << "can't do op, conflicting inflight operation";
             updated_status = true;
           }
-          ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+          ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
           yield wait_for_child();
           
         }
         if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
           set_status() << "can't do op, sync already in progress for object";
-          ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
+          ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
           marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
           continue;
         }
         // yield {
           set_status() << "start object sync";
           if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
-            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
           } else {
             uint64_t versioned_epoch = 0;
             bucket_entry_owner owner(entry->owner, entry->owner_display_name);
             if (entry->ver.pool < 0) {
               versioned_epoch = entry->ver.epoch;
             }
-            spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+            spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
                                                          key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
                                                          entry->state, entry->id, marker_tracker), false);
           }
@@ -2281,7 +2149,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           yield wait_for_child();
           while (collect(&ret)) {
             if (ret < 0) {
-              ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+              ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
               /* we should have reported this error */
 #warning deal with error
             }
@@ -2302,74 +2170,71 @@ int RGWBucketShardIncrementalSyncCR::operate()
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
-    yield call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, bucket_name, bucket_id, shard_id, &sync_status));
+    yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id, &sync_status));
     if (retcode < 0 && retcode != -ENOENT) {
-      ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
       return set_cr_error(retcode);
     }
 
-    ldout(store->ctx(), 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
+    ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
 
-    yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+    yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
     if (retcode == -ENOENT) {
       /* bucket instance info has not been synced in yet, fetch it now */
       yield {
-        ldout(store->ctx(), 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl;
+        ldout(sync_env->cct, 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl;
         string raw_key = string("bucket.instance:") + bucket_name + ":" + bucket_id;
 
-        sync_env.init(cct, store, store->rest_master_conn, async_rados, http_manager);
+        meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
 
-        call(new RGWMetaSyncSingleEntryCR(&sync_env, raw_key,
+        call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
                                           string() /* no marker */,
                                           MDLOG_STATUS_COMPLETE,
                                           NULL /* no marker tracker */));
       }
       if (retcode < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
+        ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
         return set_cr_error(retcode);
       }
 
-      yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+      yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
     }
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
       return set_cr_error(retcode);
     }
 
     yield {
       if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
-        call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
-                                                       conn, bucket_name, bucket_id, shard_id));
+        call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id));
         sync_status.state = rgw_bucket_shard_sync_info::StateFullSync;
       }
     }
 
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
       return set_cr_error(retcode);
     }
     yield {
       if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
-        call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store,
-                                          source_zone, bucket_name, bucket_id, shard_id,
+        call(new RGWBucketShardFullSyncCR(sync_env, bucket_name, bucket_id, shard_id,
                                           &bucket_info, sync_status.full_marker));
         sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
       }
     }
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
       return set_cr_error(retcode);
     }
 
     yield {
       if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
-        call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store,
-                                                 source_zone, bucket_name, bucket_id, shard_id,
+        call(new RGWBucketShardIncrementalSyncCR(sync_env, bucket_name, bucket_id, shard_id,
                                                  &bucket_info, sync_status.inc_marker));
       }
     }
     if (retcode < 0) {
-      ldout(store->ctx(), 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+      ldout(sync_env->cct, 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
       return set_cr_error(retcode);
     }
 
@@ -2381,7 +2246,7 @@ int RGWRunBucketSyncCoroutine::operate()
 
 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
 {
-  return new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_id, shard_id);
+  return new RGWRunBucketSyncCoroutine(&sync_env, bucket_name, bucket_id, shard_id);
 }
 
 int RGWBucketSyncStatusManager::init()
index e5a59dbf365d3831f17fa4f7a812034bba54d620..c3778d23e02cf6e5bb3b4909c88a329be9da9816 100644 (file)
@@ -141,14 +141,39 @@ class RGWAsyncRadosProcessor;
 class RGWDataSyncStatusManager;
 class RGWDataSyncControlCR;
 
-class RGWRemoteDataLog : public RGWCoroutinesManager {
+struct RGWDataSyncEnv {
+  CephContext *cct;
   RGWRados *store;
   RGWRESTConn *conn;
-  string source_zone;
   RGWAsyncRadosProcessor *async_rados;
+  RGWHTTPManager *http_manager;
+  RGWSyncErrorLogger *error_logger;
+  string source_zone;
 
+  RGWDataSyncEnv() : cct(NULL), store(NULL), conn(NULL), async_rados(NULL), http_manager(NULL), error_logger(NULL) {}
+
+  void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
+            RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
+            RGWSyncErrorLogger *_error_logger, const string& _source_zone) {
+    cct = _cct;
+    store = _store;
+    conn = _conn;
+    async_rados = _async_rados;
+    http_manager = _http_manager;
+    error_logger = _error_logger;
+    source_zone = _source_zone;
+  }
+
+  string shard_obj_name(int shard_id);
+  string status_oid();
+};
+
+class RGWRemoteDataLog : public RGWCoroutinesManager {
+  RGWRados *store;
+  RGWAsyncRadosProcessor *async_rados;
   RGWHTTPManager http_manager;
-  RGWDataSyncStatusManager *status_manager;
+
+  RGWDataSyncEnv sync_env;
 
   RWLock lock;
   RGWDataSyncControlCR *data_sync_cr;
@@ -156,12 +181,11 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
   bool initialized;
 
 public:
-  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                   RGWDataSyncStatusManager *_sm)
+  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
     : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
-      store(_store), conn(NULL), async_rados(async_rados),
+      store(_store), async_rados(async_rados),
       http_manager(store->ctx(), &completion_mgr),
-      status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
+      lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
       initialized(false) {}
 
   int init(const string& _source_zone, RGWRESTConn *_conn);
@@ -198,7 +222,7 @@ public:
   RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
                            const string& _source_zone)
     : store(_store), source_zone(_source_zone), conn(NULL),
-      source_log(store, async_rados, this), num_shards(0) {}
+      source_log(store, async_rados), num_shards(0) {}
   int init();
 
   rgw_data_sync_status& get_sync_status() { return sync_status; }
@@ -347,6 +371,9 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager {
   RGWBucketSyncStatusManager *status_manager;
   RGWAsyncRadosProcessor *async_rados;
   RGWHTTPManager *http_manager;
+  RGWSyncErrorLogger *error_logger;
+
+  RGWDataSyncEnv sync_env;
 
   RGWBucketSyncCR *sync_cr;
 
@@ -354,7 +381,7 @@ public:
   RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
                      RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
                                        conn(NULL), shard_id(0),
-                                       status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager),
+                                       status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), error_logger(NULL),
                                        sync_cr(NULL) {}
 
   int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id);