]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_journal: Select min commit position for new clients 9179/head
authorVenky Shankar <vshankar@redhat.com>
Thu, 19 May 2016 17:15:34 +0000 (22:45 +0530)
committerVenky Shankar <vshankar@redhat.com>
Fri, 20 May 2016 14:26:35 +0000 (19:56 +0530)
When a new client registers, its commit position is initialized
to the least commit position of all other clients.

Fixes: http://tracker.ceph.com/issues/15757
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/cls/journal/cls_journal.cc
src/cls/journal/cls_journal_client.cc
src/cls/journal/cls_journal_types.h

index d5cdfe3ececf7b30678d50038806580ad233c17b..2d36296f1885981ed6e2986619fd8b19f4a08d9a 100644 (file)
@@ -224,6 +224,83 @@ int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
   return 0;
 }
 
+int get_client_list_range(cls_method_context_t hctx,
+                          std::set<cls::journal::Client> *clients,
+                          std::string start_after, uint64_t max_return) {
+  std::string last_read;
+  if (!start_after.empty()) {
+    last_read = key_from_client_id(start_after);
+  }
+
+  std::map<std::string, bufferlist> vals;
+  int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+                               max_return, &vals);
+  if (r < 0) {
+    CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  for (std::map<std::string, bufferlist>::iterator it = vals.begin();
+       it != vals.end(); ++it) {
+    try {
+      bufferlist::iterator iter = it->second.begin();
+
+      cls::journal::Client client;
+      ::decode(client, iter);
+      clients->insert(client);
+    } catch (const buffer::error &err) {
+      CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
+              err.what());
+      return -EIO;
+    }
+  }
+
+  return 0;
+}
+
+int find_min_commit_position(cls_method_context_t hctx,
+                             cls::journal::ObjectSetPosition *minset) {
+  int r;
+  bool valid = false;
+  std::string start_after = "";
+  uint64_t tag_tid = 0, entry_tid = 0;
+
+  while (true) {
+    std::set<cls::journal::Client> batch;
+
+    r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
+    if ((r < 0) || batch.empty()) {
+      break;
+    }
+
+    start_after = batch.rbegin()->id;
+
+    // update the (minimum) commit position from this batch of clients
+    for(std::set<cls::journal::Client>::iterator it = batch.begin();
+        it != batch.end(); ++it) {
+      cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
+      if (!object_set_position.object_positions.empty()) {
+        cls::journal::ObjectPosition first = object_set_position.object_positions.front();
+
+        // least tag_tid (or least entry_tid for matching tag_tid)
+        if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
+          tag_tid = first.tag_tid;
+          entry_tid = first.entry_tid;
+          *minset = cls::journal::ObjectSetPosition(object_set_position);
+          valid = true;
+        }
+      }
+    }
+
+    // got the last batch, we're done
+    if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
+      break;
+    }
+  }
+
+  return r;
+}
+
 } // anonymous namespace
 
 /**
@@ -565,7 +642,12 @@ int journal_client_register(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  cls::journal::Client client(id, data);
+  cls::journal::ObjectSetPosition minset;
+  r = find_min_commit_position(hctx, &minset);
+  if (r < 0)
+    return r;
+
+  cls::journal::Client client(id, data, minset);
   r = write_key(hctx, key, client);
   if (r < 0) {
     return r;
@@ -760,34 +842,10 @@ int journal_client_list(cls_method_context_t hctx, bufferlist *in,
     return -EINVAL;
   }
 
-  std::string last_read;
-  if (!start_after.empty()) {
-    last_read = key_from_client_id(start_after);
-  }
-
-  std::map<std::string, bufferlist> vals;
-  int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
-                               max_return, &vals);
-  if (r < 0) {
-    CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-
   std::set<cls::journal::Client> clients;
-  for (std::map<std::string, bufferlist>::iterator it = vals.begin();
-       it != vals.end(); ++it) {
-    try {
-      bufferlist::iterator iter = it->second.begin();
-
-      cls::journal::Client client;
-      ::decode(client, iter);
-      clients.insert(client);
-    } catch (const buffer::error &err) {
-      CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
-              err.what());
-      return -EIO;
-    }
-  }
+  int r = get_client_list_range(hctx, &clients, start_after, max_return);
+  if (r < 0)
+    return r;
 
   ::encode(clients, *out);
   return 0;
index 7fbfb517a69a91549013c1d29652d66cb8461005..50549dc4061db71fe92894423a606dccee7e361f 100644 (file)
@@ -14,8 +14,6 @@ namespace client {
 
 namespace {
 
-static const uint64_t JOURNAL_MAX_RETURN = 256;
-
 struct C_AioExec : public Context {
   librados::IoCtx ioctx;
   std::string oid;
index 0381aff93c6b01223f9da6dc8ea500b41a557fbb..4e1f2d7fd6e1013abd1cd3108b9e952cbf104cd6 100644 (file)
@@ -19,6 +19,8 @@ class Formatter;
 namespace cls {
 namespace journal {
 
+static const uint64_t JOURNAL_MAX_RETURN = 256;
+
 struct ObjectPosition {
   uint64_t object_number;
   uint64_t tag_tid;