From: Casey Bodley Date: Wed, 10 Oct 2018 19:11:57 +0000 (-0400) Subject: rgw: add rgw::putobj::HeadObjectProcessor X-Git-Tag: v14.1.0~1156^2~24 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c71d4a9ef5d54b2b2f602b30f6bf6c6e889f19e;p=ceph.git rgw: add rgw::putobj::HeadObjectProcessor a striped object processor with special handling for the first chunk of data Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc new file mode 100644 index 000000000000..9fe46f1bddab --- /dev/null +++ b/src/rgw/rgw_putobj_processor.cc @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_putobj_processor.h" + +namespace rgw::putobj { + +int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) +{ + const bool flush = (data.length() == 0); + + // capture the first chunk for special handling + if (data_offset < head_chunk_size) { + if (flush) { + // flush partial chunk + return process_first_chunk(std::move(head_data), &processor); + } + + auto remaining = head_chunk_size - data_offset; + auto count = std::min(data.length(), remaining); + data.splice(0, count, &head_data); + data_offset += count; + + if (data_offset == head_chunk_size) { + // process the first complete chunk + ceph_assert(head_data.length() == head_chunk_size); + int r = process_first_chunk(std::move(head_data), &processor); + if (r < 0) { + return r; + } + } + if (data.length() == 0) { // avoid flushing stripe processor + return 0; + } + } + ceph_assert(processor); // process_first_chunk() must initialize + + // send everything else through the processor + auto write_offset = data_offset; + data_offset += data.length(); + return processor->process(std::move(data), write_offset); +} + +} // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index d071194f26c2..a567de434a1b 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "rgw_putobj.h" #include "rgw_rados.h" @@ -35,4 +37,32 @@ class ObjectProcessor : public DataProcessor { rgw_zone_set *zones_trace, bool *canceled) = 0; }; +// an object processor with special handling for the first chunk of the head. +// the virtual process_first_chunk() function returns a processor to handle the +// rest of the object +class HeadObjectProcessor : public ObjectProcessor { + uint64_t head_chunk_size; + // buffer to capture the first chunk of the head object + bufferlist head_data; + // initialized after process_first_chunk() to process everything else + DataProcessor *processor = nullptr; + uint64_t data_offset = 0; // maximum offset of data written (ie compressed) + protected: + uint64_t get_actual_size() const { return data_offset; } + + // process the first chunk of data and return a processor for the rest + virtual int process_first_chunk(bufferlist&& data, + DataProcessor **processor) = 0; + public: + HeadObjectProcessor(uint64_t head_chunk_size) + : head_chunk_size(head_chunk_size) + {} + + void set_head_chunk_size(uint64_t size) { head_chunk_size = size; } + + // cache first chunk for process_first_chunk(), then forward everything else + // to the returned processor + int process(bufferlist&& data, uint64_t logical_offset) final override; +}; + } // namespace rgw::putobj