datalog_sync_status_oid, sync_info));
}
+static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
+{
+ char buf[datalog_sync_full_sync_index_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
+ return string(buf);
+}
+
+struct bucket_instance_meta_info {
+ string key;
+ obj_version ver;
+ time_t mtime;
+ RGWBucketInstanceMetadataObject data;
+
+ bucket_instance_meta_info() : mtime(0) {}
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("key", key, obj);
+ JSONDecoder::decode_json("ver", ver, obj);
+ JSONDecoder::decode_json("mtime", mtime, obj);
+ JSONDecoder::decode_json("data", data, obj);
+ }
+};
+
class RGWListBucketIndexesCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http_manager;
int req_ret;
list<string> result;
+ list<string>::iterator iter;
RGWShardedOmapCRManager *entries_index;
string oid_prefix;
+ string path;
+ bucket_instance_meta_info meta_info;
+ string key;
+ string s;
+ int i;
+
public:
RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
RGWRESTConn *_conn,
http_manager(_mgr),
async_rados(_async_rados),
conn(_conn), source_zone(_source_zone), num_shards(_num_shards),
- req_ret(0), entries_index(NULL) {
+ req_ret(0), entries_index(NULL), i(0) {
oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone;
+ path = "/admin/metadata/bucket.instance";
}
~RGWListBucketIndexesCR() {
delete entries_index;
call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
entrypoint, NULL, &result));
}
- yield {
- if (get_ret_status() < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
- return set_state(RGWCoroutine_Error);
+ if (get_ret_status() < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+ return set_state(RGWCoroutine_Error);
+ }
+ for (iter = result.begin(); iter != result.end(); ++iter) {
+ ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+
+ key = *iter;
+
+ yield {
+ rgw_http_param_pair pairs[] = { { "key", key.c_str() },
+ { NULL, NULL } };
+
+ int ret = call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+ return ret;
+ }
}
- for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
- ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
- entries_index->append(*iter);
+
+ num_shards = meta_info.data.get_bucket_info().num_shards;
#warning error handling of shards
+ if (num_shards > 0) {
+ for (i = 0; i < num_shards; i++) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", i);
+ s = key + buf;
+ yield entries_index->append(s);
+ }
+ } else {
+ yield entries_index->append(key);
}
}
+
yield entries_index->finish();
int ret;
while (collect(&ret)) {
}
};
+#define DATA_SYNC_UPDATE_MARKER_WINDOW 10
+
+class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+ RGWRados *store;
+ RGWAsyncRadosProcessor *async_rados;
+
+ string marker_oid;
+ rgw_data_sync_marker sync_marker;
+
+
+public:
+ RGWDataSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ const string& _marker_oid,
+ const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+ store(_store),
+ async_rados(_async_rados),
+ marker_oid(_marker_oid),
+ sync_marker(_marker) {}
+
+ RGWCoroutine *store_marker(const string& new_marker) {
+ sync_marker.marker = new_marker;
+
+ ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ marker_oid, sync_marker);
+ }
+};
+
+class RGWRunBucketSyncCoroutine : public RGWCoroutine {
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
+ RGWRados *store;
+ RGWObjectCtx& obj_ctx;
+ string source_zone;
+ string bucket_name;
+ string bucket_id;
+ RGWBucketInfo bucket_info;
+ int shard_id;
+ rgw_bucket_shard_sync_info sync_status;
+
+public:
+ RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, RGWRados *_store,
+ RGWObjectCtx& _obj_ctx, const string& _source_zone,
+ const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
+ http_manager(_mgr), async_rados(_async_rados), conn(_conn),
+ store(_store),
+ obj_ctx(_obj_ctx), source_zone(_source_zone),
+ bucket_name(_bucket_name),
+ bucket_id(_bucket_id), shard_id(_shard_id) {}
+
+ int operate();
+};
+
+
+class RGWDataSyncSingleEntryCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+
+ RGWRESTConn *conn;
+ string source_zone;
+
+ string raw_key;
+ string entry_marker;
+
+ RGWObjectCtx obj_ctx;
+
+ ssize_t pos;
+ string bucket_name;
+ string bucket_instance;
+
+ int sync_status;
+
+ bufferlist md_bl;
+
+ RGWDataSyncShardMarkerTrack *marker_tracker;
+
+public:
+ RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, const string& _source_zone,
+ const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ conn(_conn), source_zone(_source_zone),
+ raw_key(_raw_key), entry_marker(_entry_marker),
+ obj_ctx(_store),
+ pos(0), sync_status(0),
+ marker_tracker(_marker_tracker) {
+ }
+
+ int operate() {
+ reenter(this) {
+ yield {
+ pos = raw_key.find(':');
+ bucket_name = raw_key.substr(0, pos);
+ bucket_instance = raw_key.substr(pos + 1);
+ pos = bucket_instance.find(':');
+ int shard_id = -1;
+ if (pos >= 0) {
+ string err;
+ string s = bucket_instance.substr(pos + 1);
+ shard_id = strict_strtol(s.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldout(store->ctx(), 0) << "ERROR: failed to parse bucket instance key: " << bucket_instance << dendl;
+ return set_state(RGWCoroutine_Error, -EIO);
+ }
+
+ bucket_instance = bucket_instance.substr(0, pos - 1);
+ }
+ int ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, obj_ctx, source_zone, bucket_name, bucket_instance, shard_id));
+ if (ret < 0) {
+#warning failed syncing bucket, need to log
+ return set_state(RGWCoroutine_Error, sync_status);
+ }
+ }
+
+ sync_status = retcode;
+#warning what do do in case of error
+ yield {
+ /* update marker */
+ int ret = call(marker_tracker->finish(entry_marker));
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
+ return set_state(RGWCoroutine_Error, sync_status);
+ }
+ }
+ if (sync_status == 0) {
+ sync_status = retcode;
+ }
+ if (sync_status < 0) {
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ return set_state(RGWCoroutine_Done, 0);
+ }
+ return 0;
+ }
+};
+
+class RGWDataSyncShardCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
+
+ rgw_bucket pool;
+
+ string source_zone;
+ uint32_t shard_id;
+ rgw_data_sync_marker sync_marker;
+
+ map<string, bufferlist> entries;
+ map<string, bufferlist>::iterator iter;
+
+ string oid;
+
+ RGWDataSyncShardMarkerTrack *marker_tracker;
+
+ list<cls_log_entry> log_entries;
+ list<cls_log_entry>::iterator log_iter;
+ bool truncated;
+
+ string mdlog_marker;
+ string raw_key;
+
+ Mutex inc_lock;
+ Cond inc_cond;
+
+ boost::asio::coroutine incremental_cr;
+ boost::asio::coroutine full_cr;
+
+
+public:
+ RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
+ uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ conn(_conn),
+ pool(_pool),
+ source_zone(_source_zone),
+ shard_id(_shard_id),
+ sync_marker(_marker),
+ marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock") {
+ }
+
+ ~RGWDataSyncShardCR() {
+ delete marker_tracker;
+ }
+
+ void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
+ delete marker_tracker;
+ marker_tracker = mt;
+ }
+
+ int operate() {
+ while (true) {
+ switch (sync_marker.state) {
+ case rgw_data_sync_marker::FullSync:
+ return full_sync();
+ case rgw_data_sync_marker::IncrementalSync:
+ return incremental_sync();
+ break;
+ default:
+ return set_state(RGWCoroutine_Error, -EIO);
+ }
+ }
+ return 0;
+ }
+
+ int full_sync() {
+ int ret;
+
+#define OMAP_GET_MAX_ENTRIES 100
+ int max_entries = OMAP_GET_MAX_ENTRIES;
+ reenter(&full_cr) {
+ oid = full_data_sync_index_shard_oid(source_zone, shard_id);
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
+ RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id),
+ sync_marker));
+ do {
+ yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ iter = entries.begin();
+ for (; iter != entries.end(); ++iter) {
+ ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+ marker_tracker->start(iter->first);
+ // fetch remote and write locally
+ yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+ if (retcode < 0) {
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ sync_marker.marker = iter->first;
+ }
+ } while ((int)entries.size() == max_entries);
+
+ drain_all();
+
+ yield {
+ /* update marker to reflect we're done with full sync */
+ sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+ sync_marker.marker = sync_marker.next_step_marker;
+ sync_marker.next_step_marker.clear();
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), sync_marker));
+ }
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ }
+ return 0;
+ }
+
+
+ int incremental_sync() {
+#if 0
+ reenter(&incremental_cr) {
+ mdlog_marker = sync_marker.marker;
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
+ RGWDataSyncStatusManager::shard_obj_name(shard_id),
+ sync_marker));
+ do {
+#define INCREMENTAL_MAX_ENTRIES 100
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker <= sync_marker.marker) {
+ /* we're at the tip, try to bring more entries */
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
+ yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker));
+ }
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker > sync_marker.marker) {
+ yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+ for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
+ marker_tracker->start(log_iter->id);
+ raw_key = log_iter->section + ":" + log_iter->name;
+ yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, raw_key, log_iter->id, marker_tracker), false);
+ if (retcode < 0) {
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ }
+ }
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker == sync_marker.marker) {
+#define INCREMENTAL_INTERVAL 20
+ yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
+ }
+ } while (true);
+ }
+ /* TODO */
+ return 0;
+#endif
+ }
+};
+
int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);
return 0;
}
-struct bucket_instance_meta_info {
- string key;
- obj_version ver;
- time_t mtime;
- RGWBucketInstanceMetadataObject data;
-
- bucket_instance_meta_info() : mtime(0) {}
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("key", key, obj);
- JSONDecoder::decode_json("ver", ver, obj);
- JSONDecoder::decode_json("mtime", mtime, obj);
- JSONDecoder::decode_json("data", data, obj);
- }
-};
-
struct bucket_index_marker_info {
string bucket_ver;
string master_ver;
return 0;
}
-class RGWRunBucketSyncCoroutine : public RGWCoroutine {
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- RGWRados *store;
- RGWObjectCtx& obj_ctx;
- string source_zone;
- string bucket_name;
- string bucket_id;
- RGWBucketInfo bucket_info;
- int shard_id;
- rgw_bucket_shard_sync_info sync_status;
-
-public:
- RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, RGWRados *_store,
- RGWObjectCtx& _obj_ctx, const string& _source_zone,
- const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
- http_manager(_mgr), async_rados(_async_rados), conn(_conn),
- store(_store),
- obj_ctx(_obj_ctx), source_zone(_source_zone),
- bucket_name(_bucket_name),
- bucket_id(_bucket_id), shard_id(_shard_id) {}
-
- int operate();
-};
-
-
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {