cls_method_handle_t h_journal_set_active_set;
cls_method_handle_t h_journal_get_client;
cls_method_handle_t h_journal_client_register;
-cls_method_handle_t h_journal_client_update;
+cls_method_handle_t h_journal_client_update_data;
+cls_method_handle_t h_journal_client_update_state;
cls_method_handle_t h_journal_client_unregister;
cls_method_handle_t h_journal_client_commit;
cls_method_handle_t h_journal_client_list;
* Output:
* @returns 0 on success, negative error code on failure
*/
-int journal_client_update(cls_method_context_t hctx, bufferlist *in,
- bufferlist *out) {
+int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
std::string id;
bufferlist data;
try {
return 0;
}
+/**
+ * Input:
+ * @param id (string) - unique client id
+ * @param state (uint8_t) - client state
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ std::string id;
+ cls::journal::ClientState state;
+ bufferlist data;
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(id, iter);
+ uint8_t state_raw;
+ ::decode(state_raw, iter);
+ state = static_cast<cls::journal::ClientState>(state_raw);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_client_id(id));
+ cls::journal::Client client;
+ int r = read_key(hctx, key, &client);
+ if (r < 0) {
+ return r;
+ }
+
+ client.state = state;
+ r = write_key(hctx, key, client);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
/**
* Input:
* @param id (string) - unique client id
cls_register_cxx_method(h_class, "client_register",
CLS_METHOD_RD | CLS_METHOD_WR,
journal_client_register, &h_journal_client_register);
- cls_register_cxx_method(h_class, "client_update",
+ cls_register_cxx_method(h_class, "client_update_data",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_client_update_data,
+ &h_journal_client_update_data);
+ cls_register_cxx_method(h_class, "client_update_state",
CLS_METHOD_RD | CLS_METHOD_WR,
- journal_client_update, &h_journal_client_update);
+ journal_client_update_state,
+ &h_journal_client_update_state);
cls_register_cxx_method(h_class, "client_unregister",
CLS_METHOD_RD | CLS_METHOD_WR,
journal_client_unregister,
op->exec("journal", "client_register", bl);
}
-int client_update(librados::IoCtx &ioctx, const std::string &oid,
- const std::string &id, const bufferlist &data) {
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data) {
librados::ObjectWriteOperation op;
- client_update(&op, id, data);
+ client_update_data(&op, id, data);
return ioctx.operate(oid, &op);
}
-void client_update(librados::ObjectWriteOperation *op,
- const std::string &id, const bufferlist &data) {
+void client_update_data(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data) {
bufferlist bl;
::encode(id, bl);
::encode(data, bl);
- op->exec("journal", "client_update", bl);
+ op->exec("journal", "client_update_data", bl);
+}
+
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::ClientState state) {
+ bufferlist bl;
+ ::encode(id, bl);
+ ::encode(static_cast<uint8_t>(state), bl);
+
+ librados::ObjectWriteOperation op;
+ op.exec("journal", "client_update_state", bl);
+ return ioctx.operate(oid, &op);
}
int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
void client_register(librados::ObjectWriteOperation *op,
const std::string &id, const bufferlist &data);
-int client_update(librados::IoCtx &ioctx, const std::string &oid,
- const std::string &id, const bufferlist &data);
-void client_update(librados::ObjectWriteOperation *op,
- const std::string &id, const bufferlist &data);
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, const bufferlist &data);
+void client_update_data(librados::ObjectWriteOperation *op,
+ const std::string &id, const bufferlist &data);
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &id, cls::journal::ClientState state);
int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id);
// vim: ts=8 sw=2 smarttab
#include "cls/journal/cls_journal_types.h"
+#include "include/stringify.h"
#include "common/Formatter.h"
namespace cls {
::encode(id, bl);
::encode(data, bl);
::encode(commit_position, bl);
+ ::encode(static_cast<uint8_t>(state), bl);
ENCODE_FINISH(bl);
}
::decode(id, iter);
::decode(data, iter);
::decode(commit_position, iter);
+
+ uint8_t state_raw;
+ ::decode(state_raw, iter);
+ state = static_cast<ClientState>(state_raw);
DECODE_FINISH(iter);
}
f->open_object_section("commit_position");
commit_position.dump(f);
f->close_section();
+
+ f->dump_string("state", stringify(state));
}
void Client::generate_test_instances(std::list<Client *> &o) {
o.push_back(new Tag(123, 234, data));
}
+std::ostream &operator<<(std::ostream &os, const ClientState &state) {
+ switch (state) {
+ case CLIENT_STATE_CONNECTED:
+ os << "connected";
+ break;
+ case CLIENT_STATE_DISCONNECTED:
+ os << "disconnected";
+ break;
+ default:
+ os << "unknown (" << static_cast<uint32_t>(state) << ")";
+ break;
+ }
+ return os;
+}
+
std::ostream &operator<<(std::ostream &os,
const ObjectPosition &object_position) {
os << "["
std::ostream &operator<<(std::ostream &os, const Client &client) {
os << "[id=" << client.id << ", "
- << "commit_position=" << client.commit_position << "]";
+ << "commit_position=" << client.commit_position << ", "
+ << "state=" << client.state << "]";
return os;
}
static void generate_test_instances(std::list<ObjectSetPosition *> &o);
};
+enum ClientState {
+ CLIENT_STATE_CONNECTED = 0,
+ CLIENT_STATE_DISCONNECTED = 1
+};
+
struct Client {
std::string id;
bufferlist data;
ObjectSetPosition commit_position;
+ ClientState state;
- Client() {}
+ Client() : state(CLIENT_STATE_CONNECTED) {}
Client(const std::string& _id, const bufferlist &_data,
- const ObjectSetPosition &_commit_position = ObjectSetPosition())
- : id(_id), data(_data), commit_position(_commit_position) {}
+ const ObjectSetPosition &_commit_position = ObjectSetPosition(),
+ ClientState _state = CLIENT_STATE_CONNECTED)
+ : id(_id), data(_data), commit_position(_commit_position), state(_state) {}
inline bool operator==(const Client &rhs) const {
return (id == rhs.id &&
data.contents_equal(rhs.data) &&
- commit_position == rhs.commit_position);
+ commit_position == rhs.commit_position &&
+ state == rhs.state);
}
inline bool operator<(const Client &rhs) const {
return (id < rhs.id);
WRITE_CLASS_ENCODER(Client);
WRITE_CLASS_ENCODER(Tag);
+std::ostream &operator<<(std::ostream &os, const ClientState &state);
std::ostream &operator<<(std::ostream &os,
const ObjectPosition &object_position);
std::ostream &operator<<(std::ostream &os,
Context *on_finish) {
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
librados::ObjectWriteOperation op;
- client::client_update(&op, m_client_id, data);
+ client::client_update_data(&op, m_client_id, data);
C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
ASSERT_EQ(-EEXIST, client::client_register(ioctx, oid, "id1", bufferlist()));
}
-TEST_F(TestClsJournal, ClientUpdate) {
+TEST_F(TestClsJournal, ClientUpdateData) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
std::string oid = get_temp_image_name();
- ASSERT_EQ(-ENOENT, client::client_update(ioctx, oid, "id1", bufferlist()));
+ ASSERT_EQ(-ENOENT, client::client_update_data(ioctx, oid, "id1",
+ bufferlist()));
ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
bufferlist data;
data.append(std::string('1', 128));
- ASSERT_EQ(0, client::client_update(ioctx, oid, "id1", data));
+ ASSERT_EQ(0, client::client_update_data(ioctx, oid, "id1", data));
Client client;
ASSERT_EQ(0, client::get_client(ioctx, oid, "id1", &client));
ASSERT_EQ(expected_client, client);
}
+TEST_F(TestClsJournal, ClientUpdateState) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ ASSERT_EQ(-ENOENT, client::client_update_state(ioctx, oid, "id1",
+ CLIENT_STATE_DISCONNECTED));
+
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
+
+ bufferlist data;
+ data.append(std::string('1', 128));
+ ASSERT_EQ(0, client::client_update_state(ioctx, oid, "id1",
+ CLIENT_STATE_DISCONNECTED));
+
+ Client client;
+ ASSERT_EQ(0, client::get_client(ioctx, oid, "id1", &client));
+ Client expected_client;
+ expected_client.id = "id1";
+ expected_client.state = CLIENT_STATE_DISCONNECTED;
+ ASSERT_EQ(expected_client, client);
+}
+
TEST_F(TestClsJournal, ClientUnregister) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));