Skip to content

Commit 1ab927a

Browse files
author
marcin-zlakowski
committed
not working destroy^
1 parent fdc9ee2 commit 1ab927a

File tree

6 files changed

+76
-32
lines changed

6 files changed

+76
-32
lines changed

cyfra-core/src/main/scala/io/computenode/cyfra/core/Allocation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import izumi.reflect.Tag
1010
import java.nio.ByteBuffer
1111

1212
trait Allocation:
13+
def reportLayout[L <: Layout: LayoutBinding](layout: L): Unit
14+
1315
extension (buffer: GBinding[?])
1416
def read(bb: ByteBuffer, offset: Int = 0): Unit
1517

cyfra-core/src/main/scala/io/computenode/cyfra/core/GBufferRegion.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@ import io.computenode.cyfra.dsl.Value.FromExpr
99
import io.computenode.cyfra.dsl.binding.GBuffer
1010
import izumi.reflect.Tag
1111

12+
import scala.util.chaining.given
1213
import java.nio.ByteBuffer
1314

1415
sealed trait GBufferRegion[ReqAlloc <: Layout: LayoutBinding, ResAlloc <: Layout: LayoutBinding]:
16+
def reqAllocBinding: LayoutBinding[ReqAlloc] = summon[LayoutBinding[ReqAlloc]]
17+
def resAllocBinding: LayoutBinding[ResAlloc] = summon[LayoutBinding[ResAlloc]]
18+
1519
def map[NewAlloc <: Layout: LayoutBinding](f: Allocation ?=> ResAlloc => NewAlloc): GBufferRegion[ReqAlloc, NewAlloc] =
1620
MapRegion(this, (alloc: Allocation) => (resAlloc: ResAlloc) => f(using alloc)(resAlloc))
1721

@@ -31,13 +35,13 @@ object GBufferRegion:
3135
cyfraRuntime.withAllocation: allocation =>
3236

3337
// noinspection ScalaRedundantCast
34-
val steps: Seq[Allocation => Layout => Layout] = Seq.unfold(region: GBufferRegion[?, ?]):
35-
case _: AllocRegion[?] => None
36-
case MapRegion(req, f) =>
37-
Some((f.asInstanceOf[Allocation => Layout => Layout], req))
38+
val steps: Seq[(Allocation => Layout => Layout, LayoutBinding[Layout])] = Seq.unfold(region: GBufferRegion[?, ?]):
39+
case _: AllocRegion[?] => None
40+
case m @ MapRegion(req, f) =>
41+
Some(((f.asInstanceOf[Allocation => Layout => Layout], req.resAllocBinding.asInstanceOf[LayoutBinding[Layout]]), req))
3842

39-
val initAlloc = init(using allocation)
43+
val initAlloc = init(using allocation).tap(allocation.reportLayout)
4044
val bodyAlloc = steps.foldLeft[Layout](initAlloc): (acc, step) =>
41-
step(allocation)(acc)
45+
step._1(allocation)(acc).tap(allocation.reportLayout(_)(using step._2))
4246

4347
onDone(using allocation)(bodyAlloc.asInstanceOf[ResAlloc])

cyfra-runtime/src/main/scala/io/computenode/cyfra/runtime/ExecutionHandler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class ExecutionHandler(runtime: VkCyfraRuntime, threadContext: VulkanThreadConte
7474
val externalBindings = getAllBindings(executeSteps).map(VkAllocation.getUnderlying)
7575
val deps = externalBindings.flatMap(_.execution.fold(Seq(_), _.toSeq))
7676
val pe = new PendingExecution(commandBuffer, deps, cleanup)
77+
summon[VkAllocation].addExecution(pe)
7778
externalBindings.foreach(_.execution = Left(pe)) // TODO we assume all accesses are read-write
7879
result
7980

cyfra-runtime/src/main/scala/io/computenode/cyfra/runtime/PendingExecution.scala

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,50 @@ import org.lwjgl.vulkan.{VK13, VkCommandBuffer, VkCommandBufferSubmitInfo, VkSem
99

1010
import scala.collection.mutable
1111

12-
class PendingExecution(protected val handle: VkCommandBuffer, val dependencies: Seq[PendingExecution], cleanup: () => Unit)(using Device)
13-
extends VulkanObject[VkCommandBuffer]:
12+
class PendingExecution(protected val handle: VkCommandBuffer, val dependencies: Seq[PendingExecution], cleanup: () => Unit)(using Device):
1413
private val semaphore: Semaphore = Semaphore()
1514
private var fence: Option[Fence] = None
1615

17-
override protected def close(): Unit = cleanup()
18-
19-
private def setFence(otherFence: Fence): Unit =
20-
if fence.isDefined then return
21-
fence = Some(otherFence)
22-
dependencies.foreach(_.setFence(otherFence))
16+
def isPending: Boolean = fence.isEmpty
17+
def isRunning: Boolean = fence.exists(!_.isSignaled)
18+
def isFinished: Boolean = fence.exists(_.isSignaled)
19+
20+
def block(): Unit = fence.foreach(_.block())
21+
22+
private var closed = false
23+
def isClosed: Boolean = closed
24+
private def close(): Unit =
25+
assert(!closed, "PendingExecution already closed")
26+
assert(isFinished, "Cannot close a PendingExecution that is not finished")
27+
cleanup()
28+
closed = true
29+
30+
private var destroyed = false
31+
def destroy(): Unit =
32+
assert(!destroyed, "PendingExecution already destroyed")
33+
assert(isFinished, "Cannot destroy a PendingExecution that is not finished")
34+
if !closed then close()
35+
semaphore.destroy()
36+
fence.foreach(x => if x.isAlive then x.destroy())
37+
destroyed = true
38+
39+
private def setFence(f: Fence): Unit = {
40+
if !isPending then return
41+
fence = Some(f)
42+
dependencies.foreach(_.setFence(f))
43+
}
2344

24-
private def gatherForSubmission: Seq[((VkCommandBuffer, Semaphore), Set[Semaphore])] = {
25-
if fence.isDefined then return Seq.empty
45+
private def gatherForSubmission: Seq[((VkCommandBuffer, Semaphore), Set[Semaphore])] =
46+
if !isPending then return Seq.empty
2647
val mySubmission = ((handle, semaphore), dependencies.map(_.semaphore).toSet)
2748
dependencies.flatMap(_.gatherForSubmission).appended(mySubmission)
28-
}
29-
30-
def block(): Unit =
31-
fence match
32-
case Some(f) => f.block()
33-
case None => throw new IllegalStateException("No fence set for this execution")
3449

3550
object PendingExecution:
3651
def executeAll(executions: Seq[PendingExecution], queue: Queue)(using Device): Fence = pushStack: stack =>
37-
val exec =
52+
assert(executions.forall(_.isPending), "All executions must be pending")
53+
assert(executions.nonEmpty, "At least one execution must be provided")
54+
55+
val exec: Seq[(Set[Semaphore], Set[(VkCommandBuffer, Semaphore)])] =
3856
val gathered = executions.flatMap(_.gatherForSubmission)
3957
val ordering = gathered.zipWithIndex.map(x => (x._1._1._1, x._2)).toMap
4058
gathered.toSet.groupMap(_._2)(_._1).toSeq.sortBy(x => x._2.map(_._1).map(ordering).min)
@@ -79,13 +97,13 @@ object PendingExecution:
7997
submitInfos.flip()
8098

8199
val fence = Fence()
82-
executions.foreach(_.setFence(fence))
83100
check(vkQueueSubmit2(queue.get, submitInfos, fence.get), "Failed to submit command buffer to queue")
101+
executions.foreach(_.setFence(fence))
84102
fence
85103

86104
def cleanupAll(executions: Seq[PendingExecution]): Unit =
87105
def cleanupRec(ex: PendingExecution): Unit =
88-
if !ex.isAlive then return
89-
ex.destroy()
106+
if !ex.isClosed then return
107+
ex.close()
90108
ex.dependencies.foreach(cleanupRec)
91109
executions.foreach(cleanupRec)

cyfra-runtime/src/main/scala/io/computenode/cyfra/runtime/VkAllocation.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ import scala.util.chaining.*
2929
class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)(using Allocator, Device) extends Allocation:
3030
given VkAllocation = this
3131

32+
override def reportLayout[L <: Layout: LayoutBinding](layout: L): Unit =
33+
val executions = summon[LayoutBinding[L]]
34+
.toBindings(layout)
35+
.map(getUnderlying)
36+
.flatMap(_.execution.fold(Seq(_), _.toSeq))
37+
.filter(_.isPending)
38+
39+
PendingExecution.executeAll(executions, commandPool.queue)
40+
3241
extension (buffer: GBinding[?])
3342
def read(bb: ByteBuffer, offset: Int = 0): Unit =
3443
val size = bb.remaining()
@@ -54,6 +63,7 @@ class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)
5463
commandPool.freeCommandBuffer(cb)
5564
stagingBuffer.destroy()
5665
val pe = new PendingExecution(cb, binding.execution.fold(Seq(_), _.toSeq), cleanup)
66+
addExecution(pe)
5767
binding.execution = Left(pe)
5868
case _ => throw new IllegalArgumentException(s"Tried to write to non-VkBinding $buffer")
5969

@@ -89,8 +99,14 @@ class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)
8999
case _ => ???
90100
direct(bb)
91101

102+
private val executions = mutable.Buffer[PendingExecution]()
103+
104+
def addExecution(pe: PendingExecution): Unit =
105+
executions += pe
106+
92107
private val bindings = mutable.Buffer[VkUniform[?] | VkBuffer[?]]()
93108
private[cyfra] def close(): Unit =
109+
executions.foreach(_.destroy())
94110
bindings.map(getUnderlying).foreach(_.buffer.destroy())
95111

96112
private def getStagingBuffer(size: Int): Buffer.HostBuffer =

cyfra-runtime/src/main/scala/io/computenode/cyfra/runtime/VkBinding.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ sealed abstract class VkBinding[T <: Value: {Tag, FromExpr}](val buffer: Buffer)
3333
*/
3434
var execution: Either[PendingExecution, mutable.Buffer[PendingExecution]] = Right(mutable.Buffer.empty)
3535

36-
def materialise(queue: Queue)(using Device): Unit = execution match
37-
case Left(exec) if exec.isAlive =>
38-
PendingExecution.executeAll(Seq(exec), queue)
39-
exec.block()
40-
PendingExecution.cleanupAll(Seq(exec))
41-
case _ => ()
36+
def materialise(queue: Queue)(using Device): Unit =
37+
val (pendingExecs, runningExecs) = execution.fold(Seq(_), _.toSeq).partition(_.isPending) // TODO better handle read only executions
38+
if pendingExecs.nonEmpty then
39+
val fence = PendingExecution.executeAll(pendingExecs, queue)
40+
fence.block()
41+
PendingExecution.cleanupAll(pendingExecs)
42+
43+
runningExecs.foreach(_.block())
44+
PendingExecution.cleanupAll(runningExecs)
4245

4346
object VkBinding:
4447
def unapply(binding: GBinding[?]): Option[Buffer] = binding match

0 commit comments

Comments
 (0)