#include "rgw_cr_rados.h"
#include "rgw_sync_counters.h"
#include "rgw_bucket.h"
+#include "rgw_datalog_notify.h"
+#include "rgw_cr_rest.h"
+#include "rgw_rest_conn.h"
+#include "rgw_rados.h"
#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
#include "cls/rgw/cls_rgw_client.h"
#include <boost/asio/yield.hpp>
+#include <boost/container/flat_set.hpp>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
return r;
}
+
+
+int RGWDataPostNotifyCR::operate(const DoutPrefixProvider* dpp)
+{
+ reenter(this) {
+ using PostNotify2 = RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>>, int>;
+ yield {
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { "notify2", NULL },
+ { "source-zone", source_zone },
+ { NULL, NULL } };
+ call(new PostNotify2(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, nullptr));
+ }
+ if (retcode == -ERR_METHOD_NOT_ALLOWED) {
+ using PostNotify1 = RGWPostRESTResourceCR<rgw_data_notify_v1_encoder, int>;
+ yield {
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { "notify", NULL },
+ { "source-zone", source_zone },
+ { NULL, NULL } };
+ auto encoder = rgw_data_notify_v1_encoder{shards};
+ call(new PostNotify1(store->ctx(), conn, &http_manager, "/admin/log", pairs, encoder, nullptr));
+ }
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+}
#define dout_subsys ceph_subsys_rgw
+struct rgw_http_param_pair;
+class RGWRESTConn;
+
class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
int request_complete() override;
};
+class RGWDataPostNotifyCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager& http_manager;
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shards;
+ const char *source_zone;
+ RGWRESTConn *conn;
+
+public:
+ RGWDataPostNotifyCR(RGWRados *_store, RGWHTTPManager& _http_manager, bc::flat_map<int,
+ bc::flat_set<rgw_data_notify_entry> >& _shards, const char *_zone, RGWRESTConn *_conn)
+ : RGWCoroutine(_store->ctx()), store(_store), http_manager(_http_manager),
+ shards(_shards), source_zone(_zone), conn(_conn) {}
+
+ int operate(const DoutPrefixProvider* dpp) override;
+};
+
#endif
boost::asio::coroutine full_cr;
- set<string> modified_shards;
- set<string> current_modified;
+ bc::flat_set<rgw_data_notify_entry> modified_shards;
+ bc::flat_set<rgw_data_notify_entry> current_modified;
- set<string>::iterator modified_iter;
+ bc::flat_set<rgw_data_notify_entry>::iterator modified_iter;
uint64_t total_entries = 0;
static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW;
}
}
- void append_modified_shards(set<string>& keys) {
+ void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& entries) {
std::lock_guard l{inc_lock};
- modified_shards.insert(keys.begin(), keys.end());
+ modified_shards.insert(entries.begin(), entries.end());
}
int operate(const DoutPrefixProvider *dpp) override {
}
/* process out of band updates */
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
- retcode = parse_bucket_key(*modified_iter, source_bs);
+ retcode = parse_bucket_key(modified_iter->key, source_bs);
if (retcode < 0) {
- tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter));
+ tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key));
continue;
}
- tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(sync_single_entry(source_bs, std::nullopt, string(),
+ tn->log(20, SSTR("received async update notification: " << modified_iter->key));
+ spawn(sync_single_entry(source_bs, modified_iter->gen, string(),
ceph::real_time{}, false), false);
}
&sync_marker);
}
- void append_modified_shards(set<string>& keys) {
+ void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& keys) {
std::lock_guard l{cr_lock()};
RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
sync_status.sync_info);
}
- void wakeup(int shard_id, set<string>& keys) {
+ void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
std::lock_guard l{shard_crs_lock};
map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
if (iter == shard_crs.end()) {
return;
}
- iter->second->append_modified_shards(keys);
+ iter->second->append_modified_shards(entries);
iter->second->wakeup();
}
};
return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr());
}
- void wakeup(int shard_id, set<string>& keys) {
+ void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
ceph::mutex& m = cr_lock();
m.lock();
m.unlock();
if (cr) {
- tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
- cr->wakeup(shard_id, keys);
+ cr->wakeup(shard_id, entries);
}
cr->put();
}
};
-void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
+void RGWRemoteDataLog::wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
std::shared_lock rl{lock};
if (!data_sync_cr) {
return;
}
- data_sync_cr->wakeup(shard_id, keys);
+ data_sync_cr->wakeup(shard_id, entries);
}
int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards)
int init_sync_status(const DoutPrefixProvider *dpp, int num_shards);
int run_sync(const DoutPrefixProvider *dpp, int num_shards);
- void wakeup(int shard_id, std::set<std::string>& keys);
+ void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries);
};
class RGWDataSyncStatusManager : public DoutPrefixProvider {
int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); }
- void wakeup(int shard_id, std::set<std::string>& keys) { return source_log.wakeup(shard_id, keys); }
+ void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) { return source_log.wakeup(shard_id, entries); }
+
void stop() {
source_log.finish();
}
JSONDecoder::decode_json("entry", entry, obj);
}
+void rgw_data_notify_entry::dump(Formatter *f) const
+{
+ encode_json("key", key, f);
+ encode_json("gen", gen, f);
+}
+
+void rgw_data_notify_entry::decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("key", key, obj);
+ JSONDecoder::decode_json("gen", gen, obj);
+}
+
class RGWDataChangesOmap final : public RGWDataChangesBE {
using centries = std::list<cls_log_entry>;
std::vector<std::string> oids;
rgw_bucket_shard bs(bucket, shard_id);
int index = choose_oid(bs);
- mark_modified(index, bs);
+
+ mark_modified(index, bs, gen.gen);
std::unique_lock l(lock);
renew_cond.notify_all();
}
-void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
+void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen)
{
auto key = bs.get_key();
{
std::shared_lock rl{modified_lock}; // read lock to check for existence
auto shard = modified_shards.find(shard_id);
- if (shard != modified_shards.end() && shard->second.count(key)) {
+ if (shard != modified_shards.end() && shard->second.count({key, gen})) {
return;
}
}
std::unique_lock wl{modified_lock}; // write lock for insertion
- modified_shards[shard_id].insert(key);
+ modified_shards[shard_id].insert(rgw_data_notify_entry{key, gen});
}
std::string RGWDataChangesLog::max_marker() const {
#include <vector>
#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
rgw_data_notify_entry& operator=(const rgw_data_notify_entry&) = default;
- bool operator<(const rgw_data_notify_entry& d) const {
+ bool operator <(const rgw_data_notify_entry& d) const {
if (key < d.key) {
return true;
}
}
return gen < d.gen;
}
+ friend std::ostream& operator <<(std::ostream& m,
+ const rgw_data_notify_entry& e) {
+ return m << "[key: " << e.key << ", gen: " << e.gen << "]";
+ }
};
class RGWDataChangesBE;
ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
ceph::shared_mutex modified_lock =
ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
- bc::flat_map<int, bc::flat_set<std::string>> modified_shards;
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
std::atomic<bool> down_flag = { false };
std::vector<rgw_data_change_log_entry>& entries,
LogMarker& marker, bool* ptruncated);
- void mark_modified(int shard_id, const rgw_bucket_shard& bs);
+ void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
auto read_clear_modified() {
std::unique_lock wl{modified_lock};
decltype(modified_shards) modified;
return -EACCES;
case 404:
return -ENOENT;
+ case 405:
+ return -ERR_METHOD_NOT_ALLOWED;
case 409:
return -ENOTEMPTY;
case 503:
#include "rgw_etag_verifier.h"
#include "rgw_worker.h"
#include "rgw_notify.h"
+#include "rgw_http_errors.h"
#undef fork // fails to compile RGWPeriod::fork() below
#include "rgw_data_sync.h"
#include "rgw_realm_watcher.h"
#include "rgw_reshard.h"
+#include "rgw_cr_rados.h"
#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
}
int notify_all(const DoutPrefixProvider *dpp, map<rgw_zone_id, RGWRESTConn *>& conn_map,
- bc::flat_map<int, bc::flat_set<string> >& shards) {
- rgw_http_param_pair pairs[] = { { "type", "data" },
- { "notify", NULL },
- { "source-zone", store->svc.zone->get_zone_params().get_id().c_str() },
- { NULL, NULL } };
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shards) {
list<RGWCoroutinesStack *> stacks;
+ const char *source_zone = store->svc.zone->get_zone_params().get_id().c_str();
for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) {
RGWRESTConn *conn = iter->second;
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- stack->call(new RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
-
+ stack->call(new RGWDataPostNotifyCR(store, http_manager, shards, source_zone, conn));
stacks.push_back(stack);
}
+
return run(dpp, stacks);
}
};
class RGWDataNotifier : public RGWRadosThread {
RGWDataNotifierManager notify_mgr;
+ bc::flat_set<rgw_data_notify_entry> entry;
uint64_t interval_msec() override {
return cct->_conf.get_val<int64_t>("rgw_data_notify_interval_msec");
return 0;
}
- for (const auto& [shard_id, keys] : shards) {
- ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id="
- << shard_id << ": " << keys << dendl;
+ for (const auto& [shard_id, entries] : shards) {
+ bc::flat_set<rgw_data_notify_entry>::iterator it;
+ for (const auto& entry : entries) {
+ ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id="
+ << shard_id << ":" << entry.gen << ":" << entry.key << dendl;
+ }
}
notify_mgr.notify_all(dpp, store->svc.zone->get_zone_data_notify_to_map(), shards);
sync(_store, async_rados, source_zone->id, counters.get()),
initialized(false) {}
- void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
- for (map<int, set<string> >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
+ void wakeup_sync_shards(bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries) {
+ for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
sync.wakeup(iter->first, iter->second);
}
}
+
RGWDataSyncStatusManager* get_manager() { return &sync; }
int init(const DoutPrefixProvider *dpp) override {
}
}
-void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map<int, set<string> >& shard_ids)
+void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries)
{
- ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", shard_ids=" << shard_ids << dendl;
+ ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", entries=" << entries << dendl;
+ for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+ ldpp_dout(dpp, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
+ bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+ for (const auto& [key, gen] : entries) {
+ ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", key=" << key
+ << ", gen=" << gen << dendl;
+ }
+ }
+
std::lock_guard l{data_sync_thread_lock};
auto iter = data_sync_processor_threads.find(source_zone);
if (iter == data_sync_processor_threads.end()) {
RGWDataSyncProcessorThread *thread = iter->second;
ceph_assert(thread);
- thread->wakeup_sync_shards(shard_ids);
+ thread->wakeup_sync_shards(entries);
}
RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager()
#include <functional>
#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
#include "include/rados/librados.hpp"
#include "include/Context.h"
int delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp, bool check_empty = true);
void wakeup_meta_sync_shards(std::set<int>& shard_ids);
- void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
- const rgw_zone_id& source_zone,
- std::map<int, std::set<std::string> >& shard_ids);
+
+ void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries);
RGWMetaSyncStatusManager* get_meta_sync_manager();
RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone);
#include "rgw_common.h"
#include "rgw_zone.h"
#include "rgw_mdlog.h"
+#include "rgw_datalog_notify.h"
#include "services/svc_zone.h"
#include "services/svc_mdlog.h"
return;
}
- map<int, set<string> > updated_shards;
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> updated_shards;
+ try {
+ auto decoder = rgw_data_notify_v1_decoder{updated_shards};
+ decode_json_obj(decoder, &p);
+ } catch (JSONDecoder::err& err) {
+ ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+ for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+ ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
+ bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+ for (const auto& [key, gen] : entries) {
+ ldpp_dout(this, 20) << __func__ << "(): modified key=" << key
+ << " of gen=" << gen << dendl;
+ }
+ }
+ }
+
+ store->wakeup_data_sync_shards(this, source_zone, updated_shards);
+
+ op_ret = 0;
+}
+
+void RGWOp_DATALog_Notify2::execute(optional_yield y) {
+ string source_zone = s->info.args.get("source-zone");
+#define LARGE_ENOUGH_BUF (128 * 1024)
+
+ int r = 0;
+ bufferlist data;
+ std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
+ if (r < 0) {
+ op_ret = r;
+ return;
+ }
+
+ char* buf = data.c_str();
+ ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl;
+
+ JSONParser p;
+ r = p.parse(buf, data.length());
+ if (r < 0) {
+ ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
+ op_ret = r;
+ return;
+ }
+
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> > updated_shards;
try {
decode_json_obj(updated_shards, &p);
} catch (JSONDecoder::err& err) {
}
if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
- for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+ for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter =
+ updated_shards.begin(); iter != updated_shards.end(); ++iter) {
ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
- set<string>& keys = iter->second;
- for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
- ldpp_dout(this, 20) << __func__ << "(): modified key=" << *kiter << dendl;
+ bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+ for (const auto& [key, gen] : entries) {
+ ldpp_dout(this, 20) << __func__ << "(): modified key=" << key <<
+ " of generation=" << gen << dendl;
}
}
}
else if (s->info.args.exists("notify"))
return new RGWOp_MDLog_Notify;
} else if (type.compare("data") == 0) {
- if (s->info.args.exists("notify"))
+ if (s->info.args.exists("notify")) {
return new RGWOp_DATALog_Notify;
+ } else if (s->info.args.exists("notify2")) {
+ return new RGWOp_DATALog_Notify2;
+ }
}
return NULL;
}
RGWOpType get_type() override { return RGW_OP_SYNC_DATALOG_NOTIFY; }
};
+class RGWOp_DATALog_Notify2 : public RGWRESTOp {
+ rgw_data_notify_entry data_notify;
+public:
+ RGWOp_DATALog_Notify2() {}
+ ~RGWOp_DATALog_Notify2() override {}
+
+ int check_caps(const RGWUserCaps& caps) override {
+ return caps.check_cap("datalog", RGW_CAP_WRITE);
+ }
+ void execute(optional_yield y) override;
+ const char* name() const override {
+ return "datalog_notify2";
+ }
+};
+
class RGWOp_DATALog_Delete : public RGWRESTOp {
public:
RGWOp_DATALog_Delete() {}
#include "rgw_user.h"
#include "rgw_notify_event_type.h"
+#include "rgw_datalog_notify.h"
class RGWGetDataCB;
struct RGWObjState;
class RGWObjManifest;
struct RGWZoneGroup;
struct RGWZoneParams;
-struct RGWRealm;
+class RGWRealm;
struct RGWCtl;
struct rgw_user_bucket;
class RGWUsageBatch;
optional_yield y) = 0;
virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) = 0;
virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) = 0;
- virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map<int, std::set<std::string> >& shard_ids) = 0;
+ virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+ const rgw_zone_id& source_zone,
+ boost::container::flat_map<
+ int,
+ boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) = 0;
virtual int clear_usage(const DoutPrefixProvider *dpp) = 0;
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool* is_truncated,
RGWBucketSyncPolicyHandlerRef *phandler,
optional_yield y) override;
virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override;
- virtual void wakeup_meta_sync_shards(set<int>& shard_ids) override { return; }
- virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map<int, set<string> >& shard_ids) override { return; }
+ virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) override { return; }
+ virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+ const rgw_zone_id& source_zone,
+ boost::container::flat_map<
+ int,
+ boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) override { return; }
virtual int clear_usage(const DoutPrefixProvider *dpp) override { return 0; }
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool *is_truncated,
optional_yield y) override;
virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override;
virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) override { rados->wakeup_meta_sync_shards(shard_ids); }
- virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map<int, std::set<std::string> >& shard_ids) override { rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids); }
+ virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+ const rgw_zone_id& source_zone,
+ bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shard_ids) override {
+ rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids);
+ }
virtual int clear_usage(const DoutPrefixProvider *dpp) override { return rados->clear_usage(dpp); }
virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
uint32_t max_entries, bool* is_truncated,
{
// overwrite the existing timestamp if value is greater
const uint64_t value = timestamp.time_since_epoch().count();
- using namespace cls::cmpomap;
+ using namespace ::cls::cmpomap;
const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys
return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero);
}
{
// remove the omap key if value >= existing
const uint64_t value = timestamp.time_since_epoch().count();
- using namespace cls::cmpomap;
+ using namespace ::cls::cmpomap;
return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}});
}