}
std::string rgw_bucket_shard::get_key(char tenant_delim, char id_delim,
- char shard_delim) const
+ char shard_delim, size_t reserve) const
{
static constexpr size_t shard_len{12}; // ":4294967295\0"
- auto key = bucket.get_key(tenant_delim, id_delim, shard_len);
+ auto key = bucket.get_key(tenant_delim, id_delim, reserve + shard_len);
if (shard_id >= 0 && shard_delim) {
key.append(1, shard_delim);
key.append(std::to_string(shard_id));
return key;
}
+void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f)
+{
+ encode(b.bucket, bl, f);
+ encode(b.shard_id, bl, f);
+}
+
+void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl)
+{
+ decode(b.bucket, bl);
+ decode(b.shard_id, bl);
+}
+
void encode_json_impl(const char *name, const rgw_zone_id& zid, Formatter *f)
{
encode_json(name, zid.id, f);
rgw_bucket_shard(const rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
std::string get_key(char tenant_delim = '/', char id_delim = ':',
- char shard_delim = ':') const;
+ char shard_delim = ':',
+ size_t reserve = 0) const;
bool operator<(const rgw_bucket_shard& b) const {
if (bucket < b.bucket) {
}
};
+void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f=0);
+void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl);
+
inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_shard& bs) {
if (bs.shard_id <= 0) {
return out << bs.bucket;
JSONDecoder::decode_json("entries", entries, obj);
};
+// print a bucket shard with [gen]
+std::string to_string(const rgw_bucket_shard& bs, std::optional<uint64_t> gen)
+{
+ constexpr auto digits10 = std::numeric_limits<uint64_t>::digits10;
+ constexpr auto reserve = 2 + digits10; // [value]
+ auto str = bs.get_key('/', ':', ':', reserve);
+ str.append(1, '[');
+ str.append(std::to_string(gen.value_or(0)));
+ str.append(1, ']');
+ return str;
+}
class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
static constexpr int MAX_CONCURRENT_SHARDS = 16;
marker_tracker(_marker_tracker), error_repo(error_repo),
lease_cr(std::move(lease_cr)) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
- tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
+ tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", to_string(obligation.bs, obligation.gen));
}
int operate() override {
// this was added when 'tenant/' was added to datalog entries, because
// preexisting tenant buckets could never sync and would stay in the
// error_repo forever
- tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key));
+ tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->bs));
sync_status = 0;
}
if (sync_status < 0) {
// write actual sync failures for 'radosgw-admin sync error list'
if (sync_status != -EBUSY && sync_status != -EAGAIN) {
- yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", complete->key,
+ yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data",
+ to_string(complete->bs, complete->gen),
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
if (complete->timestamp != ceph::real_time{}) {
tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
- yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo,
- complete->key, complete->timestamp));
+ yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(complete->bs, complete->gen),
+ complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
}
}
} else if (complete->retry) {
- yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
- complete->key, complete->timestamp));
+ yield call(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+ rgw::error_repo::encode_key(complete->bs, complete->gen),
+ complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
<< error_repo << " retcode=" << retcode));
&bs.bucket, &bs.shard_id);
}
RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
- const std::string& key,
+ std::optional<uint64_t> gen,
const std::string& marker,
ceph::real_time timestamp, bool retry) {
auto state = bucket_shard_cache->get(src);
- auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
+ auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
&*marker_tracker, error_repo,
lease_cr.get(), tn);
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- spawn(sync_single_entry(source_bs, iter->first, iter->first,
+ spawn(sync_single_entry(source_bs, std::nullopt, iter->first,
entry_timestamp, false), false);
}
sync_marker.marker = iter->first;
continue;
}
tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(sync_single_entry(source_bs, *modified_iter, string(),
+ spawn(sync_single_entry(source_bs, std::nullopt, string(),
ceph::real_time{}, false), false);
}
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
error_marker = iter->first;
- entry_timestamp = rgw_error_repo_decode_value(iter->second);
- retcode = parse_bucket_key(error_marker, source_bs);
+ entry_timestamp = rgw::error_repo::decode_value(iter->second);
+ std::optional<uint64_t> gen;
+ retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
+ if (retcode == -EINVAL) {
+ // backward compatibility for string keys that don't encode a gen
+ retcode = parse_bucket_key(error_marker, source_bs);
+ }
if (retcode < 0) {
tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
- spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
- error_marker, entry_timestamp), false);
+ spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+ error_marker, entry_timestamp), false);
continue;
}
- tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
- spawn(sync_single_entry(source_bs, error_marker, "",
- entry_timestamp, true), false);
+ tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
+ spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
}
if (!omapvals->more) {
error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
- spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
+ spawn(sync_single_entry(source_bs, std::nullopt, log_iter->log_id,
log_iter->log_timestamp, false), false);
}
// represents an obligation to sync an entry up a given time
struct rgw_data_sync_obligation {
- std::string key;
+ rgw_bucket_shard bs;
+ std::optional<uint64_t> gen;
std::string marker;
ceph::real_time timestamp;
bool retry = false;
};
inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) {
- out << "key=" << o.key;
+ out << "key=" << o.bs;
+ if (o.gen) {
+ out << '[' << *o.gen << ']';
+ }
if (!o.marker.empty()) {
out << " marker=" << o.marker;
}
#include "services/svc_rados.h"
#include "cls/cmpomap/client.h"
-ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl)
+namespace rgw::error_repo {
+
+// prefix for the binary encoding of keys. this particular value is not
+// valid as the first byte of a utf8 code point, so we use this to
+// differentiate the binary encoding from existing string keys for
+// backward-compatibility
+constexpr uint8_t binary_key_prefix = 0x80;
+
+struct key_type {
+ rgw_bucket_shard bs;
+ std::optional<uint64_t> gen;
+};
+
+void encode(const key_type& k, bufferlist& bl, uint64_t f=0)
+{
+ ENCODE_START(1, 1, bl);
+ encode(k.bs, bl);
+ encode(k.gen, bl);
+ ENCODE_FINISH(bl);
+}
+
+void decode(key_type& k, bufferlist::const_iterator& bl)
+{
+ DECODE_START(1, bl);
+ decode(k.bs, bl);
+ decode(k.gen, bl);
+ DECODE_FINISH(bl);
+}
+
+std::string encode_key(const rgw_bucket_shard& bs,
+ std::optional<uint64_t> gen)
+{
+ using ceph::encode;
+ const auto key = key_type{bs, gen};
+ bufferlist bl;
+ encode(binary_key_prefix, bl);
+ encode(key, bl);
+ return bl.to_str();
+}
+
+int decode_key(std::string encoded,
+ rgw_bucket_shard& bs,
+ std::optional<uint64_t>& gen)
+{
+ using ceph::decode;
+ key_type key;
+ const auto bl = bufferlist::static_from_string(encoded);
+ auto p = bl.cbegin();
+ try {
+ uint8_t prefix;
+ decode(prefix, p);
+ if (prefix != binary_key_prefix) {
+ return -EINVAL;
+ }
+ decode(key, p);
+ } catch (const buffer::error&) {
+ return -EIO;
+ }
+ if (!p.end()) {
+ return -EIO; // buffer contained unexpected bytes
+ }
+ bs = std::move(key.bs);
+ gen = key.gen;
+ return 0;
+}
+
+ceph::real_time decode_value(const bufferlist& bl)
{
uint64_t value;
try {
return ceph::real_clock::zero() + ceph::timespan(value);
}
-int rgw_error_repo_write(librados::ObjectWriteOperation& op,
- const std::string& key,
- ceph::real_time timestamp)
+int write(librados::ObjectWriteOperation& op,
+ const std::string& key,
+ ceph::real_time timestamp)
{
// overwrite the existing timestamp if value is greater
const uint64_t value = timestamp.time_since_epoch().count();
return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero);
}
-int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
- const std::string& key,
- ceph::real_time timestamp)
+int remove(librados::ObjectWriteOperation& op,
+ const std::string& key,
+ ceph::real_time timestamp)
{
// remove the omap key if value >= existing
const uint64_t value = timestamp.time_since_epoch().count();
int send_request() override {
librados::ObjectWriteOperation op;
- int r = rgw_error_repo_write(op, key, timestamp);
+ int r = write(op, key, timestamp);
if (r < 0) {
return r;
}
}
};
-RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
- const rgw_raw_obj& obj,
- const std::string& key,
- ceph::real_time timestamp)
+RGWCoroutine* write_cr(RGWSI_RADOS* rados,
+ const rgw_raw_obj& obj,
+ const std::string& key,
+ ceph::real_time timestamp)
{
return new RGWErrorRepoWriteCR(rados, obj, key, timestamp);
}
int send_request() override {
librados::ObjectWriteOperation op;
- int r = rgw_error_repo_remove(op, key, timestamp);
+ int r = remove(op, key, timestamp);
if (r < 0) {
return r;
}
}
};
-RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
- const rgw_raw_obj& obj,
- const std::string& key,
- ceph::real_time timestamp)
+RGWCoroutine* remove_cr(RGWSI_RADOS* rados,
+ const rgw_raw_obj& obj,
+ const std::string& key,
+ ceph::real_time timestamp)
{
return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp);
}
+
+} // namespace rgw::error_repo
#pragma once
+#include <optional>
#include "include/rados/librados_fwd.hpp"
#include "include/buffer_fwd.h"
#include "common/ceph_time.h"
class RGWSI_RADOS;
class RGWCoroutine;
struct rgw_raw_obj;
+struct rgw_bucket_shard;
+
+namespace rgw::error_repo {
+
+// binary-encode a bucket/shard/gen and return it as a string
+std::string encode_key(const rgw_bucket_shard& bs,
+ std::optional<uint64_t> gen);
+
+// try to decode a key. returns -EINVAL if not in binary format
+int decode_key(std::string encoded,
+ rgw_bucket_shard& bs,
+ std::optional<uint64_t>& gen);
// decode a timestamp as a uint64_t for CMPXATTR_MODE_U64
-ceph::real_time rgw_error_repo_decode_value(const ceph::bufferlist& bl);
+ceph::real_time decode_value(const ceph::bufferlist& bl);
// write an omap key iff the given timestamp is newer
-int rgw_error_repo_write(librados::ObjectWriteOperation& op,
- const std::string& key,
- ceph::real_time timestamp);
-RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
- const rgw_raw_obj& obj,
- const std::string& key,
- ceph::real_time timestamp);
+int write(librados::ObjectWriteOperation& op,
+ const std::string& key,
+ ceph::real_time timestamp);
+RGWCoroutine* write_cr(RGWSI_RADOS* rados,
+ const rgw_raw_obj& obj,
+ const std::string& key,
+ ceph::real_time timestamp);
// remove an omap key iff there isn't a newer timestamp
-int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
- const std::string& key,
- ceph::real_time timestamp);
-RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
- const rgw_raw_obj& obj,
- const std::string& key,
- ceph::real_time timestamp);
-
+int remove(librados::ObjectWriteOperation& op,
+ const std::string& key,
+ ceph::real_time timestamp);
+RGWCoroutine* remove_cr(RGWSI_RADOS* rados,
+ const rgw_raw_obj& obj,
+ const std::string& key,
+ ceph::real_time timestamp);
+
+} // namespace rgw::error_repo