bl(_bl), mtime(_mtime) {}
};
+class RGWAsyncWriteOmapKeys : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_obj obj;
+ map<string, bufferlist> entries;
+
+protected:
+ int _send_request() {
+ librados::IoCtx ioctx;
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+ return r;
+ }
+
+ return ioctx.omap_set(obj.get_object(), entries);
+ }
+public:
+ RGWAsyncWriteOmapKeys(RGWAioCompletionNotifier *cn, RGWRados *_store, rgw_obj& _obj,
+ map<string, bufferlist>& _entries) : RGWAsyncRadosRequest(cn), store(_store),
+ obj(_obj), entries(_entries) {}
+};
+
class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
RGWRados *store;
rgw_obj obj;
}
};
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ map<string, bufferlist> entries;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAsyncWriteOmapKeys *req;
+
+public:
+ RGWRadosSetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ entries(_entries),
+ pool(_pool), oid(_oid),
+ req(NULL) {
+ }
+
+ ~RGWRadosSetOmapKeysCR() {
+ delete req;
+ }
+
+ int send_request() {
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncWriteOmapKeys(env->stack->create_completion_notifier(), store, obj, entries);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_meta_sync_marker marker;
spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
- RGWMetaSyncStatusManager::shard_obj_name(i), marker));
+ RGWMetaSyncStatusManager::shard_obj_name(i), marker), true);
}
}
yield { /* unlock */
map<uint32_t, rgw_meta_sync_marker>& markers = sync_status->sync_markers;
for (int i = 0; i < (int)data.num_shards; i++) {
spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
- RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]));
+ RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]), true);
}
return 0;
}
return complete_spawned();
}
+class RGWOmapAppend : public RGWCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWCoroutinesEnv *env;
+
+ rgw_bucket pool;
+ string oid;
+
+ map<string, bufferlist> entries;
+public:
+
+ RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid)
+ : RGWCoroutine(_store->ctx()), async_rados(_async_rados),
+ store(_store), env(_env), pool(_pool), oid(_oid) {}
+ int append(const string& entry) {
+ entries[entry] = bufferlist();
+#define OMAP_APPEND_MAX_ENTRIES 10
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ int r = flush();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to flush entries" << dendl;
+ return r;
+ }
+ entries.clear();
+ }
+ return 0;
+ }
+
+ int flush() {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ return env->stack->call(this);
+ }
+
+ int operate() {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ reenter(this) {
+ yield {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries));
+ }
+ if (get_ret_status() < 0) {
+ return set_state(RGWCoroutine_Error);
+ }
+ return set_state(RGWCoroutine_Done);
+ }
+ return 0;
+ }
+};
+
+class RGWShardedOmapCRManager {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWCoroutinesEnv *env;
+
+ int num_shards;
+
+ vector<RGWOmapAppend *> shards;
+public:
+ RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, int _num_shards, rgw_bucket& pool, const string& oid_prefix)
+ : async_rados(_async_rados),
+ store(_store), env(_env), num_shards(_num_shards) {
+ shards.reserve(num_shards);
+ for (int i = 0; i < num_shards; ++i) {
+ char buf[oid_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
+ RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, env, pool, buf);
+ shards.push_back(shard);
+ }
+ }
+ int append(const string& entry) {
+ static int counter = 0;
+ return (shards[++counter % shards.size()]->append(entry));
+ }
+};
+
class RGWFetchAllMetaCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
- int max_entries;
+ int num_shards;
int req_ret;
list<string>::iterator sections_iter;
list<string> result;
+ RGWShardedOmapCRManager *entries_index;
+
public:
- RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr) : RGWCoroutine(_store->ctx()), store(_store),
+ RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
- req_ret(0) {}
+ async_rados(_async_rados),
+ num_shards(_num_shards),
+ req_ret(0), entries_index(NULL) {
+ }
int operate() {
RGWRESTConn *conn = store->rest_master_conn;
reenter(this) {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+ entries_index = new RGWShardedOmapCRManager(async_rados, store, env, num_shards,
+ store->get_zone_params().log_pool, "meta.full-sync.index");
yield {
call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
"/admin/metadata", NULL, §ions));
}
for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
+ string s = *sections_iter + ":" + *iter;
+ int r = entries_index->append(s);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): failed to append entry into index" << dendl;
+ return set_state(RGWCoroutine_Error);
+ }
}
}
}
return run(new RGWInitSyncStatusCoroutine(async_rados, store, obj_ctx, num_shards));
}
-int RGWRemoteMetaLog::run_sync(rgw_meta_sync_status& sync_status)
+int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWFetchAllMetaCR(store, &http_manager));
+ return run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards));
}
int RGWCloneMetaLogCoroutine::operate()