return r;
}
-void RGWStateLog::oid_str(int shard, string& oid) {
- oid = RGW_STATELOG_OBJ_PREFIX + module_name + ".";
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard);
- oid += buf;
-}
-
-int RGWStateLog::get_shard_num(const string& object) {
- uint32_t val = ceph_str_hash_linux(object.c_str(), object.length());
- return val % num_shards;
-}
-
-string RGWStateLog::get_oid(const string& object) {
- int shard = get_shard_num(object);
- string oid;
- oid_str(shard, oid);
- return oid;
-}
-
-int RGWStateLog::open_ioctx(librados::IoCtx& ioctx) {
- rgw_pool pool;
- store->get_log_pool(pool);
- int r = rgw_init_ioctx(store->get_rados_handle(), pool, ioctx);
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: could not open rados pool" << dendl;
- return r;
- }
- return 0;
-}
-
-int RGWStateLog::store_entry(const string& client_id, const string& op_id, const string& object,
- uint32_t state, bufferlist *bl, uint32_t *check_state)
-{
- if (client_id.empty() ||
- op_id.empty() ||
- object.empty()) {
- ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl;
- }
-
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx);
- if (r < 0)
- return r;
-
- string oid = get_oid(object);
-
- librados::ObjectWriteOperation op;
- if (check_state) {
- cls_statelog_check_state(op, client_id, op_id, object, *check_state);
- }
- utime_t ts = ceph_clock_now();
- bufferlist nobl;
- cls_statelog_add(op, client_id, op_id, object, ts, state, (bl ? *bl : nobl));
- r = ioctx.operate(oid, &op);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWStateLog::remove_entry(const string& client_id, const string& op_id, const string& object)
-{
- if (client_id.empty() ||
- op_id.empty() ||
- object.empty()) {
- ldout(store->ctx(), 0) << "client_id / op_id / object is empty" << dendl;
- }
-
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx);
- if (r < 0)
- return r;
-
- string oid = get_oid(object);
-
- librados::ObjectWriteOperation op;
- cls_statelog_remove_by_object(op, object, op_id);
- r = ioctx.operate(oid, &op);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-void RGWStateLog::init_list_entries(const string& client_id, const string& op_id, const string& object,
- void **handle)
-{
- list_state *state = new list_state;
- state->client_id = client_id;
- state->op_id = op_id;
- state->object = object;
- if (object.empty()) {
- state->cur_shard = 0;
- state->max_shard = num_shards - 1;
- } else {
- state->cur_shard = state->max_shard = get_shard_num(object);
- }
- *handle = (void *)state;
-}
-
-int RGWStateLog::list_entries(void *handle, int max_entries,
- list<cls_statelog_entry>& entries,
- bool *done)
-{
- list_state *state = static_cast<list_state *>(handle);
-
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx);
- if (r < 0)
- return r;
-
- entries.clear();
-
- for (; state->cur_shard <= state->max_shard && max_entries > 0; ++state->cur_shard) {
- string oid;
- oid_str(state->cur_shard, oid);
-
- librados::ObjectReadOperation op;
- list<cls_statelog_entry> ents;
- bool truncated;
- cls_statelog_list(op, state->client_id, state->op_id, state->object, state->marker,
- max_entries, ents, &state->marker, &truncated);
- bufferlist ibl;
- r = ioctx.operate(oid, &op, &ibl);
- if (r == -ENOENT) {
- truncated = false;
- r = 0;
- }
- if (r < 0) {
- ldout(store->ctx(), 0) << "cls_statelog_list returned " << r << dendl;
- return r;
- }
-
- if (!truncated) {
- state->marker.clear();
- }
-
- max_entries -= ents.size();
-
- entries.splice(entries.end(), ents);
-
- if (truncated)
- break;
- }
-
- *done = (state->cur_shard > state->max_shard);
-
- return 0;
-}
-
-void RGWStateLog::finish_list_entries(void *handle)
-{
- list_state *state = static_cast<list_state *>(handle);
- delete state;
-}
-
-void RGWStateLog::dump_entry(const cls_statelog_entry& entry, Formatter *f)
-{
- f->open_object_section("statelog_entry");
- f->dump_string("client_id", entry.client_id);
- f->dump_string("op_id", entry.op_id);
- f->dump_string("object", entry.object);
- entry.timestamp.gmtime_nsec(f->dump_stream("timestamp"));
- if (!dump_entry_internal(entry, f)) {
- f->dump_int("state", entry.state);
- }
- f->close_section();
-}
-
-RGWOpState::RGWOpState(RGWRados *_store) : RGWStateLog(_store, _store->ctx()->_conf->rgw_num_zone_opstate_shards, string("obj_opstate"))
-{
-}
-
-bool RGWOpState::dump_entry_internal(const cls_statelog_entry& entry, Formatter *f)
-{
- string s;
- switch ((OpState)entry.state) {
- case OPSTATE_UNKNOWN:
- s = "unknown";
- break;
- case OPSTATE_IN_PROGRESS:
- s = "in-progress";
- break;
- case OPSTATE_COMPLETE:
- s = "complete";
- break;
- case OPSTATE_ERROR:
- s = "error";
- break;
- case OPSTATE_ABORT:
- s = "abort";
- break;
- case OPSTATE_CANCELLED:
- s = "cancelled";
- break;
- default:
- s = "invalid";
- }
- f->dump_string("state", s);
- return true;
-}
-
-int RGWOpState::state_from_str(const string& s, OpState *state)
-{
- if (s == "unknown") {
- *state = OPSTATE_UNKNOWN;
- } else if (s == "in-progress") {
- *state = OPSTATE_IN_PROGRESS;
- } else if (s == "complete") {
- *state = OPSTATE_COMPLETE;
- } else if (s == "error") {
- *state = OPSTATE_ERROR;
- } else if (s == "abort") {
- *state = OPSTATE_ABORT;
- } else if (s == "cancelled") {
- *state = OPSTATE_CANCELLED;
- } else {
- return -EINVAL;
- }
-
- return 0;
-}
-
-int RGWOpState::set_state(const string& client_id, const string& op_id, const string& object, OpState state)
-{
- uint32_t s = (uint32_t)state;
- return store_entry(client_id, op_id, object, s, NULL, NULL);
-}
-
-int RGWOpState::renew_state(const string& client_id, const string& op_id, const string& object, OpState state)
-{
- uint32_t s = (uint32_t)state;
- return store_entry(client_id, op_id, object, s, NULL, &s);
-}
-
-RGWOpStateSingleOp::RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid,
- const string& obj) : os(store), client_id(cid), op_id(oid), object(obj)
-{
- cct = store->ctx();
- cur_state = RGWOpState::OPSTATE_UNKNOWN;
-}
-
-int RGWOpStateSingleOp::set_state(RGWOpState::OpState state) {
- last_update = real_clock::now();
- cur_state = state;
- return os.set_state(client_id, op_id, object, state);
-}
-
-int RGWOpStateSingleOp::renew_state() {
- real_time now = real_clock::now();
-
- int rate_limit_sec = cct->_conf->rgw_opstate_ratelimit_sec;
-
- if (rate_limit_sec && now - last_update < make_timespan(rate_limit_sec)) {
- return 0;
- }
-
- last_update = now;
- return os.renew_state(client_id, op_id, object, cur_state);
-}
-
uint64_t RGWRados::instance_id()
{
class RGWMetaSyncStatusManager;
class RGWDataSyncStatusManager;
class RGWCoroutinesManagerRegistry;
-
-class RGWStateLog {
- RGWRados *store;
- int num_shards;
- string module_name;
-
- void oid_str(int shard, string& oid);
- int get_shard_num(const string& object);
- string get_oid(const string& object);
- int open_ioctx(librados::IoCtx& ioctx);
-
- struct list_state {
- int cur_shard;
- int max_shard;
- string marker;
- string client_id;
- string op_id;
- string object;
-
- list_state() : cur_shard(0), max_shard(0) {}
- };
-
-protected:
- virtual bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) {
- return false;
- }
-
-public:
- RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) :
- store(_store), num_shards(_num_shards), module_name(_module_name) {}
- virtual ~RGWStateLog() {}
-
- int store_entry(const string& client_id, const string& op_id, const string& object,
- uint32_t state, bufferlist *bl, uint32_t *check_state);
-
- int remove_entry(const string& client_id, const string& op_id, const string& object);
-
- void init_list_entries(const string& client_id, const string& op_id, const string& object,
- void **handle);
-
- int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries, bool *done);
-
- void finish_list_entries(void *handle);
-
- virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f);
-};
-
-/*
- * state transitions:
- *
- * unknown -> in-progress -> complete
- * -> error
- *
- * user can try setting the 'abort' state, and it can only succeed if state is
- * in-progress.
- *
- * state renewal cannot switch state (stays in the same state)
- *
- * rgw can switch from in-progress to complete
- * rgw can switch from in-progress to error
- *
- * rgw can switch from abort to cancelled
- *
- */
-
-class RGWOpState : public RGWStateLog {
-protected:
- bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) override;
-public:
-
- enum OpState {
- OPSTATE_UNKNOWN = 0,
- OPSTATE_IN_PROGRESS = 1,
- OPSTATE_COMPLETE = 2,
- OPSTATE_ERROR = 3,
- OPSTATE_ABORT = 4,
- OPSTATE_CANCELLED = 5,
- };
-
- explicit RGWOpState(RGWRados *_store);
-
- int state_from_str(const string& s, OpState *state);
- int set_state(const string& client_id, const string& op_id, const string& object, OpState state);
- int renew_state(const string& client_id, const string& op_id, const string& object, OpState state);
-};
-
-class RGWOpStateSingleOp
-{
- RGWOpState os;
- string client_id;
- string op_id;
- string object;
-
- CephContext *cct;
-
- RGWOpState::OpState cur_state;
- ceph::real_time last_update;
-
-public:
- RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj);
-
- int set_state(RGWOpState::OpState state);
- int renew_state();
-};
class RGWGetBucketStats_CB : public RefCountedObject {
protected:
friend class RGWObjectExpirer;
friend class RGWMetaSyncProcessorThread;
friend class RGWDataSyncProcessorThread;
- friend class RGWStateLog;
friend class RGWReshard;
friend class RGWBucketReshard;
friend class BucketIndexLockGuard;