Source code for vulk.vulkanutil
from contextlib import contextmanager
from vulk import vulkanconstant as vc
from vulk import vulkanobject as vo
[docs]class CommandBufferSynchronizedPool():
'''This class allows to synchronize command buffers with semaphores.
If you need a unknow quantity of command buffer to be executed with
synchronization, this class is for you. It handles command buffer
pooling and synchronization.
*Exemple:*
```
cbpool = CommandBufferSynchronizedChain(context)
cbpool.begin(context, semaphore) #Must be called each frame before begining
for action in actions:
with cbpool.pull() as commanbuffer:
# Register command in command buffer
semaphore_out = cbpool.end()
```
'''
def __init__(self, context):
'''
*Parameters:*
- `context`: `VulkContext`
'''
self.commandpool = self.init_commandpool(context)
self.commandbuffers = []
self.commandbuffer_id = 0
self.semaphores = []
self.semaphore_id = 0
self.context = None
self.semaphores_in = []
self.wait_semaphores = []
self.signal_semaphores = []
[docs] def init_commandpool(self, context):
'''Initialize transient command pool
*Parameters:*
- `context`: `VulkContext`
'''
flags = vc.CommandPoolCreate.TRANSIENT | vc.CommandPoolCreate.RESET_COMMAND_BUFFER # noqa
return vo.CommandPool(
context, context.queue_family_indices['graphic'], flags)
[docs] def begin(self, context, semaphores=None):
'''
Begin pooling and synchronization of command buffers.
*Parameters:*
- `context`: `VulkContext`
- `semaphores`: `list` of `Semaphore` to wait on
**Note: `context` is borrowed until `end` is called**
'''
self.context = context
self.commandbuffer_id = -1
self.semaphore_id = -1
self.semaphores_in.extend(semaphores if semaphores else [])
[docs] def next_commandbuffer(self):
'''
Create a new command buffer if requested
'''
self.commandbuffer_id += 1
try:
cb = self.commandbuffers[self.commandbuffer_id]
except IndexError:
cb = self.commandpool.allocate_buffers(
self.context, vc.CommandBufferLevel.PRIMARY, 1)[0]
self.commandbuffers.append(cb)
cb.reset()
return cb
[docs] def next_semaphore(self):
'''
Create a new semaphore if requested
'''
self.semaphore_id += 1
try:
semaphore = self.semaphores[self.semaphore_id]
except IndexError:
semaphore = vo.Semaphore(self.context)
self.semaphores.append(semaphore)
[docs] @contextmanager
def pull(self):
'''
Pull a new command buffer.
This function is a context manager, you should call it with
`with` keyword to auto-submit the last created buffer.
*Returns:*
`CommandBufferRegister` ready to register commands
'''
try:
cb = self.next_commandbuffer()
flags = vc.CommandBufferUsage.ONE_TIME_SUBMIT
with cb.bind(flags) as cmd:
yield cmd
finally:
self.submit()
[docs] def submit(self):
'''
Submit the last command buffer
'''
cb_id = self.commandbuffer_id
self.next_semaphore()
wait_semaphores = []
signal_semaphores = [self.semaphores[self.semaphore_id]]
if self.semaphore_id == 0: # First submit
wait_semaphores.extend(self.semaphores_in)
else:
wait_semaphores.append(self.semaphores[self.semaphore_id - 1])
submit = vo.SubmitInfo(
wait_semaphores, [vc.PipelineStage.VERTEX_INPUT],
signal_semaphores, [self.commandbuffers[cb_id]])
vo.submit_to_graphic_queue(self.context, [submit])
[docs] def end(self):
'''
Release the context.
*Returns:*
Out `Semaphore`: Signal semaphore of last command buffer
'''
del self.semaphores_in[:]
self.context = None
# No submission return no semaphore
if self.semaphore_id == -1:
return None
return self.semaphores[self.semaphore_id]