use std::cmp;
use std::iter;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use buffer::BufferUsage;
use buffer::sys::BufferCreationError;
use buffer::sys::SparseLevel;
use buffer::sys::UnsafeBuffer;
use buffer::traits::BufferAccess;
use buffer::traits::BufferInner;
use buffer::traits::TypedBufferAccess;
use device::Device;
use device::DeviceOwned;
use device::Queue;
use image::ImageAccess;
use memory::DedicatedAlloc;
use memory::DeviceMemoryAllocError;
use memory::pool::AllocFromRequirementsFilter;
use memory::pool::AllocLayout;
use memory::pool::MappingRequirement;
use memory::pool::MemoryPool;
use memory::pool::MemoryPoolAlloc;
use memory::pool::PotentialDedicatedAllocation;
use memory::pool::StdMemoryPool;
use sync::AccessError;
use sync::Sharing;
use OomError;
pub struct CpuBufferPool<T, A = Arc<StdMemoryPool>>
where A: MemoryPool
{
device: Arc<Device>,
pool: A,
current_buffer: Mutex<Option<Arc<ActualBuffer<A>>>>,
usage: BufferUsage,
marker: PhantomData<Box<T>>,
}
struct ActualBuffer<A>
where A: MemoryPool
{
inner: UnsafeBuffer,
memory: PotentialDedicatedAllocation<A::Alloc>,
chunks_in_use: Mutex<Vec<ActualBufferChunk>>,
next_index: AtomicUsize,
capacity: usize,
}
#[derive(Debug)]
struct ActualBufferChunk {
index: usize,
len: usize,
num_cpu_accesses: usize,
num_gpu_accesses: usize,
}
pub struct CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
buffer: Arc<ActualBuffer<A>>,
index: usize,
align_offset: usize,
requested_len: usize,
marker: PhantomData<Box<T>>,
}
pub struct CpuBufferPoolSubbuffer<T, A>
where A: MemoryPool
{
chunk: CpuBufferPoolChunk<T, A>,
}
impl<T> CpuBufferPool<T> {
#[inline]
pub fn new(device: Arc<Device>, usage: BufferUsage) -> CpuBufferPool<T> {
let pool = Device::standard_pool(&device);
CpuBufferPool {
device: device,
pool: pool,
current_buffer: Mutex::new(None),
usage: usage.clone(),
marker: PhantomData,
}
}
#[inline]
pub fn upload(device: Arc<Device>) -> CpuBufferPool<T> {
CpuBufferPool::new(device, BufferUsage::transfer_source())
}
#[inline]
pub fn download(device: Arc<Device>) -> CpuBufferPool<T> {
CpuBufferPool::new(device, BufferUsage::transfer_destination())
}
#[inline]
pub fn uniform_buffer(device: Arc<Device>) -> CpuBufferPool<T> {
CpuBufferPool::new(device, BufferUsage::uniform_buffer())
}
#[inline]
pub fn vertex_buffer(device: Arc<Device>) -> CpuBufferPool<T> {
CpuBufferPool::new(device, BufferUsage::vertex_buffer())
}
#[inline]
pub fn indirect_buffer(device: Arc<Device>) -> CpuBufferPool<T> {
CpuBufferPool::new(device, BufferUsage::indirect_buffer())
}
}
impl<T, A> CpuBufferPool<T, A>
where A: MemoryPool
{
pub fn capacity(&self) -> usize {
match *self.current_buffer.lock().unwrap() {
None => 0,
Some(ref buf) => buf.capacity,
}
}
pub fn reserve(&self, capacity: usize) -> Result<(), DeviceMemoryAllocError> {
let mut cur_buf = self.current_buffer.lock().unwrap();
match *cur_buf {
Some(ref buf) if buf.capacity >= capacity => {
return Ok(());
},
_ => (),
};
self.reset_buf(&mut cur_buf, capacity)
}
#[inline]
pub fn next(&self, data: T) -> Result<CpuBufferPoolSubbuffer<T, A>, DeviceMemoryAllocError> {
Ok(CpuBufferPoolSubbuffer { chunk: self.chunk(iter::once(data))? })
}
pub fn chunk<I>(&self, data: I) -> Result<CpuBufferPoolChunk<T, A>, DeviceMemoryAllocError>
where I: IntoIterator<Item = T>,
I::IntoIter: ExactSizeIterator
{
let data = data.into_iter();
let mut mutex = self.current_buffer.lock().unwrap();
let data = match self.try_next_impl(&mut mutex, data) {
Ok(n) => return Ok(n),
Err(d) => d,
};
let next_capacity = match *mutex {
Some(ref b) if data.len() < b.capacity => 2 * b.capacity,
_ => 2 * data.len(),
};
self.reset_buf(&mut mutex, next_capacity)?;
match self.try_next_impl(&mut mutex, data) {
Ok(n) => Ok(n),
Err(_) => unreachable!(),
}
}
#[inline]
pub fn try_next(&self, data: T) -> Option<CpuBufferPoolSubbuffer<T, A>> {
let mut mutex = self.current_buffer.lock().unwrap();
self.try_next_impl(&mut mutex, iter::once(data))
.map(|c| CpuBufferPoolSubbuffer { chunk: c })
.ok()
}
fn reset_buf(&self, cur_buf_mutex: &mut MutexGuard<Option<Arc<ActualBuffer<A>>>>,
capacity: usize)
-> Result<(), DeviceMemoryAllocError> {
unsafe {
let (buffer, mem_reqs) = {
let size_bytes = match mem::size_of::<T>().checked_mul(capacity) {
Some(s) => s,
None =>
return Err(DeviceMemoryAllocError::OomError(OomError::OutOfDeviceMemory)),
};
match UnsafeBuffer::new(self.device.clone(),
size_bytes,
self.usage,
Sharing::Exclusive::<iter::Empty<_>>,
SparseLevel::none()) {
Ok(b) => b,
Err(BufferCreationError::AllocError(err)) => return Err(err),
Err(_) => unreachable!(),
}
};
let mem = MemoryPool::alloc_from_requirements(&self.pool,
&mem_reqs,
AllocLayout::Linear,
MappingRequirement::Map,
DedicatedAlloc::Buffer(&buffer),
|_| AllocFromRequirementsFilter::Allowed)?;
debug_assert!((mem.offset() % mem_reqs.alignment) == 0);
debug_assert!(mem.mapped_memory().is_some());
buffer.bind_memory(mem.memory(), mem.offset())?;
**cur_buf_mutex = Some(Arc::new(ActualBuffer {
inner: buffer,
memory: mem,
chunks_in_use: Mutex::new(vec![]),
next_index: AtomicUsize::new(0),
capacity: capacity,
}));
Ok(())
}
}
fn try_next_impl<I>(&self, cur_buf_mutex: &mut MutexGuard<Option<Arc<ActualBuffer<A>>>>,
mut data: I)
-> Result<CpuBufferPoolChunk<T, A>, I>
where I: ExactSizeIterator<Item = T>
{
let current_buffer = match cur_buf_mutex.clone() {
Some(b) => b,
None => return Err(data),
};
let mut chunks_in_use = current_buffer.chunks_in_use.lock().unwrap();
debug_assert!(!chunks_in_use.iter().any(|c| c.len == 0));
let requested_len = data.len();
if requested_len == 0 {
assert!(data.next().is_none(),
"Expected iterator passed to CpuBufferPool::chunk to be empty");
return Ok(CpuBufferPoolChunk {
buffer: current_buffer.clone(),
index: 0,
align_offset: 0,
requested_len: 0,
marker: PhantomData,
});
}
let (index, occupied_len, align_offset) = {
let (tentative_index, tentative_len, tentative_align_offset) = {
let idx = current_buffer.next_index.load(Ordering::SeqCst);
let align_bytes = cmp::max(if self.usage.uniform_buffer {
self.device()
.physical_device()
.limits()
.min_uniform_buffer_offset_alignment() as
usize
} else {
1
},
if self.usage.storage_buffer {
self.device()
.physical_device()
.limits()
.min_storage_buffer_offset_alignment() as
usize
} else {
1
});
let tentative_align_offset =
(align_bytes - ((idx * mem::size_of::<T>()) % align_bytes)) % align_bytes;
let additional_len = if tentative_align_offset == 0 {
0
} else {
1 + (tentative_align_offset - 1) / mem::size_of::<T>()
};
(idx, requested_len + additional_len, tentative_align_offset)
};
if tentative_index + tentative_len <= current_buffer.capacity &&
!chunks_in_use.iter().any(|c| {
(c.index >= tentative_index &&
c.index < tentative_index + tentative_len) ||
(c.index <= tentative_index &&
c.index + c.len > tentative_index)
})
{
(tentative_index, tentative_len, tentative_align_offset)
} else {
if requested_len <= current_buffer.capacity &&
!chunks_in_use.iter().any(|c| c.index < requested_len)
{
(0, requested_len, 0)
} else {
return Err(data);
}
}
};
unsafe {
let mem_off = current_buffer.memory.offset();
let range_start = index * mem::size_of::<T>() + align_offset + mem_off;
let range_end = (index + requested_len) * mem::size_of::<T>() + align_offset + mem_off;
let mut mapping = current_buffer
.memory
.mapped_memory()
.unwrap()
.read_write::<[T]>(range_start .. range_end);
let mut written = 0;
for (o, i) in mapping.iter_mut().zip(data) {
ptr::write(o, i);
written += 1;
}
assert_eq!(written,
requested_len,
"Iterator passed to CpuBufferPool::chunk has a mismatch between reported \
length and actual number of elements");
}
current_buffer
.next_index
.store(index + occupied_len, Ordering::SeqCst);
chunks_in_use.push(ActualBufferChunk {
index,
len: occupied_len,
num_cpu_accesses: 1,
num_gpu_accesses: 0,
});
Ok(CpuBufferPoolChunk {
buffer: current_buffer.clone(),
index: index,
align_offset,
requested_len,
marker: PhantomData,
})
}
}
impl<T, A> Clone for CpuBufferPool<T, A>
where A: MemoryPool + Clone
{
fn clone(&self) -> Self {
let buf = self.current_buffer.lock().unwrap();
CpuBufferPool {
device: self.device.clone(),
pool: self.pool.clone(),
current_buffer: Mutex::new(buf.clone()),
usage: self.usage.clone(),
marker: PhantomData,
}
}
}
unsafe impl<T, A> DeviceOwned for CpuBufferPool<T, A>
where A: MemoryPool
{
#[inline]
fn device(&self) -> &Arc<Device> {
&self.device
}
}
impl<T, A> Clone for CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
fn clone(&self) -> CpuBufferPoolChunk<T, A> {
let mut chunks_in_use_lock = self.buffer.chunks_in_use.lock().unwrap();
let chunk = chunks_in_use_lock
.iter_mut()
.find(|c| c.index == self.index)
.unwrap();
debug_assert!(chunk.num_cpu_accesses >= 1);
chunk.num_cpu_accesses = chunk
.num_cpu_accesses
.checked_add(1)
.expect("Overflow in CPU accesses");
CpuBufferPoolChunk {
buffer: self.buffer.clone(),
index: self.index,
align_offset: self.align_offset,
requested_len: self.requested_len,
marker: PhantomData,
}
}
}
unsafe impl<T, A> BufferAccess for CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
#[inline]
fn inner(&self) -> BufferInner {
BufferInner {
buffer: &self.buffer.inner,
offset: self.index * mem::size_of::<T>() + self.align_offset,
}
}
#[inline]
fn size(&self) -> usize {
self.requested_len * mem::size_of::<T>()
}
#[inline]
fn conflicts_buffer(&self, other: &dyn BufferAccess) -> bool {
self.conflict_key() == other.conflict_key()
}
#[inline]
fn conflicts_image(&self, other: &dyn ImageAccess) -> bool {
false
}
#[inline]
fn conflict_key(&self) -> (u64, usize) {
(
self.buffer.inner.key(),
if self.requested_len == 0 { usize::max_value() } else { self.index }
)
}
#[inline]
fn try_gpu_lock(&self, _: bool, _: &Queue) -> Result<(), AccessError> {
if self.requested_len == 0 {
return Ok(());
}
let mut chunks_in_use_lock = self.buffer.chunks_in_use.lock().unwrap();
let chunk = chunks_in_use_lock
.iter_mut()
.find(|c| c.index == self.index)
.unwrap();
if chunk.num_gpu_accesses != 0 {
return Err(AccessError::AlreadyInUse);
}
chunk.num_gpu_accesses = 1;
Ok(())
}
#[inline]
unsafe fn increase_gpu_lock(&self) {
if self.requested_len == 0 {
return;
}
let mut chunks_in_use_lock = self.buffer.chunks_in_use.lock().unwrap();
let chunk = chunks_in_use_lock
.iter_mut()
.find(|c| c.index == self.index)
.unwrap();
debug_assert!(chunk.num_gpu_accesses >= 1);
chunk.num_gpu_accesses = chunk
.num_gpu_accesses
.checked_add(1)
.expect("Overflow in GPU usages");
}
#[inline]
unsafe fn unlock(&self) {
if self.requested_len == 0 {
return;
}
let mut chunks_in_use_lock = self.buffer.chunks_in_use.lock().unwrap();
let chunk = chunks_in_use_lock
.iter_mut()
.find(|c| c.index == self.index)
.unwrap();
debug_assert!(chunk.num_gpu_accesses >= 1);
chunk.num_gpu_accesses -= 1;
}
}
impl<T, A> Drop for CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
fn drop(&mut self) {
if self.requested_len == 0 {
return;
}
let mut chunks_in_use_lock = self.buffer.chunks_in_use.lock().unwrap();
let chunk_num = chunks_in_use_lock
.iter_mut()
.position(|c| c.index == self.index)
.unwrap();
if chunks_in_use_lock[chunk_num].num_cpu_accesses >= 2 {
chunks_in_use_lock[chunk_num].num_cpu_accesses -= 1;
} else {
debug_assert_eq!(chunks_in_use_lock[chunk_num].num_gpu_accesses, 0);
chunks_in_use_lock.remove(chunk_num);
}
}
}
unsafe impl<T, A> TypedBufferAccess for CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
type Content = [T];
}
unsafe impl<T, A> DeviceOwned for CpuBufferPoolChunk<T, A>
where A: MemoryPool
{
#[inline]
fn device(&self) -> &Arc<Device> {
self.buffer.inner.device()
}
}
impl<T, A> Clone for CpuBufferPoolSubbuffer<T, A>
where A: MemoryPool
{
fn clone(&self) -> CpuBufferPoolSubbuffer<T, A> {
CpuBufferPoolSubbuffer { chunk: self.chunk.clone() }
}
}
unsafe impl<T, A> BufferAccess for CpuBufferPoolSubbuffer<T, A>
where A: MemoryPool
{
#[inline]
fn inner(&self) -> BufferInner {
self.chunk.inner()
}
#[inline]
fn size(&self) -> usize {
self.chunk.size()
}
#[inline]
fn conflicts_buffer(&self, other: &dyn BufferAccess) -> bool {
self.conflict_key() == other.conflict_key()
}
#[inline]
fn conflicts_image(&self, other: &dyn ImageAccess) -> bool {
false
}
#[inline]
fn conflict_key(&self) -> (u64, usize) {
self.chunk.conflict_key()
}
#[inline]
fn try_gpu_lock(&self, e: bool, q: &Queue) -> Result<(), AccessError> {
self.chunk.try_gpu_lock(e, q)
}
#[inline]
unsafe fn increase_gpu_lock(&self) {
self.chunk.increase_gpu_lock()
}
#[inline]
unsafe fn unlock(&self) {
self.chunk.unlock()
}
}
unsafe impl<T, A> TypedBufferAccess for CpuBufferPoolSubbuffer<T, A>
where A: MemoryPool
{
type Content = T;
}
unsafe impl<T, A> DeviceOwned for CpuBufferPoolSubbuffer<T, A>
where A: MemoryPool
{
#[inline]
fn device(&self) -> &Arc<Device> {
self.chunk.buffer.inner.device()
}
}
#[cfg(test)]
mod tests {
use buffer::CpuBufferPool;
use std::mem;
#[test]
fn basic_create() {
let (device, _) = gfx_dev_and_queue!();
let _ = CpuBufferPool::<u8>::upload(device);
}
#[test]
fn reserve() {
let (device, _) = gfx_dev_and_queue!();
let pool = CpuBufferPool::<u8>::upload(device);
assert_eq!(pool.capacity(), 0);
pool.reserve(83).unwrap();
assert_eq!(pool.capacity(), 83);
}
#[test]
fn capacity_increase() {
let (device, _) = gfx_dev_and_queue!();
let pool = CpuBufferPool::upload(device);
assert_eq!(pool.capacity(), 0);
pool.next(12).unwrap();
let first_cap = pool.capacity();
assert!(first_cap >= 1);
for _ in 0 .. first_cap + 5 {
mem::forget(pool.next(12).unwrap());
}
assert!(pool.capacity() > first_cap);
}
#[test]
fn reuse_subbuffers() {
let (device, _) = gfx_dev_and_queue!();
let pool = CpuBufferPool::upload(device);
assert_eq!(pool.capacity(), 0);
let mut capacity = None;
for _ in 0 .. 64 {
pool.next(12).unwrap();
let new_cap = pool.capacity();
assert!(new_cap >= 1);
match capacity {
None => capacity = Some(new_cap),
Some(c) => assert_eq!(c, new_cap),
}
}
}
#[test]
fn chunk_loopback() {
let (device, _) = gfx_dev_and_queue!();
let pool = CpuBufferPool::<u8>::upload(device);
pool.reserve(5).unwrap();
let a = pool.chunk(vec![0, 0]).unwrap();
let b = pool.chunk(vec![0, 0]).unwrap();
assert_eq!(b.index, 2);
drop(a);
let c = pool.chunk(vec![0, 0]).unwrap();
assert_eq!(c.index, 0);
assert_eq!(pool.capacity(), 5);
}
#[test]
fn chunk_0_elems_doesnt_pollute() {
let (device, _) = gfx_dev_and_queue!();
let pool = CpuBufferPool::<u8>::upload(device);
let _ = pool.chunk(vec![]).unwrap();
let _ = pool.chunk(vec![0, 0]).unwrap();
}
}