Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLevelOperations, TruncatableTable}
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand All @@ -40,11 +40,11 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
EliminateSubqueryAliases(aliasedTable) match {
case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral =>
case DataSourceV2Table(_: TruncatableTable) if cond == TrueLiteral =>
// don't rewrite as the table supports truncation
d

case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) =>
case r @ DataSourceV2Table(t: SupportsRowLevelOperations) =>
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
table.operation match {
case _: SupportsDelta =>
Expand All @@ -53,7 +53,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
buildReplaceDataPlan(r, table, cond)
}

case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
case DataSourceV2Table(_: SupportsDeleteV2) =>
// don't rewrite as the table supports deletes only with filters
d

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -125,7 +125,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>

EliminateSubqueryAliases(aliasedTable) match {
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) =>
validateMergeIntoConditions(m)
val table = buildOperationTable(tbl, MERGE, CaseInsensitiveStringMap.empty())
table.operation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -40,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
if u.resolved && u.rewritable && u.aligned =>

EliminateSubqueryAliases(aliasedTable) match {
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) =>
val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty())
val updateCond = cond.getOrElse(TrueLiteral)
table.operation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, DataSourceV2Table}
import org.apache.spark.sql.internal.SQLConf

trait OperationHelper extends AliasHelper with PredicateHelper {
Expand Down Expand Up @@ -436,8 +436,7 @@ object GroupBasedRowLevelOperation {
type ReturnType = (ReplaceData, Expression, Option[Expression], LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _),
cond, query, _, _, groupFilterCond, _) =>
case rd @ ReplaceData(DataSourceV2Table(table), cond, query, _, _, groupFilterCond, _) =>
// group-based UPDATEs that are rewritten as UNION read the table twice
val allowMultipleReads = rd.operation.command == UPDATE
val readRelation = findReadRelation(table, query, allowMultipleReads)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowL
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -263,7 +263,7 @@ case class ReplaceData(

lazy val operation: RowLevelOperation = {
EliminateSubqueryAliases(table) match {
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
case DataSourceV2Table(RowLevelOperationTable(_, operation)) =>
operation
case _ =>
throw new AnalysisException(
Expand Down Expand Up @@ -345,7 +345,7 @@ case class WriteDelta(

lazy val operation: SupportsDelta = {
EliminateSubqueryAliases(table) match {
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
case DataSourceV2Table(RowLevelOperationTable(_, operation)) =>
operation.asInstanceOf[SupportsDelta]
case _ =>
throw new AnalysisException(
Expand Down Expand Up @@ -834,7 +834,7 @@ case class UpdateTable(

lazy val rewritable: Boolean = {
EliminateSubqueryAliases(table) match {
case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true
case DataSourceV2Table(_: SupportsRowLevelOperations) => true
case _ => false
}
}
Expand Down Expand Up @@ -878,7 +878,7 @@ case class MergeIntoTable(

lazy val rewritable: Boolean = {
EliminateSubqueryAliases(targetTable) match {
case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true
case DataSourceV2Table(_: SupportsRowLevelOperations) => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ case class StreamingDataSourceV2ScanRelation(
override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator
}

object DataSourceV2Table {
def unapply(relation: DataSourceV2Relation): Option[Table] = Some(relation.table)
}

object DataSourceV2Relation {
def create(
table: Table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, DataSourceV2Table, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1733,8 +1733,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _),
_, _, _, _) =>
case DataSourceV2ScanRelation(DataSourceV2Table(table: FileTable), _, _, _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
Expand Down Expand Up @@ -431,7 +431,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
case _ => false
}

case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
case DataSourceV2Table(fileTable: FileTable) =>
refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)

case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Table, FileTable}

/**
* Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]].
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, _) =>
d @ DataSourceV2Table(table: FileTable), _, _, _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
invalidateCache) :: Nil
}

case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
case AppendData(r @ DataSourceV2Table(v1: SupportsWrite), _, _,
_, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
Expand All @@ -278,7 +278,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil

case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
case OverwriteByExpression(r @ DataSourceV2Table(v1: SupportsWrite), _, _,
_, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimeFor
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Table, FileTable}
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -1498,7 +1498,7 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
case DataSourceV2Table(fileTable: FileTable) =>
assert(fileTable.fileIndex.partitionSpec() === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a matching DataSourceV2Relation, but got:\n$queryExecution")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Table, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
import org.apache.spark.sql.execution.streaming.runtime._
import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink, FileStreamSinkLog, SinkFileStatus}
Expand Down Expand Up @@ -776,7 +776,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val table = df.queryExecution.analyzed.collect {
case DataSourceV2Relation(table: FileTable, _, _, _, _) => table
case DataSourceV2Table(table: FileTable) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
Expand Down