RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
: RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
- store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0)
+ store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0), total_entries(0)
{
}
}
void RGWOmapAppend::append(const string& s) {
+ ++total_entries;
pending_entries.push_back(s);
if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
flush_pending();
list<string> pending_entries;
map<string, bufferlist> entries;
+
+ uint64_t total_entries;
public:
RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid);
int operate();
void flush_pending();
void append(const string& s);
void finish();
+
+ uint64_t get_total_entries() {
+ return total_entries;
+ }
};
class RGWAsyncWait : public RGWAsyncRadosRequest {
(*iter)->finish();
}
}
+
+ uint64_t get_total_entries(int shard_id) {
+ return shards[shard_id]->get_total_entries();
+ }
};
class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
state = s;
JSONDecoder::decode_json("marker", marker, obj);
JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
+ JSONDecoder::decode_json("total_entries", total_entries, obj);
+ JSONDecoder::decode_json("pos", pos, obj);
}
void rgw_meta_sync_marker::dump(Formatter *f) const
encode_json("state", (int)state, f);
encode_json("marker", marker, f);
encode_json("next_step_marker", next_step_marker, f);
+ encode_json("total_entries", total_entries, f);
+ encode_json("pos", pos, f);
}
void rgw_meta_sync_status::decode_json(JSONObj *obj)
uint16_t state;
string marker;
string next_step_marker;
+ uint64_t total_entries;
+ uint64_t pos;
- rgw_meta_sync_marker() : state(FullSync) {}
+ rgw_meta_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(state, bl);
::encode(marker, bl);
::encode(next_step_marker, bl);
+ ::encode(total_entries, bl);
+ ::encode(pos, bl);
ENCODE_FINISH(bl);
}
::decode(state, bl);
::decode(marker, bl);
::decode(next_step_marker, bl);
+ ::decode(total_entries, bl);
+ ::decode(pos, bl);
DECODE_FINISH(bl);
}
RGWContinuousLeaseCR *lease_cr;
bool lost_lock;
+ map<uint32_t, rgw_meta_sync_marker>& markers;
+
public:
- RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store),
+ RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards,
+ map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
num_shards(_num_shards),
- req_ret(0), entries_index(NULL), lease_cr(NULL), lost_lock(false) {
+ req_ret(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), markers(_markers) {
}
~RGWFetchAllMetaCR() {
}
yield entries_index->finish();
+ for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) {
+ int shard_id = (int)iter->first;
+ rgw_meta_sync_marker& marker = iter->second;
+ marker.total_entries = entries_index->get_total_entries(shard_id);
+ spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWMetaSyncStatusManager::shard_obj_name(shard_id), marker), true);
+ }
+
drain_all_but(1); /* the lease cr still needs to run */
yield lease_cr->go_down();
switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
case rgw_meta_sync_info::StateBuildingFullSyncMaps:
ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
- r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards));
+ r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards, sync_status.sync_markers));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;