From: Casey Bodley Date: Tue, 31 Mar 2020 13:22:54 +0000 (-0400) Subject: rgw: add error repo primitives with timestamp comparisons X-Git-Tag: wip-pdonnell-testing-20200918.022351~1523^2~21 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=59df17f1d9a9f178845eaa924d6c3ce6664879b4;p=ceph-ci.git rgw: add error repo primitives with timestamp comparisons the sync error repo stores omap keys for each datalog entry that needs a retry. this adds a new primitive for that, that also stores a timestamp with each omap key, and will only allow overwrites/removals when a newer timestamp is provided Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 7cbd18da3bb..4af33be7d3d 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -85,6 +85,7 @@ set(librgw_common_srcs rgw_sync.cc rgw_data_sync.cc rgw_sync_counters.cc + rgw_sync_error_repo.cc rgw_sync_module.cc rgw_sync_module_aws.cc rgw_sync_module_es.cc @@ -232,7 +233,7 @@ endif() target_link_libraries(rgw_a PRIVATE librados cls_otp_client cls_lock_client cls_rgw_client cls_refcount_client - cls_log_client cls_timeindex_client cls_version_client + cls_log_client cls_timeindex_client cls_version_client cls_cmpomap_client cls_user_client cls_rgw_gc_client ceph-common common_utf8 global ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} diff --git a/src/rgw/rgw_sync_error_repo.cc b/src/rgw/rgw_sync_error_repo.cc new file mode 100644 index 00000000000..e952ce91230 --- /dev/null +++ b/src/rgw/rgw_sync_error_repo.cc @@ -0,0 +1,137 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "rgw_sync_error_repo.h" +#include "rgw_coroutine.h" +#include "rgw_sal.h" +#include "services/svc_rados.h" +#include "cls/cmpomap/client.h" + +ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl) +{ + uint64_t value; + try { + using ceph::decode; + decode(value, bl); + } catch (const buffer::error&) { + value = 0; // empty buffer = 0 + } + return ceph::real_clock::zero() + ceph::timespan(value); +} + +int rgw_error_repo_write(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp) +{ + // overwrite the existing timestamp if value is greater + const uint64_t value = timestamp.time_since_epoch().count(); + using namespace cls::cmpomap; + const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys + return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero); +} + +int rgw_error_repo_remove(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp) +{ + // remove the omap key if value >= existing + const uint64_t value = timestamp.time_since_epoch().count(); + using namespace cls::cmpomap; + return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}}); +} + +class RGWErrorRepoWriteCR : public RGWSimpleCoroutine { + RGWSI_RADOS::Obj obj; + std::string key; + ceph::real_time timestamp; + + boost::intrusive_ptr cn; + public: + RGWErrorRepoWriteCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, + const std::string& key, ceph::real_time timestamp) + : RGWSimpleCoroutine(rados->ctx()), + obj(rados->obj(raw_obj)), + key(key), timestamp(timestamp) + {} + + int send_request() override { + librados::ObjectWriteOperation op; + int r = rgw_error_repo_write(op, key, timestamp); + if (r < 0) { + return r; + } + r = obj.open(); + if (r < 0) { + return r; + } + + cn = stack->create_completion_notifier(); + return obj.aio_operate(cn->completion(), &op); + } + + int request_complete() override { + return cn->completion()->get_return_value(); + } +}; + +RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp) +{ + return new RGWErrorRepoWriteCR(rados, obj, key, timestamp); +} + + +class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine { + RGWSI_RADOS::Obj obj; + std::string key; + ceph::real_time timestamp; + + boost::intrusive_ptr cn; + public: + RGWErrorRepoRemoveCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, + const std::string& key, ceph::real_time timestamp) + : RGWSimpleCoroutine(rados->ctx()), + obj(rados->obj(raw_obj)), + key(key), timestamp(timestamp) + {} + + int send_request() override { + librados::ObjectWriteOperation op; + int r = rgw_error_repo_remove(op, key, timestamp); + if (r < 0) { + return r; + } + r = obj.open(); + if (r < 0) { + return r; + } + + cn = stack->create_completion_notifier(); + return obj.aio_operate(cn->completion(), &op); + } + + int request_complete() override { + return cn->completion()->get_return_value(); + } +}; + +RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp) +{ + return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp); +} diff --git a/src/rgw/rgw_sync_error_repo.h b/src/rgw/rgw_sync_error_repo.h new file mode 100644 index 00000000000..f5169c0b280 --- /dev/null +++ b/src/rgw/rgw_sync_error_repo.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#pragma once + +#include "include/rados/librados_fwd.hpp" +#include "include/buffer_fwd.h" +#include "common/ceph_time.h" + +class RGWSI_RADOS; +class RGWCoroutine; +struct rgw_raw_obj; + +// decode a timestamp as a uint64_t for CMPXATTR_MODE_U64 +ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl); + +// write an omap key iff the given timestamp is newer +int rgw_error_repo_write(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp); +RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp); + +// remove an omap key iff there isn't a newer timestamp +int rgw_error_repo_remove(librados::ObjectWriteOperation& op, + const std::string& key, + ceph::real_time timestamp); +RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados, + const rgw_raw_obj& obj, + const std::string& key, + ceph::real_time timestamp); +