return c->is_safe();
}
-// PutObj filter that buffers data so we don't try to compress tiny blocks.
-// libcurl reads in 16k at a time, and we need at least 64k to get a good
-// compression ratio
-class RGWPutObj_Buffer : public RGWPutObj_Filter {
- const unsigned buffer_size;
- bufferlist buffer;
- public:
- RGWPutObj_Buffer(RGWPutObjDataProcessor* next, unsigned buffer_size)
- : RGWPutObj_Filter(next), buffer_size(buffer_size) {
- ceph_assert(isp2(buffer_size)); // must be power of 2
- }
-
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj,
- bool *again) override {
- if (*again || !bl.length()) {
- // flush buffered data
- return RGWPutObj_Filter::handle_data(buffer, ofs, phandle, pobj, again);
- }
- // transform offset to the beginning of the buffer
- ofs = ofs - buffer.length();
- buffer.claim_append(bl);
- if (buffer.length() < buffer_size) {
- *again = false; // don't come back until there's more data
- return 0;
- }
- const auto count = p2align(buffer.length(), buffer_size);
- buffer.splice(0, count, &bl);
- return RGWPutObj_Filter::handle_data(bl, ofs, phandle, pobj, again);
- }
-};
-
class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
{
CephContext* cct;
rgw_obj obj;
- RGWPutObjDataProcessor *filter;
+ rgw::putobj::DataProcessor *filter;
boost::optional<RGWPutObj_Compress>& compressor;
- boost::optional<RGWPutObj_Buffer> buffering;
+ boost::optional<rgw::putobj::ChunkProcessor> buffering;
CompressorRef& plugin;
- RGWPutObjProcessor_Atomic *processor;
+ rgw::putobj::ObjectProcessor *processor;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl;
uint64_t extra_data_left;
uint64_t data_len;
map<string, bufferlist> src_attrs;
- off_t ofs{0};
- off_t lofs{0}; /* logical ofs */
+ uint64_t ofs{0};
+ uint64_t lofs{0}; /* logical ofs */
public:
RGWRadosPutObj(CephContext* cct,
CompressorRef& plugin,
boost::optional<RGWPutObj_Compress>& compressor,
- RGWPutObjProcessor_Atomic *p,
+ rgw::putobj::ObjectProcessor *p,
void (*_progress_cb)(off_t, void *),
void *_progress_data) :
cct(cct),
if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
//do not compress if object is encrypted
-#if 0
compressor = boost::in_place(cct, plugin, filter);
+ // add a filter that buffers data so we don't try to compress tiny blocks.
+ // libcurl reads in 16k at a time, and we need at least 64k to get a good
+ // compression ratio
constexpr unsigned buffer_size = 512 * 1024;
buffering = boost::in_place(&*compressor, buffer_size);
filter = &*buffering;
-#endif
}
return 0;
}
int handle_data(bufferlist& bl, bool *pause) override {
if (progress_cb) {
- progress_cb(lofs, progress_data);
+ progress_cb(data_len, progress_data);
}
if (extra_data_left) {
- size_t extra_len = bl.length();
+ uint64_t extra_len = bl.length();
if (extra_len > extra_data_left)
extra_len = extra_data_left;
ceph_assert(uint64_t(ofs) >= extra_data_len);
- lofs = ofs - extra_data_len;
-
- data_len += bl.length();
- bool again = false;
-
- do {
- void *handle = NULL;
- rgw_raw_obj obj;
- uint64_t size = bl.length();
- int ret = filter->handle_data(bl, lofs, &handle, &obj, &again);
- if (ret < 0)
- return ret;
-
- ofs += size;
+ uint64_t size = bl.length();
+ ofs += size;
- ret = filter->throttle_data(handle, obj, size, false);
- if (ret < 0)
- return ret;
- } while (again);
+ const uint64_t lofs = data_len;
+ data_len += size;
- return 0;
+ return filter->process(std::move(bl), lofs);
}
int flush() {
- bufferlist bl;
- return put_data_and_throttle(filter, bl, ofs, false);
+ return filter->process({}, data_len);
}
bufferlist& get_extra_data() { return extra_data_bl; }
}
int complete(const string& etag, real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
- return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
- }
-
- bool is_canceled() {
- return processor->is_canceled();
+ map<string, bufferlist>& attrs, real_time delete_at,
+ rgw_zone_set *zones_trace, bool *canceled) {
+ return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace, canceled);
}
};
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
- ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
+ bool canceled = false;
+ ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace, &canceled);
if (ret < 0) {
goto set_err_state;
}
- if (copy_if_newer && cb.is_canceled()) {
+ if (copy_if_newer && canceled) {
ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
obj_ctx.obj.invalidate(dest_obj); /* object was overwritten */
ret = get_obj_state(&obj_ctx, dest_bucket_info, dest_obj, &dest_state, false);