From 844fafc135a42fc7fcbfefd73af4d6bb325f786c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 22 Jan 2021 18:28:50 -0500 Subject: [PATCH] rgw: add generations to error repo with binary format adds a backward-compatible binary encoding for error repo keys that can contain a generation number along with the bucket and shard Signed-off-by: Casey Bodley --- src/rgw/rgw_basic_types.cc | 16 +++++- src/rgw/rgw_basic_types.h | 6 +- src/rgw/rgw_data_sync.cc | 64 +++++++++++++-------- src/rgw/rgw_data_sync.h | 8 ++- src/rgw/rgw_sync_error_repo.cc | 102 +++++++++++++++++++++++++++------ src/rgw/rgw_sync_error_repo.h | 46 +++++++++------ 6 files changed, 181 insertions(+), 61 deletions(-) diff --git a/src/rgw/rgw_basic_types.cc b/src/rgw/rgw_basic_types.cc index f306fa10ac9ea..178b00bf9f83d 100644 --- a/src/rgw/rgw_basic_types.cc +++ b/src/rgw/rgw_basic_types.cc @@ -84,10 +84,10 @@ void rgw_bucket::generate_test_instances(list& o) } std::string rgw_bucket_shard::get_key(char tenant_delim, char id_delim, - char shard_delim) const + char shard_delim, size_t reserve) const { static constexpr size_t shard_len{12}; // ":4294967295\0" - auto key = bucket.get_key(tenant_delim, id_delim, shard_len); + auto key = bucket.get_key(tenant_delim, id_delim, reserve + shard_len); if (shard_id >= 0 && shard_delim) { key.append(1, shard_delim); key.append(std::to_string(shard_id)); @@ -95,6 +95,18 @@ std::string rgw_bucket_shard::get_key(char tenant_delim, char id_delim, return key; } +void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f) +{ + encode(b.bucket, bl, f); + encode(b.shard_id, bl, f); +} + +void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl) +{ + decode(b.bucket, bl); + decode(b.shard_id, bl); +} + void encode_json_impl(const char *name, const rgw_zone_id& zid, Formatter *f) { encode_json(name, zid.id, f); diff --git a/src/rgw/rgw_basic_types.h b/src/rgw/rgw_basic_types.h index ab128686940ec..afbcf17c82803 100644 --- a/src/rgw/rgw_basic_types.h +++ b/src/rgw/rgw_basic_types.h @@ -435,7 +435,8 @@ struct rgw_bucket_shard { rgw_bucket_shard(const rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {} std::string get_key(char tenant_delim = '/', char id_delim = ':', - char shard_delim = ':') const; + char shard_delim = ':', + size_t reserve = 0) const; bool operator<(const rgw_bucket_shard& b) const { if (bucket < b.bucket) { @@ -453,6 +454,9 @@ struct rgw_bucket_shard { } }; +void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f=0); +void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl); + inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_shard& bs) { if (bs.shard_id <= 0) { return out << bs.bucket; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 401c574da1625..990c7a4f643f9 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -70,6 +70,17 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; +// print a bucket shard with [gen] +std::string to_string(const rgw_bucket_shard& bs, std::optional gen) +{ + constexpr auto digits10 = std::numeric_limits::digits10; + constexpr auto reserve = 2 + digits10; // [value] + auto str = bs.get_key('/', ':', ':', reserve); + str.append(1, '['); + str.append(std::to_string(gen.value_or(0))); + str.append(1, ']'); + return str; +} class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { static constexpr int MAX_CONCURRENT_SHARDS = 16; @@ -1279,7 +1290,7 @@ public: marker_tracker(_marker_tracker), error_repo(error_repo), lease_cr(std::move(lease_cr)) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation; - tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key); + tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", to_string(obligation.bs, obligation.gen)); } int operate(const DoutPrefixProvider *dpp) override { @@ -1334,14 +1345,15 @@ public: // this was added when 'tenant/' was added to datalog entries, because // preexisting tenant buckets could never sync and would stay in the // error_repo forever - tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key)); + tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->bs)); sync_status = 0; } if (sync_status < 0) { // write actual sync failures for 'radosgw-admin sync error list' if (sync_status != -EBUSY && sync_status != -EAGAIN) { - yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data", complete->key, + yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data", + to_string(complete->bs, complete->gen), -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode)); @@ -1349,15 +1361,17 @@ public: } if (complete->timestamp != ceph::real_time{}) { tn->log(10, SSTR("writing " << *complete << " to error repo for retry")); - yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo, - complete->key, complete->timestamp)); + yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(complete->bs, complete->gen), + complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); } } } else if (complete->retry) { - yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo, - complete->key, complete->timestamp)); + yield call(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, + rgw::error_repo::encode_key(complete->bs, complete->gen), + complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to remove omap key from error repo (" << error_repo << " retcode=" << retcode)); @@ -1445,11 +1459,11 @@ class RGWDataSyncShardCR : public RGWCoroutine { &bs.bucket, &bs.shard_id); } RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src, - const std::string& key, + std::optional gen, const std::string& marker, ceph::real_time timestamp, bool retry) { auto state = bucket_shard_cache->get(src); - auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry}; + auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry}; return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), &*marker_tracker, error_repo, lease_cr.get(), tn); @@ -1575,11 +1589,11 @@ public: tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?")); } else { // fetch remote and write locally - yield_spawn_window(sync_single_entry(source_bs, iter->first, iter->first, - entry_timestamp, false), - cct->_conf->rgw_data_sync_spawn_window, std::nullopt); - } - sync_marker.marker = iter->first; + yield_spawn_window(sync_single_entry(source_bs, std::nullopt, iter->first, + entry_timestamp, false), + cct->_conf->rgw_data_sync_spawn_window, std::nullopt); + } + sync_marker.marker = iter->first; } } while (omapvals->more); omapvals.reset(); @@ -1657,7 +1671,7 @@ public: continue; } tn->log(20, SSTR("received async update notification: " << *modified_iter)); - spawn(sync_single_entry(source_bs, *modified_iter, string(), + spawn(sync_single_entry(source_bs, std::nullopt, string(), ceph::real_time{}, false), false); } @@ -1671,17 +1685,21 @@ public: iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { error_marker = iter->first; - entry_timestamp = rgw_error_repo_decode_value(iter->second); - retcode = parse_bucket_key(error_marker, source_bs); + entry_timestamp = rgw::error_repo::decode_value(iter->second); + std::optional gen; + retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen); + if (retcode == -EINVAL) { + // backward compatibility for string keys that don't encode a gen + retcode = parse_bucket_key(error_marker, source_bs); + } if (retcode < 0) { tn->log(1, SSTR("failed to parse bucket shard: " << error_marker)); - spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo, - error_marker, entry_timestamp), false); + spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo, + error_marker, entry_timestamp), false); continue; } - tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp)); - spawn(sync_single_entry(source_bs, error_marker, "", - entry_timestamp, true), false); + tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp)); + spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false); } if (!omapvals->more) { error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs); @@ -1715,7 +1733,7 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { - yield_spawn_window(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id, + yield_spawn_window(sync_single_entry(source_bs, std::nullopt, log_iter->log_id, log_iter->log_timestamp, false), cct->_conf->rgw_data_sync_spawn_window, std::nullopt); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 0b66b5c64dfc7..75293b88ba915 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -21,14 +21,18 @@ // represents an obligation to sync an entry up a given time struct rgw_data_sync_obligation { - std::string key; + rgw_bucket_shard bs; + std::optional gen; std::string marker; ceph::real_time timestamp; bool retry = false; }; inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) { - out << "key=" << o.key; + out << "key=" << o.bs; + if (o.gen) { + out << '[' << *o.gen << ']'; + } if (!o.marker.empty()) { out << " marker=" << o.marker; } diff --git a/src/rgw/rgw_sync_error_repo.cc b/src/rgw/rgw_sync_error_repo.cc index 1f332276d0d6b..75f552de93134 100644 --- a/src/rgw/rgw_sync_error_repo.cc +++ b/src/rgw/rgw_sync_error_repo.cc @@ -18,7 +18,73 @@ #include "services/svc_rados.h" #include "cls/cmpomap/client.h" -ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl) +namespace rgw::error_repo { + +// prefix for the binary encoding of keys. this particular value is not +// valid as the first byte of a utf8 code point, so we use this to +// differentiate the binary encoding from existing string keys for +// backward-compatibility +constexpr uint8_t binary_key_prefix = 0x80; + +struct key_type { + rgw_bucket_shard bs; + std::optional gen; +}; + +void encode(const key_type& k, bufferlist& bl, uint64_t f=0) +{ + ENCODE_START(1, 1, bl); + encode(k.bs, bl); + encode(k.gen, bl); + ENCODE_FINISH(bl); +} + +void decode(key_type& k, bufferlist::const_iterator& bl) +{ + DECODE_START(1, bl); + decode(k.bs, bl); + decode(k.gen, bl); + DECODE_FINISH(bl); +} + +std::string encode_key(const rgw_bucket_shard& bs, + std::optional gen) +{ + using ceph::encode; + const auto key = key_type{bs, gen}; + bufferlist bl; + encode(binary_key_prefix, bl); + encode(key, bl); + return bl.to_str(); +} + +int decode_key(std::string encoded, + rgw_bucket_shard& bs, + std::optional& gen) +{ + using ceph::decode; + key_type key; + const auto bl = bufferlist::static_from_string(encoded); + auto p = bl.cbegin(); + try { + uint8_t prefix; + decode(prefix, p); + if (prefix != binary_key_prefix) { + return -EINVAL; + } + decode(key, p); + } catch (const buffer::error&) { + return -EIO; + } + if (!p.end()) { + return -EIO; // buffer contained unexpected bytes + } + bs = std::move(key.bs); + gen = key.gen; + return 0; +} + +ceph::real_time decode_value(const bufferlist& bl) { uint64_t value; try { @@ -30,9 +96,9 @@ ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl) return ceph::real_clock::zero() + ceph::timespan(value); } -int rgw_error_repo_write(librados::ObjectWriteOperation& op, - const std::string& key, - ceph::real_time timestamp) +int write(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp) { // overwrite the existing timestamp if value is greater const uint64_t value = timestamp.time_since_epoch().count(); @@ -41,9 +107,9 @@ int rgw_error_repo_write(librados::ObjectWriteOperation& op, return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero); } -int rgw_error_repo_remove(librados::ObjectWriteOperation& op, - const std::string& key, - ceph::real_time timestamp) +int remove(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp) { // remove the omap key if value >= existing const uint64_t value = timestamp.time_since_epoch().count(); @@ -67,7 +133,7 @@ class RGWErrorRepoWriteCR : public RGWSimpleCoroutine { int send_request(const DoutPrefixProvider *dpp) override { librados::ObjectWriteOperation op; - int r = rgw_error_repo_write(op, key, timestamp); + int r = write(op, key, timestamp); if (r < 0) { return r; } @@ -85,10 +151,10 @@ class RGWErrorRepoWriteCR : public RGWSimpleCoroutine { } }; -RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados, - const rgw_raw_obj& obj, - const std::string& key, - ceph::real_time timestamp) +RGWCoroutine* write_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp) { return new RGWErrorRepoWriteCR(rados, obj, key, timestamp); } @@ -110,7 +176,7 @@ class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine { int send_request(const DoutPrefixProvider *dpp) override { librados::ObjectWriteOperation op; - int r = rgw_error_repo_remove(op, key, timestamp); + int r = remove(op, key, timestamp); if (r < 0) { return r; } @@ -128,10 +194,12 @@ class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine { } }; -RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados, - const rgw_raw_obj& obj, - const std::string& key, - ceph::real_time timestamp) +RGWCoroutine* remove_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp) { return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp); } + +} // namespace rgw::error_repo diff --git a/src/rgw/rgw_sync_error_repo.h b/src/rgw/rgw_sync_error_repo.h index 58b3b183eea5f..60525d281f0fb 100644 --- a/src/rgw/rgw_sync_error_repo.h +++ b/src/rgw/rgw_sync_error_repo.h @@ -14,6 +14,7 @@ #pragma once +#include #include "include/rados/librados_fwd.hpp" #include "include/buffer_fwd.h" #include "common/ceph_time.h" @@ -21,25 +22,38 @@ class RGWSI_RADOS; class RGWCoroutine; struct rgw_raw_obj; +struct rgw_bucket_shard; + +namespace rgw::error_repo { + +// binary-encode a bucket/shard/gen and return it as a string +std::string encode_key(const rgw_bucket_shard& bs, + std::optional gen); + +// try to decode a key. returns -EINVAL if not in binary format +int decode_key(std::string encoded, + rgw_bucket_shard& bs, + std::optional& gen); // decode a timestamp as a uint64_t for CMPXATTR_MODE_U64 -ceph::real_time rgw_error_repo_decode_value(const ceph::bufferlist& bl); +ceph::real_time decode_value(const ceph::bufferlist& bl); // write an omap key iff the given timestamp is newer -int rgw_error_repo_write(librados::ObjectWriteOperation& op, - const std::string& key, - ceph::real_time timestamp); -RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados, - const rgw_raw_obj& obj, - const std::string& key, - ceph::real_time timestamp); +int write(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp); +RGWCoroutine* write_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp); // remove an omap key iff there isn't a newer timestamp -int rgw_error_repo_remove(librados::ObjectWriteOperation& op, - const std::string& key, - ceph::real_time timestamp); -RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados, - const rgw_raw_obj& obj, - const std::string& key, - ceph::real_time timestamp); - +int remove(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp); +RGWCoroutine* remove_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp); + +} // namespace rgw::error_repo -- 2.39.5