Skip to content

scio-test SCollectionMatchers not working with SideOutput #1587

@sgireddy

Description

@sgireddy
class SCollectionSideoutTests extends FlatSpec with BeforeAndAfterEach with Matchers with Serializable with SCollectionMatchers  {

  private var _sc: ScioContext = _
  def sc: ScioContext = _sc

  override def beforeEach(): Unit = {
    super.beforeEach()
    _sc = ContextAndArgs(Array.empty)._1
  }

  override def afterEach(): Unit = {
    _sc.close().waitUntilFinish()
    super.afterEach()
  }

  val nSideOut = SideOutput[Int]
  
  def getEventWithOddSideOut(numbers: SCollection[Int]): (SCollection[Int], SideOutputCollections) = {
    numbers
      .withSideOutputs(nSideOut)
      .flatMap((n, ctx) => {
        if (n % 2 == 0)
          Some(n)
        else {
          ctx.output(nSideOut, n)
          None
        }
      })
  }

  "Scio-matchers" should "handle sideoutputs" in {
    val expected = List(2, 4, 6, 8, 10)
    val elements = sc.parallelize(1 to 10)
    val (even, odd) = getEventWithOddSideOut(elements)
    even should containInAnyOrder(expected)
  }
}

Error:

Unable to return a default Coder for flatMap@{SCollectionSideoutTests.scala:28}2.out1 [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
java.lang.IllegalStateException: Unable to return a default Coder for flatMap@{SCollectionSideoutTests.scala:28}2.out1 [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:278)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:257)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:243)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:577)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:312)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:489)
at com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:476)
at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:542)
at com.spotify.scio.ScioContext.close(ScioContext.scala:476)
at test.SCollectionSideoutTests.afterEach(SCollectionSideoutTests.scala:19)
at org.scalatest.BeforeAndAfterEach$$anonfun$1.apply$mcV$sp(BeforeAndAfterEach.scala:234)
at org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:379)
at org.scalatest.Status$$anonfun$withAfterEffect$1.apply(Status.scala:375)
at org.scalatest.SucceededStatus$.whenCompleted(Status.scala:454)
at org.scalatest.Status$class.withAfterEffect(Status.scala:375)
at org.scalatest.SucceededStatus$.withAfterEffect(Status.scala:426)
at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:232)
at test.SCollectionSideoutTests.runTest(SCollectionSideoutTests.scala:8)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpec.run(FlatSpec.scala:1685)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions