*/
void RecoveryQueue::advance()
{
- dout(10) << "RecoveryQueue::advance " << file_recover_queue.size() << " queued, "
+ dout(10) << file_recover_queue.size() << " queued, "
+ << file_recover_queue_front.size() << " prioritized, "
<< file_recovering.size() << " recovering" << dendl;
- while (file_recovering.size() < 5 &&
- !file_recover_queue.empty()) {
- CInode *in = *file_recover_queue.begin();
- file_recover_queue.erase(in);
+ while (file_recovering.size() < g_conf->mds_max_file_recover) {
+ if (!file_recover_queue_front.empty()) {
+ CInode *in = *file_recover_queue_front.begin();
+ file_recover_queue_front.erase(file_recover_queue_front.begin());
+ file_recover_queue.erase(in);
+ _start(in);
+ } else if (!file_recover_queue.empty()) {
+ CInode *in = *file_recover_queue.begin();
+ file_recover_queue.erase(file_recover_queue.begin());
+ _start(in);
+ } else {
+ break;
+ }
+ }
+}
- inode_t *pi = in->get_projected_inode();
+void RecoveryQueue::_start(CInode *in)
+{
+ inode_t *pi = in->get_projected_inode();
- // blech
- if (pi->client_ranges.size() && !pi->get_max_size()) {
- mds->clog->warn() << "bad client_range " << pi->client_ranges
- << " on ino " << pi->ino << "\n";
- }
+ // blech
+ if (pi->client_ranges.size() && !pi->get_max_size()) {
- mds->clog.warn() << "bad client_range " << pi->client_ranges
- << " on ino " << pi->ino << "\n";
++ mds->clog->warn() << "bad client_range " << pi->client_ranges
++ << " on ino " << pi->ino << "\n";
+ }
- if (pi->client_ranges.size() && pi->get_max_size()) {
- dout(10) << "do_file_recover starting " << in->inode.size << " " << pi->client_ranges
- << " " << *in << dendl;
- file_recovering.insert(in);
+ if (pi->client_ranges.size() && pi->get_max_size()) {
+ dout(10) << "starting " << in->inode.size << " " << pi->client_ranges
+ << " " << *in << dendl;
+ file_recovering.insert(in);
+
+ C_MDC_Recover *fin = new C_MDC_Recover(this, in);
+ mds->filer->probe(in->inode.ino, &in->inode.layout, in->last,
+ pi->get_max_size(), &fin->size, &fin->mtime, false,
+ 0, fin);
+ } else {
+ dout(10) << "skipping " << in->inode.size << " " << *in << dendl;
+ in->state_clear(CInode::STATE_RECOVERING);
+ mds->locker->eval(in, CEPH_LOCK_IFILE);
+ in->auth_unpin(this);
+ }
+}
- C_MDC_Recover *fin = new C_MDC_Recover(this, in);
- mds->filer->probe(in->inode.ino, &in->inode.layout, in->last,
- pi->get_max_size(), &fin->size, &fin->mtime, false,
- 0, fin);
- } else {
- dout(10) << "do_file_recover skipping " << in->inode.size
- << " " << *in << dendl;
- in->state_clear(CInode::STATE_RECOVERING);
- mds->locker->eval(in, CEPH_LOCK_IFILE);
- in->auth_unpin(this);
- }
+void RecoveryQueue::prioritize(CInode *in)
+{
+ if (file_recovering.count(in)) {
+ dout(10) << "already working on " << *in << dendl;
+ return;
+ }
+
+ if (file_recover_queue.count(in)) {
+ dout(20) << *in << dendl;
+ file_recover_queue_front.insert(in);
+ return;
}
+
+ dout(10) << "not queued " << *in << dendl;
}
map<string,string> *erasure_code_profile_map,
stringstream &ss)
{
- int r = get_str_map(g_conf->osd_pool_default_erasure_code_profile,
- ss,
- erasure_code_profile_map);
+ int r = get_json_str_map(g_conf->osd_pool_default_erasure_code_profile,
+ ss,
+ erasure_code_profile_map);
if (r)
return r;
- (*erasure_code_profile_map)["directory"] =
- g_conf->osd_pool_default_erasure_code_directory;
-
+ assert((*erasure_code_profile_map).count("plugin"));
+ string default_plugin = (*erasure_code_profile_map)["plugin"];
+ map<string,string> user_map;
for (vector<string>::const_iterator i = erasure_code_profile.begin();
i != erasure_code_profile.end();
++i) {