:command:`opstate rm`
Remove entry (use client_id, op_id, object).
-:command:`replicalog get`
- Get replica metadata log entry.
-
-:command:`replicalog update`
- Update replica metadata log entry.
-
-:command:`replicalog rm`
- Remove replica metadata log entry.
-
:command:`orphans find`
Init and run search for leaked rados objects
.. option:: --shard-id=<shard-id>
- Optional for mdlog list, data sync status. Required for ``mdlog trim``,
- ``replica mdlog get/delete``, ``replica datalog get/delete``.
+ Optional for mdlog list, data sync status. Required for ``mdlog trim``.
.. option:: --max-entries=<entries>
Specify a state for the opstate set command.
-.. option:: --replica-log-type <logtypestr>
-
- Replica log type (metadata, data, bucket), required for replica log
- operations.
-
.. option:: --categories=<list>
Comma separated list of categories, used in usage show.
conf:
osd:
osd_class_load_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
osd_class_default_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
tasks:
- workunit:
clients:
conf:
osd:
osd_class_load_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
osd_class_default_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
tasks:
- workunit:
clients:
conf:
osd:
osd_class_load_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
osd_class_default_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
tasks:
- install:
- ceph:
mon warn on osd down out interval zero: false
osd:
osd_class_load_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
osd_class_default_list: "cephfs hello journal lock log numops rbd refcount
- replica_log rgw sdk statelog timeindex user version"
+ rgw sdk statelog timeindex user version"
fs: xfs
cls_log_client
cls_statelog_client
cls_version_client
- cls_replica_log_client
cls_user_client
cls_journal_client
cls_timeindex_client
list(APPEND cls_embedded_srcs ${cls_timeindex_srcs} ${cls_timeindex_client_srcs})
-# cls_replica_log
-set(cls_replica_log_srcs replica_log/cls_replica_log.cc)
-add_library(cls_replica_log SHARED ${cls_replica_log_srcs})
-set_target_properties(cls_replica_log PROPERTIES
- VERSION "1.0.0"
- SOVERSION "1"
- INSTALL_RPATH ""
- CXX_VISIBILITY_PRESET hidden)
-install(TARGETS cls_replica_log DESTINATION ${cls_dir})
-
-set(cls_replica_log_client_srcs
- replica_log/cls_replica_log_types.cc
- replica_log/cls_replica_log_ops.cc
- replica_log/cls_replica_log_client.cc)
-add_library(cls_replica_log_client STATIC ${cls_replica_log_client_srcs})
-
-list(APPEND cls_embedded_srcs ${cls_replica_log_srcs} ${cls_replica_log_client_srcs})
-
# cls_user
set(cls_user_srcs user/cls_user.cc)
add_library(cls_user SHARED ${cls_user_srcs})
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- * Copyright Inktank 2013
- */
-
-#include "objclass/objclass.h"
-
-#include "cls_replica_log_types.h"
-#include "cls_replica_log_ops.h"
-
-CLS_VER(1, 0)
-CLS_NAME(replica_log)
-
-static const string replica_log_prefix = "rl_";
-static const string replica_log_bounds = replica_log_prefix + "bounds";
-
-static int get_bounds(cls_method_context_t hctx, cls_replica_log_bound& bound)
-{
- bufferlist bounds_bl;
- int rc = cls_cxx_map_get_val(hctx, replica_log_bounds, &bounds_bl);
- if (rc < 0) {
- return rc;
- }
-
- try {
- bufferlist::iterator bounds_bl_i = bounds_bl.begin();
- decode(bound, bounds_bl_i);
- } catch (buffer::error& err) {
- bound = cls_replica_log_bound();
- CLS_LOG(0, "ERROR: get_bounds(): failed to decode on-disk bounds object");
- return -EIO;
- }
-
- return 0;
-}
-
-static int write_bounds(cls_method_context_t hctx,
- const cls_replica_log_bound& bound)
-{
- bufferlist bounds_bl;
- encode(bound, bounds_bl);
- return cls_cxx_map_set_val(hctx, replica_log_bounds, &bounds_bl);
-}
-
-static int cls_replica_log_set(cls_method_context_t hctx,
- bufferlist *in, bufferlist *out)
-{
- bufferlist::iterator in_iter = in->begin();
-
- cls_replica_log_set_marker_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: cls_replica_log_set(): failed to decode op");
- return -EINVAL;
- }
-
- cls_replica_log_bound bound;
- int rc = get_bounds(hctx, bound);
- if (rc < 0 && rc != -ENOENT) {
- return rc;
- }
-
- rc = bound.update_marker(op.marker);
- if (rc < 0) {
- return rc;
- }
-
- return write_bounds(hctx, bound);
-}
-
-static int cls_replica_log_delete(cls_method_context_t hctx,
- bufferlist *in, bufferlist *out)
-{
- bufferlist::iterator in_iter = in->begin();
-
- cls_replica_log_delete_marker_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: cls_replica_log_delete(): failed to decode op");
- return -EINVAL;
- }
-
- cls_replica_log_bound bound;
- int rc = get_bounds(hctx, bound);
- if (rc < 0 && rc != -ENOENT) {
- return rc;
- }
-
- rc = bound.delete_marker(op.entity_id);
- if (rc < 0) {
- return rc;
- }
-
- return write_bounds(hctx, bound);
-}
-
-static int cls_replica_log_get(cls_method_context_t hctx,
- bufferlist *in, bufferlist *out)
-{
- bufferlist::iterator in_iter = in->begin();
-
- cls_replica_log_get_bounds_op op;
- try {
- decode(op, in_iter);
- } catch (buffer::error& err) {
- CLS_LOG(0, "ERROR: cls_replica_log_get(): failed to decode op");
- return -EINVAL;
- }
-
- cls_replica_log_bound bound;
- int rc = get_bounds(hctx, bound);
- if (rc < 0) {
- return rc;
- }
-
- cls_replica_log_get_bounds_ret ret;
- ret.oldest_time = bound.get_oldest_time();
- ret.position_marker = bound.get_lowest_marker_bound();
- bound.get_markers(ret.markers);
-
- encode(ret, *out);
- return 0;
-}
-
-CLS_INIT(replica_log)
-{
- CLS_LOG(1, "Loaded replica log class!");
-
- cls_handle_t h_class;
- cls_method_handle_t h_replica_log_set;
- cls_method_handle_t h_replica_log_delete;
- cls_method_handle_t h_replica_log_get;
-
- cls_register("replica_log", &h_class);
-
- cls_register_cxx_method(h_class, "set", CLS_METHOD_RD | CLS_METHOD_WR,
- cls_replica_log_set, &h_replica_log_set);
- cls_register_cxx_method(h_class, "get", CLS_METHOD_RD,
- cls_replica_log_get, &h_replica_log_get);
- cls_register_cxx_method(h_class, "delete", CLS_METHOD_RD | CLS_METHOD_WR,
- cls_replica_log_delete, &h_replica_log_delete);
-}
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#include <errno.h>
-
-#include "cls/replica_log/cls_replica_log_client.h"
-#include "include/rados/librados.hpp"
-
-using namespace librados;
-
-void cls_replica_log_prepare_marker(cls_replica_log_progress_marker& progress,
- const string& entity, const string& marker,
- const utime_t& time,
- const list<pair<string, utime_t> > *entries)
-{
- progress.entity_id = entity;
- progress.position_marker = marker;
- progress.position_time = time;
- if (entries) {
- list<pair<string, utime_t> >::const_iterator i;
- for (i = entries->begin(); i != entries->end(); ++i) {
- cls_replica_log_item_marker item(i->first, i->second);
- progress.items.push_back(item);
- }
- }
-}
-
-void cls_replica_log_extract_marker(const cls_replica_log_progress_marker& progress,
- string& entity, string& marker,
- utime_t& time,
- list<pair<string, utime_t> >& entries)
-{
- entity = progress.entity_id;
- marker = progress.position_marker;
- time = progress.position_time;
- list<cls_replica_log_item_marker>::const_iterator i;
- for (i = progress.items.begin(); i != progress.items.end(); ++i) {
- entries.push_back(make_pair(i->item_name, i->item_timestamp));
- }
-}
-
-void cls_replica_log_update_bound(librados::ObjectWriteOperation& o,
- const cls_replica_log_progress_marker& progress)
-{
- cls_replica_log_set_marker_op op(progress);
- bufferlist in;
- encode(op, in);
- o.exec("replica_log", "set", in);
-}
-
-void cls_replica_log_delete_bound(librados::ObjectWriteOperation& o,
- const string& entity)
-{
- cls_replica_log_delete_marker_op op(entity);
- bufferlist in;
- encode(op, in);
- o.exec("replica_log", "delete", in);
-}
-
-int cls_replica_log_get_bounds(librados::IoCtx& io_ctx, const string& oid,
- string& position_marker,
- utime_t& oldest_time,
- list<cls_replica_log_progress_marker>& markers)
-{
- bufferlist in;
- bufferlist out;
- cls_replica_log_get_bounds_op op;
- encode(op, in);
- int r = io_ctx.exec(oid, "replica_log", "get", in, out);
- if (r < 0)
- return r;
-
- cls_replica_log_get_bounds_ret ret;
- try {
- bufferlist::iterator i = out.begin();
- decode(ret, i);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- position_marker = ret.position_marker;
- oldest_time = ret.oldest_time;
- markers = ret.markers;
-
- return 0;
-}
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- * Copyright 2013 Inktank
- */
-
-#ifndef CLS_REPLICA_LOG_CLIENT_H_
-#define CLS_REPLICA_LOG_CLIENT_H_
-
-#include "cls_replica_log_ops.h"
-
-namespace librados {
- class ObjectWriteOperation;
- class IoCtx;
-}
-
-/**
- * Prepare a progress marker object to send out.
- *
- * @param progress The marker object to prepare
- * @param entity The ID of the entity setting the progress
- * @param marker The marker key the entity has gotten to
- * @param time The timestamp associated with the marker
- * param entries A list of in-progress entries prior to the marker
- */
-void cls_replica_log_prepare_marker(cls_replica_log_progress_marker& progress,
- const string& entity, const string& marker,
- const utime_t& time,
- const list<pair<string, utime_t> > *entries);
-
-/**
- * Extract a progress marker object into its components.
- *
- * @param progress The marker object to extract data from
- * @param entity [out] The ID of the entity the progress is associated with
- * @param marker [out] The marker key the entity has gotten to
- * @param time [out] The timestamp associated with the marker
- * @param entries [out] List of in-progress entries prior to the marker
- */
-void cls_replica_log_extract_marker(const cls_replica_log_progress_marker& progress,
- string& entity, string& marker,
- utime_t& time,
- list<pair<string, utime_t> >& entries);
-
-/**
- * Add a progress marker update to a write op. The op will return 0 on
- * success, -EEXIST if the marker conflicts with an existing one, or
- * -EINVAL if the marker is in conflict (ie, before) the daemon's existing
- * marker.
- *
- * @param op The op to add the update to
- * @param progress The progress marker to send
- */
-void cls_replica_log_update_bound(librados::ObjectWriteOperation& op,
- const cls_replica_log_progress_marker& progress);
-
-/**
- * Remove an entity's progress marker from the replica log. The op will return
- * 0 on success, -ENOENT if the entity does not exist on the replica log, or
- * -ENOTEMPTY if the items list on the marker is not empty.
- *
- * @param op The op to add the delete to
- * @param entity The entity whose progress should be removed
- */
-void cls_replica_log_delete_bound(librados::ObjectWriteOperation& op,
- const string& entity);
-
-/**
- * Read the bounds on a replica log.
- *
- * @param io_ctx The IoCtx to use for the read
- * @param oid The oid to direct the read to
- * @param position_marker [out] The lowest marker key that has been reached
- * @param oldest_time [out] Timestamp corresponding to the position marker or
- * oldest in-progress item.
- * @param markers [out] List of progress markers for individual daemons
- */
-int cls_replica_log_get_bounds(librados::IoCtx& io_ctx, const string& oid,
- string& position_marker,
- utime_t& oldest_time,
- list<cls_replica_log_progress_marker>& markers);
-
-#endif /* CLS_REPLICA_LOG_CLIENT_H_ */
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "cls_replica_log_ops.h"
-#include "common/Formatter.h"
-#include "common/ceph_json.h"
-
-void cls_replica_log_delete_marker_op::dump(Formatter *f) const
-{
- f->dump_string("entity_id", entity_id);
-}
-
-void cls_replica_log_delete_marker_op::
-generate_test_instances(std::list<cls_replica_log_delete_marker_op*>& ls)
-{
- ls.push_back(new cls_replica_log_delete_marker_op);
- ls.push_back(new cls_replica_log_delete_marker_op);
- ls.back()->entity_id = "test_entity_1";
-}
-
-void cls_replica_log_set_marker_op::dump(Formatter *f) const
-{
- encode_json("marker", marker, f);
-}
-
-void cls_replica_log_set_marker_op::
-generate_test_instances(std::list<cls_replica_log_set_marker_op*>& ls)
-{
- std::list<cls_replica_log_progress_marker*> samples;
- cls_replica_log_progress_marker::generate_test_instances(samples);
- std::list<cls_replica_log_progress_marker*>::iterator i;
- for (i = samples.begin(); i != samples.end(); ++i) {
- ls.push_back(new cls_replica_log_set_marker_op(*(*i)));
- }
-}
-
-void cls_replica_log_get_bounds_op::dump(Formatter *f) const
-{
- f->dump_string("contents", "empty");
-}
-
-void cls_replica_log_get_bounds_op::
-generate_test_instances(std::list<cls_replica_log_get_bounds_op*>& ls)
-{
- ls.push_back(new cls_replica_log_get_bounds_op);
-}
-
-void cls_replica_log_get_bounds_ret::dump(Formatter *f) const
-{
- f->dump_string("position_marker", position_marker);
- oldest_time.gmtime(f->dump_stream("oldest_time"));
- encode_json("entity_markers", markers, f);
-}
-
-void cls_replica_log_get_bounds_ret::
-generate_test_instances(std::list<cls_replica_log_get_bounds_ret*>& ls)
-{
- std::list<cls_replica_log_progress_marker*> samples;
- cls_replica_log_progress_marker::generate_test_instances(samples);
- std::list<cls_replica_log_progress_marker> samples_whole;
- std::list<cls_replica_log_progress_marker*>::iterator i;
- int count = 0;
- for (i = samples.begin(); i != samples.end(); ++i) {
- ls.push_back(new cls_replica_log_get_bounds_ret());
- ls.back()->markers.push_back(*(*i));
- ls.back()->oldest_time.set_from_double(1000*count);
- ls.back()->position_marker = ls.back()->markers.front().position_marker;
- samples_whole.push_back(*(*i));
- }
- ls.push_back(new cls_replica_log_get_bounds_ret());
- ls.back()->markers = samples_whole;
- ls.back()->oldest_time = samples_whole.back().position_time;
- ls.back()->position_marker = samples_whole.back().position_marker;
-}
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#ifndef CLS_REPLICA_LOG_OPS_H_
-#define CLS_REPLICA_LOG_OPS_H_
-
-#include "cls_replica_log_types.h"
-
-struct cls_replica_log_delete_marker_op {
- string entity_id;
- cls_replica_log_delete_marker_op() {}
- explicit cls_replica_log_delete_marker_op(const string& id) : entity_id(id) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(entity_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(entity_id, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- static void generate_test_instances(std::list<cls_replica_log_delete_marker_op*>& ls);
-
-};
-WRITE_CLASS_ENCODER(cls_replica_log_delete_marker_op)
-
-struct cls_replica_log_set_marker_op {
- cls_replica_log_progress_marker marker;
- cls_replica_log_set_marker_op() {}
- explicit cls_replica_log_set_marker_op(const cls_replica_log_progress_marker& m) :
- marker(m) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(marker, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(marker, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- static void generate_test_instances(std::list<cls_replica_log_set_marker_op*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_set_marker_op)
-
-struct cls_replica_log_get_bounds_op {
- cls_replica_log_get_bounds_op() {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- static void generate_test_instances(std::list<cls_replica_log_get_bounds_op*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_get_bounds_op)
-
-struct cls_replica_log_get_bounds_ret {
- string position_marker; // oldest log listing position on the master
- utime_t oldest_time; // oldest timestamp associated with position or an item
- std::list<cls_replica_log_progress_marker> markers;
-
- cls_replica_log_get_bounds_ret() {}
- cls_replica_log_get_bounds_ret(const string& pos_marker,
- const utime_t& time,
- const std::list<cls_replica_log_progress_marker>& m) :
- position_marker(pos_marker), oldest_time(time), markers(m)
- {}
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(position_marker, bl);
- encode(oldest_time, bl);
- encode(markers, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(position_marker, bl);
- decode(oldest_time, bl);
- decode(markers, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- static void generate_test_instances(std::list<cls_replica_log_get_bounds_ret*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_get_bounds_ret)
-
-#endif /* CLS_REPLICA_LOG_OPS_H_ */
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "cls_replica_log_types.h"
-
-#include "common/Formatter.h"
-#include "common/ceph_json.h"
-
-void cls_replica_log_item_marker::dump(Formatter *f) const
-{
- f->dump_string("name", item_name);
- f->dump_stream("timestamp") << item_timestamp;
-}
-
-void cls_replica_log_item_marker::decode_json(JSONObj *obj)
-{
- JSONDecoder::decode_json("name", item_name, obj);
- JSONDecoder::decode_json("timestamp", item_timestamp, obj);
-}
-
-void cls_replica_log_item_marker::
-generate_test_instances(std::list<cls_replica_log_item_marker*>& ls)
-{
- ls.push_back(new cls_replica_log_item_marker);
- ls.back()->item_name = "test_item_1";
- ls.back()->item_timestamp.set_from_double(0);
- ls.push_back(new cls_replica_log_item_marker);
- ls.back()->item_name = "test_item_2";
- ls.back()->item_timestamp.set_from_double(20);
-}
-
-void cls_replica_log_progress_marker::dump(Formatter *f) const
-{
- encode_json("entity", entity_id, f);
- encode_json("position_marker", position_marker, f);
- encode_json("position_time", position_time, f);
- encode_json("items_in_progress", items, f);
-}
-
-void cls_replica_log_progress_marker::decode_json(JSONObj *obj)
-{
- JSONDecoder::decode_json("entity", entity_id, obj);
- JSONDecoder::decode_json("position_marker", position_marker, obj);
- JSONDecoder::decode_json("position_time", position_time, obj);
- JSONDecoder::decode_json("items_in_progress", items, obj);
-}
-
-void cls_replica_log_progress_marker::
-generate_test_instances(std::list<cls_replica_log_progress_marker*>& ls)
-{
- ls.push_back(new cls_replica_log_progress_marker);
- ls.push_back(new cls_replica_log_progress_marker);
- ls.back()->entity_id = "entity1";
- ls.back()->position_marker = "pos1";
- ls.back()->position_time.set_from_double(20);
-
- std::list<cls_replica_log_item_marker*> test_items;
- cls_replica_log_item_marker::generate_test_instances(test_items);
- std::list<cls_replica_log_item_marker*>::iterator i = test_items.begin();
- for ( ; i != test_items.end(); ++i) {
- ls.back()->items.push_back(*(*i));
- }
-}
-
-void cls_replica_log_bound::dump(Formatter *f) const
-{
- encode_json("position_marker", position_marker, f);
- encode_json("position_time", position_time, f);
- encode_json("marker_exists", marker_exists, f);
- if (marker_exists) {
- encode_json("marker", marker, f); //progress marker
- }
-}
-
-void cls_replica_log_bound::decode_json(JSONObj *obj)
-{
- JSONDecoder::decode_json("position_marker", position_marker, obj);
- JSONDecoder::decode_json("position_time", position_time, obj);
- JSONDecoder::decode_json("marker_exists", marker_exists, obj);
- if (marker_exists) {
- JSONDecoder::decode_json("marker", marker, obj); //progress marker
- }
-}
-
-void cls_replica_log_bound::
-generate_test_instances(std::list<cls_replica_log_bound*>& ls)
-{
- ls.push_back(new cls_replica_log_bound);
- std::list<cls_replica_log_progress_marker*> marker_objects;
- cls_replica_log_progress_marker::generate_test_instances(marker_objects);
- std::list<cls_replica_log_progress_marker*>::iterator i =
- marker_objects.begin();
- ls.back()->update_marker(*(*i));
- ls.push_back(new cls_replica_log_bound);
- ++i;
- ls.back()->update_marker(*(*i));
-}
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- * Copyright 2013 Inktank
- */
-
-#ifndef CLS_REPLICA_LOG_TYPES_H_
-#define CLS_REPLICA_LOG_TYPES_H_
-
-#include "include/utime.h"
-#include "include/encoding.h"
-#include "include/types.h"
-#include <errno.h>
-
-class JSONObj;
-
-struct cls_replica_log_item_marker {
- string item_name; // the name of the item we're marking
- utime_t item_timestamp; // the time stamp at which the item was outdated
-
- cls_replica_log_item_marker() {}
- cls_replica_log_item_marker(const string& name, const utime_t& time) :
- item_name(name), item_timestamp(time) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(item_name, bl);
- encode(item_timestamp, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(item_name, bl);
- decode(item_timestamp, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(std::list<cls_replica_log_item_marker*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_item_marker)
-
-struct cls_replica_log_progress_marker {
- string entity_id; // the name of the entity setting the progress marker
- string position_marker; // represents a log listing position on the master
- utime_t position_time; // the timestamp associated with the position marker
- std::list<cls_replica_log_item_marker> items; /* any items not caught up
- to the position marker*/
-
- cls_replica_log_progress_marker() {}
- cls_replica_log_progress_marker(const string& entity, const string& marker,
- const utime_t& time ) :
- entity_id(entity), position_marker(marker),
- position_time(time) {}
- cls_replica_log_progress_marker(const string& entity, const string& marker,
- const utime_t& time,
- const std::list<cls_replica_log_item_marker>& b) :
- entity_id(entity), position_marker(marker),
- position_time(time),
- items(b) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(entity_id, bl);
- encode(position_marker, bl);
- encode(position_time, bl);
- encode(items, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(entity_id, bl);
- decode(position_marker, bl);
- decode(position_time, bl);
- decode(items, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(std::list<cls_replica_log_progress_marker*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_progress_marker)
-
-class cls_replica_log_bound {
- /**
- * Right now, we are lazy and only support a single marker at a time. In the
- * future, we might support more than one, so the interface is designed to
- * let that work.
- */
- string position_marker; // represents a log listing position on the master
- utime_t position_time; // the timestamp associated with the position marker
- bool marker_exists; // has the marker been set?
- cls_replica_log_progress_marker marker; // the status of the current locker
-
-public:
- cls_replica_log_bound() : marker_exists(false) {}
-
- int update_marker(const cls_replica_log_progress_marker& new_mark) {
- // only one marker at a time right now
- if (marker_exists && (marker.entity_id != new_mark.entity_id)) {
- return -EEXIST;
- }
- // can't go backwards with our one marker!
- if (marker_exists && (marker.position_time > new_mark.position_time)) {
- return -EINVAL;
- }
-
- marker = new_mark;
- position_marker = new_mark.position_marker;
- position_time = new_mark.position_time;
- marker_exists = true;
- // hey look, updating is idempotent; did you notice that?
- return 0;
- }
-
- int delete_marker(const string& entity_id) {
- if (marker_exists) {
- // ENOENT if our marker doesn't match the passed ID
- if (marker.entity_id != entity_id) {
- return -ENOENT;
- }
- // you can't delete it if there are unclean entries
- if (!marker.items.empty()) {
- return -ENOTEMPTY;
- }
- }
-
- marker_exists = false;
- marker = cls_replica_log_progress_marker();
- // hey look, deletion is idempotent! Hurray.
- return 0;
- }
-
- std::string get_lowest_marker_bound() {
- return position_marker;
- }
-
- utime_t get_lowest_time_bound() {
- return position_time;
- }
-
- utime_t get_oldest_time() {
- utime_t oldest = position_time;
- list<cls_replica_log_item_marker>::const_iterator i;
- for ( i = marker.items.begin(); i != marker.items.end(); ++i) {
- if (i->item_timestamp < oldest)
- oldest = i->item_timestamp;
- }
- return oldest;
- }
-
- void get_markers(list<cls_replica_log_progress_marker>& ls) {
- if (marker_exists) {
- ls.push_back(marker);
- }
- }
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(position_marker, bl);
- encode(position_time, bl);
- encode(marker_exists, bl);
- if (marker_exists) {
- encode(marker, bl);
- }
- ENCODE_FINISH(bl);
- }
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- decode(position_marker, bl);
- decode(position_time, bl);
- decode(marker_exists, bl);
- if (marker_exists) {
- decode(marker, bl);
- }
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(std::list<cls_replica_log_bound*>& ls);
-};
-WRITE_CLASS_ENCODER(cls_replica_log_bound)
-
-#endif /* CLS_REPLICA_LOG_TYPES_H_ */
OPTION(rgw_data_log_changes_size, OPT_INT) // number of in-memory entries to hold for data changes log
OPTION(rgw_data_log_num_shards, OPT_INT) // number of objects to keep data changes log on
OPTION(rgw_data_log_obj_prefix, OPT_STR) //
-OPTION(rgw_replica_log_obj_prefix, OPT_STR) //
OPTION(rgw_bucket_quota_ttl, OPT_INT) // time for cached bucket stats to be cached within rgw instance
OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE) // threshold from which we don't rely on cached info for quota decisions
.set_description(""),
Option("osd_class_load_list", Option::TYPE_STR, Option::LEVEL_ADVANCED)
- .set_default("cephfs hello journal lock log numops " "otp rbd refcount replica_log rgw statelog timeindex user version")
+ .set_default("cephfs hello journal lock log numops " "otp rbd refcount rgw statelog timeindex user version")
.set_description(""),
Option("osd_class_default_list", Option::TYPE_STR, Option::LEVEL_ADVANCED)
- .set_default("cephfs hello journal lock log numops " "otp rbd refcount replica_log rgw statelog timeindex user version")
+ .set_default("cephfs hello journal lock log numops " "otp rbd refcount rgw statelog timeindex user version")
.set_description(""),
Option("osd_check_for_log_corruption", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default("data_log")
.set_description(""),
- Option("rgw_replica_log_obj_prefix", Option::TYPE_STR, Option::LEVEL_DEV)
- .set_default("replica_log")
- .set_description(""),
-
Option("rgw_bucket_quota_ttl", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(600)
.set_description("Bucket quota stats cache TTL")
CLS_INIT(numops);
CLS_INIT(rbd);
CLS_INIT(refcount);
-CLS_INIT(replica_log);
CLS_INIT(rgw);
CLS_INIT(statelog);
CLS_INIT(timeindex);
#endif
class_handler->add_embedded_class("refcount");
refcount_cls_init();
- class_handler->add_embedded_class("replica_log");
- replica_log_cls_init();
#ifdef WITH_RADOSGW
class_handler->add_embedded_class("rgw");
rgw_cls_init();
rgw_process.cc
rgw_quota.cc
rgw_rados.cc
- rgw_replica_log.cc
rgw_request.cc
rgw_resolve.cc
rgw_rest_bucket.cc
rgw_rest_metadata.cc
rgw_rest_opstate.cc
rgw_rest_realm.cc
- rgw_rest_replica_log.cc
rgw_rest_role.cc
rgw_rest_s3.cc
rgw_rest_swift.cc
target_link_libraries(rgw_a librados cls_otp_client cls_lock_client cls_rgw_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client cls_version_client
- cls_replica_log_client cls_user_client ceph-common common_utf8 global
+ cls_user_client ceph-common common_utf8 global
${CURL_LIBRARIES}
${EXPAT_LIBRARIES}
${OPENLDAP_LIBRARIES} ${CRYPTO_LIBS}
target_link_libraries(radosgw radosgw_a librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client
- cls_version_client cls_replica_log_client cls_user_client
+ cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES}
${ALLOC_LIBS})
# radosgw depends on cls libraries at runtime, but not as link dependencies
add_dependencies(radosgw cls_rgw cls_lock cls_refcount
cls_log cls_statelog cls_timeindex
- cls_version cls_replica_log cls_user)
+ cls_version cls_user)
install(TARGETS radosgw DESTINATION bin)
set(radosgw_admin_srcs
target_link_libraries(radosgw-admin rgw_a librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client
- cls_version_client cls_replica_log_client cls_user_client
+ cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES})
install(TARGETS radosgw-admin DESTINATION bin)
target_link_libraries(radosgw-es rgw_a librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client
- cls_version_client cls_replica_log_client cls_user_client
+ cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${SSL_LIBRARIES} ${BLKID_LIBRARIES})
install(TARGETS radosgw-es DESTINATION bin)
target_link_libraries(radosgw-object-expirer rgw_a librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
cls_log_client cls_statelog_client cls_timeindex_client
- cls_version_client cls_replica_log_client cls_user_client
+ cls_version_client cls_user_client
global ${FCGI_LIBRARY} ${LIB_RESOLV}
${CURL_LIBRARIES} ${EXPAT_LIBRARIES})
install(TARGETS radosgw-object-expirer DESTINATION bin)
cls_statelog_client
cls_timeindex_client
cls_version_client
- cls_replica_log_client
cls_user_client
global
${LIB_RESOLV}
#include "rgw_log.h"
#include "rgw_formats.h"
#include "rgw_usage.h"
-#include "rgw_replica_log.h"
#include "rgw_orphan.h"
#include "rgw_sync.h"
#include "rgw_sync_log_trim.h"
cout << " opstate set set state on an entry (use client_id, op_id, object, state)\n";
cout << " opstate renew renew state on an entry (use client_id, op_id, object)\n";
cout << " opstate rm remove entry (use client_id, op_id, object)\n";
- cout << " replicalog get get replica metadata log entry\n";
- cout << " replicalog update update replica metadata log entry\n";
- cout << " replicalog rm remove replica metadata log entry\n";
cout << " orphans find init and run search for leaked rados objects (use job-id, pool)\n";
cout << " orphans finish clean up search for leaked rados objects\n";
cout << " orphans list-jobs list the current job-ids for orphans search\n";
cout << " data sync status\n";
cout << " required for: \n";
cout << " mdlog trim\n";
- cout << " replica mdlog get/delete\n";
- cout << " replica datalog get/delete\n";
cout << " --max-entries=<entries> max entries for listing operations\n";
cout << " --metadata-key=<key> key to retrieve metadata from with metadata get\n";
cout << " --remote=<remote> zone or zonegroup id of remote gateway\n";
cout << " in one of the numeric field\n";
cout << " --infile=<file> specify a file to read in when setting data\n";
cout << " --state=<state> specify a state for the opstate set command\n";
- cout << " --replica-log-type=<logtypestr>\n";
- cout << " replica log type (metadata, data, bucket), required for\n";
- cout << " replica log operations\n";
cout << " --categories=<list> comma separated list of categories, used in usage show\n";
cout << " --caps=<caps> list of caps (e.g., \"usage=read, write; user=read\")\n";
cout << " --yes-i-really-mean-it required for certain operations\n";
OPT_OPSTATE_SET,
OPT_OPSTATE_RENEW,
OPT_OPSTATE_RM,
- OPT_REPLICALOG_GET,
- OPT_REPLICALOG_UPDATE,
- OPT_REPLICALOG_DELETE,
OPT_REALM_CREATE,
OPT_REALM_DELETE,
OPT_REALM_GET,
strcmp(cmd, "pools") == 0 ||
strcmp(cmd, "quota") == 0 ||
strcmp(cmd, "realm") == 0 ||
- strcmp(cmd, "replicalog") == 0 ||
strcmp(cmd, "role") == 0 ||
strcmp(cmd, "role-policy") == 0 ||
strcmp(cmd, "subuser") == 0 ||
return OPT_OPSTATE_RENEW;
if (strcmp(cmd, "rm") == 0)
return OPT_OPSTATE_RM;
- } else if (strcmp(prev_cmd, "replicalog") == 0) {
- if (strcmp(cmd, "get") == 0)
- return OPT_REPLICALOG_GET;
- if (strcmp(cmd, "update") == 0)
- return OPT_REPLICALOG_UPDATE;
- if (strcmp(cmd, "delete") == 0)
- return OPT_REPLICALOG_DELETE;
} else if (strcmp(prev_cmd, "sync") == 0) {
if (strcmp(cmd, "status") == 0)
return OPT_SYNC_STATUS;
return -EINVAL;
}
-enum ReplicaLogType {
- ReplicaLog_Invalid = 0,
- ReplicaLog_Metadata,
- ReplicaLog_Data,
- ReplicaLog_Bucket,
-};
-
-ReplicaLogType get_replicalog_type(const string& name) {
- if (name == "md" || name == "meta" || name == "metadata")
- return ReplicaLog_Metadata;
- if (name == "data")
- return ReplicaLog_Data;
- if (name == "bucket")
- return ReplicaLog_Bucket;
-
- return ReplicaLog_Invalid;
-}
-
BIIndexType get_bi_index_type(const string& type_str) {
if (type_str == "plain")
return PlainIdx;
bool system_specified = false;
int shard_id = -1;
bool specified_shard_id = false;
- string daemon_id;
- bool specified_daemon_id = false;
string client_id;
string op_id;
string state_str;
- string replica_log_type_str;
- ReplicaLogType replica_log_type = ReplicaLog_Invalid;
string op_mask_str;
string quota_scope;
string object_version;
return EINVAL;
}
specified_shard_id = true;
- } else if (ceph_argparse_witharg(args, i, &val, "--daemon-id", (char*)NULL)) {
- daemon_id = val;
- specified_daemon_id = true;
} else if (ceph_argparse_witharg(args, i, &val, "--access", (char*)NULL)) {
access = val;
perm_mask = rgw_str_to_perm(access.c_str());
end_marker = val;
} else if (ceph_argparse_witharg(args, i, &val, "--quota-scope", (char*)NULL)) {
quota_scope = val;
- } else if (ceph_argparse_witharg(args, i, &val, "--replica-log-type", (char*)NULL)) {
- replica_log_type_str = val;
- replica_log_type = get_replicalog_type(replica_log_type_str);
- if (replica_log_type == ReplicaLog_Invalid) {
- cerr << "ERROR: invalid replica log type" << std::endl;
- return EINVAL;
- }
} else if (ceph_argparse_witharg(args, i, &val, "--index-type", (char*)NULL)) {
string index_type_str = val;
bi_index_type = get_bi_index_type(index_type_str);
OPT_DATALOG_LIST,
OPT_DATALOG_STATUS,
OPT_OPSTATE_LIST,
- OPT_REPLICALOG_GET,
OPT_REALM_GET,
OPT_REALM_GET_DEFAULT,
OPT_REALM_LIST,
}
}
- if (opt_cmd == OPT_REPLICALOG_GET || opt_cmd == OPT_REPLICALOG_UPDATE ||
- opt_cmd == OPT_REPLICALOG_DELETE) {
- if (replica_log_type_str.empty()) {
- cerr << "ERROR: need to specify --replica-log-type=<metadata | data | bucket>" << std::endl;
- return EINVAL;
- }
- }
-
- if (opt_cmd == OPT_REPLICALOG_GET) {
- RGWReplicaBounds bounds;
- if (replica_log_type == ReplicaLog_Metadata) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for get operation" << std::endl;
- return EINVAL;
- }
-
- RGWReplicaObjectLogger logger(store, pool, META_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.get_bounds(shard_id, bounds);
- if (ret < 0)
- return -ret;
- } else if (replica_log_type == ReplicaLog_Data) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for get operation" << std::endl;
- return EINVAL;
- }
- RGWReplicaObjectLogger logger(store, pool, DATA_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.get_bounds(shard_id, bounds);
- if (ret < 0)
- return -ret;
- } else if (replica_log_type == ReplicaLog_Bucket) {
- if (bucket_name.empty()) {
- cerr << "ERROR: bucket not specified" << std::endl;
- return EINVAL;
- }
- RGWBucketInfo bucket_info;
- int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
- if (ret < 0) {
- cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- RGWReplicaBucketLogger logger(store);
- ret = logger.get_bounds(bucket, shard_id, bounds);
- if (ret < 0)
- return -ret;
- } else { // shouldn't get here
- ceph_abort();
- }
- encode_json("bounds", bounds, formatter);
- formatter->flush(cout);
- cout << std::endl;
- }
-
- if (opt_cmd == OPT_REPLICALOG_DELETE) {
- if (replica_log_type == ReplicaLog_Metadata) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for delete operation" << std::endl;
- return EINVAL;
- }
- if (!specified_daemon_id) {
- cerr << "ERROR: daemon-id must be specified for delete operation" << std::endl;
- return EINVAL;
- }
- RGWReplicaObjectLogger logger(store, pool, META_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.delete_bound(shard_id, daemon_id, false);
- if (ret < 0)
- return -ret;
- } else if (replica_log_type == ReplicaLog_Data) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for delete operation" << std::endl;
- return EINVAL;
- }
- if (!specified_daemon_id) {
- cerr << "ERROR: daemon-id must be specified for delete operation" << std::endl;
- return EINVAL;
- }
- RGWReplicaObjectLogger logger(store, pool, DATA_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.delete_bound(shard_id, daemon_id, false);
- if (ret < 0)
- return -ret;
- } else if (replica_log_type == ReplicaLog_Bucket) {
- if (bucket_name.empty()) {
- cerr << "ERROR: bucket not specified" << std::endl;
- return EINVAL;
- }
- RGWBucketInfo bucket_info;
- int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
- if (ret < 0) {
- cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- RGWReplicaBucketLogger logger(store);
- ret = logger.delete_bound(bucket, shard_id, daemon_id, false);
- if (ret < 0)
- return -ret;
- }
- }
-
- if (opt_cmd == OPT_REPLICALOG_UPDATE) {
- if (marker.empty()) {
- cerr << "ERROR: marker was not specified" <<std::endl;
- return EINVAL;
- }
- utime_t time = ceph_clock_now();
- if (!date.empty()) {
- ret = parse_date_str(date, time);
- if (ret < 0) {
- cerr << "ERROR: failed to parse start date" << std::endl;
- return EINVAL;
- }
- }
- list<RGWReplicaItemMarker> entries;
- int ret = read_decode_json(infile, entries);
- if (ret < 0) {
- cerr << "ERROR: failed to decode entries" << std::endl;
- return EINVAL;
- }
- RGWReplicaBounds bounds;
- if (replica_log_type == ReplicaLog_Metadata) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for get operation" << std::endl;
- return EINVAL;
- }
-
- RGWReplicaObjectLogger logger(store, pool, META_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.update_bound(shard_id, daemon_id, marker, time, &entries);
- if (ret < 0) {
- cerr << "ERROR: failed to update bounds: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- } else if (replica_log_type == ReplicaLog_Data) {
- if (!specified_shard_id) {
- cerr << "ERROR: shard-id must be specified for get operation" << std::endl;
- return EINVAL;
- }
- RGWReplicaObjectLogger logger(store, pool, DATA_REPLICA_LOG_OBJ_PREFIX);
- int ret = logger.update_bound(shard_id, daemon_id, marker, time, &entries);
- if (ret < 0) {
- cerr << "ERROR: failed to update bounds: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- } else if (replica_log_type == ReplicaLog_Bucket) {
- if (bucket_name.empty()) {
- cerr << "ERROR: bucket not specified" << std::endl;
- return EINVAL;
- }
- RGWBucketInfo bucket_info;
- int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
- if (ret < 0) {
- cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
-
- RGWReplicaBucketLogger logger(store);
- ret = logger.update_bound(bucket, shard_id, daemon_id, marker, time, &entries);
- if (ret < 0) {
- cerr << "ERROR: failed to update bounds: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- }
- }
-
bool quota_op = (opt_cmd == OPT_QUOTA_SET || opt_cmd == OPT_QUOTA_ENABLE || opt_cmd == OPT_QUOTA_DISABLE);
if (quota_op) {
#include "rgw_rest_metadata.h"
#include "rgw_rest_log.h"
#include "rgw_rest_opstate.h"
-#include "rgw_replica_log.h"
-#include "rgw_rest_replica_log.h"
#include "rgw_rest_config.h"
#include "rgw_rest_realm.h"
#include "rgw_swift_auth.h"
admin_resource->register_resource("metadata", new RGWRESTMgr_Metadata);
admin_resource->register_resource("log", new RGWRESTMgr_Log);
admin_resource->register_resource("opstate", new RGWRESTMgr_Opstate);
- admin_resource->register_resource("replica_log", new RGWRESTMgr_ReplicaLog);
admin_resource->register_resource("config", new RGWRESTMgr_Config);
admin_resource->register_resource("realm", new RGWRESTMgr_Realm);
rest.register_resource(g_conf->rgw_admin_entry, admin_resource);
#include "rgw_log.h"
#include "rgw_formats.h"
#include "rgw_usage.h"
-#include "rgw_replica_log.h"
#include "rgw_object_expirer_core.h"
#define dout_subsys ceph_subsys_rgw
#include "rgw_log.h"
#include "rgw_formats.h"
#include "rgw_usage.h"
-#include "rgw_replica_log.h"
#include "rgw_object_expirer_core.h"
#include "cls/lock/cls_lock_client.h"
#include "rgw_log.h"
#include "rgw_formats.h"
#include "rgw_usage.h"
-#include "rgw_replica_log.h"
class RGWObjectExpirer {
protected:
class RGWDataChangesLog;
class RGWMetaSyncStatusManager;
class RGWDataSyncStatusManager;
-class RGWReplicaLogger;
class RGWCoroutinesManagerRegistry;
class RGWStateLog {
friend class RGWMetaSyncProcessorThread;
friend class RGWDataSyncProcessorThread;
friend class RGWStateLog;
- friend class RGWReplicaLogger;
friend class RGWReshard;
friend class RGWBucketReshard;
friend class BucketIndexLockGuard;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- * Copyright 2013 Inktank
- */
-
-#include "common/ceph_json.h"
-
-#include "rgw_replica_log.h"
-#include "cls/replica_log/cls_replica_log_client.h"
-#include "cls/rgw/cls_rgw_client.h"
-#include "rgw_rados.h"
-
-#define dout_subsys ceph_subsys_rgw
-
-void RGWReplicaBounds::dump(Formatter *f) const
-{
- encode_json("marker", marker, f);
- encode_json("oldest_time", oldest_time, f);
- encode_json("markers", markers, f);
-}
-
-void RGWReplicaBounds::decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("marker", marker, obj);
- JSONDecoder::decode_json("oldest_time", oldest_time, obj);
- JSONDecoder::decode_json("markers", markers, obj);
-}
-
-RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
- cct(_store->cct), store(_store) {}
-
-int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool)
-{
- int r = rgw_init_ioctx(store->get_rados_handle(), pool, ctx, true);
- if (r < 0) {
- lderr(cct) << "ERROR: could not open rados pool " << pool << dendl;
- }
- return r;
-}
-
-int RGWReplicaLogger::update_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries,
- bool need_to_exist)
-{
- cls_replica_log_progress_marker progress;
- progress.entity_id = daemon_id;
- progress.position_marker = marker;
- progress.position_time = time;
- progress.items = *entries;
-
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- if (need_to_exist) {
- opw.assert_exists();
- }
- cls_replica_log_update_bound(opw, progress);
- return ioctx.operate(oid, &opw);
-}
-
-int RGWReplicaLogger::write_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
- for (; iter != bounds.markers.end(); ++iter) {
- RGWReplicaProgressMarker& progress = *iter;
- cls_replica_log_update_bound(opw, progress);
- }
-
- r = ioctx.operate(oid, &opw);
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-int RGWReplicaLogger::delete_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id, bool purge_all,
- bool need_to_exist)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- librados::ObjectWriteOperation opw;
- if (need_to_exist) {
- opw.assert_exists();
- }
- if (purge_all) {
- opw.remove();
- } else {
- cls_replica_log_delete_bound(opw, daemon_id);
- }
- return ioctx.operate(oid, &opw);
-}
-
-int RGWReplicaLogger::get_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
-
- return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers);
-}
-
-RGWReplicaObjectLogger::
-RGWReplicaObjectLogger(RGWRados *_store,
- const rgw_pool& _pool,
- const string& _prefix) : RGWReplicaLogger(_store),
- pool(_pool), prefix(_prefix) {
- if (pool.empty())
- store->get_log_pool(pool);
-}
-
-int RGWReplicaObjectLogger::create_log_objects(int shards)
-{
- librados::IoCtx ioctx;
- int r = open_ioctx(ioctx, pool);
- if (r < 0) {
- return r;
- }
- for (int i = 0; i < shards; ++i) {
- string oid;
- get_shard_oid(i, oid);
- r = ioctx.create(oid, false);
- if (r < 0)
- return r;
- }
- return r;
-}
-
-RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) :
- RGWReplicaLogger(_store)
-{
- store->get_log_pool(pool);
- prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
- prefix.append(".");
-}
-
-string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance)
-{
- string s;
-
- if (index_by_instance) {
- s = prefix + bucket.name + ":" + bucket.bucket_id;
- } else {
- s = prefix + bucket.name;
- }
-
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ".%d", shard_id);
- s += buf;
- }
- return s;
-}
-
-int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries)
-{
- if (shard_id >= 0 ||
- !BucketIndexShardsManager::is_shards_marker(marker)) {
- return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
- daemon_id, marker, time, entries,
- false);
- }
-
- BucketIndexShardsManager sm;
- int ret = sm.from_string(marker, shard_id);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
- return ret;
- }
-
- map<int, string>& vals = sm.get();
-
- ret = 0;
-
- map<int, string>::iterator iter;
- for (iter = vals.begin(); iter != vals.end(); ++iter) {
- ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
- daemon_id, iter->second, time, entries,
- true /* need to exist */);
-
- if (r == -ENOENT) {
- RGWReplicaBounds bounds;
- r = convert_old_bounds(bucket, -1, bounds);
- if (r < 0 && r != -ENOENT) {
- return r;
- }
- r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
- daemon_id, marker, time, entries, false);
- }
- if (r < 0) {
- ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- ret = r;
- }
- }
-
- return ret;
-}
-
-int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all)
-{
- int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, true /* need to exist */);
- if (r != -ENOENT) {
- return r;
- }
- /*
- * can only get here if need_to_exist == true,
- * entry is not found, let's convert old entry if exists
- */
- RGWReplicaBounds bounds;
- r = convert_old_bounds(bucket, shard_id, bounds);
- if (r < 0 && r != -ENOENT) {
- return r;
- }
- return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, true), pool, daemon_id, purge_all, false);
-}
-
-int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
- int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
- if (r != -ENOENT) {
- return r;
- }
-
- r = convert_old_bounds(bucket, shard_id, bounds);
- if (r < 0) {
- return r;
- }
-
- return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, true), pool, bounds);
-}
-
-int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
- string old_key = obj_name(bucket, shard_id, false);
- string new_key = obj_name(bucket, shard_id, true);
-
- /* couldn't find when indexed by instance, retry with old key by bucket name only */
- int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
- if (r < 0) {
- return r;
- }
- /* convert to new keys */
- r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
- if (r < 0) {
- return r;
- }
-
- string daemon_id;
- r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */
- if (r < 0) {
- return r;
- }
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- */
-
-#ifndef RGW_REPLICA_LOG_H_
-#define RGW_REPLICA_LOG_H_
-
-#include <string>
-#include "cls/replica_log/cls_replica_log_types.h"
-#include "include/types.h"
-#include "include/utime.h"
-#include "include/rados/librados.hpp"
-#include "rgw_common.h"
-
-class RGWRados;
-class CephContext;
-
-#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog."
-#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog."
-
-typedef cls_replica_log_item_marker RGWReplicaItemMarker;
-typedef cls_replica_log_progress_marker RGWReplicaProgressMarker;
-
-struct RGWReplicaBounds {
- string marker;
- utime_t oldest_time;
- list<RGWReplicaProgressMarker> markers;
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-
-class RGWReplicaLogger {
-protected:
- CephContext *cct;
- RGWRados *store;
- int open_ioctx(librados::IoCtx& ctx, const rgw_pool& pool);
-
- explicit RGWReplicaLogger(RGWRados *_store);
-
- int update_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id, const string& marker,
- const utime_t& time,
- const list<RGWReplicaItemMarker> *entries,
- bool need_to_exist);
- int write_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds);
- int delete_bound(const string& oid, const rgw_pool& pool,
- const string& daemon_id, bool purge_all,
- bool need_to_exist);
- int get_bounds(const string& oid, const rgw_pool& pool,
- RGWReplicaBounds& bounds);
-};
-
-class RGWReplicaObjectLogger : private RGWReplicaLogger {
- rgw_pool pool;
- string prefix;
-
- void get_shard_oid(int id, string& oid) {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", id);
- oid = prefix + buf;
- }
-
-public:
- RGWReplicaObjectLogger(RGWRados *_store,
- const rgw_pool& _pool,
- const string& _prefix);
-
- int create_log_objects(int shards);
- int update_bound(int shard, const string& daemon_id, const string& marker,
- const utime_t& time,
- const list<RGWReplicaItemMarker> *entries) {
- string oid;
- get_shard_oid(shard, oid);
- return RGWReplicaLogger::update_bound(oid, pool,
- daemon_id, marker, time, entries, false);
- }
- int delete_bound(int shard, const string& daemon_id, bool purge_all) {
- string oid;
- get_shard_oid(shard, oid);
- return RGWReplicaLogger::delete_bound(oid, pool,
- daemon_id, purge_all, false);
- }
- int get_bounds(int shard, RGWReplicaBounds& bounds) {
- string oid;
- get_shard_oid(shard, oid);
- return RGWReplicaLogger::get_bounds(oid, pool, bounds);
- }
-};
-
-class RGWReplicaBucketLogger : private RGWReplicaLogger {
- rgw_pool pool;
- string prefix;
-
- string obj_name(const rgw_bucket& bucket, int shard_id, bool index_by_instance);
-
-public:
- explicit RGWReplicaBucketLogger(RGWRados *_store);
- int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries);
- int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool purge_all);
- int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds);
- int convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds);
-};
-
-#endif /* RGW_REPLICA_LOG_H_ */
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-#include "common/ceph_json.h"
-#include "common/strtol.h"
-#include "rgw_rest.h"
-#include "rgw_op.h"
-#include "rgw_rest_s3.h"
-#include "rgw_replica_log.h"
-#include "rgw_metadata.h"
-#include "rgw_bucket.h"
-#include "rgw_rest_replica_log.h"
-#include "rgw_client_io.h"
-#include "common/errno.h"
-#include "include/assert.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
-#define REPLICA_INPUT_MAX_LEN (512*1024)
-
-static int parse_to_utime(string& in, utime_t& out) {
- uint64_t sec = 0;
- uint64_t nsec = 0;
- int ret = utime_t::parse_date(in.c_str(), &sec, &nsec);
- if (ret < 0)
- return ret;
-
- out = utime_t(sec, nsec);
- return 0;
-}
-
-void RGWOp_OBJLog_SetBounds::execute() {
- string id_str = s->info.args.get("id"),
- marker = s->info.args.get("marker"),
- time = s->info.args.get("time"),
- daemon_id = s->info.args.get("daemon_id");
-
- if (id_str.empty() ||
- marker.empty() ||
- time.empty() ||
- daemon_id.empty()) {
- dout(5) << "Error - invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- int shard;
- string err;
- utime_t ut;
-
- shard = (int)strict_strtol(id_str.c_str(), 10, &err);
- if (!err.empty()) {
- dout(5) << "Error parsing id parameter - " << id_str << ", err " << err << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- if (parse_to_utime(time, ut) < 0) {
- http_ret = -EINVAL;
- return;
- }
-
- string pool;
- RGWReplicaObjectLogger rl(store, pool, prefix);
- bufferlist bl;
- list<RGWReplicaItemMarker> markers;
-
- if ((http_ret = rgw_rest_get_json_input(store->ctx(), s, markers, REPLICA_INPUT_MAX_LEN, NULL)) < 0) {
- dout(5) << "Error - retrieving input data - " << http_ret << dendl;
- return;
- }
-
- http_ret = rl.update_bound(shard, daemon_id, marker, ut, &markers);
-}
-
-void RGWOp_OBJLog_GetBounds::execute() {
- string id = s->info.args.get("id");
-
- if (id.empty()) {
- dout(5) << " Error - invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- int shard;
- string err;
-
- shard = (int)strict_strtol(id.c_str(), 10, &err);
- if (!err.empty()) {
- dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- string pool;
- RGWReplicaObjectLogger rl(store, pool, prefix);
- http_ret = rl.get_bounds(shard, bounds);
-}
-
-void RGWOp_OBJLog_GetBounds::send_response() {
- set_req_state_err(s, http_ret);
- dump_errno(s);
- end_header(s);
-
- if (http_ret < 0)
- return;
-
- encode_json("bounds", bounds, s->formatter);
- flusher.flush();
-}
-
-void RGWOp_OBJLog_DeleteBounds::execute() {
- string id = s->info.args.get("id"),
- daemon_id = s->info.args.get("daemon_id");
- bool purge_all;
-
- s->info.args.get_bool("purge-all", &purge_all, false);
-
- if (id.empty() ||
- (!purge_all && daemon_id.empty())) {
- dout(5) << "Error - invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- int shard;
- string err;
-
- shard = (int)strict_strtol(id.c_str(), 10, &err);
- if (!err.empty()) {
- dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl;
- http_ret = -EINVAL;
- }
-
- string pool;
- RGWReplicaObjectLogger rl(store, pool, prefix);
- http_ret = rl.delete_bound(shard, daemon_id, purge_all);
-}
-
-static int bucket_instance_to_bucket(RGWRados *store, const string& bucket_instance, rgw_bucket& bucket) {
- RGWBucketInfo bucket_info;
- real_time mtime;
-
- RGWObjectCtx obj_ctx(store);
- int r = store->get_bucket_instance_info(obj_ctx, bucket_instance, bucket_info, &mtime, NULL);
- if (r < 0) {
- dout(5) << "could not get bucket instance info for bucket=" << bucket_instance << ": " << cpp_strerror(r) << dendl;
- if (r == -ENOENT)
- return r;
- return -EINVAL;
- }
-
- bucket = bucket_info.bucket;
- return 0;
-}
-
-void RGWOp_BILog_SetBounds::execute() {
- string bucket_instance = s->info.args.get("bucket-instance"),
- marker = s->info.args.get("marker"),
- time = s->info.args.get("time"),
- daemon_id = s->info.args.get("daemon_id");
-
- if (marker.empty() ||
- time.empty() ||
- daemon_id.empty()) {
- dout(5) << "Error - invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- utime_t ut;
-
- if (parse_to_utime(time, ut) < 0) {
- http_ret = -EINVAL;
- return;
- }
-
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
- rgw_bucket bucket;
-
- if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) {
- return;
- }
-
- RGWReplicaBucketLogger rl(store);
- bufferlist bl;
- list<RGWReplicaItemMarker> markers;
-
- if ((http_ret = rgw_rest_get_json_input(store->ctx(), s, markers, REPLICA_INPUT_MAX_LEN, NULL)) < 0) {
- dout(5) << "Error - retrieving input data - " << http_ret << dendl;
- return;
- }
-
- http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers);
-}
-
-void RGWOp_BILog_GetBounds::execute() {
- string bucket_instance = s->info.args.get("bucket-instance");
- rgw_bucket bucket;
-
- int shard_id;
-
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
- if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
- return;
-
- RGWReplicaBucketLogger rl(store);
- http_ret = rl.get_bounds(bucket, shard_id, bounds);
-}
-
-void RGWOp_BILog_GetBounds::send_response() {
- set_req_state_err(s, http_ret);
- dump_errno(s);
- end_header(s);
-
- if (http_ret < 0)
- return;
-
- encode_json("bounds", bounds, s->formatter);
- flusher.flush();
-}
-
-void RGWOp_BILog_DeleteBounds::execute() {
- string bucket_instance = s->info.args.get("bucket-instance");
- string daemon_id = s->info.args.get("daemon_id");
- bool purge_all;
-
- s->info.args.get_bool("purge-all", &purge_all, false);
-
- if (daemon_id.empty() && !purge_all) {
- dout(5) << "Error - invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
- rgw_bucket bucket;
-
- if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) {
- return;
- }
-
- RGWReplicaBucketLogger rl(store);
- http_ret = rl.delete_bound(bucket, shard_id, daemon_id, purge_all);
-}
-
-RGWOp *RGWHandler_ReplicaLog::op_get() {
- bool exists;
- string type = s->info.args.get("type", &exists);
-
- if (!exists) {
- return NULL;
- }
-
- if (type.compare("metadata") == 0) {
- return new RGWOp_OBJLog_GetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
- } else if (type.compare("bucket-index") == 0) {
- return new RGWOp_BILog_GetBounds;
- } else if (type.compare("data") == 0) {
- return new RGWOp_OBJLog_GetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
- }
- return NULL;
-}
-
-RGWOp *RGWHandler_ReplicaLog::op_delete() {
- bool exists;
- string type = s->info.args.get("type", &exists);
-
- if (!exists) {
- return NULL;
- }
-
- if (type.compare("metadata") == 0)
- return new RGWOp_OBJLog_DeleteBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
- else if (type.compare("bucket-index") == 0)
- return new RGWOp_BILog_DeleteBounds;
- else if (type.compare("data") == 0)
- return new RGWOp_OBJLog_DeleteBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
-
- return NULL;
-}
-
-RGWOp *RGWHandler_ReplicaLog::op_post() {
- bool exists;
- string type = s->info.args.get("type", &exists);
-
- if (!exists) {
- return NULL;
- }
-
- if (type.compare("metadata") == 0) {
- return new RGWOp_OBJLog_SetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
- } else if (type.compare("bucket-index") == 0) {
- return new RGWOp_BILog_SetBounds;
- } else if (type.compare("data") == 0) {
- return new RGWOp_OBJLog_SetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
- }
- return NULL;
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef RGW_REST_REPLICA_LOG_H
-#define RGW_REST_REPLICA_LOG_H
-
-class RGWOp_OBJLog_GetBounds : public RGWRESTOp {
- string prefix;
- string obj_type;
- RGWReplicaBounds bounds;
-
-public:
- RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type)
- : prefix(_prefix), obj_type(type){}
- ~RGWOp_OBJLog_GetBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap(obj_type.c_str(), RGW_CAP_READ);
- }
- int verify_permission() override {
- return check_caps(s->user->caps);
- }
- void execute() override;
- void send_response() override;
- const string name() override {
- string s = "replica";
- s.append(obj_type);
- s.append("_getbounds");
- return s;
- }
-};
-
-class RGWOp_OBJLog_SetBounds : public RGWRESTOp {
- string prefix;
- string obj_type;
-public:
- RGWOp_OBJLog_SetBounds(const char *_prefix, const char *type)
- : prefix(_prefix), obj_type(type){}
- ~RGWOp_OBJLog_SetBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE);
- }
- void execute() override;
- const string name() override {
- string s = "replica";
- s.append(obj_type);
- s.append("_updatebounds");
- return s;
- }
-};
-
-class RGWOp_OBJLog_DeleteBounds : public RGWRESTOp {
- string prefix;
- string obj_type;
-public:
- RGWOp_OBJLog_DeleteBounds(const char *_prefix, const char *type)
- : prefix(_prefix), obj_type(type){}
- ~RGWOp_OBJLog_DeleteBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE);
- }
- void execute() override;
- const string name() override {
- string s = "replica";
- s.append(obj_type);
- s.append("_deletebound");
- return s;
- }
-};
-
-class RGWOp_BILog_GetBounds : public RGWRESTOp {
- RGWReplicaBounds bounds;
-public:
- RGWOp_BILog_GetBounds() {}
- ~RGWOp_BILog_GetBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap("bilog", RGW_CAP_READ);
- }
- int verify_permission() override {
- return check_caps(s->user->caps);
- }
- void execute() override;
- void send_response() override;
- const string name() override {
- return "replicabilog_getbounds";
- }
-};
-
-class RGWOp_BILog_SetBounds : public RGWRESTOp {
-public:
- RGWOp_BILog_SetBounds() {}
- ~RGWOp_BILog_SetBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap("bilog", RGW_CAP_WRITE);
- }
- void execute() override;
- const string name() override {
- return "replicabilog_updatebounds";
- }
-};
-
-class RGWOp_BILog_DeleteBounds : public RGWRESTOp {
-public:
- RGWOp_BILog_DeleteBounds() {}
- ~RGWOp_BILog_DeleteBounds() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap("bilog", RGW_CAP_WRITE);
- }
- void execute() override;
- const string name() override {
- return "replicabilog_deletebound";
- }
-};
-
-class RGWHandler_ReplicaLog : public RGWHandler_Auth_S3 {
-protected:
- RGWOp *op_get() override;
- RGWOp *op_delete() override;
- RGWOp *op_post() override;
-
- int read_permissions(RGWOp*) override {
- return 0;
- }
-public:
- using RGWHandler_Auth_S3::RGWHandler_Auth_S3;
- ~RGWHandler_ReplicaLog() override = default;
-};
-
-class RGWRESTMgr_ReplicaLog : public RGWRESTMgr {
-public:
- RGWRESTMgr_ReplicaLog() = default;
- ~RGWRESTMgr_ReplicaLog() override = default;
-
- RGWHandler_REST* get_handler(struct req_state*,
- const rgw::auth::StrategyRegistry& auth_registry,
- const std::string&) override {
- return new RGWHandler_ReplicaLog(auth_registry);
- }
-};
-
-#endif /*!RGW_REST_REPLICA_LOG_H*/
add_subdirectory(cls_rbd)
endif(WITH_RBD)
add_subdirectory(cls_refcount)
-add_subdirectory(cls_replica_log)
add_subdirectory(cls_rgw)
add_subdirectory(cls_statelog)
add_subdirectory(cls_version)
opstate set set state on an entry (use client_id, op_id, object, state)
opstate renew renew state on an entry (use client_id, op_id, object)
opstate rm remove entry (use client_id, op_id, object)
- replicalog get get replica metadata log entry
- replicalog update update replica metadata log entry
- replicalog rm remove replica metadata log entry
orphans find init and run search for leaked rados objects (use job-id, pool)
orphans finish clean up search for leaked rados objects
orphans list-jobs list the current job-ids for orphans search
data sync status
required for:
mdlog trim
- replica mdlog get/delete
- replica datalog get/delete
--max-entries=<entries> max entries for listing operations
--metadata-key=<key> key to retrieve metadata from with metadata get
--remote=<remote> zone or zonegroup id of remote gateway
in one of the numeric field
--infile=<file> specify a file to read in when setting data
--state=<state> specify a state for the opstate set command
- --replica-log-type=<logtypestr>
- replica log type (metadata, data, bucket), required for
- replica log operations
--categories=<list> comma separated list of categories, used in usage show
--caps=<caps> list of caps (e.g., "usage=read, write; user=read")
--yes-i-really-mean-it required for certain operations
+++ /dev/null
-# ceph_test_cls_replica_log
-add_executable(ceph_test_cls_replica_log
- test_cls_replica_log.cc
- )
-set_target_properties(ceph_test_cls_replica_log PROPERTIES COMPILE_FLAGS
- ${UNITTEST_CXX_FLAGS})
-target_link_libraries(ceph_test_cls_replica_log
- librados
- cls_replica_log_client
- global
- ${UNITTEST_LIBS}
- ${BLKID_LIBRARIES}
- ${CMAKE_DL_LIBS}
- ${CRYPTO_LIBS}
- ${EXTRALIBS}
- radostest
- )
-
+++ /dev/null
-/*
- * Ceph - scalable distributed file system
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- * Copyright 2013 Inktank
- */
-
-#include "gtest/gtest.h"
-#include "test/librados/test.h"
-
-#include "cls/replica_log/cls_replica_log_client.h"
-#include "cls/replica_log/cls_replica_log_types.h"
-
-class cls_replica_log_Test : public ::testing::Test {
-public:
- librados::Rados rados;
- librados::IoCtx ioctx;
- string pool_name;
- string oid;
- string entity;
- string marker;
- utime_t time;
- list<pair<string, utime_t> > entries;
- cls_replica_log_progress_marker progress;
-
- void SetUp() override {
- pool_name = get_temp_pool_name();
- ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
- ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
- oid = "obj";
- ASSERT_EQ(0, ioctx.create(oid, true));
- }
-
- void add_marker() {
- entity = "tester_entity";
- marker = "tester_marker1";
- time.set_from_double(10);
- entries.push_back(make_pair("tester_obj1", time));
- time.set_from_double(20);
- cls_replica_log_prepare_marker(progress, entity, marker, time, &entries);
- librados::ObjectWriteOperation opw;
- cls_replica_log_update_bound(opw, progress);
- ASSERT_EQ(0, ioctx.operate(oid, &opw));
- }
-};
-
-TEST_F(cls_replica_log_Test, test_set_get_marker)
-{
- add_marker();
-
- string reply_position_marker;
- utime_t reply_time;
- list<cls_replica_log_progress_marker> return_progress_list;
- ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker,
- reply_time, return_progress_list));
-
- ASSERT_EQ(reply_position_marker, marker);
- ASSERT_EQ((double)10, (double)reply_time);
- string response_entity;
- string response_marker;
- utime_t response_time;
- list<pair<string, utime_t> > response_item_list;
-
- cls_replica_log_extract_marker(return_progress_list.front(),
- response_entity, response_marker,
- response_time, response_item_list);
- ASSERT_EQ(response_entity, entity);
- ASSERT_EQ(response_marker, marker);
- ASSERT_EQ(response_time, time);
- ASSERT_EQ((unsigned)1, response_item_list.size());
- ASSERT_EQ("tester_obj1", response_item_list.front().first);
-}
-
-TEST_F(cls_replica_log_Test, test_bad_update)
-{
- add_marker();
-
- time.set_from_double(15);
- cls_replica_log_progress_marker bad_marker;
- cls_replica_log_prepare_marker(bad_marker, entity, marker, time, &entries);
- librados::ObjectWriteOperation badw;
- cls_replica_log_update_bound(badw, bad_marker);
- ASSERT_EQ(-EINVAL, ioctx.operate(oid, &badw));
-}
-
-TEST_F(cls_replica_log_Test, test_bad_delete)
-{
- add_marker();
-
- librados::ObjectWriteOperation badd;
- cls_replica_log_delete_bound(badd, entity);
- ASSERT_EQ(-ENOTEMPTY, ioctx.operate(oid, &badd));
-}
-
-TEST_F(cls_replica_log_Test, test_good_delete)
-{
- add_marker();
-
- librados::ObjectWriteOperation opc;
- progress.items.clear();
- cls_replica_log_update_bound(opc, progress);
- ASSERT_EQ(0, ioctx.operate(oid, &opc));
- librados::ObjectWriteOperation opd;
- cls_replica_log_delete_bound(opd, entity);
- ASSERT_EQ(0, ioctx.operate(oid, &opd));
-
- string reply_position_marker;
- utime_t reply_time;
- list<cls_replica_log_progress_marker> return_progress_list;
- ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker,
- reply_time, return_progress_list));
- ASSERT_EQ((unsigned)0, return_progress_list.size());
-}
-
-TEST_F(cls_replica_log_Test, test_bad_get)
-{
- string reply_position_marker;
- utime_t reply_time;
- list<cls_replica_log_progress_marker> return_progress_list;
- ASSERT_EQ(-ENOENT,
- cls_replica_log_get_bounds(ioctx, oid, reply_position_marker,
- reply_time, return_progress_list));
-}
-
-TEST_F(cls_replica_log_Test, test_double_delete)
-{
- add_marker();
-
- librados::ObjectWriteOperation opc;
- progress.items.clear();
- cls_replica_log_update_bound(opc, progress);
- ASSERT_EQ(0, ioctx.operate(oid, &opc));
- librados::ObjectWriteOperation opd;
- cls_replica_log_delete_bound(opd, entity);
- ASSERT_EQ(0, ioctx.operate(oid, &opd));
-
- librados::ObjectWriteOperation opd2;
- cls_replica_log_delete_bound(opd2, entity);
- ASSERT_EQ(0, ioctx.operate(oid, &opd2));
-
- string reply_position_marker;
- utime_t reply_time;
- list<cls_replica_log_progress_marker> return_progress_list;
- ASSERT_EQ(0, cls_replica_log_get_bounds(ioctx, oid, reply_position_marker,
- reply_time, return_progress_list));
- ASSERT_EQ((unsigned)0, return_progress_list.size());
-
-}
TYPE(cls_lock_assert_op)
TYPE(cls_lock_set_cookie_op)
-#include "cls/replica_log/cls_replica_log_types.h"
-TYPE(cls_replica_log_item_marker)
-TYPE(cls_replica_log_progress_marker)
-TYPE(cls_replica_log_bound)
-#include "cls/replica_log/cls_replica_log_ops.h"
-TYPE(cls_replica_log_delete_marker_op)
-TYPE(cls_replica_log_set_marker_op)
-TYPE(cls_replica_log_get_bounds_op)
-TYPE(cls_replica_log_get_bounds_ret)
-
#include "cls/refcount/cls_refcount_ops.h"
TYPE(cls_refcount_get_op)
TYPE(cls_refcount_put_op)
cls_statelog_client
cls_timeindex_client
cls_version_client
- cls_replica_log_client
cls_user_client
librados
global
cls_log_client
cls_statelog_client
cls_version_client
- cls_replica_log_client
cls_user_client
librados
global
cls_log_client
cls_statelog_client
cls_version_client
- cls_replica_log_client
cls_user_client
librados
global
cls_log_client
cls_statelog_client
cls_version_client
- cls_replica_log_client
cls_user_client
librados
global