From 4a9f79a6ad8f8bc80bb3ae3bccf36e8986edf22c Mon Sep 17 00:00:00 2001 From: Yang Bo Date: Wed, 26 Apr 2017 11:17:44 +0800 Subject: [PATCH 1/4] Implement Event.duplicate --- .../main/scala/com/thoughtworks/deeplearning/OpenCL.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/OpenCL/src/main/scala/com/thoughtworks/deeplearning/OpenCL.scala b/OpenCL/src/main/scala/com/thoughtworks/deeplearning/OpenCL.scala index 5cbb1049..2455aede 100644 --- a/OpenCL/src/main/scala/com/thoughtworks/deeplearning/OpenCL.scala +++ b/OpenCL/src/main/scala/com/thoughtworks/deeplearning/OpenCL.scala @@ -495,7 +495,10 @@ data: $data""") final class Event(val handle: Address) extends AssertionAutoCloseable with AssertionFinalizer { - def duplicate = new Event(handle) + def duplicate(): Event = { + checkErrorCode(clRetainEvent(handle.toLong)) + new Event(handle) + } override protected def forceClose(): Unit = { checkErrorCode(clReleaseEvent(handle.toLong)) From 66b757f37192df51b48784ec7f5fe5f3980d5dc9 Mon Sep 17 00:00:00 2001 From: Yang Bo Date: Wed, 26 Apr 2017 17:18:51 +0800 Subject: [PATCH 2/4] Add FutureIsomorphism and AsynchronousSemaphore libraries --- AsynchronousSemaphore/build.sbt | 3 + .../deeplearning/AsynchronousSemaphore.scala | 78 +++++++++++++++++++ FutureIsomorphism/build.sbt | 5 ++ .../deeplearning/FutureIsomorphism.scala | 20 +++++ build.sbt | 8 +- 5 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 AsynchronousSemaphore/build.sbt create mode 100644 AsynchronousSemaphore/src/main/scala/com/thoughtworks/deeplearning/AsynchronousSemaphore.scala create mode 100644 FutureIsomorphism/build.sbt create mode 100644 FutureIsomorphism/src/main/scala/com/thoughtworks/deeplearning/FutureIsomorphism.scala diff --git a/AsynchronousSemaphore/build.sbt b/AsynchronousSemaphore/build.sbt new file mode 100644 index 00000000..3822722a --- /dev/null +++ b/AsynchronousSemaphore/build.sbt @@ -0,0 +1,3 @@ +libraryDependencies += "org.scalaz" %% "scalaz-core" % "7.2.10" + +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % Test diff --git a/AsynchronousSemaphore/src/main/scala/com/thoughtworks/deeplearning/AsynchronousSemaphore.scala b/AsynchronousSemaphore/src/main/scala/com/thoughtworks/deeplearning/AsynchronousSemaphore.scala new file mode 100644 index 00000000..ee7b8f61 --- /dev/null +++ b/AsynchronousSemaphore/src/main/scala/com/thoughtworks/deeplearning/AsynchronousSemaphore.scala @@ -0,0 +1,78 @@ +package com.thoughtworks.deeplearning + +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scalaz.{ContT, Trampoline} +import scalaz.Free.Trampoline + +object AsynchronousSemaphore { + sealed trait State + final case class Available(restNumberOfPermits: Int) extends State + final case class Unavailable(waiters: Queue[Unit => Trampoline[Unit]]) extends State + + @inline + def apply(numberOfPermits: Int): AsynchronousSemaphore = { + numberOfPermits.ensuring(_ > 0) + new AtomicReference[State](Available(numberOfPermits)) with AsynchronousSemaphore { + override protected def state: AtomicReference[State] = this + } + } +} + +/** + * @author 杨博 (Yang Bo) <pop.atry@gmail.com> + */ +trait AsynchronousSemaphore { + import AsynchronousSemaphore._ + protected def state: AtomicReference[State] + + final def acquire(): ContT[Trampoline, Unit, Unit] = { + ContT[Trampoline, Unit, Unit]({ waiter: (Unit => Trampoline[Unit]) => + @tailrec + def retry(): Trampoline[Unit] = { + state.get() match { + case oldState @ Available(1) => + if (state.compareAndSet(oldState, Unavailable(Queue.empty))) { + waiter(()) + } else { + retry() + } + case oldState @ Available(restNumberOfPermits) if restNumberOfPermits > 1 => + if (state.compareAndSet(oldState, Available(restNumberOfPermits - 1))) { // TODO + waiter(()) + } else { + retry() + } + case oldState @ Unavailable(waiters) => + if (state.compareAndSet(oldState, Unavailable(waiters.enqueue(waiter)))) { + Trampoline.done(()) + } else { + retry() + } + } + } + retry() + }) + } + + @tailrec + final def release(): Trampoline[Unit] = { + state.get() match { + case oldState @ Unavailable(waiters) => + val (head, tail) = waiters.dequeue + if (state.compareAndSet(oldState, Unavailable(tail))) { + head(()) + } else { + release() + } + case oldState @ Available(restNumberOfPermits) => + if (state.compareAndSet(oldState, Available(restNumberOfPermits + 1))) { + Trampoline.done(()) + } else { + release() + } + } + } +} diff --git a/FutureIsomorphism/build.sbt b/FutureIsomorphism/build.sbt new file mode 100644 index 00000000..36a9be41 --- /dev/null +++ b/FutureIsomorphism/build.sbt @@ -0,0 +1,5 @@ +addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3") + +libraryDependencies += "org.scalaz" %% "scalaz-concurrent" % "7.2.10" + +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % Test diff --git a/FutureIsomorphism/src/main/scala/com/thoughtworks/deeplearning/FutureIsomorphism.scala b/FutureIsomorphism/src/main/scala/com/thoughtworks/deeplearning/FutureIsomorphism.scala new file mode 100644 index 00000000..502f0664 --- /dev/null +++ b/FutureIsomorphism/src/main/scala/com/thoughtworks/deeplearning/FutureIsomorphism.scala @@ -0,0 +1,20 @@ +package com.thoughtworks.deeplearning + +import scalaz.{ContT, Trampoline} +import scalaz.Free.Trampoline +import scalaz.concurrent.Future + +/** + * @author 杨博 (Yang Bo) <pop.atry@gmail.com> + */ +object FutureIsomorphism extends scalaz.Isomorphism.IsoFunctorTemplate[Future, ContT[Trampoline, Unit, ?]] { + override def to[A](fa: Future[A]): ContT[Trampoline, Unit, A] = ContT[Trampoline, Unit, A] { continue => + Trampoline.delay(fa.unsafePerformListen(continue)) + } + + override def from[A](ga: ContT[Trampoline, Unit, A]): Future[A] = { + Future.Async { continue => + ga(continue).run + } + } +} diff --git a/build.sbt b/build.sbt index 33b6dc27..013527d6 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,9 @@ lazy val DifferentiableKernel = project.dependsOn( OpenCL, OpenCLCodeGenerator, - TapeTaskFactory + TapeTaskFactory, + FutureIsomorphism, + AsynchronousSemaphore ) lazy val OpenCLCodeGenerator = project.dependsOn(Memory) @@ -40,6 +42,10 @@ lazy val TapeTask = project.dependsOn(Tape, ProjectRef(file("RAII.scala"), "RAII lazy val LogRecords = project +lazy val AsynchronousSemaphore = project + +lazy val FutureIsomorphism = project + //lazy val DifferentiableDouble = // project.dependsOn(Layer, // CumulativeTape, From 632401d4d75354458f03ce64c026680f45e5fe69 Mon Sep 17 00:00:00 2001 From: Yang Bo Date: Wed, 26 Apr 2017 17:28:16 +0800 Subject: [PATCH 3/4] Use PendingBuffer instead of OpenCL.Buffer --- .../deeplearning/DifferentiableKernel.scala | 73 ++++++++++++++----- .../DifferentiableKernelSpec.scala | 17 +++-- 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/DifferentiableKernel/src/main/scala/com/thoughtworks/deeplearning/DifferentiableKernel.scala b/DifferentiableKernel/src/main/scala/com/thoughtworks/deeplearning/DifferentiableKernel.scala index c2ed53a9..5e9e792e 100644 --- a/DifferentiableKernel/src/main/scala/com/thoughtworks/deeplearning/DifferentiableKernel.scala +++ b/DifferentiableKernel/src/main/scala/com/thoughtworks/deeplearning/DifferentiableKernel.scala @@ -1,5 +1,8 @@ package com.thoughtworks.deeplearning +import java.util.concurrent.Semaphore + +import com.thoughtworks.deeplearning.Closeables.{AssertionAutoCloseable, AssertionFinalizer} import com.thoughtworks.deeplearning.DifferentiableKernel.Zero.FloatZero import com.thoughtworks.deeplearning.Memory.Address import com.thoughtworks.deeplearning.OpenCL.CommandQueue.GlobalWorkSizeOnlyDimension @@ -7,13 +10,14 @@ import com.thoughtworks.deeplearning.OpenCL.{CommandQueue, Device, Kernel} import com.thoughtworks.deeplearning.OpenCLCodeGenerator.DslType.{DslBuffer, DslDouble, DslFloat, DslInt} import com.thoughtworks.deeplearning.OpenCLCodeGenerator._ import com.thoughtworks.each.Monadic._ -import com.thoughtworks.raii.RAIITask +import com.thoughtworks.raii.ResourceFactoryT.ResourceT +import com.thoughtworks.raii.{RAIITask, ResourceFactoryT} import shapeless.labelled._ import shapeless._ import scala.concurrent.ExecutionContext import scala.util.control.NonFatal -import scalaz.{@@, Monad, Monoid} +import scalaz.{@@, Monad, Monoid, \/, \/-} import scalaz.Tags.{Multiplication, Parallel} import scalaz.concurrent.Future import scalaz.concurrent.Future.{ParallelFuture, futureParallelApplicativeInstance} @@ -24,13 +28,15 @@ import scala.language.higherKinds object DifferentiableKernel { + final case class PendingBuffer[Element](buffer: OpenCL.Buffer[Element], events: List[OpenCL.Event]) + private[DifferentiableKernel] trait StaticDslTypeExtractor { type AbstractType[A] <: DslType implicit def dslDouble: AbstractType[Double] implicit def dslFloat: AbstractType[Float] implicit def dslInt: AbstractType[Int] - implicit def dslBuffer[Element: AbstractType]: AbstractType[OpenCL.Buffer[Element]] + implicit def dslBuffer[Element: AbstractType]: AbstractType[PendingBuffer[Element]] } private[DifferentiableKernel] trait StaticDslExpressionExtractor { @@ -73,14 +79,14 @@ object DifferentiableKernel { import OpenCLLayer._ - def compile(context: OpenCL.Context, device: Device, commandQueue: CommandQueue)( + def compile(context: OpenCL.Context, device: Device, commandQueue: CommandQueue, semaphore: AsynchronousSemaphore)( implicit compiler: Compiler[OutputElementData, OutputElementDelta, LocalDelta], outputDataMemory: Memory[OutputElementData], outputDeltaMemory: Memory[OutputElementDelta], outputDataType: StaticDslType[OutputElementData], outputDeltaType: StaticDslType[OutputElementDelta], executor: ExecutionContext): RAIITask[(Int, compiler.ParameterRecord) => RAIITask[ - Tape.Aux[OpenCL.Buffer[OutputElementData], OpenCL.Buffer[OutputElementDelta]]]] = throwableMonadic[RAIITask] { + Tape.Aux[PendingBuffer[OutputElementData], PendingBuffer[OutputElementDelta]]]] = throwableMonadic[RAIITask] { RAIITask.jump().each @@ -101,20 +107,32 @@ object DifferentiableKernel { { (expectedSize: Int, inputParameterMap: compiler.ParameterRecord) => throwableMonadic[RAIITask] { val kernel = forwardKernelTask.each - val outputBuffer = - RAIITask.managed(context.createBuffer[OutputElementData](expectedSize)(outputDataMemory)).each + val outputBuffer = context.createBuffer[OutputElementData](expectedSize)(outputDataMemory) + compiler.setKernelInputArguments(kernel, 1, inputParameterMap) kernel.setArg(0, outputBuffer) - val event = + + RAIITask.unmanaged(semaphore.acquire()).each + val event = try { RAIITask .managed( commandQueue.enqueueNDRangeKernel(kernel, Seq(GlobalWorkSizeOnlyDimension(Address(expectedSize))))) .each + } catch { + case e if NonFatal(e) => + semaphore.release().run + (throw e): OpenCL.Event + } + event.waitForComplete().unsafePerformAsync { _ => + semaphore.release().run + } + RAIITask.unmanaged(event.waitForComplete()).each new Tape { - override def data: OpenCL.Buffer[OutputElementData] = outputBuffer + // borrow + override val data: PendingBuffer[OutputElementData] = PendingBuffer(outputBuffer, List(event)) - override def backward[OutputDeltaBuffer <: OpenCL.Buffer[OutputElementDelta]]( + override def backward[OutputDeltaBuffer <: PendingBuffer[OutputElementDelta]]( outputDeltaTask: RAIITask[OutputDeltaBuffer]): Future[Unit] = { Future.suspend { Future.now(()) // TODO: backward @@ -123,9 +141,9 @@ object DifferentiableKernel { } // TODO: Change OutputData and OutputDelta to a pair of OpenCL.Buffer and OpenCL.Event - override type Data = OpenCL.Buffer[OutputElementData] - override type Delta = OpenCL.Buffer[OutputElementDelta] - }: Tape.Aux[OpenCL.Buffer[OutputElementData], OpenCL.Buffer[OutputElementDelta]] + override type Data = PendingBuffer[OutputElementData] + override type Delta = PendingBuffer[OutputElementDelta] + }: Tape.Aux[PendingBuffer[OutputElementData], PendingBuffer[OutputElementDelta]] } } } @@ -291,10 +309,10 @@ object DifferentiableKernel { } def bufferIdentifier[Data, Delta]( - key: Witness): OpenCLLayer[OpenCL.Buffer[Data], - OpenCL.Buffer[Delta], + key: Witness): OpenCLLayer[PendingBuffer[Data], + PendingBuffer[Delta], FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil] = { - OpenCLLayer[OpenCL.Buffer[Data], OpenCL.Buffer[Delta], FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil]( + OpenCLLayer[PendingBuffer[Data], PendingBuffer[Delta], FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil]( StaticDslExpression(DslExpression.Identifier(key.value)), field[key.T](JacobianMatrix.Identity[Data, Delta]()) :: HNil ) @@ -314,7 +332,7 @@ object DifferentiableKernel { IndexLocalDelta <: HList, ElementLocalDelta <: HList, LocalDelta <: HList]( - buffer: OpenCLLayer[OpenCL.Buffer[ElementData], OpenCL.Buffer[ElementDelta], BufferLocalDelta], + buffer: OpenCLLayer[PendingBuffer[ElementData], PendingBuffer[ElementDelta], BufferLocalDelta], index: OpenCLLayer[Int, Float, IndexLocalDelta])( implicit elementDataType: StaticDslType[ElementData], zero: Zero.Aux[IndexLocalDelta], @@ -355,6 +373,8 @@ object DifferentiableKernel { def forwardParameter: Parameter def setArgument(kernel: Kernel, index: Int, input: Input): Unit + + def borrowEvents(input: Input): List[OpenCL.Event] } object InputCompiler { @@ -368,14 +388,19 @@ object DifferentiableKernel { elementDataType: StaticDslType[InputElementData]) : InputCompiler.Aux[Key, JacobianMatrix.Row[InputElementData, InputElementDelta], - Tape.Aux[OpenCL.Buffer[InputElementData], OpenCL.Buffer[InputElementDelta]]] = + Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]]] = new InputCompiler[Key, JacobianMatrix.Row[InputElementData, InputElementDelta]] { - override type Input = Tape.Aux[OpenCL.Buffer[InputElementData], OpenCL.Buffer[InputElementDelta]] + override type Input = Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]] override def forwardParameter: Parameter = Parameter(witness.value, DslType.DslBuffer(elementDataType)) override def setArgument(kernel: Kernel, index: Int, input: Input): Unit = { - kernel.setArg[OpenCL.Buffer[InputElementData]](index, input.data) + kernel.setArg[OpenCL.Buffer[InputElementData]](index, input.data.buffer) + } + + override def borrowEvents( + input: Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]]): List[OpenCL.Event] = { + input.data.events } } @@ -395,6 +420,8 @@ object DifferentiableKernel { Parameter(OutputId, DslType.DslBuffer(outputDataType)) :: forwardInputParameters def setKernelInputArguments(kernel: Kernel, startIndex: Int, parameters: ParameterRecord) + + def borrowEvents(parameters: ParameterRecord): List[OpenCL.Event] } object Compiler { @@ -415,6 +442,8 @@ object DifferentiableKernel { override def forwardInputParameters: Nil.type = Nil override def setKernelInputArguments(kernel: Kernel, startIndex: Int, parameters: HNil): Unit = {} + + override def borrowEvents(parameters: HNil): List[OpenCL.Event] = Nil } implicit def hconsFill[OutputElementData, @@ -441,6 +470,10 @@ object DifferentiableKernel { headInputCompiler.setArgument(kernel, startIndex, parameters.head) tailCompiler.setKernelInputArguments(kernel, startIndex + 1, parameters.tail) } + + override def borrowEvents(parameters: ::[FieldType[Key, Input], TailParameterRecord]): List[OpenCL.Event] = { + headInputCompiler.borrowEvents(parameters.head) ::: tailCompiler.borrowEvents(parameters.tail) + } } } diff --git a/DifferentiableKernel/src/test/scala/com/thoughtworks/deeplearning/DifferentiableKernelSpec.scala b/DifferentiableKernel/src/test/scala/com/thoughtworks/deeplearning/DifferentiableKernelSpec.scala index 7a91a6ca..5bc7441b 100644 --- a/DifferentiableKernel/src/test/scala/com/thoughtworks/deeplearning/DifferentiableKernelSpec.scala +++ b/DifferentiableKernel/src/test/scala/com/thoughtworks/deeplearning/DifferentiableKernelSpec.scala @@ -56,6 +56,7 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers { (context, commandQueue) } + val semaphore = AsynchronousSemaphore(3) "When fill a buffer with 42.0f" - { val differentiableKernel = { @@ -71,12 +72,14 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers { RAIITask.unmanaged( RAIITask.run( throwableMonadic[RAIITask] { - val layer = differentiableKernel.compile(context, device, commandQueue).each + val layer = differentiableKernel.compile(context, device, commandQueue, semaphore).each val outputTape = layer(1, HNil).each - val delta = RAIITask.managed(context.createBuffer[Float](1)) + val delta = RAIITask.managed(context.createBuffer[Float](1)).map(PendingBuffer(_, Nil)) RAIITask.unmanaged(outputTape.backward(delta)).each val f = BufferUtils.createFloatBuffer(1) - val event = RAIITask.managed(commandQueue.enqueueReadBuffer(outputTape.data, f)).each + val event = RAIITask + .managed(commandQueue.enqueueReadBuffer(outputTape.data.buffer, f, outputTape.data.events: _*)) + .each RAIITask.unmanaged(event.waitForComplete()).each f } @@ -119,12 +122,14 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers { RAIITask.unmanaged( RAIITask.run( throwableMonadic[RAIITask] { - val layer = differentiableKernel.compile(context, device, commandQueue).each + val layer = differentiableKernel.compile(context, device, commandQueue, semaphore).each val outputTape = layer(1, ??? :: HNil).each - val delta = RAIITask.managed(context.createBuffer[Float](1)) + val delta = RAIITask.managed(context.createBuffer[Float](1)).map(PendingBuffer(_, Nil)) RAIITask.unmanaged(outputTape.backward(delta)).each val f = BufferUtils.createFloatBuffer(1) - val event = RAIITask.managed(commandQueue.enqueueReadBuffer(outputTape.data, f)).each + val event = RAIITask + .managed(commandQueue.enqueueReadBuffer(outputTape.data.buffer, f, outputTape.data.events: _*)) + .each RAIITask.unmanaged(event.waitForComplete()).each f } From 8cd868b98911bb3d11f9bc1fc64576eb4d4330ab Mon Sep 17 00:00:00 2001 From: Yang Bo Date: Wed, 26 Apr 2017 17:51:20 +0800 Subject: [PATCH 4/4] Upgrade RAII.scala --- RAII.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RAII.scala b/RAII.scala index d442cd5f..87115f9c 160000 --- a/RAII.scala +++ b/RAII.scala @@ -1 +1 @@ -Subproject commit d442cd5fb6437e02eac7e7b84ff1d1cf3ddb4e4e +Subproject commit 87115f9ccbdd713dd053e9592abd1dc413dec035