]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add rgw::putobj::HeadObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:11:57 +0000 (15:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 15 Oct 2018 21:06:26 +0000 (17:06 -0400)
a striped object processor with special handling for the first chunk of
data

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_putobj_processor.cc [new file with mode: 0644]
src/rgw/rgw_putobj_processor.h

diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc
new file mode 100644 (file)
index 0000000..9fe46f1
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw_putobj_processor.h"
+
+namespace rgw::putobj {
+
+int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
+{
+  const bool flush = (data.length() == 0);
+
+  // capture the first chunk for special handling
+  if (data_offset < head_chunk_size) {
+    if (flush) {
+      // flush partial chunk
+      return process_first_chunk(std::move(head_data), &processor);
+    }
+
+    auto remaining = head_chunk_size - data_offset;
+    auto count = std::min<uint64_t>(data.length(), remaining);
+    data.splice(0, count, &head_data);
+    data_offset += count;
+
+    if (data_offset == head_chunk_size) {
+      // process the first complete chunk
+      ceph_assert(head_data.length() == head_chunk_size);
+      int r = process_first_chunk(std::move(head_data), &processor);
+      if (r < 0) {
+        return r;
+      }
+    }
+    if (data.length() == 0) { // avoid flushing stripe processor
+      return 0;
+    }
+  }
+  ceph_assert(processor); // process_first_chunk() must initialize
+
+  // send everything else through the processor
+  auto write_offset = data_offset;
+  data_offset += data.length();
+  return processor->process(std::move(data), write_offset);
+}
+
+} // namespace rgw::putobj
index d071194f26c2b221305450af0f3ba44993efc94b..a567de434a1b7b80148c616ccfe128967acbc4f6 100644 (file)
@@ -14,6 +14,8 @@
 
 #pragma once
 
+#include <optional>
+
 #include "rgw_putobj.h"
 #include "rgw_rados.h"
 
@@ -35,4 +37,32 @@ class ObjectProcessor : public DataProcessor {
                        rgw_zone_set *zones_trace, bool *canceled) = 0;
 };
 
+// an object processor with special handling for the first chunk of the head.
+// the virtual process_first_chunk() function returns a processor to handle the
+// rest of the object
+class HeadObjectProcessor : public ObjectProcessor {
+  uint64_t head_chunk_size;
+  // buffer to capture the first chunk of the head object
+  bufferlist head_data;
+  // initialized after process_first_chunk() to process everything else
+  DataProcessor *processor = nullptr;
+  uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
+ protected:
+  uint64_t get_actual_size() const { return data_offset; }
+
+  // process the first chunk of data and return a processor for the rest
+  virtual int process_first_chunk(bufferlist&& data,
+                                  DataProcessor **processor) = 0;
+ public:
+  HeadObjectProcessor(uint64_t head_chunk_size)
+    : head_chunk_size(head_chunk_size)
+  {}
+
+  void set_head_chunk_size(uint64_t size) { head_chunk_size = size; }
+
+  // cache first chunk for process_first_chunk(), then forward everything else
+  // to the returned processor
+  int process(bufferlist&& data, uint64_t logical_offset) final override;
+};
+
 } // namespace rgw::putobj