Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,20 @@ fully qualified class name to the following option:
## Reading ASCII text file
Cobrix is primarily designed to read binary files, but you can directly use some internal functions to read ASCII text files. In ASCII text files, records are separated with newlines.

Working example:
Working example 1:
```scala
// A new experimental way
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("is_text", "true")
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)
````

Working example 2:
```scala
val spark = SparkSession
.builder()
Expand All @@ -490,16 +503,23 @@ Working example:
| 10 PERSON PIC X(3).
""".stripMargin

val parsedCopybook = CopybookParser.parseTree(ASCII(), copybook, dropGroupFillers = false, segmentRedefines = Seq(), stringTrimmingPolicy = StringTrimmingPolicy.TrimNone, ebcdicCodePage = CodePage.getCodePageByName("common"), nonTerminals = Seq())
val cobolSchema = new CobolSchema(parsedCopybook, SchemaRetentionPolicy.CollapseRoot, false)
val parsedCopybook = CopybookParser.parse(copybook, dataEnncoding = ASCII, stringTrimmingPolicy = StringTrimmingPolicy.TrimNone)
val cobolSchema = new CobolSchema(parsedCopybook, SchemaRetentionPolicy.CollapseRoot, "", false)
val sparkSchema = cobolSchema.getSparkSchema


val rddText = spark.sparkContext.textFile("src/main/resources/mini.txt")

val rddRow = rddText.map(str => {
RowExtractors.extractRecord(parsedCopybook.ast, str.getBytes(), 0, SchemaRetentionPolicy.CollapseRoot)
})
val recordHandler = new RowHandler()

val rddRow = rddText
.filter(str => str.length > 0)
.map(str => {
val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast,
str.getBytes(),
0,
SchemaRetentionPolicy.CollapseRoot, handler = recordHandler)
Row.fromSeq(record)
})

val dfOut = spark.createDataFrame(rddRow, sparkSchema)

Expand Down Expand Up @@ -1047,6 +1067,7 @@ Again, the full example is available at
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. This option is incompatible with `is_record_sequence`. |
| .option("record_header_parser", "com.example.record.header.parser") | Specifies a class for parsing custom record headers. The class must inherit `RecordHeaderParser` and `Serializable` traits. |
| .option("rhp_additional_info", "") | Passes a string as an additional info parameter passed to a custom record header parser (RHP). A custom RHP can get that additional info by overriding `onReceiveAdditionalInfo()` |
| .option("is_text", "true") | `Experimental` If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. |


##### Multisegment files options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,55 @@ object CopybookParser {

case class RecordBoundary(name: String, begin: Int, end: Int)

/**
* Tokenizes a Cobol Copybook contents and returns the AST.
*
* @param dataEnncoding Encoding of the data file (either ASCII/EBCDIC). The encoding of the copybook is expected to be ASCII.
* @param copyBookContents A string containing all lines of a copybook
* @param dropGroupFillers Drop groups marked as fillers from the output AST
* @param segmentRedefines A list of redefined fields that correspond to various segments. This needs to be specified for automatically
* resolving segment redefines.
* @param fieldParentMap A segment fields parent mapping
* @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed
* @param commentPolicy Specifies a policy for comments truncation inside a copybook
* @param ebcdicCodePage A code page for EBCDIC encoded data
* @param asciiCharset A charset for ASCII encoded data
* @param isUtf16BigEndian If true UTF-16 strings are considered big-endian.
* @param floatingPointFormat A format of floating-point numbers (IBM/IEEE754)
* @param nonTerminals A list of non-terminals that should be extracted as strings
* @param isDebug If true, additional debug fields will be added alongside all non-redefined primitives
* @return Seq[Group] where a group is a record inside the copybook
*/
def parse(copyBookContents: String,
dataEnncoding: Encoding = EBCDIC,
dropGroupFillers: Boolean = false,
segmentRedefines: Seq[String] = Nil,
fieldParentMap: Map[String, String] = HashMap[String, String](),
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
commentPolicy: CommentPolicy = CommentPolicy(),
ebcdicCodePage: CodePage = new CodePageCommon,
asciiCharset: Charset = StandardCharsets.US_ASCII,
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
nonTerminals: Seq[String] = Nil,
occursHandlers: Map[String, Map[String, Int]] = Map(),
isDebug: Boolean = false): Copybook = {
parseTree(dataEnncoding,
copyBookContents,
dropGroupFillers,
segmentRedefines,
fieldParentMap,
stringTrimmingPolicy,
commentPolicy,
ebcdicCodePage,
asciiCharset,
isUtf16BigEndian,
floatingPointFormat,
nonTerminals,
occursHandlers,
isDebug)
}

/**
* Tokenizes a Cobol Copybook contents and returns the AST.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param multiCopybookPath Sequence containing the paths to the copybooks.
* @param copybookContent String containing the actual content of the copybook. Either this, the copybookPath, or multiCopybookPath parameter must be specified.
* @param sourcePath String containing the path to the Cobol file to be parsed.
* @param isText If true the input data consists of text files where records are separated by a line ending character
* @param isEbcdic If true the input data file encoding is EBCDIC, otherwise it is ASCII
* @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions
* @param ebcdicCodePageClass An optional custom code page conversion class provided by a user
Expand All @@ -51,6 +52,7 @@ case class CobolParameters(
multiCopybookPath: Seq[String],
copybookContent: Option[String],
sourcePath: Option[String],
isText: Boolean,
isEbcdic: Boolean,
ebcdicCodePage: String,
ebcdicCodePageClass: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object CobolParametersParser {
// Parameters for multisegment variable length files
val PARAM_IS_XCOM = "is_xcom"
val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence"
val PARAM_IS_TEXT = "is_text"
val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian"
val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length"
val PARAM_RDW_ADJUSTMENT = "rdw_adjustment"
Expand Down Expand Up @@ -193,6 +194,7 @@ object CobolParametersParser {
params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','),
getParameter(PARAM_COPYBOOK_CONTENTS, params),
getParameter(PARAM_SOURCE_PATH, params),
params.getOrElse(PARAM_IS_TEXT, "false").toBoolean,
isEbcdic,
ebcdicCodePageName,
ebcdicCodePageClass,
Expand Down Expand Up @@ -451,6 +453,8 @@ object CobolParametersParser {
params.contains(PARAM_FILE_END_OFFSET) ||
params.contains(PARAM_RECORD_LENGTH)

val isText = params.getOrElse(PARAM_IS_TEXT, "false").toBoolean

val isPedantic = params.getOrElse(PARAM_PEDANTIC, "false").toBoolean
val keysPassed = params.getMap.keys.toSeq
val unusedKeys = keysPassed.flatMap(key => {
Expand All @@ -473,6 +477,21 @@ object CobolParametersParser {
s" or one of these options is set: '$PARAM_RECORD_LENGTH', '$PARAM_FILE_START_OFFSET', '$PARAM_FILE_END_OFFSET'"
throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when $recordSequenceCondition")
}
if (isText) {
val incorrectParameters = new ListBuffer[String]
if (params.contains(PARAM_IS_RECORD_SEQUENCE)) {
incorrectParameters += PARAM_IS_RECORD_SEQUENCE
}
if (params.contains(PARAM_IS_XCOM)) {
incorrectParameters += PARAM_IS_XCOM
}
if (params.contains(PARAM_RECORD_LENGTH)) {
incorrectParameters += PARAM_RECORD_LENGTH
}
if (incorrectParameters.nonEmpty) {
throw new IllegalArgumentException(s"Option '$PARAM_IS_TEXT' and ${incorrectParameters.mkString(", ")} cannot be used together.")
}
}
if (unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmi
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.{FixedLenNestedReader => ReaderFixedLenNestedReader}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.spark.cobol.reader.RowHandler
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.reader

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.StructType
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.{FixedLenNestedReader => ReaderFixedLenNestedReader}
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema


/**
* The Cobol data reader from text files that produces nested structure schema
*
* @param copyBookContents A copybook contents.
* @param startOffset Specifies the number of bytes at the beginning of each record that can be ignored.
* @param endOffset Specifies the number of bytes at the end of each record that can be ignored.
* @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
*/
final class FixedLenTextReader(copyBookContents: Seq[String],
isEbcdic: Boolean = true,
ebcdicCodePage: CodePage,
floatingPointFormat: FloatingPointFormat,
startOffset: Int = 0,
endOffset: Int = 0,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
dropGroupFillers: Boolean,
nonTerminals: Seq[String],
occursMappings: Map[String, Map[String, Int]],
readerProperties: ReaderParameters
)
extends ReaderFixedLenNestedReader[GenericRow](
copyBookContents, isEbcdic, ebcdicCodePage, floatingPointFormat,
startOffset, endOffset, schemaRetentionPolicy, stringTrimmingPolicy,
dropGroupFillers, nonTerminals, occursMappings, readerProperties,
new RowHandler()
) with FixedLenReader with Serializable {

class RowIterator(private val iterator: Iterator[Seq[Any]]) extends Iterator[Row] {
override def hasNext: Boolean = iterator.hasNext

@throws(classOf[IllegalStateException])
override def next(): Row = Row.fromSeq(iterator.next())
}

override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema)

override def getSparkSchema: StructType = getCobolSchema.getSparkSchema

@throws(classOf[Exception])
override def getRowIterator(binaryData: Array[Byte]): Iterator[Row] = {
checkBinaryDataValidity(binaryData)
new RowIterator(getRecordIterator(binaryData))
}

protected override def checkBinaryDataValidity(binaryData: Array[Byte]): Unit = {
if (startOffset < 0) {
throw new IllegalArgumentException(s"Invalid record start offset = $startOffset. A record start offset cannot be negative.")
}
if (endOffset < 0) {
throw new IllegalArgumentException(s"Invalid record end offset = $endOffset. A record end offset cannot be negative.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.slf4j.LoggerFactory
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenReader, Reader, VarLenReader}
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenReader, FixedLenTextReader, Reader, VarLenReader}
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
import za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
import za.co.absa.cobrix.spark.cobol.utils.FileUtils

import scala.util.control.NonFatal


Expand Down Expand Up @@ -84,6 +85,8 @@ class CobolRelation(sourceDir: String,
override def buildScan(): RDD[Row] = {

cobolReader match {
case blockReader: FixedLenTextReader =>
CobolScanners.buildScanForTextFiles(blockReader, sourceDir, parseRecords, sqlContext)
case blockReader: FixedLenReader =>
CobolScanners.buildScanForFixedLength(blockReader, sourceDir, parseRecords, debugIgnoreFileSize, sqlContext)
case streamReader: VarLenReader if streamReader.isIndexGenerationNeeded =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenNestedReader, FixedLenReader, Reader, ReaderFactory, VarLenNestedReader, VarLenReader}
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenNestedReader, FixedLenReader, FixedLenTextReader, Reader, ReaderFactory, VarLenNestedReader, VarLenReader}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.spark.cobol.source.copybook.CopybookContentLoader
import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._
Expand Down Expand Up @@ -71,14 +71,36 @@ class DefaultSource
* This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place.
*/
private def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = {
if (cobolParameters.variableLengthParams.isEmpty) {
if (cobolParameters.isText) {
createTextReader(cobolParameters, spark)
} else if (cobolParameters.variableLengthParams.isEmpty) {
createFixedLengthReader(cobolParameters, spark)
}
else {
createVariableLengthReader(cobolParameters, spark)
}
}

/**
* Creates a Reader that knows how to consume text Cobol records.
*/
private def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
new FixedLenTextReader(copybookContent,
parameters.isEbcdic,
getCodePage(parameters.ebcdicCodePage, parameters.ebcdicCodePageClass),
parameters.floatingPointFormat,
parameters.recordStartOffset,
parameters.recordEndOffset,
parameters.schemaRetentionPolicy,
parameters.stringTrimmingPolicy,
parameters.dropGroupFillers,
parameters.nonTerminals,
parameters.occursMappings,
getReaderProperties(parameters, spark)
)
}

/**
* Creates a Reader that knows how to consume fixed-length Cobol records.
*/
Expand Down
Loading