--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ * Copyright 2013 Inktank
+ */
+
+#include "rgw_replica_log.h"
+#include "cls/replica_log/cls_replica_log_client.h"
+#include "rgw_rados.h"
+
+RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
+ cct(_store->cct), store(_store) {}
+
+int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
+{
+ int r = store->rados->ioctx_create(pool.c_str(), ctx);
+ if (r < 0) {
+ lderr(cct) << "ERROR: could not open rados pool "
+ << pool << dendl;
+ }
+ return r;
+}
+
+int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
+ const string& daemon_id,
+ const string& marker, const utime_t& time,
+ const list<pair<string, utime_t> > *entries)
+{
+ cls_replica_log_progress_marker progress;
+ cls_replica_log_prepare_marker(progress, daemon_id, marker, time,
+ entries);
+
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation opw;
+ cls_replica_log_update_bound(opw, progress);
+ return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
+ const string& daemon_id)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation opw;
+ cls_replica_log_delete_bound(opw, daemon_id);
+ return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::get_bounds(const string& oid, const string& pool,
+ string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers);
+}
+
+void RGWReplicaLogger::get_bound_info(
+ const cls_replica_log_progress_marker& progress,
+ string& entity, string& marker,
+ utime_t time,
+ list<pair<string, utime_t> >& entries) {
+ cls_replica_log_extract_marker(progress, entity, marker, time, entries);
+}
+
+RGWReplicaObjectLogger::
+RGWReplicaObjectLogger(RGWRados *_store,
+ const string& _pool,
+ const string& _prefix) : RGWReplicaLogger(_store),
+ pool(_pool), prefix(_prefix) {
+ if (pool.empty())
+ store->get_log_pool_name(pool);
+}
+
+int RGWReplicaObjectLogger::create_log_objects(int shards)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+ for (int i = 0; i < shards; ++i) {
+ string oid;
+ get_shard_oid(i, oid);
+ r = ioctx.create(oid, false);
+ if (r < 0)
+ return r;
+ }
+ return r;
+}
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef RGW_REPLICA_LOG_H_
+#define RGW_REPLICA_LOG_H_
+
+#include <string>
+#include "cls/replica_log/cls_replica_log_types.h"
+#include "include/types.h"
+#include "include/utime.h"
+#include "include/rados/librados.hpp"
+#include "rgw_common.h"
+
+class RGWRados;
+class CephContext;
+
+using namespace std;
+
+#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog."
+#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog."
+
+class RGWReplicaLogger {
+protected:
+ CephContext *cct;
+ RGWRados *store;
+ int open_ioctx(librados::IoCtx& ctx, const string& pool);
+
+ RGWReplicaLogger(RGWRados *_store);
+
+ int update_bound(const string& oid, const string& pool,
+ const string& daemon_id, const string& marker,
+ const utime_t& time,
+ const list<pair<string, utime_t> > *entries);
+ int delete_bound(const string& oid, const string& pool,
+ const string& daemon_id);
+ int get_bounds(const string& oid, const string& pool,
+ string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers);
+
+public:
+ static void get_bound_info(const cls_replica_log_progress_marker& progress,
+ string& entity, string& marker,
+ utime_t time,
+ list<pair<string, utime_t> >& entries);
+};
+
+class RGWReplicaObjectLogger : private RGWReplicaLogger {
+ string pool;
+ string prefix;
+
+ void get_shard_oid(int id, string& oid) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", id);
+ oid = prefix + buf;
+ }
+
+public:
+ RGWReplicaObjectLogger(RGWRados *_store,
+ const string& _pool,
+ const string& _prefix);
+
+ int create_log_objects(int shards);
+ int update_bound(int shard, const string& daemon_id, const string& marker,
+ const utime_t& time,
+ const list<pair<string, utime_t> > *entries) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::update_bound(oid, pool,
+ daemon_id, marker, time, entries);
+ }
+ int delete_bound(int shard, const string& daemon_id) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::delete_bound(oid, pool,
+ daemon_id);
+ }
+ int get_bounds(int shard, string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::get_bounds(oid, pool,
+ marker, oldest_time, markers);
+ }
+};
+
+class RGWReplicaBucketLogger : private RGWReplicaLogger {
+public:
+ RGWReplicaBucketLogger(RGWRados *_store) :
+ RGWReplicaLogger(_store) {}
+ int update_bound(const rgw_bucket& bucket, const string& daemon_id,
+ const string& marker, const utime_t& time,
+ const list<pair<string, utime_t> > *entries) {
+ return RGWReplicaLogger::update_bound(bucket.name, bucket.index_pool,
+ daemon_id, marker, time, entries);
+ }
+ int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
+ return RGWReplicaLogger::delete_bound(bucket.name, bucket.index_pool,
+ daemon_id);
+ }
+ int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers) {
+ return RGWReplicaLogger::get_bounds(bucket.name, bucket.index_pool,
+ marker, oldest_time, markers);
+ }
+};
+
+#endif /* RGW_REPLICA_LOG_H_ */