}
-RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
+RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid,
+ uint64_t _window_size)
: RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
- store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0), total_entries(0)
+ store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0), window_size(_window_size), total_entries(0)
{
}
}
-#define OMAP_APPEND_MAX_ENTRIES 100
int RGWOmapAppend::operate() {
reenter(this) {
for (;;) {
while (consume(&entry)) {
set_status() << "adding entry: " << entry;
entries[entry] = bufferlist();
- if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
+ if (entries.size() >= window_size) {
break;
}
}
- if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
+ if (entries.size() >= window_size || going_down) {
set_status() << "flushing to omap";
call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
entries.clear();
}
++total_entries;
pending_entries.push_back(s);
- if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
+ if (++num_pending_entries >= (int)window_size) {
flush_pending();
}
return true;
int request_complete();
};
+#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
+
class RGWOmapAppend : public RGWConsumerCR<string> {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
map<string, bufferlist> entries;
+ uint64_t window_size;
uint64_t total_entries;
public:
- RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid);
+ RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid,
+ uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT);
int operate();
void flush_pending();
bool append(const string& s);
uint64_t get_total_entries() {
return total_entries;
}
+
+ const rgw_bucket& get_pool() {
+ return pool;
+ }
+
+ const string& get_oid() {
+ return oid;
+ }
};
class RGWAsyncWait : public RGWAsyncRadosRequest {