int shard_id;
RGWDataChangesLogInfo *shard_info;
+ int tries{0};
+ int op_ret{0};
+
public:
RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc,
int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct),
shard_info(_shard_info) {
}
- ~RGWReadRemoteDataLogShardInfoCR() override {
- if (http_op) {
- http_op->put();
- }
- }
-
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- yield {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard_id);
- rgw_http_param_pair pairs[] = { { "type" , "data" },
- { "id", buf },
- { "info" , NULL },
- { NULL, NULL } };
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ ldpp_dout(dpp, 20) << "read remote datalog shard info. shard_id=" << shard_id << " retries=" << tries << dendl;
- string p = "/admin/log/";
+ yield {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+ rgw_http_param_pair pairs[] = { { "type" , "data" },
+ { "id", buf },
+ { "info" , NULL },
+ { NULL, NULL } };
- http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
+ string p = "/admin/log/";
- init_new_io(http_op);
+ http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
- int ret = http_op->aio_read(dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
- log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
- return set_cr_error(ret);
+ init_new_io(http_op);
+
+ int ret = http_op->aio_read(dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ http_op->put();
+ return set_cr_error(ret);
+ }
+
+ return io_block(0);
+ }
+ yield {
+ op_ret = http_op->wait(shard_info, null_yield);
+ http_op->put();
}
- return io_block(0);
- }
- yield {
- int ret = http_op->wait(shard_info, null_yield);
- if (ret < 0) {
- return set_cr_error(ret);
+ if (op_ret < 0) {
+ if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << "failed to fetch remote datalog shard info. retry. shard_id=" << shard_id << dendl;
+ continue;
+ } else {
+ return set_cr_error(op_ret);
+ }
}
return set_cr_done();
}
read_remote_data_log_response response;
std::optional<TOPNSPC::common::PerfGuard> timer;
+ int tries{0};
+ int op_ret{0};
+
public:
RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
const std::string& marker, string *pnext_marker,
shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
entries(_entries), truncated(_truncated) {
}
- ~RGWReadRemoteDataLogShardCR() override {
- if (http_op) {
- http_op->put();
- }
- }
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- yield {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard_id);
- rgw_http_param_pair pairs[] = { { "type" , "data" },
- { "id", buf },
- { "marker", marker.c_str() },
- { "extra-info", "true" },
- { NULL, NULL } };
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ ldpp_dout(dpp, 20) << "read remote datalog shard. shard_id=" << shard_id << " retries=" << tries << dendl;
- string p = "/admin/log/";
+ yield {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+ rgw_http_param_pair pairs[] = { { "type" , "data" },
+ { "id", buf },
+ { "marker", marker.c_str() },
+ { "extra-info", "true" },
+ { NULL, NULL } };
- http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
+ string p = "/admin/log/";
- init_new_io(http_op);
+ http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
+
+ init_new_io(http_op);
- if (sync_env->counters) {
- timer.emplace(sync_env->counters, sync_counters::l_poll);
- }
- int ret = http_op->aio_read(dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
- log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
if (sync_env->counters) {
- sync_env->counters->inc(sync_counters::l_poll_err);
+ timer.emplace(sync_env->counters, sync_counters::l_poll);
+ }
+ int ret = http_op->aio_read(dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ if (sync_env->counters) {
+ sync_env->counters->inc(sync_counters::l_poll_err);
+ }
+ http_op->put();
+ return set_cr_error(ret);
}
- return set_cr_error(ret);
+
+ return io_block(0);
+ }
+ yield {
+ timer.reset();
+ op_ret = http_op->wait(&response, null_yield);
+ http_op->put();
}
- return io_block(0);
- }
- yield {
- timer.reset();
- int ret = http_op->wait(&response, null_yield);
- if (ret < 0) {
- if (sync_env->counters && ret != -ENOENT) {
- sync_env->counters->inc(sync_counters::l_poll_err);
+ if (op_ret < 0) {
+ if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << "failed to read remote datalog shard. retry. shard_id=" << shard_id << dendl;
+ continue;
+ } else {
+ if (sync_env->counters && op_ret != -ENOENT) {
+ sync_env->counters->inc(sync_counters::l_poll_err);
+ }
+ return set_cr_error(op_ret);
}
- return set_cr_error(ret);
}
+
entries->clear();
entries->swap(response.entries);
*pnext_marker = response.marker;
}
class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
RGWRESTReadResource *http_op;
RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id,
const string& _marker, uint32_t _max_entries,
rgw_datalog_shard_data *_result)
- : RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL),
+ : RGWSimpleCoroutine(sc->cct, NUM_ENPOINT_IOERROR_RETRIES), sc(sc), sync_env(sc->env), http_op(NULL),
shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
int send_request(const DoutPrefixProvider *dpp) override {
int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
+ ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
return ret;
}
return 0;
constexpr bool sync_manifest = true;
constexpr bool skip_decrypt = true;
constexpr bool sync_cloudtiered = true;
- int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
- dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
- prepend_meta, get_op, rgwx_stat,
- sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
- true, &cb, &in_stream_req);
- if (ret < 0) {
- return ret;
- }
- ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
- nullptr, pheaders, y);
- if (ret < 0) {
- return ret;
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
+ dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
+ prepend_meta, get_op, rgwx_stat,
+ sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
+ true, &cb, &in_stream_req);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
+ nullptr, pheaders, y);
+ if (ret < 0) {
+ if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
+ continue;
+ }
+ return ret;
+ }
+ break;
}
bufferlist& extra_data_bl = cb.get_extra_data();
static constexpr bool sync_manifest = true;
static constexpr bool skip_decrypt = true;
static constexpr bool sync_cloudtiered = true;
- ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
- dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
- prepend_meta, get_op, rgwx_stat,
- sync_manifest, skip_decrypt, &dst_zone_trace,
- sync_cloudtiered, true,
- &cb, &in_stream_req);
- if (ret < 0) {
- goto set_err_state;
- }
- ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
- &accounted_size, nullptr, nullptr, rctx.y);
- if (ret < 0) {
- goto set_err_state;
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
+ dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
+ prepend_meta, get_op, rgwx_stat,
+ sync_manifest, skip_decrypt, &dst_zone_trace,
+ sync_cloudtiered, true,
+ &cb, &in_stream_req);
+ if (ret < 0) {
+ goto set_err_state;
+ }
+
+ ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
+ &accounted_size, nullptr, nullptr, rctx.y);
+ if (ret < 0) {
+ if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(rctx.dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
+ continue;
+ }
+ goto set_err_state;
+ }
+ break;
}
ret = cb.flush();
if (ret < 0) {
auto rest_master_conn = svc.zone->get_master_conn();
- int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
- if (ret < 0) {
- return ret;
- }
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
+ if (ret < 0) {
+ return ret;
+ }
- out_stream_req->set_send_length(astate->size);
+ out_stream_req->set_send_length(astate->size);
- ret = RGWHTTP::send(out_stream_req);
- if (ret < 0) {
- delete out_stream_req;
- return ret;
- }
+ ret = RGWHTTP::send(out_stream_req);
+ if (ret < 0) {
+ delete out_stream_req;
+ return ret;
+ }
- ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
- if (ret < 0) {
- delete out_stream_req;
- return ret;
- }
+ ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
+ if (ret < 0) {
+ delete out_stream_req;
+ return ret;
+ }
- ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
- if (ret < 0)
- return ret;
+ ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
+ if (ret < 0) {
+ if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to put_obj_async_init. retries=" << tries << dendl;
+ continue;
+ }
+ return ret;
+ }
+ break;
+ }
return 0;
}
int shard_id;
RGWMetadataLogInfo *shard_info;
+ int tries{0};
+ int op_ret{0};
+
public:
RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
int _shard_id, RGWMetadataLogInfo *_shard_info)
auto store = env->store;
RGWRESTConn *conn = store->svc()->zone->get_master_conn();
reenter(this) {
- yield {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard_id);
- rgw_http_param_pair pairs[] = { { "type" , "metadata" },
- { "id", buf },
- { "period", period.c_str() },
- { "info" , NULL },
- { NULL, NULL } };
-
- string p = "/admin/log/";
-
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
- env->http_manager);
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ ldpp_dout(dpp, 20) << "read remote metadata log shard info. shard_is=" << shard_id << " retries=" << tries << dendl;
- init_new_io(http_op);
+ yield {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+ rgw_http_param_pair pairs[] = { { "type" , "metadata" },
+ { "id", buf },
+ { "period", period.c_str() },
+ { "info" , NULL },
+ { NULL, NULL } };
+
+ string p = "/admin/log/";
+
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
+ env->http_manager);
+
+ init_new_io(http_op);
+
+ int ret = http_op->aio_read(dpp);
+ if (ret < 0) {
+ ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ http_op->put();
+ return set_cr_error(ret);
+ }
- int ret = http_op->aio_read(dpp);
- if (ret < 0) {
- ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl;
- log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ return io_block(0);
+ }
+ yield {
+ op_ret = http_op->wait(shard_info, null_yield);
http_op->put();
- return set_cr_error(ret);
}
- return io_block(0);
- }
- yield {
- int ret = http_op->wait(shard_info, null_yield);
- http_op->put();
- if (ret < 0) {
- return set_cr_error(ret);
+ if (op_ret < 0) {
+ if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << "failed to read remote metadata log shard info. retry. shard_id=" << shard_id << dendl;
+ continue;
+ } else {
+ return set_cr_error(op_ret);
+ }
}
return set_cr_done();
}
}
class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+
RGWMetaSyncEnv *sync_env;
RGWRESTReadResource *http_op;
RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
int _shard_id, const string& _marker, uint32_t _max_entries,
rgw_mdlog_shard_data *_result)
- : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
+ : RGWSimpleCoroutine(env->store->ctx(), NUM_ENPOINT_IOERROR_RETRIES), sync_env(env), http_op(NULL),
period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
int send_request(const DoutPrefixProvider *dpp) override {
int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
+ ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
return ret;
}
return 0;
RGWSyncTraceNodeRef tn;
+ int tries{0};
+ int op_ret{0};
+
public:
RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
const string& _section, const string& _key, bufferlist *_pbl,
http_op(NULL),
section(_section),
key(_key),
- pbl(_pbl) {
+ pbl(_pbl) {
tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta",
section + ":" + key);
}
int operate(const DoutPrefixProvider *dpp) override {
RGWRESTConn *conn = sync_env->conn;
reenter(this) {
- yield {
- string key_encode;
- url_encode(key, key_encode);
- rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
- { NULL, NULL } };
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ ldpp_dout(dpp, 20) << "read remote metadata. retries=" << tries << dendl;
- string p = string("/admin/metadata/") + section + "/" + key_encode;
+ yield {
+ string key_encode;
+ url_encode(key, key_encode);
+ rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
+ { NULL, NULL } };
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
+ string p = string("/admin/metadata/") + section + "/" + key_encode;
- init_new_io(http_op);
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
- int ret = http_op->aio_read(dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
- log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ init_new_io(http_op);
+
+ int ret = http_op->aio_read(dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ http_op->put();
+ return set_cr_error(ret);
+ }
+
+ return io_block(0);
+ }
+ yield {
+ op_ret = http_op->wait(pbl, null_yield);
http_op->put();
- return set_cr_error(ret);
}
- return io_block(0);
- }
- yield {
- int ret = http_op->wait(pbl, null_yield);
- http_op->put();
- if (ret < 0) {
- return set_cr_error(ret);
+ if (op_ret < 0) {
+ if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << "failed to read remote metadata. retry. section=" << section << " key=" << key << dendl;
+ continue;
+ } else {
+ return set_cr_error(op_ret);
+ }
}
return set_cr_done();
}
RGWMetadataLogInfo shard_info;
rgw_mdlog_shard_data data;
+ int tries{0};
+ int op_ret{0};
+
public:
RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
const std::string& period, int _id,
ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
return state_read_shard_status_complete();
}
- yield {
- ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
- return state_send_rest_request(dpp);
- }
- yield {
- ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
- return state_receive_rest_response();
+
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ yield {
+ ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
+ return state_send_rest_request(dpp);
+ }
+ yield {
+ ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
+ return state_receive_rest_response();
+ }
+
+ if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldout(cct, 20) << __func__ << ": request IO error. retries=" << tries << dendl;
+ continue;
+ } else if (op_ret < 0) {
+ return set_cr_error(op_ret);
+ }
+ break;
}
+
yield {
ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
return state_store_mdlog_entries();
int RGWCloneMetaLogCoroutine::state_receive_rest_response()
{
- int ret = http_op->wait(&data, null_yield);
- if (ret < 0) {
+ op_ret = http_op->wait(&data, null_yield);
+ if (op_ret < 0 && op_ret != -EIO) {
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
- ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;
+ ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << op_ret << dendl;
http_op->put();
http_op = NULL;
- return set_cr_error(ret);
+ return set_cr_error(op_ret);
}
http_op->put();
http_op = NULL;
+ if (op_ret == -EIO) {
+ return 0;
+ }
+
ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
truncated = ((int)data.entries.size() == max_entries);
int ret = 0;
reenter(this) {
yield return state_init();
- yield return state_send_request(dpp);
- yield return state_request_complete();
+
+ for (tries = 0; tries < max_eio_retries; tries++) {
+ yield return state_send_request(dpp);
+ yield return state_request_complete();
+
+ if (op_ret == -EIO && tries < max_eio_retries - 1) {
+ ldout(cct, 20) << "request IO error. retries=" << tries << dendl;
+ continue;
+ } else if (op_ret < 0) {
+ call_cleanup();
+ return set_state(RGWCoroutine_Error, op_ret);
+ }
+ break;
+ }
+
yield return state_all_complete();
drain_all();
call_cleanup();
int RGWSimpleCoroutine::state_request_complete()
{
- int ret = request_complete();
- if (ret < 0) {
+ op_ret = request_complete();
+ if (op_ret < 0 && op_ret != -EIO) {
call_cleanup();
- return set_state(RGWCoroutine_Error, ret);
+ return set_state(RGWCoroutine_Error, op_ret);
}
return 0;
}
class RGWSimpleCoroutine : public RGWCoroutine {
bool called_cleanup;
+ const int max_eio_retries;
+
+ int tries{0};
+ int op_ret{0};
int operate(const DoutPrefixProvider *dpp) override;
void call_cleanup();
public:
- RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
+ RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false), max_eio_retries(1) {}
+ RGWSimpleCoroutine(CephContext *_cct, const int _max_eio_retries) : RGWCoroutine(_cct), called_cleanup(false), max_eio_retries(_max_eio_retries) {}
virtual ~RGWSimpleCoroutine() override;
virtual int init() { return 0; }
};
class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+
bufferlist *result;
protected:
RGWRESTConn *conn;
RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const std::string& _path,
rgw_http_param_pair *params, bufferlist *_result)
- : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), result(_result), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params))
{}
RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const std::string& _path,
rgw_http_param_pair *params)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params))
{}
RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const std::string& _path,
rgw_http_param_pair *params, param_vec_t &hdrs)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params)),
extra_headers(hdrs)
{}
RGWHTTPManager *_http_manager, const std::string& _path,
rgw_http_param_pair *params,
std::map <std::string, std::string> *hdrs)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params)),
extra_headers(make_param_list(hdrs))
{}
template <class T, class E = int>
class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+
protected:
RGWRESTConn *conn;
RGWHTTPManager *http_manager;
bufferlist& _input, T *_result,
bool _send_content_length,
E *_err_result = nullptr)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
method(_method), path(_path), params(make_param_list(_params)),
headers(make_param_list(_attrs)), attrs(_attrs),
result(_result), err_result(_err_result),
const std::string& _method, const std::string& _path,
rgw_http_param_pair *_params, std::map<std::string, std::string> *_attrs,
T *_result, E *_err_result = nullptr)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result),
err_result(_err_result) {}
};
class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+
RGWRESTConn *conn;
RGWHTTPManager *http_manager;
std::string path;
RGWHTTPManager *_http_manager,
const std::string& _path,
rgw_http_param_pair *_params)
- : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
+ : RGWSimpleCoroutine(_cct, NUM_ENPOINT_IOERROR_RETRIES), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(_params))
{}
verify_ssl(cct->_conf->rgw_verify_ssl),
cct(cct),
method(_method),
+ url_orig(_url),
url(_url) {
init();
}
CephContext *cct;
std::string method;
+ std::string url_orig;
std::string url;
std::string protocol;
url = _url;
}
+ const std::string& get_url_orig() const {
+ return url_orig;
+ }
+
void set_method(const std::string& _method) {
method = _method;
}
api_name(_api_name),
host_style(_host_style)
{
+ endpoints_status.reserve(remote_endpoints.size());
+ std::for_each(remote_endpoints.begin(), remote_endpoints.end(),
+ [this](const auto& url) {
+ this->endpoints_status.emplace(url, ceph::real_clock::zero());
+ });
+
if (driver) {
key = driver->get_zone()->get_system_key();
self_zone_group = driver->get_zone()->get_zonegroup().get_id();
api_name(_api_name),
host_style(_host_style)
{
+ endpoints_status.reserve(remote_endpoints.size());
+ std::for_each(remote_endpoints.begin(), remote_endpoints.end(),
+ [this](const auto& url) {
+ this->endpoints_status.emplace(url, ceph::real_clock::zero());
+ });
}
RGWRESTConn::RGWRESTConn(RGWRESTConn&& other)
: cct(other.cct),
endpoints(std::move(other.endpoints)),
+ endpoints_status(std::move(other.endpoints_status)),
key(std::move(other.key)),
self_zone_group(std::move(other.self_zone_group)),
remote_id(std::move(other.remote_id)),
{
cct = other.cct;
endpoints = std::move(other.endpoints);
+ endpoints_status = std::move(other.endpoints_status);
key = std::move(other.key);
self_zone_group = std::move(other.self_zone_group);
remote_id = std::move(other.remote_id);
{
if (endpoints.empty()) {
ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl;
- return -EIO;
+ return -EINVAL;
}
- int i = ++counter;
- endpoint = endpoints[i % endpoints.size()];
+ size_t num = 0;
+ while (num < endpoints.size()) {
+ int i = ++counter;
+ endpoint = endpoints[i % endpoints.size()];
+
+ if (endpoints_status.find(endpoint) == endpoints_status.end()) {
+ ldout(cct, 1) << "ERROR: missing status for endpoint " << endpoint << dendl;
+ num++;
+ continue;
+ }
+
+ const auto& upd_time = endpoints_status[endpoint].load();
+
+ if (ceph::real_clock::is_zero(upd_time)) {
+ break;
+ }
+
+ auto diff = ceph::to_seconds<double>(ceph::real_clock::now() - upd_time);
+
+ ldout(cct, 20) << "endpoint url=" << endpoint
+ << " last endpoint status update time="
+ << ceph::real_clock::to_double(upd_time)
+ << " diff=" << diff << dendl;
+
+ static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
+ if (diff >= CONN_STATUS_EXPIRE_SECS) {
+ endpoints_status[endpoint].store(ceph::real_clock::zero());
+ ldout(cct, 10) << "endpoint " << endpoint << " unconnectable status expired. mark it connectable" << dendl;
+ break;
+ }
+ num++;
+ };
+
+ if (num == endpoints.size()) {
+ ldout(cct, 5) << "ERROR: no valid endpoint" << dendl;
+ return -EINVAL;
+ }
+ ldout(cct, 20) << "get_url picked endpoint=" << endpoint << dendl;
return 0;
}
return endpoint;
}
+void RGWRESTConn::set_url_unconnectable(const std::string& endpoint)
+{
+ if (endpoint.empty() || endpoints_status.find(endpoint) == endpoints_status.end()) {
+ ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status. endpoint="
+ << endpoint << dendl;
+ return;
+ }
+
+ endpoints_status[endpoint].store(ceph::real_clock::now());
+
+ ldout(cct, 10) << "set endpoint unconnectable. url=" << endpoint << dendl;
+}
+
void RGWRESTConn::populate_params(param_vec_t& params, const rgw_user *uid, const string& zonegroup)
{
populate_uid(params, uid);
int RGWRESTConn::forward(const DoutPrefixProvider *dpp, const rgw_user& uid, const req_info& info, obj_version *objv, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y)
{
- string url;
- int ret = get_url(url);
- if (ret < 0)
- return ret;
- param_vec_t params;
- populate_params(params, &uid, self_zone_group);
- if (objv) {
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "tag", objv->tag));
- char buf[16];
- snprintf(buf, sizeof(buf), "%lld", (long long)objv->ver);
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "ver", buf));
- }
- RGWRESTSimpleRequest req(cct, info.method, url, NULL, ¶ms, api_name);
- return req.forward_request(dpp, key, info, max_response, inbl, outbl, y);
+ int ret = 0;
+
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ string url;
+ ret = get_url(url);
+ if (ret < 0)
+ return ret;
+ param_vec_t params;
+ populate_params(params, &uid, self_zone_group);
+ if (objv) {
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "tag", objv->tag));
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%lld", (long long)objv->ver);
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "ver", buf));
+ }
+ RGWRESTSimpleRequest req(cct, info.method, url, NULL, ¶ms, api_name);
+ ret = req.forward_request(dpp, key, info, max_response, inbl, outbl, y);
+ if (ret == -EIO) {
+ set_url_unconnectable(url);
+ if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to forward request. retries=" << tries << dendl;
+ continue;
+ }
+ }
+ break;
+ }
+ return ret;
}
int RGWRESTConn::forward_iam_request(const DoutPrefixProvider *dpp, const req_info& info, obj_version *objv, size_t max_response, bufferlist *inbl, bufferlist *outbl, optional_yield y)
{
- string url;
- int ret = get_url(url);
- if (ret < 0)
- return ret;
- param_vec_t params;
- if (objv) {
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "tag", objv->tag));
- char buf[16];
- snprintf(buf, sizeof(buf), "%lld", (long long)objv->ver);
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "ver", buf));
- }
- std::string service = "iam";
- RGWRESTSimpleRequest req(cct, info.method, url, NULL, ¶ms, api_name);
- // coverity[uninit_use_in_call:SUPPRESS]
- return req.forward_request(dpp, key, info, max_response, inbl, outbl, y, service);
+ int ret = 0;
+
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ string url;
+ ret = get_url(url);
+ if (ret < 0)
+ return ret;
+ param_vec_t params;
+ if (objv) {
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "tag", objv->tag));
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%lld", (long long)objv->ver);
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "ver", buf));
+ }
+ std::string service = "iam";
+ RGWRESTSimpleRequest req(cct, info.method, url, NULL, ¶ms, api_name);
+ // coverity[uninit_use_in_call:SUPPRESS]
+ ret = req.forward_request(dpp, key, info, max_response, inbl, outbl, y, service);
+ if (ret == -EIO) {
+ set_url_unconnectable(url);
+ if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to forward request. retries=" << tries << dendl;
+ continue;
+ }
+ }
+ break;
+ }
+ return ret;
}
int RGWRESTConn::put_obj_send_init(const rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req)
real_time *mtime, optional_yield y)
{
int ret = req->complete_request(y, &etag, mtime);
+ if (ret == -EIO) {
+ ldout(cct, 5) << __func__ << ": complete_request() returned ret=" << ret << dendl;
+ set_url_unconnectable(req->get_url_orig());
+ }
+
delete req;
return ret;
optional_yield y)
{
int ret = req->complete_request(y, etag, mtime, psize, pattrs, pheaders);
+ if (ret == -EIO) {
+ ldout(cct, 5) << __func__ << ": complete_request() returned ret=" << ret << dendl;
+ set_url_unconnectable(req->get_url_orig());
+ }
delete req;
return ret;
RGWHTTPManager *mgr,
optional_yield y)
{
- string url;
- int ret = get_url(url);
- if (ret < 0)
- return ret;
+ int ret = 0;
- param_vec_t params;
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ string url;
+ ret = get_url(url);
+ if (ret < 0)
+ return ret;
- if (extra_params) {
- params.insert(params.end(), extra_params->begin(), extra_params->end());
- }
+ param_vec_t params;
- populate_params(params, nullptr, self_zone_group);
+ if (extra_params) {
+ params.insert(params.end(), extra_params->begin(), extra_params->end());
+ }
- RGWStreamIntoBufferlist cb(bl);
+ populate_params(params, nullptr, self_zone_group);
- RGWRESTStreamReadRequest req(cct, url, &cb, NULL, ¶ms, api_name, host_style);
+ RGWStreamIntoBufferlist cb(bl);
- map<string, string> headers;
- if (extra_headers) {
- headers.insert(extra_headers->begin(), extra_headers->end());
- }
+ RGWRESTStreamReadRequest req(cct, url, &cb, NULL, ¶ms, api_name, host_style);
- ret = req.send_request(dpp, &key, headers, resource, mgr, send_data);
- if (ret < 0) {
- ldpp_dout(dpp, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
- return ret;
+ map<string, string> headers;
+ if (extra_headers) {
+ headers.insert(extra_headers->begin(), extra_headers->end());
+ }
+
+ ret = req.send_request(dpp, &key, headers, resource, mgr, send_data);
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = req.complete_request(y);
+ if (ret == -EIO) {
+ set_url_unconnectable(url);
+ if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to get resource. retries=" << tries << dendl;
+ continue;
+ }
+ }
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << ": complete_request() returned ret=" << ret << dendl;
+ }
+ break;
}
- return req.complete_request(y);
+ return ret;
}
int RGWRESTConn::send_resource(const DoutPrefixProvider *dpp, const std::string& method,
std::map<std::string, std::string> *extra_headers, bufferlist& bl,
bufferlist *send_data, RGWHTTPManager *mgr, optional_yield y)
{
- std::string url;
- int ret = get_url(url);
- if (ret < 0)
- return ret;
+ int ret = 0;
- param_vec_t params;
+ static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
+ for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
+ std::string url;
+ ret = get_url(url);
+ if (ret < 0)
+ return ret;
- if (extra_params) {
- params = make_param_list(extra_params);
- }
+ param_vec_t params;
- populate_params(params, nullptr, self_zone_group);
+ if (extra_params) {
+ params = make_param_list(extra_params);
+ }
- RGWStreamIntoBufferlist cb(bl);
+ populate_params(params, nullptr, self_zone_group);
- RGWRESTStreamSendRequest req(cct, method, url, &cb, NULL, ¶ms, api_name, host_style);
+ RGWStreamIntoBufferlist cb(bl);
- std::map<std::string, std::string> headers;
- if (extra_headers) {
- headers.insert(extra_headers->begin(), extra_headers->end());
- }
+ RGWRESTStreamSendRequest req(cct, method, url, &cb, NULL, ¶ms, api_name, host_style);
- ret = req.send_request(dpp, &key, headers, resource, mgr, send_data);
- if (ret < 0) {
- ldpp_dout(dpp, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
- return ret;
- }
+ std::map<std::string, std::string> headers;
+ if (extra_headers) {
+ headers.insert(extra_headers->begin(), extra_headers->end());
+ }
- ret = req.complete_request(y);
- if (ret < 0) {
- ldpp_dout(dpp, 5) << __func__ << ": complete_request() resource=" << resource << " returned ret=" << ret << dendl;
+ ret = req.send_request(dpp, &key, headers, resource, mgr, send_data);
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = req.complete_request(y);
+ if (ret == -EIO) {
+ set_url_unconnectable(url);
+ if (tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to send resource. retries=" << tries << dendl;
+ continue;
+ }
+ }
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << __func__ << ": complete_request() resource=" << resource << " returned ret=" << ret << dendl;
+ }
+ break;
}
return ret;
return ret;
}
- return req.complete_request(y);
+ ret = req.complete_request(y);
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ ldpp_dout(dpp, 20) << __func__ << ": complete_request() returned ret=" << ret << dendl;
+ }
+
+ return ret;
}
int RGWRESTReadResource::aio_read(const DoutPrefixProvider *dpp)
return ret;
}
- return req.complete_request(y);
+ ret = req.complete_request(y);
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ ldpp_dout(dpp, 20) << __func__ << ": complete_request() returned ret=" << ret << dendl;
+ }
+
+ return ret;
}
int RGWRESTSendResource::aio_send(const DoutPrefixProvider *dpp, bufferlist& outbl)
class RGWRESTConn
{
+ /* the endpoint is not able to connect if the timestamp is not real_clock::zero */
+ using endpoint_status_map = std::unordered_map<std::string, std::atomic<ceph::real_time>>;
+
CephContext *cct;
std::vector<std::string> endpoints;
+ endpoint_status_map endpoints_status;
RGWAccessKey key;
std::string self_zone_group;
std::string remote_id;
int get_url(std::string& endpoint);
std::string get_url();
+ void set_url_unconnectable(const std::string& endpoint);
const std::string& get_self_zonegroup() {
return self_zone_group;
}
int wait(bufferlist *pbl, optional_yield y) {
int ret = req.wait(y);
if (ret < 0) {
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ }
return ret;
}
{
int ret = req.wait(y);
if (ret < 0) {
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ }
return ret;
}
int ret = req.wait(y);
*pbl = bl;
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ }
+
if (ret < 0 && err_result ) {
ret = parse_decode_json(*err_result, bl);
}
int RGWRESTSendResource::wait(T *dest, optional_yield y, E *err_result)
{
int ret = req.wait(y);
+ if (ret == -EIO) {
+ conn->set_url_unconnectable(req.get_url_orig());
+ }
+
if (ret >= 0) {
ret = req.get_status();
}