return 0;
}
+int get_client_list_range(cls_method_context_t hctx,
+ std::set<cls::journal::Client> *clients,
+ std::string start_after, uint64_t max_return) {
+ std::string last_read;
+ if (!start_after.empty()) {
+ last_read = key_from_client_id(start_after);
+ }
+
+ std::map<std::string, bufferlist> vals;
+ int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+ max_return, &vals);
+ if (r < 0) {
+ CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (std::map<std::string, bufferlist>::iterator it = vals.begin();
+ it != vals.end(); ++it) {
+ try {
+ bufferlist::iterator iter = it->second.begin();
+
+ cls::journal::Client client;
+ ::decode(client, iter);
+ clients->insert(client);
+ } catch (const buffer::error &err) {
+ CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
+ err.what());
+ return -EIO;
+ }
+ }
+
+ return 0;
+}
+
+int find_min_commit_position(cls_method_context_t hctx,
+ cls::journal::ObjectSetPosition *minset) {
+ int r;
+ bool valid = false;
+ std::string start_after = "";
+ uint64_t tag_tid = 0, entry_tid = 0;
+
+ while (true) {
+ std::set<cls::journal::Client> batch;
+
+ r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
+ if ((r < 0) || batch.empty()) {
+ break;
+ }
+
+ start_after = batch.rbegin()->id;
+
+ // update the (minimum) commit position from this batch of clients
+ for(std::set<cls::journal::Client>::iterator it = batch.begin();
+ it != batch.end(); ++it) {
+ cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
+ if (!object_set_position.object_positions.empty()) {
+ cls::journal::ObjectPosition first = object_set_position.object_positions.front();
+
+ // least tag_tid (or least entry_tid for matching tag_tid)
+ if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
+ tag_tid = first.tag_tid;
+ entry_tid = first.entry_tid;
+ *minset = cls::journal::ObjectSetPosition(object_set_position);
+ valid = true;
+ }
+ }
+ }
+
+ // got the last batch, we're done
+ if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
+ break;
+ }
+ }
+
+ return r;
+}
+
} // anonymous namespace
/**
return r;
}
- cls::journal::Client client(id, data);
+ cls::journal::ObjectSetPosition minset;
+ r = find_min_commit_position(hctx, &minset);
+ if (r < 0)
+ return r;
+
+ cls::journal::Client client(id, data, minset);
r = write_key(hctx, key, client);
if (r < 0) {
return r;
return -EINVAL;
}
- std::string last_read;
- if (!start_after.empty()) {
- last_read = key_from_client_id(start_after);
- }
-
- std::map<std::string, bufferlist> vals;
- int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
- max_return, &vals);
- if (r < 0) {
- CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
- return r;
- }
-
std::set<cls::journal::Client> clients;
- for (std::map<std::string, bufferlist>::iterator it = vals.begin();
- it != vals.end(); ++it) {
- try {
- bufferlist::iterator iter = it->second.begin();
-
- cls::journal::Client client;
- ::decode(client, iter);
- clients.insert(client);
- } catch (const buffer::error &err) {
- CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
- err.what());
- return -EIO;
- }
- }
+ int r = get_client_list_range(hctx, &clients, start_after, max_return);
+ if (r < 0)
+ return r;
::encode(clients, *out);
return 0;