//------------RGWPutObj_Compress---------------
-int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
+int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
{
- bufferlist in_bl;
- if (*again) {
- return next->handle_data(in_bl, ofs, phandle, pobj, again);
- }
- if (bl.length() > 0) {
+ bufferlist out;
+ if (in.length() > 0) {
// compression stuff
- if ((ofs > 0 && compressed) || // if previous part was compressed
- (ofs == 0)) { // or it's the first part
- ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
- int cr = compressor->compress(bl, in_bl);
+ if ((logical_offset > 0 && compressed) || // if previous part was compressed
+ (logical_offset == 0)) { // or it's the first part
+ ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
+ int cr = compressor->compress(in, out);
if (cr < 0) {
- if (ofs > 0) {
+ if (logical_offset > 0) {
lderr(cct) << "Compression failed with exit code " << cr
<< " for next part, compression process failed" << dendl;
return -EIO;
compressed = false;
ldout(cct, 5) << "Compression failed with exit code " << cr
<< " for first part, storing uncompressed" << dendl;
- in_bl.claim(bl);
+ out.claim(in);
} else {
compressed = true;
compression_block newbl;
size_t bs = blocks.size();
- newbl.old_ofs = ofs;
+ newbl.old_ofs = logical_offset;
newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
- newbl.len = in_bl.length();
+ newbl.len = out.length();
blocks.push_back(newbl);
}
} else {
compressed = false;
- in_bl.claim(bl);
+ out.claim(in);
}
// end of compression stuff
}
- return next->handle_data(in_bl, ofs, phandle, pobj, again);
+ return Pipe::process(std::move(out), logical_offset);
}
//----------------RGWGetObj_Decompress---------------------
#include <vector>
#include "compressor/Compressor.h"
+#include "rgw_putobj.h"
#include "rgw_op.h"
class RGWGetObj_Decompress : public RGWGetObj_Filter
};
-class RGWPutObj_Compress : public RGWPutObj_Filter
+class RGWPutObj_Compress : public rgw::putobj::Pipe
{
CephContext* cct;
bool compressed{false};
std::vector<compression_block> blocks;
public:
RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
- RGWPutObjDataProcessor* next)
- : RGWPutObj_Filter(next), cct(cct_), compressor(compressor) {}
- ~RGWPutObj_Compress() override{}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override;
+ rgw::putobj::DataProcessor *next)
+ : Pipe(next), cct(cct_), compressor(compressor) {}
+
+ int process(bufferlist&& data, uint64_t logical_offset) override;
bool is_compressed() { return compressed; }
vector<compression_block>& get_compression_blocks() { return blocks; }
}
};
-class ut_put_sink: public RGWPutObjDataProcessor
+class ut_put_sink: public rgw::putobj::DataProcessor
{
bufferlist sink;
public:
- ut_put_sink(){}
- virtual ~ut_put_sink(){}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override
- {
- sink.append(bl);
- *again = false;
- return 0;
- }
- int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override
+ int process(bufferlist&& bl, uint64_t ofs) override
{
+ sink.claim_append(bl);
return 0;
}
bufferlist& get_sink()
bufferlist bl;
bl.append(bp);
- void* handle;
- rgw_raw_obj obj;
- bool again = false;
-
ut_put_sink c_sink;
RGWPutObj_Compress compressor(g_ceph_context, plugin, &c_sink);
- compressor.handle_data(bl, 0, &handle, &obj, &again);
-
- bufferlist empty;
- compressor.handle_data(empty, s, &handle, &obj, &again);
+ compressor.process(std::move(bl), 0);
+ compressor.process({}, s); // flush
RGWCompressionInfo cs_info;
cs_info.compression_type = plugin->get_type_name();
decompress.fixup_range(f_begin, f_end);
decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length());
+ bufferlist empty;
decompress.handle_data(empty, 0, 0);
ASSERT_LE(d_sink.get_size(), (size_t)g_ceph_context->_conf->rgw_max_chunk_size);
bufferlist bl;
bl.append(bp);
- void* handle;
- rgw_raw_obj obj;
- bool again = false;
-
for (int i=0; i<1000;i++)
- compressor.handle_data(bl, size*i, &handle, &obj, &again);
-
- bufferlist empty;
- compressor.handle_data(empty, size*1000, &handle, &obj, &again);
+ compressor.process(bufferlist{bl}, size*i);
+ compressor.process({}, size*1000); // flush
RGWCompressionInfo cs_info;
cs_info.compression_type = plugin->get_type_name();
decompress.fixup_range(f_begin, f_end);
decompress.handle_data(c_sink.get_sink(), 0, c_sink.get_sink().length());
+ bufferlist empty;
decompress.handle_data(empty, 0, 0);
ASSERT_EQ(d_sink.get_sink().length() , size*1000);