]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_replica_log: integrate with RGWRados
authorGreg Farnum <greg@inktank.com>
Thu, 20 Jun 2013 18:01:01 +0000 (11:01 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 20 Jun 2013 21:10:35 +0000 (14:10 -0700)
We introduce an implementation class RGWReplicaLogger, and two user
classes RGWReplicaObjectLogger (for the data/metadata logs) and
RGWReplicaBucketLogger (for the bucket logs).

Signed-off-by: Greg Farnum <greg@inktank.com>
src/Makefile.am
src/rgw/rgw_rados.h
src/rgw/rgw_replica_log.cc [new file with mode: 0644]
src/rgw/rgw_replica_log.h [new file with mode: 0644]

index 3155d764d97ebe8a2a181bf9f8dfd5577c01432e..245e5541a0bcf18dcb7c72efc9858344c91132c0 100644 (file)
@@ -401,14 +401,16 @@ librgw_a_SOURCES =  \
        rgw/rgw_cors.cc \
        rgw/rgw_cors_s3.cc \
         rgw/rgw_auth_s3.cc \
-       rgw/rgw_metadata.cc
+       rgw/rgw_metadata.cc \
+       rgw/rgw_replica_log.cc
 librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS}
 librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
 noinst_LIBRARIES += librgw.a
 
 my_radosgw_ldadd = \
-       libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a libcls_statelog_client.a \
-       libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
+       libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \
+       libcls_statelog_client.a libcls_replica_log_client.a libcls_lock_client.a \
+       libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
        $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
 
 radosgw_SOURCES = \
@@ -420,6 +422,7 @@ radosgw_SOURCES = \
         rgw/rgw_rest_user.cc \
         rgw/rgw_rest_bucket.cc \
         rgw/rgw_rest_metadata.cc \
+        rgw/rgw_replica_log.cc \
         rgw/rgw_rest_log.cc \
         rgw/rgw_http_client.cc \
         rgw/rgw_swift.cc \
@@ -2125,6 +2128,7 @@ noinst_HEADERS = \
        rgw/rgw_swift.h\
        rgw/rgw_swift_auth.h\
        rgw/rgw_rados.h\
+       rgw/rgw_replica_log.h \
        rgw/rgw_resolve.h\
        rgw/rgw_rest.h\
        rgw/rgw_rest_swift.h\
index f292f47cb0e0ec2a9613c02b27c8010316f0762c..70537ba5bef5f63d9df542ffa919655a287e9732 100644 (file)
@@ -563,6 +563,7 @@ struct RGWRegionMap {
 WRITE_CLASS_ENCODER(RGWRegionMap);
 
 class RGWDataChangesLog;
+class RGWReplicaLogger;
   
 class RGWStateLog {
   RGWRados *store;
@@ -653,6 +654,7 @@ class RGWRados
 {
   friend class RGWGC;
   friend class RGWStateLog;
+  friend class RGWReplicaLogger;
 
   /** Open the pool used as root for this gateway */
   int open_root_pool_ctx();
diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc
new file mode 100644 (file)
index 0000000..9934902
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ * Copyright 2013 Inktank
+ */
+
+#include "rgw_replica_log.h"
+#include "cls/replica_log/cls_replica_log_client.h"
+#include "rgw_rados.h"
+
+RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
+    cct(_store->cct), store(_store) {}
+
+int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
+{
+  int r = store->rados->ioctx_create(pool.c_str(), ctx);
+  if (r < 0) {
+    lderr(cct) << "ERROR: could not open rados pool "
+              << pool << dendl;
+  }
+  return r;
+}
+
+int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
+                                   const string& daemon_id,
+                                   const string& marker, const utime_t& time,
+                                   const list<pair<string, utime_t> > *entries)
+{
+  cls_replica_log_progress_marker progress;
+  cls_replica_log_prepare_marker(progress, daemon_id, marker, time,
+                                 entries);
+
+  librados::IoCtx ioctx;
+  int r = open_ioctx(ioctx, pool);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation opw;
+  cls_replica_log_update_bound(opw, progress);
+  return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
+                                   const string& daemon_id)
+{
+  librados::IoCtx ioctx;
+  int r = open_ioctx(ioctx, pool);
+  if (r < 0) {
+    return r;
+  }
+
+  librados::ObjectWriteOperation opw;
+  cls_replica_log_delete_bound(opw, daemon_id);
+  return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::get_bounds(const string& oid, const string& pool,
+                                 string& marker, utime_t& oldest_time,
+                                 list<cls_replica_log_progress_marker>& markers)
+{
+  librados::IoCtx ioctx;
+  int r = open_ioctx(ioctx, pool);
+  if (r < 0) {
+    return r;
+  }
+
+  return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers);
+}
+
+void RGWReplicaLogger::get_bound_info(
+    const cls_replica_log_progress_marker& progress,
+    string& entity, string& marker,
+    utime_t time,
+    list<pair<string, utime_t> >& entries) {
+  cls_replica_log_extract_marker(progress, entity, marker, time, entries);
+}
+
+RGWReplicaObjectLogger::
+RGWReplicaObjectLogger(RGWRados *_store,
+                       const string& _pool,
+                       const string& _prefix) : RGWReplicaLogger(_store),
+                       pool(_pool), prefix(_prefix) {
+  if (pool.empty())
+    store->get_log_pool_name(pool);
+}
+
+int RGWReplicaObjectLogger::create_log_objects(int shards)
+{
+  librados::IoCtx ioctx;
+  int r = open_ioctx(ioctx, pool);
+  if (r < 0) {
+    return r;
+  }
+  for (int i = 0; i < shards; ++i) {
+    string oid;
+    get_shard_oid(i, oid);
+    r = ioctx.create(oid, false);
+    if (r < 0)
+      return r;
+  }
+  return r;
+}
diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h
new file mode 100644 (file)
index 0000000..fd461c2
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef RGW_REPLICA_LOG_H_
+#define RGW_REPLICA_LOG_H_
+
+#include <string>
+#include "cls/replica_log/cls_replica_log_types.h"
+#include "include/types.h"
+#include "include/utime.h"
+#include "include/rados/librados.hpp"
+#include "rgw_common.h"
+
+class RGWRados;
+class CephContext;
+
+using namespace std;
+
+#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog."
+#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog."
+
+class RGWReplicaLogger {
+protected:
+  CephContext *cct;
+  RGWRados *store;
+  int open_ioctx(librados::IoCtx& ctx, const string& pool);
+
+  RGWReplicaLogger(RGWRados *_store);
+
+  int update_bound(const string& oid, const string& pool,
+                   const string& daemon_id, const string& marker,
+                   const utime_t& time,
+                   const list<pair<string, utime_t> > *entries);
+  int delete_bound(const string& oid, const string& pool,
+                   const string& daemon_id);
+  int get_bounds(const string& oid, const string& pool,
+                 string& marker, utime_t& oldest_time,
+                 list<cls_replica_log_progress_marker>& markers);
+
+public:
+  static void get_bound_info(const cls_replica_log_progress_marker& progress,
+                             string& entity, string& marker,
+                             utime_t time,
+                             list<pair<string, utime_t> >& entries);
+};
+
+class RGWReplicaObjectLogger : private RGWReplicaLogger {
+  string pool;
+  string prefix;
+
+  void get_shard_oid(int id, string& oid) {
+    char buf[16];
+    snprintf(buf, sizeof(buf), "%d", id);
+    oid = prefix + buf;
+  }
+
+public:
+  RGWReplicaObjectLogger(RGWRados *_store,
+                const string& _pool,
+                const string& _prefix);
+
+  int create_log_objects(int shards);
+  int update_bound(int shard, const string& daemon_id, const string& marker,
+                   const utime_t& time,
+                   const list<pair<string, utime_t> > *entries) {
+    string oid;
+    get_shard_oid(shard, oid);
+    return RGWReplicaLogger::update_bound(oid, pool,
+                                          daemon_id, marker, time, entries);
+  }
+  int delete_bound(int shard, const string& daemon_id) {
+    string oid;
+    get_shard_oid(shard, oid);
+    return RGWReplicaLogger::delete_bound(oid, pool,
+                                          daemon_id);
+  }
+  int get_bounds(int shard, string& marker, utime_t& oldest_time,
+                 list<cls_replica_log_progress_marker>& markers) {
+    string oid;
+    get_shard_oid(shard, oid);
+    return RGWReplicaLogger::get_bounds(oid, pool,
+                                        marker, oldest_time, markers);
+  }
+};
+
+class RGWReplicaBucketLogger : private RGWReplicaLogger {
+public:
+  RGWReplicaBucketLogger(RGWRados *_store) :
+    RGWReplicaLogger(_store) {}
+  int update_bound(const rgw_bucket& bucket, const string& daemon_id,
+                   const string& marker, const utime_t& time,
+                   const list<pair<string, utime_t> > *entries) {
+    return RGWReplicaLogger::update_bound(bucket.name, bucket.index_pool,
+                                          daemon_id, marker, time, entries);
+  }
+  int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
+    return RGWReplicaLogger::delete_bound(bucket.name, bucket.index_pool,
+                                          daemon_id);
+  }
+  int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time,
+                 list<cls_replica_log_progress_marker>& markers) {
+    return RGWReplicaLogger::get_bounds(bucket.name, bucket.index_pool,
+                                        marker, oldest_time, markers);
+  }
+};
+
+#endif /* RGW_REPLICA_LOG_H_ */