std::vector<bufferlist*> &bls_to_read,
uint64_t entry_hit_length, Extent hit_extent,
pwl::C_ReadRequest *read_ctx) {
- // Make a bl for this hit extent. This will add references to the
- // write_entry->cache_bl */
- ldout(m_image_ctx.cct, 5) << dendl;
- auto write_entry = static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
- buffer::list hit_bl;
- hit_bl = write_entry->get_cache_bl();
- bool writesame = write_entry->is_writesame_entry();
- auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
- hit_extent, hit_bl, true, read_buffer_offset, writesame);
- read_ctx->read_extents.push_back(hit_extent_buf);
-
- if(!hit_bl.length()) {
- ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
- auto read_extent = read_ctx->read_extents.back();
- log_entries_to_read.push_back(&write_entry->ram_entry);
- bls_to_read.push_back(&read_extent->m_bl);
- }
+ // Make a bl for this hit extent. This will add references to the
+ // write_entry->cache_bl */
+ ldout(m_image_ctx.cct, 5) << dendl;
+ auto write_entry = static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
+ buffer::list hit_bl;
+ hit_bl = write_entry->get_cache_bl();
+ bool writesame = write_entry->is_writesame_entry();
+ auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
+ hit_extent, hit_bl, true, read_buffer_offset, writesame);
+ read_ctx->read_extents.push_back(hit_extent_buf);
+
+ if (!hit_bl.length()) {
+ ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
+ auto read_extent = read_ctx->read_extents.back();
+ log_entries_to_read.push_back(&write_entry->ram_entry);
+ bls_to_read.push_back(&read_extent->m_bl);
+ }
}
template <typename I>
}
this->m_total_log_entries = new_root->num_log_entries;
this->m_free_log_entries = new_root->num_log_entries - 1;
- } else {
- m_cache_state->present = true;
- bdev = BlockDevice::create(
- cct, this->m_log_pool_name, aio_cache_cb,
- static_cast<void*>(this), nullptr, static_cast<void*>(this));
- int r = bdev->open(this->m_log_pool_name);
- if (r < 0) {
- delete bdev;
- on_finish->complete(r);
- return false;
- }
- load_existing_entries(later);
- if (m_first_free_entry < m_first_valid_entry) {
+ } else {
+ m_cache_state->present = true;
+ bdev = BlockDevice::create(
+ cct, this->m_log_pool_name, aio_cache_cb,
+ static_cast<void*>(this), nullptr, static_cast<void*>(this));
+ int r = bdev->open(this->m_log_pool_name);
+ if (r < 0) {
+ delete bdev;
+ on_finish->complete(r);
+ return false;
+ }
+ load_existing_entries(later);
+ if (m_first_free_entry < m_first_valid_entry) {
/* Valid entries wrap around the end of the ring, so first_free is lower
* than first_valid. If first_valid was == first_free+1, the entry at
* first_free would be empty. The last entry is never used, so in
* that case there would be zero free log entries. */
- this->m_free_log_entries = this->m_total_log_entries -
- (m_first_valid_entry - m_first_free_entry) - 1;
- } else {
+ this->m_free_log_entries = this->m_total_log_entries -
+ (m_first_valid_entry - m_first_free_entry) - 1;
+ } else {
/* first_valid is <= first_free. If they are == we have zero valid log
* entries, and n-1 free log entries */
- this->m_free_log_entries = this->m_total_log_entries -
- (m_first_free_entry - m_first_valid_entry) - 1;
- }
- m_cache_state->clean = this->m_dirty_log_entries.empty();
- m_cache_state->empty = m_log_entries.empty();
+ this->m_free_log_entries = this->m_total_log_entries -
+ (m_first_free_entry - m_first_valid_entry) - 1;
+ }
+ m_cache_state->clean = this->m_dirty_log_entries.empty();
+ m_cache_state->empty = m_log_entries.empty();
}
return true;
}
if (next_log_pos >= this->m_log_pool_actual_size) {
next_log_pos = next_log_pos % this->m_log_pool_actual_size + DATA_RING_BUFFER_OFFSET;
}
- }
+ }
this->update_sync_points(missing_sync_points, sync_point_entries, later,
MIN_WRITE_ALLOC_SSD_SIZE);
}
this->m_async_append_ops++;
this->m_async_op_tracker.start_op();
Context *append_ctx = new LambdaContext([this](int r) {
- append_scheduled_ops();
- });
+ append_scheduled_ops();
+ });
this->m_work_queue.queue(append_ctx);
}
+
/*
* Takes custody of ops. They'll all get their log entries appended,
* and have their on_write_persist contexts completed once they and
});
uint64_t *new_first_free_entry = new(uint64_t);
Context *append_ctx = new LambdaContext(
- [this, new_first_free_entry, ops, ctx](int r) {
+ [this, new_first_free_entry, ops, ctx](int r) {
std::shared_ptr<WriteLogPoolRoot> new_root;
{
ldout(m_image_ctx.cct, 20) << "Finished appending at "
delete new_first_free_entry;
schedule_update_root(new_root, ctx);
}
- });
+ });
// Append logs and update first_free_update
uint64_t bytes_allocated_updated;
append_ops(ops, append_ctx, new_first_free_entry, bytes_allocated_updated);
if (invalidating) {
return ctx;
}
- if(log_entry->is_write_entry()) {
- bufferlist *read_bl_ptr = new bufferlist;
- ctx = new LambdaContext(
- [this, log_entry, read_bl_ptr, ctx](int r) {
- bufferlist captured_entry_bl;
- captured_entry_bl.claim_append(*read_bl_ptr);
- free(read_bl_ptr);
- m_image_ctx.op_work_queue->queue(new LambdaContext(
- [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) {
- auto captured_entry_bl = std::move(entry_bl);
- ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
- << " " << *log_entry << dendl;
- log_entry->writeback_bl(this->m_image_writeback, ctx,
- std::move(captured_entry_bl));
- }), 0);
+ if (log_entry->is_write_entry()) {
+ bufferlist *read_bl_ptr = new bufferlist;
+ ctx = new LambdaContext(
+ [this, log_entry, read_bl_ptr, ctx](int r) {
+ bufferlist captured_entry_bl;
+ captured_entry_bl.claim_append(*read_bl_ptr);
+ free(read_bl_ptr);
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) {
+ auto captured_entry_bl = std::move(entry_bl);
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback_bl(this->m_image_writeback, ctx,
+ std::move(captured_entry_bl));
+ }), 0);
});
- ctx = new LambdaContext(
- [this, log_entry, read_bl_ptr, ctx](int r) {
- aio_read_data_block(&log_entry->ram_entry, read_bl_ptr, ctx);
+ ctx = new LambdaContext(
+ [this, log_entry, read_bl_ptr, ctx](int r) {
+ aio_read_data_block(&log_entry->ram_entry, read_bl_ptr, ctx);
});
return ctx;
} else {
first_valid_entry = entry->log_entry_index + MIN_WRITE_ALLOC_SSD_SIZE;
}
if (first_valid_entry >= this->m_log_pool_config_size) {
- first_valid_entry = first_valid_entry % this->m_log_pool_config_size +
+ first_valid_entry = first_valid_entry % this->m_log_pool_config_size +
DATA_RING_BUFFER_OFFSET;
}
ceph_assert(first_valid_entry != initial_first_valid_entry);
pool_root.first_valid_entry = first_valid_entry;
Context *ctx = new LambdaContext(
- [this, flushed_sync_gen, first_valid_entry,
- initial_first_valid_entry, retiring_entries](int r) {
- uint64_t allocated_bytes = 0;
- uint64_t cached_bytes = 0;
- uint64_t former_log_pos = 0;
- for (auto &entry : retiring_entries) {
- ceph_assert(entry->log_entry_index != 0);
- if (entry->log_entry_index != former_log_pos ) {
- // Space for control blocks
- allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
- former_log_pos = entry->log_entry_index;
- }
- if (entry->is_write_entry()) {
- cached_bytes += entry->write_bytes();
- //space for userdata
- allocated_bytes += entry->get_aligned_data_size();
- }
+ [this, flushed_sync_gen, first_valid_entry,
+ initial_first_valid_entry, retiring_entries](int r) {
+ uint64_t allocated_bytes = 0;
+ uint64_t cached_bytes = 0;
+ uint64_t former_log_pos = 0;
+ for (auto &entry : retiring_entries) {
+ ceph_assert(entry->log_entry_index != 0);
+ if (entry->log_entry_index != former_log_pos ) {
+ // Space for control blocks
+ allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
+ former_log_pos = entry->log_entry_index;
}
- {
- std::lock_guard locker(m_lock);
- m_first_valid_entry = first_valid_entry;
- ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
- this->m_free_log_entries += retiring_entries.size();
- ceph_assert(this->m_bytes_cached >= cached_bytes);
- this->m_bytes_cached -= cached_bytes;
-
- ldout(m_image_ctx.cct, 20)
- << "Finished root update: " << "initial_first_valid_entry="
- << initial_first_valid_entry << ", " << "m_first_valid_entry="
- << m_first_valid_entry << "," << "release space = "
- << allocated_bytes << "," << "m_bytes_allocated="
- << m_bytes_allocated << "," << "release cached space="
- << allocated_bytes << "," << "m_bytes_cached="
- << this->m_bytes_cached << dendl;
-
- this->m_alloc_failed_since_retire = false;
- this->wake_up();
- m_async_update_superblock--;
- this->m_async_op_tracker.finish_op();
+ if (entry->is_write_entry()) {
+ cached_bytes += entry->write_bytes();
+ // space for userdata
+ allocated_bytes += entry->get_aligned_data_size();
}
+ }
+ {
+ std::lock_guard locker(m_lock);
+ m_first_valid_entry = first_valid_entry;
+ ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ this->m_free_log_entries += retiring_entries.size();
+ ceph_assert(this->m_bytes_cached >= cached_bytes);
+ this->m_bytes_cached -= cached_bytes;
+
+ ldout(m_image_ctx.cct, 20)
+ << "Finished root update: " << "initial_first_valid_entry="
+ << initial_first_valid_entry << ", " << "m_first_valid_entry="
+ << m_first_valid_entry << "," << "release space = "
+ << allocated_bytes << "," << "m_bytes_allocated="
+ << m_bytes_allocated << "," << "release cached space="
+ << allocated_bytes << "," << "m_bytes_cached="
+ << this->m_bytes_cached << dendl;
+
+ this->m_alloc_failed_since_retire = false;
+ this->wake_up();
+ m_async_update_superblock--;
+ this->m_async_op_tracker.finish_op();
+ }
- this->dispatch_deferred_writes();
- this->process_writeback_dirty_entries();
- });
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+ });
std::lock_guard locker(m_lock);
schedule_update_root(new_root, ctx);
}
template <typename I>
-void WriteLog<I>::aio_read_data_block(
- WriteLogCacheEntry *log_entry, bufferlist *bl, Context *ctx) {
+void WriteLog<I>::aio_read_data_block(WriteLogCacheEntry *log_entry,
+ bufferlist *bl, Context *ctx) {
std::vector<WriteLogCacheEntry*> log_entries {log_entry};
std::vector<bufferlist *> bls {bl};
aio_read_data_block(log_entries, bls, ctx);
bls[i]->clear();
bls[i]->append(valid_data_bl);
}
- ctx->complete(r);
+ ctx->complete(r);
});
CephContext *cct = m_image_ctx.cct;