delete base;
}
-int Infiniband::MemoryManager::Cluster::add(uint32_t num)
+int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
{
+ assert(!base);
uint32_t bytes = chunk_size * num;
- //cihar* base = (char*)malloc(bytes);
if (manager.enabled_huge_page) {
base = (char*)manager.malloc_huge_pages(bytes);
} else {
base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
}
+ end = base + bytes;
assert(base);
chunk_base = (char*)::malloc(sizeof(Chunk) * num);
memset(chunk_base, 0, sizeof(Chunk) * num);
+ free_chunks.reserve(num);
char *ptr = chunk_base;
for (uint32_t offset = 0; offset < bytes; offset += chunk_size){
Chunk *chunk = reinterpret_cast<Chunk*>(ptr);
assert(m);
new(chunk) Chunk(m, chunk_size, base+offset);
free_chunks.push_back(chunk);
- all_buffers.insert(chunk->buffer);
ptr += sizeof(Chunk);
}
return 0;
assert(device);
assert(pd);
channel = new Cluster(*this, size);
- channel->add(rx_num);
+ channel->fill(rx_num);
send = new Cluster(*this, size);
- send->add(tx_num);
+ send->fill(tx_num);
}
void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
Cluster(MemoryManager& m, uint32_t s);
~Cluster();
- int add(uint32_t num);
+ int fill(uint32_t num);
void take_back(std::vector<Chunk*> &ck);
int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
Chunk *get_chunk_by_buffer(const char *c) {
Chunk *chunk = reinterpret_cast<Chunk*>(chunk_base + sizeof(Chunk) * idx);
return chunk;
}
+ bool is_my_buffer(const char *c) const {
+ return c >= base && c < end;
+ }
MemoryManager& manager;
uint32_t chunk_size;
Mutex lock;
std::vector<Chunk*> free_chunks;
- std::set<const char*> all_buffers;
- char* base;
+ char *base = nullptr;
+ char *end = nullptr;
char* chunk_base;
};
void return_tx(std::vector<Chunk*> &chunks);
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
- // TODO: optimize via address judgement
- bool is_tx_buffer(const char* c) { return send->all_buffers.count(c); }
- bool is_rx_buffer(const char* c) { return channel->all_buffers.count(c); }
+ bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
+ bool is_rx_buffer(const char* c) { return channel->is_my_buffer(c); }
Chunk *get_tx_chunk_by_buffer(const char *c) {
return send->get_chunk_by_buffer(c);
}