]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement RGWStreamIOReorderingEngine.
authorRadoslaw Zarzynski <rzarzynski@mirantis.com>
Tue, 2 Aug 2016 13:30:57 +0000 (15:30 +0200)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Fri, 21 Oct 2016 20:57:19 +0000 (22:57 +0200)
Signed-off-by: Radoslaw Zarzynski <rzarzynski@mirantis.com>
src/rgw/rgw_client_io_decoimpl.h

index 4a1dfe22c86cf3e5ded7ba38f35d6b5e6bccea88..c5434f398cc63b315b4be4fc5bd98f93c3d518f3 100644 (file)
@@ -6,6 +6,8 @@
 
 #include <type_traits>
 
+#include <boost/optional.hpp>
+
 #include "rgw_common.h"
 #include "rgw_client_io.h"
 
@@ -290,11 +292,114 @@ public:
   }
 };
 
-
 template <typename T>
 RGWStreamIOConLenControllingEngine<T> add_conlen_controlling(T&& t) {
   return RGWStreamIOConLenControllingEngine<T>(std::move(t));
 }
 
 
+/* Filter that rectifies the wrong behaviour of some clients of the RGWStreamIO
+ * interface. Should be removed after fixing those clients. */
+template <typename T>
+class RGWStreamIOReorderingEngine : public RGWDecoratedStreamIO<T> {
+protected:
+  enum class ReorderState {
+    RGW_EARLY_HEADERS,  /* Got headers sent before calling send_status. */
+    RGW_STATUS_SEEN,    /* Status has been seen. */
+    RGW_DATA            /* Header has been completed. */
+  } phase;
+
+  boost::optional<uint64_t> content_length;
+
+  ceph::bufferlist early_header_data;
+  ceph::bufferlist header_data;
+
+  int write_data(const char* const buf, const int len) override {
+    switch (phase) {
+    case ReorderState::RGW_EARLY_HEADERS:
+      early_header_data.append(buf, len);
+      return len;
+    case ReorderState::RGW_STATUS_SEEN:
+      header_data.append(buf, len);
+      return len;
+    case ReorderState::RGW_DATA:
+      return RGWDecoratedStreamIO<T>::write_data(buf, len);
+    }
+
+    return -EIO;
+  }
+
+public:
+  template <typename U>
+  RGWStreamIOReorderingEngine(U&& decoratee)
+    : RGWDecoratedStreamIO<T>(std::move(decoratee)),
+      phase(ReorderState::RGW_EARLY_HEADERS) {
+  }
+
+  int send_status(const int status, const char* const status_name) override {
+    phase = ReorderState::RGW_STATUS_SEEN;
+
+    return RGWDecoratedStreamIO<T>::send_status(status, status_name);
+  }
+
+  int send_content_length(const uint64_t len) override {
+    if (ReorderState::RGW_EARLY_HEADERS == phase) {
+      /* Oh great, someone tries to send content length before status. */
+      content_length = len;
+      return 0;
+    } else {
+      return RGWDecoratedStreamIO<T>::send_content_length(len);
+    }
+  }
+
+  int complete_header() override {
+    size_t sent = 0;
+
+    /* Change state in order to immediately send everything we get. */
+    phase = ReorderState::RGW_DATA;
+
+    /* Sent content length if necessary. */
+    if (content_length) {
+      ssize_t rc = RGWDecoratedStreamIO<T>::send_content_length(*content_length);
+      if (rc < 0) {
+        return rc;
+      } else {
+        sent += rc;
+      }
+    }
+
+    /* Header data in buffers are already counted. */
+    if (header_data.length()) {
+      ssize_t rc = RGWDecoratedStreamIO<T>::write_data(header_data.c_str(),
+                                                       header_data.length());
+      if (rc < 0) {
+        return rc;
+      } else {
+        sent += rc;
+      }
+
+      header_data.clear();
+    }
+
+    if (early_header_data.length()) {
+      ssize_t rc = RGWDecoratedStreamIO<T>::write_data(early_header_data.c_str(),
+                                                       early_header_data.length());
+      if (rc < 0) {
+        return rc;
+      } else {
+        sent += rc;
+      }
+
+      early_header_data.clear();
+    }
+
+    return sent + RGWDecoratedStreamIO<T>::complete_header();
+  }
+};
+
+template <typename T>
+RGWStreamIOReorderingEngine<T> add_reordering(T&& t) {
+  return RGWStreamIOReorderingEngine<T>(std::move(t));
+}
+
 #endif /* CEPH_RGW_CLIENT_IO_DECOIMPL_H */