RGWMetadataLogData log_data;
void decode_json(JSONObj *obj);
+
+ bool convert_from(cls_log_entry& le) {
+ id = le.id;
+ section = le.section;
+ name = le.name;
+ timestamp = le.timestamp;
+ try {
+ bufferlist::iterator iter = le.data.begin();
+ ::decode(log_data, iter);
+ } catch (buffer::error& err) {
+ return false;
+ }
+ return true;
+ }
};
struct rgw_mdlog_shard_data {
}
};
+class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string raw_key;
+protected:
+ int _send_request() {
+ int ret = store->meta_mgr->remove(raw_key);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+ }
+public:
+ RGWAsyncMetaRemoveEntry(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _raw_key) : RGWAsyncRadosRequest(cn), store(_store),
+ raw_key(_raw_key) {}
+};
+
+
+class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine {
+ RGWMetaSyncEnv *sync_env;
+ string raw_key;
+
+ RGWAsyncMetaRemoveEntry *req;
+
+public:
+ RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env,
+ const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
+ raw_key(_raw_key), req(NULL) {
+ }
+
+ ~RGWMetaRemoveEntryCR() {
+ delete req;
+ }
+
+ int send_request() {
+ req = new RGWAsyncMetaRemoveEntry(stack->create_completion_notifier(),
+ sync_env->store, raw_key);
+ sync_env->async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ int r = req->get_ret_status();
+ if (r == -ENOENT) {
+ r = 0;
+ }
+ return r;
+ }
+};
+
#define META_SYNC_UPDATE_MARKER_WINDOW 10
class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
int RGWMetaSyncSingleEntryCR::operate() {
reenter(this) {
#define NUM_TRANSIENT_ERROR_RETRIES 10
+
+ if (op_status != MDLOG_STATUS_COMPLETE) {
+ ldout(sync_env->cct, 20) << "skipping pending operation" << dendl;
+ yield call(marker_tracker->finish(entry_marker));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
yield {
pos = raw_key.find(':');
}
retcode = 0;
- if (sync_status != -ENOENT) {
- for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
- yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
- if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
- ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
- continue;
- }
- break;
+ for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) {
+ if (sync_status != -ENOENT) {
+ yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
+ } else {
+ yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key));
+ }
+ if ((retcode == -EAGAIN || retcode == -ECANCELED) && (tries < NUM_TRANSIENT_ERROR_RETRIES - 1)) {
+ ldout(sync_env->cct, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl;
+ continue;
}
+ break;
}
sync_status = retcode;
string mdlog_marker;
string raw_key;
+ rgw_mdlog_entry mdlog_entry;
Mutex inc_lock;
Cond inc_cond;
} else {
// fetch remote and write locally
yield {
- RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
+ RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
stack->get();
stack_to_pos[stack] = iter->first;
marker = max_marker;
yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ if (!mdlog_entry.convert_from(*log_iter)) {
+ ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
+ continue;
+ }
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp)) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
} else {
raw_key = log_iter->section + ":" + log_iter->name;
yield {
- RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
+ RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
assert(stack);
stack->get();