return 0;
}
-int BlueFS::downgrade_wal_to_v1() {
- _init_logger();
+int BlueFS::downgrade_wal_to_v1(
+ const std::string& dir,
+ const std::string& name)
+{
+ int r;
+ string tmp_name("__tmp_name__.log");
+ FileWriter* writer = nullptr;
+ FileReader* reader = nullptr;
+ // we use dir for wals and name like wal; should get proper hint
+ r = open_for_write(dir, tmp_name, &writer, false);
+ // use normal v1 write path by marking node type to legacy
+ writer->file->fnode.type = bluefs_node_type::REGULAR;
+ ceph_assert(r == 0);
+ r = open_for_read(dir, name, &reader);
+ ceph_assert(r == 0);
+ uint64_t offset = 0;
+ uint64_t len = 1024 * 1024;
+ bufferlist bl;
+ while (true) {
+ r = read(reader, offset, len, &bl, nullptr);
+ if (r <= 0) break;
+ append_try_flush(writer, bl.c_str(), bl.length());
+ offset += r;
+ }
+ if (r == 0) {
+ r = fsync(writer);
+ ceph_assert(r == 0);
+ }
+ delete reader;
+ close_writer(writer);
+ r = rename(dir, tmp_name, dir, name);
+ ceph_assert(r == 0);
+ return 0;
+}
+
+int BlueFS::downgrade_wal_to_v1()
+{
string wal_dir("db.wal");
auto dir_it = nodes.dir_map.find(wal_dir);
if (dir_it == nodes.dir_map.end()) {
return 0;
}
- DirRef dir = dir_it->second;
-
- size_t wal_v2_aggregated_size = 0;
- std::vector<std::pair<string, FileRef>> files_to_migrate;
- files_to_migrate.reserve(dir->file_map.size());
-
- // get info about wal files
- for (const auto& [file_name, file] : dir->file_map) {
+ // copy, so it does not change
+ auto dir_copy = dir_it->second->file_map;
+ for (const auto& [file_name, file] : dir_copy) {
if(file->is_new_wal()) {
- wal_v2_aggregated_size += file->fnode.wal_size;
- files_to_migrate.push_back({ file_name, file });
- }
- }
-
- // check free space
- uint8_t prefer_bdev = vselector->select_prefer_bdev(vselector->get_hint_by_dir(wal_dir));
- uint64_t free_space = alloc[prefer_bdev]->get_free();
- if (free_space < wal_v2_aggregated_size) {
- dout(1) << fmt::format("{} Error trying to migrate new WAL V2 to V1. Aggregated size of WAL files is {}, free space is {}", __func__, wal_v2_aggregated_size, free_space) << dendl;
- return -ENOSPC;
- }
-
- // do migration
- size_t file_migrated_count = 0;
- string tmp_name_prefix("__tmp_name_");
- uint64_t buffer_size = 1024*1024*50; // 50 MiB?
- bool failure_while_migrating = false;
- for (auto &[name, file] : files_to_migrate) {
- FileWriter *writer;
- string new_file_temp_name = fmt::format("{}{}", tmp_name_prefix, file_migrated_count);
- int r = open_for_write(wal_dir, new_file_temp_name, &writer, false);
- ceph_assert(r == 0);
- auto close_writer_path = make_scope_guard([&] {
- close_writer(writer);
- });
-
- // use normal v1 write path by marking node type to legacy
- writer->file->fnode.type = bluefs_node_type::REGULAR;
-
- FileReader* previous_reader;
- r = open_for_read(wal_dir, name, &previous_reader);
- auto read_writer_path = make_scope_guard([&] {
- delete previous_reader;
- });
- ceph_assert(r == 0);
-
- // move contents from previous to new file
- uint64_t previous_size = previous_reader->file->fnode.wal_size;
- uint64_t left_to_read = previous_size;
- bufferlist read_buffer;
- uint64_t read_offset = 0;
- while(left_to_read > 0) {
- // read from prevoius file
- read_buffer.clear();
- uint64_t to_read = std::min(buffer_size, left_to_read);
- int r = _read_wal(previous_reader, read_offset, to_read, &read_buffer, nullptr);
- ceph_assert(r >= 0);
- if (uint64_t(r) < to_read) {
- dout(5) << fmt::format("{} read less than expect while migrating file {} to wal v1", __func__, name) << dendl;
- failure_while_migrating = true;
- break;
- }
-
- // add to new file
- append_try_flush(writer, read_buffer.c_str(), read_buffer.length());
-
- // update state
- read_offset += to_read;
- left_to_read -= to_read;
- }
- if (failure_while_migrating) {
- break;
- }
- // todo things left to read, do cleanup
-
- int fsync_ret = fsync(writer);
- if (fsync_ret != 0) {
- derr << fmt::format("{} fsync error {} while moving data for file {} ", __func__, fsync_ret, name) << dendl;
- failure_while_migrating = true;
- break;
- }
-
- file_migrated_count++;
- }
-
- if (failure_while_migrating) {
- derr << fmt::format("{} couldn't properly move data from v2 to v1, unlinking temp files...", __func__) << dendl;
- size_t file_migrated_count = 0;
- for (size_t i = 0; i < files_to_migrate.size(); i++) {
- string new_file_temp_name = fmt::format("{}{}", tmp_name_prefix, file_migrated_count);
- derr << fmt::format("{} unlinking {}", __func__, new_file_temp_name) << dendl;
- int unlink_ret = unlink(wal_dir, new_file_temp_name);
- ceph_assert(unlink_ret == 0);
- file_migrated_count++;
+ downgrade_wal_to_v1(wal_dir, file_name);
+ sync_metadata(true);
+ dout(10) << __func__ << fmt::format(" {} v2=>v1", file_name) << dendl;
+ } else {
+ dout(10) << __func__ << fmt::format(" {} in v1", file_name) << dendl;
}
- return -EIO;
- }
-
- // everything went okay, rename files
- file_migrated_count = 0;
- for (auto &[name, file] : files_to_migrate) {
-
- dout(5) << fmt::format("{} unlinking {}", __func__, name) << dendl;
- int unlink_ret = unlink(wal_dir, name);
- ceph_assert(unlink_ret == 0);
-
- string new_file_temp_name = fmt::format("{}{}", tmp_name_prefix, file_migrated_count);
- dout(5) << fmt::format("{} renaming {} to {}", __func__, tmp_name_prefix, name) << dendl;
- int rename_ret = rename(wal_dir, new_file_temp_name, wal_dir, name);
- ceph_assert(rename_ret == 0);
- file_migrated_count++;
}
// Ensure no dangling wal v2 files are inside transactions.
_compact_log_sync_LNF_LD();
-
+ // TODO assert on presence of wal_v2
_write_super(BDEV_DB, 1);
dout(5) << fmt::format("{} success moving data", __func__) << dendl;