}
RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(CephContext* cct,
- RGWPutObjDataProcessor* next,
- std::unique_ptr<BlockCrypt> crypt):
- RGWPutObj_Filter(next),
+ rgw::putobj::DataProcessor *next,
+ std::unique_ptr<BlockCrypt> crypt)
+ : Pipe(next),
cct(cct),
crypt(std::move(crypt)),
- ofs(0),
- cache()
+ block_size(this->crypt->get_block_size())
{
- block_size = this->crypt->get_block_size();
}
-RGWPutObj_BlockEncrypt::~RGWPutObj_BlockEncrypt() {
-}
+int RGWPutObj_BlockEncrypt::process(bufferlist&& data, uint64_t logical_offset)
+{
+ ldout(cct, 25) << "Encrypt " << data.length() << " bytes" << dendl;
-int RGWPutObj_BlockEncrypt::handle_data(bufferlist& bl,
- off_t in_ofs,
- void **phandle,
- rgw_raw_obj *pobj,
- bool *again) {
- int res = 0;
- ldout(cct, 25) << "Encrypt " << bl.length() << " bytes" << dendl;
-
- if (*again) {
- bufferlist no_data;
- res = next->handle_data(no_data, in_ofs, phandle, pobj, again);
- //if *again is not set to false, we will have endless loop
- //drop info on log
- if (*again) {
- ldout(cct, 20) << "*again==true" << dendl;
- }
- return res;
- }
+ // adjust logical offset to beginning of cached data
+ ceph_assert(logical_offset >= cache.length());
+ logical_offset -= cache.length();
- cache.append(bl);
- off_t proc_size = cache.length() & ~(block_size - 1);
- if (bl.length() == 0) {
+ const bool flush = (data.length() == 0);
+ cache.claim_append(data);
+
+ uint64_t proc_size = cache.length() & ~(block_size - 1);
+ if (flush) {
proc_size = cache.length();
}
if (proc_size > 0) {
- bufferlist data;
- if (! crypt->encrypt(cache, 0, proc_size, data, ofs) ) {
+ bufferlist in, out;
+ cache.splice(0, proc_size, &in);
+ if (!crypt->encrypt(in, 0, proc_size, out, logical_offset)) {
return -ERR_INTERNAL_ERROR;
}
- res = next->handle_data(data, ofs, phandle, pobj, again);
- ofs += proc_size;
- cache.splice(0, proc_size);
- if (res < 0)
- return res;
+ int r = Pipe::process(std::move(out), logical_offset);
+ logical_offset += proc_size;
+ if (r < 0)
+ return r;
}
- if (bl.length() == 0) {
+ if (flush) {
/*replicate 0-sized handle_data*/
- res = next->handle_data(bl, ofs, phandle, pobj, again);
+ return Pipe::process({}, logical_offset);
}
- return res;
+ return 0;
}
-int RGWPutObj_BlockEncrypt::throttle_data(void *handle,
- const rgw_raw_obj& obj,
- uint64_t size,
- bool need_to_wait) {
- return next->throttle_data(handle, obj, size, need_to_wait);
-}
std::string create_random_key_selector(CephContext * const cct) {
char random[AES_256_KEYSIZE];
#include <rgw/rgw_op.h>
#include <rgw/rgw_rest.h>
#include <rgw/rgw_rest_s3.h>
+#include "rgw_putobj.h"
#include <boost/utility/string_view.hpp>
/**
}; /* RGWGetObj_BlockDecrypt */
-class RGWPutObj_BlockEncrypt : public RGWPutObj_Filter
+class RGWPutObj_BlockEncrypt : public rgw::putobj::Pipe
{
CephContext* cct;
std::unique_ptr<BlockCrypt> crypt; /**< already configured stateless BlockCrypt
for operations when enough data is accumulated */
- off_t ofs; /**< stream offset of data we expect to show up next through \ref handle_data */
bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */
- size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
+ const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
public:
RGWPutObj_BlockEncrypt(CephContext* cct,
- RGWPutObjDataProcessor* next,
+ rgw::putobj::DataProcessor *next,
std::unique_ptr<BlockCrypt> crypt);
- virtual ~RGWPutObj_BlockEncrypt();
- virtual int handle_data(bufferlist& bl,
- off_t ofs,
- void **phandle,
- rgw_raw_obj *pobj,
- bool *again) override;
- virtual int throttle_data(void *handle,
- const rgw_raw_obj& obj,
- uint64_t size,
- bool need_to_wait) override;
+
+ int process(bufferlist&& data, uint64_t logical_offset) override;
}; /* RGWPutObj_BlockEncrypt */
}
};
-class ut_put_sink: public RGWPutObjDataProcessor
+class ut_put_sink: public rgw::putobj::DataProcessor
{
std::stringstream sink;
public:
- ut_put_sink(){}
- virtual ~ut_put_sink(){}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override
+ int process(bufferlist&& bl, uint64_t ofs) override
{
sink << boost::string_ref(bl.c_str(),bl.length());
- *again = false;
- return 0;
- }
- int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override
- {
return 0;
}
std::string get_sink()
bufferlist bl;
bl.append(input.c_str()+pos, size);
- void* handle;
- bool again = false;
- rgw_raw_obj ro;
- encrypt.handle_data(bl, 0, &handle, nullptr, &again);
- encrypt.throttle_data(handle, ro, size, false);
+ encrypt.process(std::move(bl), pos);
pos = pos + size;
} while (pos < test_size);
- bufferlist bl;
- void* handle;
- bool again = false;
- encrypt.handle_data(bl, 0, &handle, nullptr, &again);
+ encrypt.process({}, pos);
ASSERT_EQ(put_sink.get_sink().length(), static_cast<size_t>(test_size));
AES_256_CBC_create(g_ceph_context, &key[0], 32) );
bufferlist bl;
bl.append((char*)test_in, test_size);
- void* handle;
- bool again = false;
- rgw_raw_obj ro;
- encrypt.handle_data(bl, 0, &handle, nullptr, &again);
- encrypt.throttle_data(handle, ro, test_size, false);
- bl.clear();
- encrypt.handle_data(bl, 0, &handle, nullptr, &again);
+ encrypt.process(std::move(bl), 0);
+ encrypt.process({}, test_size);
ASSERT_EQ(put_sink.get_sink().length(), test_size);
bl.append(put_sink.get_sink().data(), put_sink.get_sink().length());