Skip to content

Use PendingBuffer instead of OpenCL.Buffer #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AsynchronousSemaphore/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
libraryDependencies += "org.scalaz" %% "scalaz-core" % "7.2.10"

libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % Test
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.thoughtworks.deeplearning

import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scalaz.{ContT, Trampoline}
import scalaz.Free.Trampoline

object AsynchronousSemaphore {
sealed trait State
final case class Available(restNumberOfPermits: Int) extends State
final case class Unavailable(waiters: Queue[Unit => Trampoline[Unit]]) extends State

@inline
def apply(numberOfPermits: Int): AsynchronousSemaphore = {
numberOfPermits.ensuring(_ > 0)
new AtomicReference[State](Available(numberOfPermits)) with AsynchronousSemaphore {
override protected def state: AtomicReference[State] = this
}
}
}

/**
* @author 杨博 (Yang Bo) <[email protected]>
*/
trait AsynchronousSemaphore {
import AsynchronousSemaphore._
protected def state: AtomicReference[State]

final def acquire(): ContT[Trampoline, Unit, Unit] = {
ContT[Trampoline, Unit, Unit]({ waiter: (Unit => Trampoline[Unit]) =>
@tailrec
def retry(): Trampoline[Unit] = {
state.get() match {
case oldState @ Available(1) =>
if (state.compareAndSet(oldState, Unavailable(Queue.empty))) {
waiter(())
} else {
retry()
}
case oldState @ Available(restNumberOfPermits) if restNumberOfPermits > 1 =>
if (state.compareAndSet(oldState, Available(restNumberOfPermits - 1))) { // TODO
waiter(())
} else {
retry()
}
case oldState @ Unavailable(waiters) =>
if (state.compareAndSet(oldState, Unavailable(waiters.enqueue(waiter)))) {
Trampoline.done(())
} else {
retry()
}
}
}
retry()
})
}

@tailrec
final def release(): Trampoline[Unit] = {
state.get() match {
case oldState @ Unavailable(waiters) =>
val (head, tail) = waiters.dequeue
if (state.compareAndSet(oldState, Unavailable(tail))) {
head(())
} else {
release()
}
case oldState @ Available(restNumberOfPermits) =>
if (state.compareAndSet(oldState, Available(restNumberOfPermits + 1))) {
Trampoline.done(())
} else {
release()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package com.thoughtworks.deeplearning

import java.util.concurrent.Semaphore

import com.thoughtworks.deeplearning.Closeables.{AssertionAutoCloseable, AssertionFinalizer}
import com.thoughtworks.deeplearning.DifferentiableKernel.Zero.FloatZero
import com.thoughtworks.deeplearning.Memory.Address
import com.thoughtworks.deeplearning.OpenCL.CommandQueue.GlobalWorkSizeOnlyDimension
import com.thoughtworks.deeplearning.OpenCL.{CommandQueue, Device, Kernel}
import com.thoughtworks.deeplearning.OpenCLCodeGenerator.DslType.{DslBuffer, DslDouble, DslFloat, DslInt}
import com.thoughtworks.deeplearning.OpenCLCodeGenerator._
import com.thoughtworks.each.Monadic._
import com.thoughtworks.raii.RAIITask
import com.thoughtworks.raii.ResourceFactoryT.ResourceT
import com.thoughtworks.raii.{RAIITask, ResourceFactoryT}
import shapeless.labelled._
import shapeless._

import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scalaz.{@@, Monad, Monoid}
import scalaz.{@@, Monad, Monoid, \/, \/-}
import scalaz.Tags.{Multiplication, Parallel}
import scalaz.concurrent.Future
import scalaz.concurrent.Future.{ParallelFuture, futureParallelApplicativeInstance}
Expand All @@ -24,13 +28,15 @@ import scala.language.higherKinds

object DifferentiableKernel {

final case class PendingBuffer[Element](buffer: OpenCL.Buffer[Element], events: List[OpenCL.Event])

private[DifferentiableKernel] trait StaticDslTypeExtractor {
type AbstractType[A] <: DslType

implicit def dslDouble: AbstractType[Double]
implicit def dslFloat: AbstractType[Float]
implicit def dslInt: AbstractType[Int]
implicit def dslBuffer[Element: AbstractType]: AbstractType[OpenCL.Buffer[Element]]
implicit def dslBuffer[Element: AbstractType]: AbstractType[PendingBuffer[Element]]
}

private[DifferentiableKernel] trait StaticDslExpressionExtractor {
Expand Down Expand Up @@ -73,14 +79,14 @@ object DifferentiableKernel {

import OpenCLLayer._

def compile(context: OpenCL.Context, device: Device, commandQueue: CommandQueue)(
def compile(context: OpenCL.Context, device: Device, commandQueue: CommandQueue, semaphore: AsynchronousSemaphore)(
implicit compiler: Compiler[OutputElementData, OutputElementDelta, LocalDelta],
outputDataMemory: Memory[OutputElementData],
outputDeltaMemory: Memory[OutputElementDelta],
outputDataType: StaticDslType[OutputElementData],
outputDeltaType: StaticDslType[OutputElementDelta],
executor: ExecutionContext): RAIITask[(Int, compiler.ParameterRecord) => RAIITask[
Tape.Aux[OpenCL.Buffer[OutputElementData], OpenCL.Buffer[OutputElementDelta]]]] = throwableMonadic[RAIITask] {
Tape.Aux[PendingBuffer[OutputElementData], PendingBuffer[OutputElementDelta]]]] = throwableMonadic[RAIITask] {

RAIITask.jump().each

Expand All @@ -101,20 +107,32 @@ object DifferentiableKernel {
{ (expectedSize: Int, inputParameterMap: compiler.ParameterRecord) =>
throwableMonadic[RAIITask] {
val kernel = forwardKernelTask.each
val outputBuffer =
RAIITask.managed(context.createBuffer[OutputElementData](expectedSize)(outputDataMemory)).each
val outputBuffer = context.createBuffer[OutputElementData](expectedSize)(outputDataMemory)

compiler.setKernelInputArguments(kernel, 1, inputParameterMap)
kernel.setArg(0, outputBuffer)
val event =

RAIITask.unmanaged(semaphore.acquire()).each
val event = try {
RAIITask
.managed(
commandQueue.enqueueNDRangeKernel(kernel, Seq(GlobalWorkSizeOnlyDimension(Address(expectedSize)))))
.each
} catch {
case e if NonFatal(e) =>
semaphore.release().run
(throw e): OpenCL.Event
}
event.waitForComplete().unsafePerformAsync { _ =>
semaphore.release().run
}

RAIITask.unmanaged(event.waitForComplete()).each
new Tape {
override def data: OpenCL.Buffer[OutputElementData] = outputBuffer
// borrow
override val data: PendingBuffer[OutputElementData] = PendingBuffer(outputBuffer, List(event))

override def backward[OutputDeltaBuffer <: OpenCL.Buffer[OutputElementDelta]](
override def backward[OutputDeltaBuffer <: PendingBuffer[OutputElementDelta]](
outputDeltaTask: RAIITask[OutputDeltaBuffer]): Future[Unit] = {
Future.suspend {
Future.now(()) // TODO: backward
Expand All @@ -123,9 +141,9 @@ object DifferentiableKernel {
}

// TODO: Change OutputData and OutputDelta to a pair of OpenCL.Buffer and OpenCL.Event
override type Data = OpenCL.Buffer[OutputElementData]
override type Delta = OpenCL.Buffer[OutputElementDelta]
}: Tape.Aux[OpenCL.Buffer[OutputElementData], OpenCL.Buffer[OutputElementDelta]]
override type Data = PendingBuffer[OutputElementData]
override type Delta = PendingBuffer[OutputElementDelta]
}: Tape.Aux[PendingBuffer[OutputElementData], PendingBuffer[OutputElementDelta]]
}
}
}
Expand Down Expand Up @@ -291,10 +309,10 @@ object DifferentiableKernel {
}

def bufferIdentifier[Data, Delta](
key: Witness): OpenCLLayer[OpenCL.Buffer[Data],
OpenCL.Buffer[Delta],
key: Witness): OpenCLLayer[PendingBuffer[Data],
PendingBuffer[Delta],
FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil] = {
OpenCLLayer[OpenCL.Buffer[Data], OpenCL.Buffer[Delta], FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil](
OpenCLLayer[PendingBuffer[Data], PendingBuffer[Delta], FieldType[key.T, JacobianMatrix[Data, Delta]] :: HNil](
StaticDslExpression(DslExpression.Identifier(key.value)),
field[key.T](JacobianMatrix.Identity[Data, Delta]()) :: HNil
)
Expand All @@ -314,7 +332,7 @@ object DifferentiableKernel {
IndexLocalDelta <: HList,
ElementLocalDelta <: HList,
LocalDelta <: HList](
buffer: OpenCLLayer[OpenCL.Buffer[ElementData], OpenCL.Buffer[ElementDelta], BufferLocalDelta],
buffer: OpenCLLayer[PendingBuffer[ElementData], PendingBuffer[ElementDelta], BufferLocalDelta],
index: OpenCLLayer[Int, Float, IndexLocalDelta])(
implicit elementDataType: StaticDslType[ElementData],
zero: Zero.Aux[IndexLocalDelta],
Expand Down Expand Up @@ -355,6 +373,8 @@ object DifferentiableKernel {
def forwardParameter: Parameter

def setArgument(kernel: Kernel, index: Int, input: Input): Unit

def borrowEvents(input: Input): List[OpenCL.Event]
}

object InputCompiler {
Expand All @@ -368,14 +388,19 @@ object DifferentiableKernel {
elementDataType: StaticDslType[InputElementData])
: InputCompiler.Aux[Key,
JacobianMatrix.Row[InputElementData, InputElementDelta],
Tape.Aux[OpenCL.Buffer[InputElementData], OpenCL.Buffer[InputElementDelta]]] =
Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]]] =
new InputCompiler[Key, JacobianMatrix.Row[InputElementData, InputElementDelta]] {

override type Input = Tape.Aux[OpenCL.Buffer[InputElementData], OpenCL.Buffer[InputElementDelta]]
override type Input = Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]]
override def forwardParameter: Parameter = Parameter(witness.value, DslType.DslBuffer(elementDataType))

override def setArgument(kernel: Kernel, index: Int, input: Input): Unit = {
kernel.setArg[OpenCL.Buffer[InputElementData]](index, input.data)
kernel.setArg[OpenCL.Buffer[InputElementData]](index, input.data.buffer)
}

override def borrowEvents(
input: Tape.Aux[PendingBuffer[InputElementData], PendingBuffer[InputElementDelta]]): List[OpenCL.Event] = {
input.data.events
}
}

Expand All @@ -395,6 +420,8 @@ object DifferentiableKernel {
Parameter(OutputId, DslType.DslBuffer(outputDataType)) :: forwardInputParameters

def setKernelInputArguments(kernel: Kernel, startIndex: Int, parameters: ParameterRecord)

def borrowEvents(parameters: ParameterRecord): List[OpenCL.Event]
}

object Compiler {
Expand All @@ -415,6 +442,8 @@ object DifferentiableKernel {
override def forwardInputParameters: Nil.type = Nil

override def setKernelInputArguments(kernel: Kernel, startIndex: Int, parameters: HNil): Unit = {}

override def borrowEvents(parameters: HNil): List[OpenCL.Event] = Nil
}

implicit def hconsFill[OutputElementData,
Expand All @@ -441,6 +470,10 @@ object DifferentiableKernel {
headInputCompiler.setArgument(kernel, startIndex, parameters.head)
tailCompiler.setKernelInputArguments(kernel, startIndex + 1, parameters.tail)
}

override def borrowEvents(parameters: ::[FieldType[Key, Input], TailParameterRecord]): List[OpenCL.Event] = {
headInputCompiler.borrowEvents(parameters.head) ::: tailCompiler.borrowEvents(parameters.tail)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers {

(context, commandQueue)
}
val semaphore = AsynchronousSemaphore(3)

"When fill a buffer with 42.0f" - {
val differentiableKernel = {
Expand All @@ -71,12 +72,14 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers {
RAIITask.unmanaged(
RAIITask.run(
throwableMonadic[RAIITask] {
val layer = differentiableKernel.compile(context, device, commandQueue).each
val layer = differentiableKernel.compile(context, device, commandQueue, semaphore).each
val outputTape = layer(1, HNil).each
val delta = RAIITask.managed(context.createBuffer[Float](1))
val delta = RAIITask.managed(context.createBuffer[Float](1)).map(PendingBuffer(_, Nil))
RAIITask.unmanaged(outputTape.backward(delta)).each
val f = BufferUtils.createFloatBuffer(1)
val event = RAIITask.managed(commandQueue.enqueueReadBuffer(outputTape.data, f)).each
val event = RAIITask
.managed(commandQueue.enqueueReadBuffer(outputTape.data.buffer, f, outputTape.data.events: _*))
.each
RAIITask.unmanaged(event.waitForComplete()).each
f
}
Expand Down Expand Up @@ -119,12 +122,14 @@ class DifferentiableKernelSpec extends AsyncFreeSpec with Matchers {
RAIITask.unmanaged(
RAIITask.run(
throwableMonadic[RAIITask] {
val layer = differentiableKernel.compile(context, device, commandQueue).each
val layer = differentiableKernel.compile(context, device, commandQueue, semaphore).each
val outputTape = layer(1, ??? :: HNil).each
val delta = RAIITask.managed(context.createBuffer[Float](1))
val delta = RAIITask.managed(context.createBuffer[Float](1)).map(PendingBuffer(_, Nil))
RAIITask.unmanaged(outputTape.backward(delta)).each
val f = BufferUtils.createFloatBuffer(1)
val event = RAIITask.managed(commandQueue.enqueueReadBuffer(outputTape.data, f)).each
val event = RAIITask
.managed(commandQueue.enqueueReadBuffer(outputTape.data.buffer, f, outputTape.data.events: _*))
.each
RAIITask.unmanaged(event.waitForComplete()).each
f
}
Expand Down
5 changes: 5 additions & 0 deletions FutureIsomorphism/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")

libraryDependencies += "org.scalaz" %% "scalaz-concurrent" % "7.2.10"

libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % Test
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.thoughtworks.deeplearning

import scalaz.{ContT, Trampoline}
import scalaz.Free.Trampoline
import scalaz.concurrent.Future

/**
* @author 杨博 (Yang Bo) &lt;[email protected]&gt;
*/
object FutureIsomorphism extends scalaz.Isomorphism.IsoFunctorTemplate[Future, ContT[Trampoline, Unit, ?]] {
override def to[A](fa: Future[A]): ContT[Trampoline, Unit, A] = ContT[Trampoline, Unit, A] { continue =>
Trampoline.delay(fa.unsafePerformListen(continue))
}

override def from[A](ga: ContT[Trampoline, Unit, A]): Future[A] = {
Future.Async { continue =>
ga(continue).run
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,10 @@ data: $data""")

final class Event(val handle: Address) extends AssertionAutoCloseable with AssertionFinalizer {

def duplicate = new Event(handle)
def duplicate(): Event = {
checkErrorCode(clRetainEvent(handle.toLong))
new Event(handle)
}

override protected def forceClose(): Unit = {
checkErrorCode(clReleaseEvent(handle.toLong))
Expand Down
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ lazy val DifferentiableKernel =
project.dependsOn(
OpenCL,
OpenCLCodeGenerator,
TapeTaskFactory
TapeTaskFactory,
FutureIsomorphism,
AsynchronousSemaphore
)

lazy val OpenCLCodeGenerator = project.dependsOn(Memory)
Expand Down Expand Up @@ -40,6 +42,10 @@ lazy val TapeTask = project.dependsOn(Tape, ProjectRef(file("RAII.scala"), "RAII

lazy val LogRecords = project

lazy val AsynchronousSemaphore = project

lazy val FutureIsomorphism = project

//lazy val DifferentiableDouble =
// project.dependsOn(Layer,
// CumulativeTape,
Expand Down