Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@

package org.locationtech.rasterframes.expressions.generators

import org.locationtech.rasterframes._
import org.locationtech.rasterframes.encoders.CatalystSerializer._
import org.locationtech.rasterframes.util._
import geotrellis.raster._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, GenericInternalRow}
import org.apache.spark.sql.rf.TileUDT
import org.apache.spark.sql.types._
import org.locationtech.rasterframes._
import org.locationtech.rasterframes.expressions.DynamicExtractors
import org.locationtech.rasterframes.util._
import spire.syntax.cfor.cfor

/**
Expand Down Expand Up @@ -67,8 +66,11 @@ case class ExplodeTiles(
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
val tiles = Array.ofDim[Tile](children.length)
cfor(0)(_ < tiles.length, _ + 1) { index =>
val row = children(index).eval(input).asInstanceOf[InternalRow]
tiles(index) = if(row != null) row.to[Tile](TileUDT.tileSerializer) else null
val c = children(index)
val row = c.eval(input).asInstanceOf[InternalRow]
tiles(index) = if(row != null)
DynamicExtractors.tileExtractor(c.dataType)(row)._1
else null
}
val dims = tiles.filter(_ != null).map(_.dimensions)
if(dims.isEmpty) Seq.empty[InternalRow]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

package org.locationtech.rasterframes.ml

import org.apache.spark.sql.rf.TileUDT
import org.apache.spark.sql.types.{StructField, StructType}
import org.locationtech.rasterframes.expressions.DynamicExtractors

/**
* Utility mix-in for separating out tile columns from non-tile columns.
Expand All @@ -31,13 +31,11 @@ import org.apache.spark.sql.types.{StructField, StructType}
*/
trait TileColumnSupport {
protected def isTile(field: StructField) =
field.dataType.typeName.equalsIgnoreCase(TileUDT.typeName)
DynamicExtractors.tileExtractor.isDefinedAt(field.dataType)

type TileFields = Array[StructField]
type NonTileFields = Array[StructField]
protected def selectTileAndNonTileFields(schema: StructType): (TileFields, NonTileFields) = {
val tiles = schema.fields.filter(isTile)
val nonTiles = schema.fields.filterNot(isTile)
(tiles, nonTiles)
schema.fields.partition(f => DynamicExtractors.tileExtractor.isDefinedAt(f.dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,48 @@

package org.locationtech.rasterframes.ml

import org.locationtech.rasterframes.TestData
import geotrellis.raster.Tile
import org.apache.spark.sql.functions.lit
import org.locationtech.rasterframes.TestEnvironment
import geotrellis.proj4.LatLng
import geotrellis.raster.{IntCellType, Tile}
import org.apache.spark.sql.functions.{avg, lit}
import org.locationtech.rasterframes.{TestData, TestEnvironment}
/**
*
* @since 2/16/18
*/
class TileExploderSpec extends TestEnvironment with TestData {
describe("Tile explode transformer") {
it("should explode tiles") {
import spark.implicits._
import spark.implicits._
it("should explode tile") {
val df = Seq[(Tile, Tile)]((byteArrayTile, byteArrayTile)).toDF("tile1", "tile2").withColumn("other", lit("stuff"))

val exploder = new TileExploder()
val newSchema = exploder.transformSchema(df.schema)

val exploded = exploder.transform(df)

assert(newSchema === exploded.schema)
assert(exploded.columns.length === 5)
assert(exploded.count() === 9)
write(exploded)
exploded.agg(avg($"tile1")).as[Double].first() should be (byteArrayTile.statisticsDouble.get.mean)
}

it("should explode proj_raster") {
val randPRT = TestData.projectedRasterTile(10, 10, scala.util.Random.nextInt(), extent, LatLng, IntCellType)

val df = Seq(randPRT).toDF("proj_raster").withColumn("other", lit("stuff"))

val exploder = new TileExploder()
val newSchema = exploder.transformSchema(df.schema)

val exploded = exploder.transform(df)

assert(newSchema === exploded.schema)
assert(exploded.columns.length === 4)
assert(exploded.count() === randPRT.size)
write(exploded)

exploded.agg(avg($"proj_raster")).as[Double].first() should be (randPRT.statisticsDouble.get.mean)
}
}
}
4 changes: 4 additions & 0 deletions docs/src/main/paradox/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.8.x

### 0.8.2

* Fixed `TileExploder` to support `proj_raster` struct [(#287)](https://github.com/locationtech/rasterframes/issues/287).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to confirm that we've kept this up to date since 0.8.1

### 0.8.1

* Added `rf_local_no_data`, `rf_local_data` and `rf_interpret_cell_type_as` raster functions.
Expand Down
2 changes: 1 addition & 1 deletion pyrasterframes/src/main/python/docs/vector-data.pymd
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ l8_filtered = l8 \
.filter(st_intersects(l8.geom, st_bufferPoint(l8.paducah, lit(50000.0)))) \
.filter(l8.acquisition_date > '2018-02-01') \
.filter(l8.acquisition_date < '2018-04-01')
l8_filtered.select('product_id', 'entity_id', 'acquisition_date', 'cloud_cover_pct').toPandas()
l8_filtered.select('product_id', 'entity_id', 'acquisition_date', 'cloud_cover_pct')
```

[GeoPandas]: http://geopandas.org
Expand Down
1 change: 0 additions & 1 deletion pyrasterframes/src/main/python/tests/ExploderTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

class ExploderTests(TestEnvironment):

@unittest.skip("See issue https://github.com/locationtech/rasterframes/issues/163")
def test_tile_exploder_pipeline_for_prt(self):
# NB the tile is a Projected Raster Tile
df = self.spark.read.raster(self.img_uri)
Expand Down