diff --git a/README.md b/README.md index 49acd3555..7c13b9c51 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,14 @@ In some scenarios Spark is unable to find "cobol" data source by it's short name Cobrix assumes input data is encoded in EBCDIC. You can load ASCII files as well by specifying the following option: `.option("encoding", "ascii")`. +If the input file is a text file (CRLF / LF are used to split records), use +`.option("is_text", "true")`. + +Multisegment ASCII text files are supported using this option: +`.option("record_format", "D)`. + +Read more on record formats at https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets + ### Streaming Cobol binary files from a directory 1. Create a Spark ```StreamContext``` @@ -267,7 +275,7 @@ You can collect the uber jar of `spark-cobol` either at Then, run `spark-shell` or `spark-submit` adding the fat jar as the option. ```sh -$ spark-shell --jars spark-cobol-assembly-2.3.1-SNAPSHOT.jar +$ spark-shell --jars spark-cobol-assembly-2.4.0-SNAPSHOT.jar ``` ## Other Features @@ -317,10 +325,17 @@ file is copied from a mainframe. To load variable length record file the following option should be specified: ``` -.option("is_record_sequence", "true") +.option("record_format", "V") +``` + +To load variable blocked length record file the following option should be specified: +``` +.option("record_format", "VB") ``` -The space used by the headers should not be mentioned in the copybook if this option is used. Please refer to the +More on record formats: https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets + +The space used by the headers (both BDW and RDW) should not be mentioned in the copybook if this option is used. Please refer to the 'Record headers support' section below. ### Schema collapsing @@ -458,11 +473,11 @@ about record length. You can instruct the reader to use 4 byte record headers to extract records from a mainframe file. ``` -.option("is_record_sequence", "true") +.option("record_format", "V") ``` This is very helpful for multisegment files when segments have different lengths. Since each segment has it's own -copybook it is very convenient to extract segments one by one by combining 'is_record_sequence' option with segment +copybook it is very convenient to extract segments one by one by combining `record_format = V` option with segment filter option. ``` @@ -549,6 +564,17 @@ Working example 1: ```` Working example 2: +```scala + // A new experimental way + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "D") + .load(tmpFileName) +```` + +Working example 3: ```scala val spark = SparkSession .builder() @@ -634,7 +660,7 @@ val df = spark .read .format("cobol") .option("copybook_contents", copybook) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") @@ -732,7 +758,7 @@ in redefined groups. Here is the copybook for our example: The 'SEGMENT-ID' and 'COMPANY-ID' fields are present in all of the segments. The 'STATIC-DETAILS' group is present only in the root record. The 'CONTACTS' group is present only in child record. Notice that 'CONTACTS' redefine 'STATIC-DETAILS'. -Because the records have different lengths the 'is_record_sequence' option should be set to 'true'. +Because the records have different lengths use `record_format = V` or `record_format = VB` depending of the record format. If you load this file as is you will get the schema and the data similar to this. @@ -742,7 +768,7 @@ val df = spark .read .format("cobol") .option("copybook", "/path/to/thecopybook") - .option("is_record_sequence", "true") + .option("record_format", "V") .load("examples/multisegment_data") ``` @@ -807,7 +833,7 @@ val df = spark .read .format("cobol") .option("copybook", "/path/to/thecopybook") - .option("is_record_sequence", "true") + .option("record_format", "V") // Specifies a field containing a segment id .option("segment_field", "SEGMENT_ID") @@ -960,7 +986,7 @@ val df = spark .read .format("cobol") .option("copybook_contents", copybook) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") @@ -1130,14 +1156,16 @@ Again, the full example is available at | Option (usage example) | Description | | --------------------------------------------- |:----------------------------------------------------------------------------- | -| .option("is_record_sequence", "true") | If 'true' the parser will look for 4 byte RDW headers to read variable record length files. | +| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | +| .option("is_record_sequence", "true") | _[deprecated]_ If 'true' the parser will look for 4 byte RDW headers to read variable record length files. Use `.option("record_format", "V")` instead. | | .option("is_rdw_big_endian", "true") | Specifies if RDW headers are big endian. They are considered little-endian by default. | -| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. | +| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. For BDW use `.option("bdw_adjustment", -4)` | | .option("rdw_adjustment", 0) | If there is a mismatch between RDW and record length this option can be used to adjust the difference. | -| .option("record_length_field", "RECORD-LEN") | Specifies a record length field to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. This option is incompatible with `is_record_sequence`. | +| .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. | +| .option("record_length_field", "RECORD-LEN") | Specifies a record length field to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. | | .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. | -| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. | -| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `is_record_sequence`, multisegment and hierarchical text record files can be loaded. | +| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. | +| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. | ##### Multisegment files options @@ -1213,7 +1241,7 @@ For multisegment variable lengths tests: .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .load(args(0)) @@ -1259,6 +1287,13 @@ For multisegment variable lengths tests: ![](performance/images/exp3_multiseg_wide_records_throughput.svg) ![](performance/images/exp3_multiseg_wide_mb_throughput.svg) ## Changelog +- #### 2.4.0 to be released soon. + - [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format. + Options to adjust BDW settings are added: + - `is_bdw_big_endian` - specifies if BDW is big-endian (false by default) + - `bdw_adjustment` - Specifies how the value of a BDW is different from the block payload. For example, if the side in BDW headers includes BDW record itself, use `.option("bdw_adjustment", "-4")`. + - Options `is_record_sequence` and `is_xcom` are deprecated. Use `.option("record_format", "V")` instead. + - [#417](https://github.com/AbsaOSS/cobrix/issues/417) Multisegment ASCII text files have now direct support using `record_format = D`. - #### 2.3.0 released 2 August 2021. - [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs. - [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking, diff --git a/cobol-converters/pom.xml b/cobol-converters/pom.xml index 32d6ec0bc..1988d8250 100644 --- a/cobol-converters/pom.xml +++ b/cobol-converters/pom.xml @@ -22,7 +22,7 @@ za.co.absa.cobrix cobrix_2.12 - 2.3.1-SNAPSHOT + 2.4.0-SNAPSHOT ../pom.xml diff --git a/cobol-parser/pom.xml b/cobol-parser/pom.xml index 789915ccd..7e4471115 100644 --- a/cobol-parser/pom.xml +++ b/cobol-parser/pom.xml @@ -22,7 +22,7 @@ za.co.absa.cobrix cobrix_2.12 - 2.3.1-SNAPSHOT + 2.4.0-SNAPSHOT ../pom.xml diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala new file mode 100644 index 000000000..75b588f5e --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.parser.recordformats + +sealed trait RecordFormat + +object RecordFormat { + case object FixedLength extends RecordFormat + case object VariableLength extends RecordFormat + case object VariableBlock extends RecordFormat + case object AsciiText extends RecordFormat + + def withNameOpt(s: String): Option[RecordFormat] = { + s match { + case "F" => Some(FixedLength) + case "V" => Some(VariableLength) + case "VB" => Some(VariableBlock) + case "D" => Some(AsciiText) + case "T" => Some(AsciiText) // Same as 'D' - Cobrix extension + case _ => None + } + } +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 4916bac23..0a4fb572d 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -23,12 +23,13 @@ import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} -import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor} +import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler import za.co.absa.cobrix.cobol.reader.index.IndexGenerator import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry import za.co.absa.cobrix.cobol.reader.iterator.{VarLenHierarchicalIterator, VarLenNestedIterator} import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator @@ -61,13 +62,21 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], binaryData: SimpleStream, copybook: Copybook ): Option[RawRecordExtractor] = { - val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, readerProperties.reAdditionalInfo) + val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment) + val bdwParams = RecordHeaderParameters(readerProperties.isBdwBigEndian, readerProperties.bdwAdjustment) + + val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams) + val bdwDecoder = new RecordHeaderDecoderBdw(bdwParams) + + val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoder, readerProperties.reAdditionalInfo) readerProperties.recordExtractor match { case Some(recordExtractorClass) => Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams)) case None if readerProperties.isText => Some(new TextRecordExtractor(reParams)) + case None if readerProperties.hasBdw => + Some(new VariableBlockVariableRecordExtractor(reParams)) case None if readerProperties.variableSizeOccurs && readerProperties.recordHeaderParser.isEmpty && !readerProperties.isRecordSequence && diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala index 585573dd0..d1e437476 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala @@ -17,6 +17,7 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import za.co.absa.cobrix.cobol.parser.Copybook +import za.co.absa.cobrix.cobol.reader.recordheader.RecordHeaderDecoder import za.co.absa.cobrix.cobol.reader.stream.SimpleStream /** @@ -29,5 +30,7 @@ case class RawRecordContext( startingRecordNumber: Long, inputStream: SimpleStream, copybook: Copybook, + rdwDecoder: RecordHeaderDecoder, + bdwDecoder: RecordHeaderDecoder, additionalInfo: String ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala index 9ac3f37a4..7172ffc59 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala @@ -18,45 +18,41 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import scala.collection.mutable -class VariableBlockRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { - VariableBlockRecordExtractor.additionalInfo = ctx.additionalInfo - private var recordQueue = new mutable.Queue[Array[Byte]] - private var initialRead = true - private var recordNumber = ctx.startingRecordNumber +class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + private val recordQueue = new mutable.Queue[Array[Byte]] + override def offset: Long = ctx.inputStream.offset override def hasNext: Boolean = { - var output: Boolean = true - if (initialRead == true) { + if (recordQueue.isEmpty) { readNextBlock() - initialRead = false - } else { - if (recordQueue.isEmpty && !ctx.inputStream.isEndOfStream) { - readNextBlock() - } else { - if (recordQueue.isEmpty && ctx.inputStream.isEndOfStream) { - output = false - } - } } - output + recordQueue.nonEmpty } private def readNextBlock(): Unit = { + val bdwSize = ctx.bdwDecoder.headerSize + val rdwSize = ctx.rdwDecoder.headerSize if (!ctx.inputStream.isEndOfStream) { - val bdwFirstPart = BigInt(ctx.inputStream.next(2)).toInt - val bdwSecondPart = BigInt(ctx.inputStream.next(2)).toInt - var blockBuffer = ctx.inputStream.next(bdwFirstPart-4) - var blockPointer: Int = 0 - var recordCounter: Int =0 - while (blockPointer < blockBuffer.length) { - recordCounter += 1 - var rdwFirstPart = BigInt(blockBuffer.slice(blockPointer, blockPointer+2)).toInt - var rdwSecondPart = BigInt(blockBuffer.slice(blockPointer+2, blockPointer+4)).toInt - val payload = blockBuffer.slice(blockPointer+4, blockPointer + rdwFirstPart) - recordQueue.enqueue(payload) - blockPointer += rdwFirstPart + val bdwOffset = ctx.inputStream.offset + val bdw = ctx.inputStream.next(bdwSize) + + val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset) + val blockBuffer = ctx.inputStream.next(blockLength) + + var blockIndex = 0 + + while (blockIndex < blockBuffer.length) { + val rdwOffset = bdwOffset + blockIndex + val rdw = blockBuffer.slice(blockIndex, blockIndex + rdwSize) + val recordLength = ctx.rdwDecoder.getRecordLength(rdw, rdwOffset) + + val payload = blockBuffer.slice(blockIndex + rdwSize, blockIndex + recordLength + rdwSize) + if (payload.length > 0) { + recordQueue.enqueue(payload) + } + blockIndex += recordLength + rdwSize } } } @@ -64,15 +60,9 @@ class VariableBlockRecordExtractor(ctx: RawRecordContext) extends Serializable w @throws[NoSuchElementException] override def next(): Array[Byte] = { - var rawRecord: Array[Byte] = new Array[Byte](0) if (!hasNext) { throw new NoSuchElementException } - rawRecord = recordQueue.dequeue() - rawRecord + recordQueue.dequeue() } } - -object VariableBlockRecordExtractor { - var additionalInfo: String = "" -} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index ecd718c79..014d389de 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -20,6 +20,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPoint import za.co.absa.cobrix.cobol.parser.policies.CommentPolicy import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy /** @@ -28,8 +29,9 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param copybookPath String containing the path to the copybook in a given file system. * @param multiCopybookPath Sequence containing the paths to the copybooks. * @param copybookContent String containing the actual content of the copybook. Either this, the copybookPath, or multiCopybookPath parameter must be specified. - * @param sourcePath String containing the path to the Cobol file to be parsed. - * @param isText If true the input data consists of text files where records are separated by a line ending character + * @param sourcePaths The list of source file paths. + * @param recordFormat The record format (F, V, VB, D) + * @param isText [deprecated by recordFormat] If true the input data consists of text files where records are separated by a line ending character * @param isEbcdic If true the input data file encoding is EBCDIC, otherwise it is ASCII * @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions * @param ebcdicCodePageClass An optional custom code page conversion class provided by a user @@ -56,6 +58,7 @@ case class CobolParameters( multiCopybookPath: Seq[String], copybookContent: Option[String], sourcePaths: Seq[String], + recordFormat: RecordFormat, isText: Boolean, isEbcdic: Boolean, ebcdicCodePage: String, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index cb1d92bdb..6e360a686 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -75,9 +75,12 @@ case class ReaderParameters( recordLength: Option[Int] = None , lengthFieldName: Option[String] = None, isRecordSequence: Boolean = false, + hasBdw: Boolean = false, isRdwBigEndian: Boolean = false, + isBdwBigEndian: Boolean = false, isRdwPartRecLength: Boolean = false, rdwAdjustment: Int = 0, + bdwAdjustment: Int = 0, isIndexGenerationNeeded: Boolean = false, inputSplitRecords: Option[Int] = None, inputSplitSizeMB: Option[Int] = None, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala index 9c78333ff..def950152 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala @@ -20,9 +20,12 @@ package za.co.absa.cobrix.cobol.reader.parameters * This class holds the parameters currently used for parsing variable-length records. * * @param isRecordSequence Does input files have 4 byte record length headers + * @param hasBdw Does the file have BDW headers * @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method + * @param isBdwBigEndian Is BDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method * @param isRdwPartRecLength Does RDW count itself as part of record length itself * @param rdwAdjustment Controls a mismatch between RDW and record length + * @param bdwAdjustment Controls a mismatch between BDW and record length * @param recordHeaderParser An optional custom record header parser for non-standard RDWs * @param recordExtractor An optional custom raw record parser class non-standard record types * @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser @@ -40,10 +43,13 @@ package za.co.absa.cobrix.cobol.reader.parameters * @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function */ case class VariableLengthParameters( - isRecordSequence: Boolean, + isRecordSequence: Boolean, // [deprecated by recordFormat] + hasBdw: Boolean, // [deprecated by recordFormat] isRdwBigEndian: Boolean, + isBdwBigEndian: Boolean, isRdwPartRecLength: Boolean, rdwAdjustment: Int, + bdwAdjustment: Int, recordHeaderParser: Option[String], recordExtractor: Option[String], rhpAdditionalInfo: Option[String], diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoder.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoder.scala new file mode 100644 index 000000000..4a516d327 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoder.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +/** + * This trait represents a contract for parsing and decoding record headers (RDW or BDW) from an array of bytes. + */ +trait RecordHeaderDecoder { + /** Header size (4 bytes most of the time. */ + def headerSize: Int + + /** Get record length from the header. Should throw a runtime exception if the header is invalid. */ + def getRecordLength(header: Array[Byte], offset: Long): Int +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala new file mode 100644 index 000000000..5bc8a7eac --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdw.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +/** + * This class represent a header decoder for standard RDW headers + * according to: https://www.ibm.com/docs/en/zos/2.3.0?topic=records-record-descriptor-word-rdw + */ +class RecordHeaderDecoderBdw(bdwParameters: RecordHeaderParameters) extends RecordHeaderDecoderCommon { + final val MAX_BDW_BLOCK_SIZE = 256*1024*1024 + final val BDW_HEADER_LENGTH = 4 + + override def headerSize: Int = BDW_HEADER_LENGTH + + def headerName = "BDW" + + override def getRecordLength(header: Array[Byte], offset: Long): Int = { + validateHeader(header, offset) + + val recordLength = if (bdwParameters.isBigEndian) { + if (isExtendedBigEndianBdw(header)) { + parseExtendedBigEndianBdw(header) + } else { + parseNormalBigEndianBdw(header, offset) + } + } else { + if (isExtendedLittleEndianBdw(header)) { + parseExtendedLittleEndianBdw(header) + } else { + parseNormalLittleEndianBdw(header, offset) + } + } + + validateBlockLength(header, offset, recordLength) + recordLength + } + + protected final def isExtendedBigEndianBdw(header: Array[Byte]): Boolean = { + (header(0) & 0x80) > 0 + } + + protected final def isExtendedLittleEndianBdw(header: Array[Byte]): Boolean = { + (header(3) & 0x80) > 0 + } + + protected final def parseNormalBigEndianBdw(header: Array[Byte], offset: Long): Int = { + if (header(2) != 0 || header(3) != 0) reportInvalidHeaderZeros(header, offset) + (header(1) & 0xFF) + 256 * (header(0) & 0x7F) + bdwParameters.adjustment + } + + protected final def parseExtendedBigEndianBdw(header: Array[Byte]): Int = { + (header(3) & 0xFF) + 256 * (header(2) & 0xFF) + 65536 * (header(1) & 0xFF) + 16777216 * (header(0) & 0x7F) + bdwParameters.adjustment + } + + protected final def parseNormalLittleEndianBdw(header: Array[Byte], offset: Long): Int = { + if (header(0) != 0 || header(1) != 0) reportInvalidHeaderZeros(header, offset) + (header(2) & 0xFF) + 256 * (header(3) & 0x7F) + bdwParameters.adjustment + } + + protected final def parseExtendedLittleEndianBdw(header: Array[Byte]): Int = { + (header(0) & 0xFF) + 256 * (header(1) & 0xFF) + 65536 * (header(2) & 0xFF) + 16777216 * (header(3) & 0x7F) + bdwParameters.adjustment + } + + protected def validateBlockLength(header: Array[Byte], offset: Long, blockLength: Int): Unit = { + if (blockLength < 0) { + reportInvalidValue(header, offset, blockLength) + } + if (blockLength < BDW_HEADER_LENGTH) { + reportMinimumLength(header, offset, blockLength, BDW_HEADER_LENGTH) + } + if (blockLength > MAX_BDW_BLOCK_SIZE) { + reportTooLargeBlockLength(header, offset, blockLength) + } + } + +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderCommon.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderCommon.scala new file mode 100644 index 000000000..477dd1c8f --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderCommon.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +abstract class RecordHeaderDecoderCommon extends RecordHeaderDecoder { + def headerName: String + + protected def validateHeader(header: Array[Byte], offset: Long): Unit = { + if (header.length != headerSize) reportInvalidHeaderLength(header, offset) + } + + protected def reportInvalidHeaderLength(header: Array[Byte], offset: Long): Unit = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"The length of $headerName headers is unexpected. Expected: $headerSize, got ${header.length}. Header: $rdwHeaders, offset: $offset.") + } + + protected def reportInvalidHeaderZeros(header: Array[Byte], offset: Long): Unit = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"$headerName headers contain non-zero values where zeros are expected (check 'rdw_big_endian' flag. Header: $rdwHeaders, offset: $offset.") + } + + protected def reportInvalidValue(header: Array[Byte], offset: Long, value: Int): Unit = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"$headerName headers contain an invalid value ($value). Header: $rdwHeaders, offset: $offset.") + } + + protected def reportZeroLength(header: Array[Byte], offset: Long): Nothing = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"$headerName headers should never be zero ($rdwHeaders). Found zero size record at $offset.") + } + + protected def reportMinimumLength(header: Array[Byte], offset: Long, length: Int, minLength: Int): Nothing = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"$headerName headers should have a length of at least $minLength bytes. Got $length bytes. Header: $rdwHeaders, offset: $offset.") + } + + protected def reportTooLargeBlockLength(header: Array[Byte], offset: Long, blockLength: Int): Nothing = { + val rdwHeaders = renderHeader(header) + throw new IllegalStateException(s"The length of $headerName block is too big. Got $blockLength. Header: $rdwHeaders, offset: $offset.") + } + + protected def renderHeader(header: Array[Byte]): String = { + header.map(_ & 0xFF).mkString(",") + } +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala new file mode 100644 index 000000000..faf7dc355 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdw.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +/** + * This class represent a header decoder for standard RDW headers + * according to: https://www.ibm.com/docs/en/zos/2.3.0?topic=records-record-descriptor-word-rdw + */ +class RecordHeaderDecoderRdw(rdwParameters: RecordHeaderParameters) extends RecordHeaderDecoderCommon { + final val RDW_HEADER_LENGTH = 4 + + override def headerSize: Int = RDW_HEADER_LENGTH + + def headerName = "RDW" + + override def getRecordLength(header: Array[Byte], offset: Long): Int = { + validateHeader(header, offset) + + val recordLength = if (rdwParameters.isBigEndian) { + if (header(2) != 0 || header(3) != 0) reportInvalidHeaderZeros(header, offset) + (header(1) & 0xFF) + 256 * (header(0) & 0xFF) + rdwParameters.adjustment + } else { + if (header(0) != 0 || header(1) != 0) reportInvalidHeaderZeros(header, offset) + (header(2) & 0xFF) + 256 * (header(3) & 0xFF) + rdwParameters.adjustment + } + + validateRecordLength(header, offset, recordLength) + recordLength + } + + protected def validateRecordLength(header: Array[Byte], offset: Long, recordLength: Int): Unit = { + if (recordLength < 0) { + reportInvalidValue(header, offset, recordLength) + } + if (recordLength == 0) { + reportZeroLength(header, offset) + } + } + +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala new file mode 100644 index 000000000..25dd0db05 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParameters.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +/** Parameters for parsing record headers of mainframe files (RDW and BDW) */ +case class RecordHeaderParameters( + isBigEndian: Boolean, + + /* Sometime the size includes only payload, and sometimes it includes headers themselves. + * This allows flexible adjustments. */ + adjustment: Int + ) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala index 8cc9dc783..086ec81a1 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala @@ -22,7 +22,7 @@ import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.ASCII import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParserFactory -import za.co.absa.cobrix.cobol.reader.MemoryStream.TestStringStream +import za.co.absa.cobrix.cobol.reader.memorystream.TestStringStream import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, TextRecordExtractor} import za.co.absa.cobrix.cobol.reader.index.IndexGenerator @@ -61,7 +61,7 @@ class SparseIndexSpecSpec extends WordSpec { val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, "")) + val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, "")) val indexes = IndexGenerator.sparseIndexGenerator(0, stream, isRdwBigEndian = false, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala new file mode 100644 index 000000000..5d99aa6f2 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala @@ -0,0 +1,26 @@ +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} +import za.co.absa.cobrix.cobol.reader.memorystream.TestStringStream +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoder, RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters} +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream + +object RawRecordContextFactory { + private val copybookContent = + """ 01 RECORD. + 02 X PIC X(1). + """ + private val copybook = CopybookParser.parseTree(copybookContent) + + def getDummyRawRecordContext( + startingRecordNumber: Long = 0L, + inputStream: SimpleStream = new TestStringStream("A1\nB2\n"), + copybook: Copybook = copybook, + rdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = false, 0)), + bdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderRdw(RecordHeaderParameters(isBigEndian = false, 0)), + additionalInfo: String = "" + ): RawRecordContext = { + RawRecordContext(startingRecordNumber, inputStream, copybook, rdwDecoder, bdwDecoder, additionalInfo) + } + +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractorFactorySpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractorFactorySpec.scala index 5b0fcd8ad..1ebff4be9 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractorFactorySpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordExtractorFactorySpec.scala @@ -18,7 +18,8 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import org.scalatest.WordSpec import za.co.absa.cobrix.cobol.parser.CopybookParser -import za.co.absa.cobrix.cobol.reader.MemoryStream.TestStringStream +import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordContextFactory.getDummyRawRecordContext +import za.co.absa.cobrix.cobol.reader.memorystream.TestStringStream class RawRecordExtractorFactorySpec extends WordSpec { "createRecordHeaderParser()" should { @@ -32,7 +33,7 @@ class RawRecordExtractorFactorySpec extends WordSpec { "be able to create a record extractor" in { val recordExtractor = RawRecordExtractorFactory.createRecordHeaderParser("za.co.absa.cobrix.cobol.reader.extractors.raw.TextRecordExtractor", - RawRecordContext(0L, new TestStringStream("AAA111\nBBB222\n"), copybook, "")) + getDummyRawRecordContext(inputStream = new TestStringStream("AAA111\nBBB222\n"), copybook = copybook)) assert(recordExtractor.isInstanceOf[TextRecordExtractor]) assert(recordExtractor.next().length == 6) @@ -42,7 +43,7 @@ class RawRecordExtractorFactorySpec extends WordSpec { "throw an exception if class not fount" in { val ex = intercept[ClassNotFoundException] { RawRecordExtractorFactory.createRecordHeaderParser("com.example.DoesNotExist", - RawRecordContext(0L, new TestStringStream("A"), copybook, "")) + getDummyRawRecordContext(inputStream = new TestStringStream("A"), copybook = copybook)) } assert(ex.getMessage.contains("com.example.DoesNotExist")) @@ -51,7 +52,7 @@ class RawRecordExtractorFactorySpec extends WordSpec { "throw an exception when the extractor type is wrong" in { val ex = intercept[IllegalArgumentException] { RawRecordExtractorFactory.createRecordHeaderParser("za.co.absa.cobrix.cobol.reader.extractors.raw.DummyTestClass", - RawRecordContext(0L, new TestStringStream("A"), copybook, "")) + getDummyRawRecordContext(inputStream = new TestStringStream("A"), copybook = copybook)) } assert(ex.getMessage.contains("does not extend RawRecordExtractor")) @@ -60,7 +61,7 @@ class RawRecordExtractorFactorySpec extends WordSpec { "throw an exception when constructor parameters are wrong" in { val ex = intercept[IllegalArgumentException] { RawRecordExtractorFactory.createRecordHeaderParser("za.co.absa.cobrix.cobol.reader.extractors.raw.DummyWrongRecordExtractor", - RawRecordContext(0L, new TestStringStream("A"), copybook, "")) + getDummyRawRecordContext(inputStream = new TestStringStream("A"), copybook = copybook)) } assert(ex.getMessage.contains("does not conform to the required signature")) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala new file mode 100644 index 000000000..e7b7285d4 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala @@ -0,0 +1,156 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import org.scalatest.WordSpec +import za.co.absa.cobrix.cobol.parser.CopybookParser +import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory} + +class VariableBlockVariableRecordExtractorSuite extends WordSpec { + private val copybookContent = + """ 01 RECORD. + 02 X PIC X(1). + """ + private val copybook = CopybookParser.parseTree(copybookContent) + + "little-endian block-included record-included case" should { + "be able to read a VBVR file that has no data" in { + val rc = getRawRecordContext(Array[Byte](), bdwBigEndian = true, rdwBigEndian = true, 0, 0) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + assert(!extractor.hasNext) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a VBVR file that has no records" in { + val rc = getRawRecordContext(Array[Byte](0, 4, 0, 0, 0, 1, 0, 0), bdwBigEndian = true, rdwBigEndian = true, 0, 0) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + assert(!extractor.hasNext) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a VBVR file that has one record per block" in { + val rc = getRawRecordContext(1, includeBdwInHeaderSize = true, includeRdwInHeaderSize = true, bdwBigEndian = true, rdwBigEndian = true) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 1) + assert(r0.head == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(!extractor.hasNext) + } + + "be able to read a VBVR file that has multiple records per block" in { + val rc = getRawRecordContext(3, includeBdwInHeaderSize = true, includeRdwInHeaderSize = true, bdwBigEndian = true, rdwBigEndian = true) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 1) + assert(r0.head == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(extractor.next().head == 0xF3.toByte) + assert(extractor.next().head == 0xF4.toByte) + assert(extractor.next().head == 0xF5.toByte) + assert(extractor.next().head == 0xF6.toByte) + assert(extractor.next().head == 0xF7.toByte) + assert(extractor.next().head == 0xF8.toByte) + assert(!extractor.hasNext) + } + } + + "failures" should { + "throw an exception if a block header is too small" in { + val rc = getRawRecordContext(Array[Byte](0, 22), bdwBigEndian = true, rdwBigEndian = true, 0, 0) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + val ex = intercept[IllegalStateException] { + extractor.hasNext + } + + assert(ex.getMessage.contains("The length of BDW headers is unexpected. Expected: 4, got 2")) + } + + "throw an exception if a record header is too small" in { + val rc = getRawRecordContext(Array[Byte](0, 4, 0, 0, 0, 1, 0), bdwBigEndian = true, rdwBigEndian = true, 0, 0) + + val extractor = new VariableBlockVariableRecordExtractor(rc) + + val ex = intercept[IllegalStateException] { + extractor.next() + } + + assert(ex.getMessage.contains("The length of RDW headers is unexpected. Expected: 4, got 3")) + } + } + + private def getRawRecordContext(recordsPerBlock: Int, + includeBdwInHeaderSize: Boolean, + includeRdwInHeaderSize: Boolean, + bdwBigEndian: Boolean, + rdwBigEndian: Boolean): RawRecordContext = { + val numOfBlocks = 3 + + val bdwHeaderAdjust = if (includeBdwInHeaderSize) 4 else 0 + val rdwHeaderAdjust = if (includeRdwInHeaderSize) 4 else 0 + + val bytes = Range(0, numOfBlocks) + .flatMap(i => { + Array[Byte](0, (recordsPerBlock * 5 + bdwHeaderAdjust).toByte, 0, 0) ++ Range(0, recordsPerBlock).flatMap(j => { + val num = (i * recordsPerBlock + j) % 10 + Array[Byte](0, (rdwHeaderAdjust + 1).toByte, 0, 0, (0xF0 + num).toByte) + }) + }).toArray[Byte] + + getRawRecordContext(bytes, bdwBigEndian, rdwBigEndian, -bdwHeaderAdjust, -rdwHeaderAdjust) + } + + private def getRawRecordContext(bytes: Array[Byte], + bdwBigEndian: Boolean, + rdwBigEndian: Boolean, + bdwAdjustment: Int, + rdwAdjustment: Int + ): RawRecordContext = { + val ibs = new TestByteStream(bytes) + + val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(bdwBigEndian, bdwAdjustment)) + val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(rdwBigEndian, rdwAdjustment)) + + RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "") + } + +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/MemoryStreamSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/MemoryStreamSpec.scala similarity index 98% rename from cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/MemoryStreamSpec.scala rename to cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/MemoryStreamSpec.scala index 6c3cbdb23..1922389f4 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/MemoryStreamSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/MemoryStreamSpec.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.reader.MemoryStream +package za.co.absa.cobrix.cobol.reader.memorystream import org.scalatest.FunSuite import za.co.absa.cobrix.cobol.reader.stream.{SimpleMemoryStream, SimpleStream} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala new file mode 100644 index 000000000..ef3ebca28 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.memorystream + +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream + +class TestByteStream(bytes: Array[Byte]) extends SimpleStream{ + + var position = 0 + val sz: Int = bytes.length + + override def inputFileName: String = "dummy" + + override def size: Long = sz + + override def offset: Long = position + + override def next(numberOfBytes: Int): Array[Byte] = { + if (position + numberOfBytes < sz) { + val s = bytes.slice(position, position+numberOfBytes) + position += numberOfBytes + s + } else { + if (position < sz ) { + val s = bytes.slice(position, sz) + position = sz + s + } else { + new Array[Byte](0) + } + } + } + + override def close(): Unit = position = sz +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/TestStringStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala similarity index 96% rename from cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/TestStringStream.scala rename to cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala index f06dd3cb3..71d3ed026 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/MemoryStream/TestStringStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.reader.MemoryStream +package za.co.absa.cobrix.cobol.reader.memorystream import za.co.absa.cobrix.cobol.reader.stream.SimpleStream diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdwSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdwSuite.scala new file mode 100644 index 000000000..348151de2 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderBdwSuite.scala @@ -0,0 +1,181 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +import org.scalatest.WordSpec + +class RecordHeaderDecoderBdwSuite extends WordSpec { + "headerSize" should { + "always return 4" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters()) + + assert(rhd.headerSize == 4) + } + } + + "getRecordLength for nonextended BDW" should { + "support big-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + assert(rhd.getRecordLength(Array[Byte](0, 4, 0, 0), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](0, 10, 0, 0), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](1, 0, 0, 0), 0) == 256) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 0), 0) == 2560) + } + + "support big-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 1)) + + assert(rhd.getRecordLength(Array[Byte](0, 4, 0, 0), 0) == 5) + assert(rhd.getRecordLength(Array[Byte](0, 10, 0, 0), 0) == 11) + assert(rhd.getRecordLength(Array[Byte](1, 0, 0, 0), 0) == 257) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 0), 0) == 2561) + } + + "support little-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, 0)) + + assert(rhd.getRecordLength(Array[Byte](0, 0, 4, 0), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](0, 0, 10, 0), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 1), 0) == 256) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 10), 0) == 2560) + } + + "support little-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, -1)) + + assert(rhd.getRecordLength(Array[Byte](0, 0, 5, 0), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](0, 0, 10, 0), 0) == 9) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 1), 0) == 255) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 10), 0) == 2559) + } + } + + "getRecordLength for extended BDW" should { + "support big-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + assert(rhd.getRecordLength(Array[Byte](128.toByte, 0, 0, 4), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](128.toByte, 0, 0, 10), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](129.toByte, 1, 2, 3), 0) == 16843267) + assert(rhd.getRecordLength(Array[Byte](138.toByte, 1, 2, 3), 0) == 167838211) + } + + "support big-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 1)) + + assert(rhd.getRecordLength(Array[Byte](128.toByte, 0, 0, 4), 0) == 5) + assert(rhd.getRecordLength(Array[Byte](128.toByte, 0, 0, 10), 0) == 11) + assert(rhd.getRecordLength(Array[Byte](129.toByte, 1, 2, 3), 0) == 16843268) + assert(rhd.getRecordLength(Array[Byte](138.toByte, 1, 2, 3), 0) == 167838212) + } + + "support little-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, 0)) + + assert(rhd.getRecordLength(Array[Byte](4, 0, 0, 128.toByte), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 128.toByte), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](3, 2, 1, 129.toByte), 0) == 16843267) + assert(rhd.getRecordLength(Array[Byte](3, 2, 1, 138.toByte), 0) == 167838211) + } + + "support little-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, -1)) + + assert(rhd.getRecordLength(Array[Byte](5, 0, 0, 128.toByte), 0) == 4) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 128.toByte), 0) == 9) + assert(rhd.getRecordLength(Array[Byte](3, 2, 1, 129.toByte), 0) == 16843266) + assert(rhd.getRecordLength(Array[Byte](3, 2, 1, 138.toByte), 0) == 167838210) + } + } + + "getRecordLength non-happy path" should { + "fail when header size is lesser than expected" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 2), 123) + } + + assert(ex.getMessage.contains("The length of BDW headers is unexpected. Expected: 4, got 3. Header: 0,0,2, offset: 123.")) + } + + "fail when header size is bigger than expected" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 2, 0, 0), 123) + } + + assert(ex.getMessage.contains("The length of BDW headers is unexpected. Expected: 4, got 5. Header: 0,0,2,0,0, offset: 123.")) + } + + "fail when big-endian header is used for little-endian headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, 0)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 234) + } + + assert(ex.getMessage.contains("BDW headers contain non-zero values where zeros are expected (check 'rdw_big_endian' flag")) + } + + "fail when little-endian header is used for big-endian headers" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 0, 1), 234) + } + + assert(ex.getMessage.contains("BDW headers contain non-zero values where zeros are expected (check 'rdw_big_endian' flag")) + } + + "fail when record size is incorrect" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, -10)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 234) + } + + assert(ex.getMessage.contains("BDW headers contain an invalid value (-9). Header: 0,1,0,0, offset: 234.")) + } + + "fail when record size is too small" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 3, 0, 0), 0) + } + + assert(ex.getMessage.contains("BDW headers should have a length of at least 4 bytes. Got 3 bytes. Header: 0,3,0,0, offset: 0.")) + } + + "fail when block size is too big" in { + val rhd = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](228.toByte, 0, 0, 1), 0) + } + + assert(ex.getMessage.contains("The length of BDW block is too big. Got 1677721601. Header: 228,0,0,1, offset: 0.")) + } + } + +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdwSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdwSuite.scala new file mode 100644 index 000000000..71138fca4 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderDecoderRdwSuite.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +import org.scalatest.WordSpec + +class RecordHeaderDecoderRdwSuite extends WordSpec { + "headerSize" should { + "always return 4" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters()) + + assert(rhd.headerSize == 4) + } + } + + "getRecordLength" should { + "support big-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + assert(rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 0) == 1) + assert(rhd.getRecordLength(Array[Byte](0, 10, 0, 0), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](1, 0, 0, 0), 0) == 256) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 0), 0) == 2560) + } + + "support big-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 1)) + + assert(rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 0) == 2) + assert(rhd.getRecordLength(Array[Byte](0, 10, 0, 0), 0) == 11) + assert(rhd.getRecordLength(Array[Byte](1, 0, 0, 0), 0) == 257) + assert(rhd.getRecordLength(Array[Byte](10, 0, 0, 0), 0) == 2561) + } + + "support little-endian non-adjusted headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, 0)) + + assert(rhd.getRecordLength(Array[Byte](0, 0, 1, 0), 0) == 1) + assert(rhd.getRecordLength(Array[Byte](0, 0, 10, 0), 0) == 10) + assert(rhd.getRecordLength(Array[Byte](0, 0 ,0, 1), 0) == 256) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 10), 0) == 2560) + } + + "support little-endian adjusted headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, -1)) + + assert(rhd.getRecordLength(Array[Byte](0, 0, 2, 0), 0) == 1) + assert(rhd.getRecordLength(Array[Byte](0, 0, 10, 0), 0) == 9) + assert(rhd.getRecordLength(Array[Byte](0, 0 ,0, 1), 0) == 255) + assert(rhd.getRecordLength(Array[Byte](0, 0, 0, 10), 0) == 2559) + } + + "fail when header size is lesser than expected" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 2), 123) + } + + assert(ex.getMessage.contains("The length of RDW headers is unexpected. Expected: 4, got 3. Header: 0,0,2, offset: 123.")) + } + + "fail when header size is bigger than expected" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 2, 0, 0), 123) + } + + assert(ex.getMessage.contains("The length of RDW headers is unexpected. Expected: 4, got 5. Header: 0,0,2,0,0, offset: 123.")) + } + + "fail when big-endian header is used for little-endian headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(false, 0)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 234) + } + + assert(ex.getMessage.contains("RDW headers contain non-zero values where zeros are expected (check 'rdw_big_endian' flag")) + } + + "fail when little-endian header is used for big-endian headers" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 0, 1), 234) + } + + assert(ex.getMessage.contains("RDW headers contain non-zero values where zeros are expected (check 'rdw_big_endian' flag")) + } + + "fail when record size is incorrect" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, -10)) + + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 1, 0, 0), 234) + } + + assert(ex.getMessage.contains("RDW headers contain an invalid value (-9). Header: 0,1,0,0, offset: 234.")) + } + + "fail when record size is zero" in { + val rhd = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(true, 0)) + + val ex = intercept[IllegalStateException] { + rhd.getRecordLength(Array[Byte](0, 0, 0, 0), 0) + } + + assert(ex.getMessage.contains("RDW headers should never be zero (0,0,0,0). Found zero size record at 0.")) + } + } + +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParametersFactory.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParametersFactory.scala new file mode 100644 index 000000000..1b3442390 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/recordheader/RecordHeaderParametersFactory.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.recordheader + +object RecordHeaderParametersFactory { + def getDummyRecordHeaderParameters(isBigEndian: Boolean = false, + adjustment: Int = 0): RecordHeaderParameters = { + RecordHeaderParameters(isBigEndian, adjustment) + } +} diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample2.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample2.scala index e8e21c108..060876484 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample2.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample2.scala @@ -59,7 +59,7 @@ object CobolSparkExample2 { .format("cobol") .option("copybook_contents", copybook) .option("schema_retention_policy", "collapse_root") // Collapses the root group returning it's field on the top level of the schema - .option("is_record_sequence", "true") // Specifies that the input file is a sequence of records having RDW headers + .option("record_format", "V") // Specifies that the input file is a sequence of records having RDW headers .load("../../examples/example_data/multisegment_data/COMP.DETAILS.SEP30.DATA.dat") import spark.implicits._ diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample3.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample3.scala index 241d4241a..d1de7de67 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample3.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/CobolSparkExample3.scala @@ -61,7 +61,7 @@ object CobolSparkExample3 { .option("copybook_contents", copybook) //.option("generate_record_id", true) // Generates File_Id and Record_Id fields for line order dependent data .option("schema_retention_policy", "collapse_root") // Collapses the root group returning it's field on the top level of the schema - .option("is_record_sequence", "true") // Specifies that the input file is a sequence of records having RDW headers + .option("record_format", "V") // Specifies that the input file is a sequence of records having RDW headers .option("segment_field", "SEGMENT_ID") // Specified that segment id field is 'SEGMENT_ID' .option("segment_id_level0", "C") // If SEGMENT_ID='C' then the segment contains company's info .option("segment_id_level1", "P") // If SEGMENT_ID='P' then the segment contains contact person's info diff --git a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolApp.scala b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolApp.scala index bf2f7cdba..d6101ba9d 100755 --- a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolApp.scala +++ b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolApp.scala @@ -105,7 +105,7 @@ object SparkCobolApp { .read .format("cobol") // Alternatively can use "za.co.absa.cobrix.spark.cobol.source" .option("copybook_contents", copybook) // A copybook can be provided inline - .option("is_record_sequence", "true") // This file is a sequence of records with 4 byte record headers + .option("record_format", "V") // This file is a sequence of records with 4 byte record headers //.option("copybook", "data/companies_copybook.cpy") // Or as a path name in Hadoop(HDFS/S3, etc). For local filesystem use file:// prefix //.option("generate_record_id", true) // Generates File_Id and Record_Id fields for line order dependent data .option("schema_retention_policy", "collapse_root") // Collapses the root group returning it's field on the top level of the schema diff --git a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolHierarchical.scala b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolHierarchical.scala index a29401f31..b9de4b7e3 100755 --- a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolHierarchical.scala +++ b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCobolHierarchical.scala @@ -83,7 +83,7 @@ object SparkCobolHierarchical { .format("cobol") // Alternatively can use "za.co.absa.cobrix.spark.cobol.source" .option("copybook_contents", copybook) // A copybook can be provided inline //.option("copybook", "../example_data/companies_copybook.cpy") // Or as a path name in HDFS. For local filesystem use file:// prefix - .option("is_record_sequence", "true") // This file is a sequence of records with 4 byte record headers + .option("record_format", "V") // This file is a sequence of records with 4 byte record headers //.option("generate_record_id", true) // Generates File_Id and Record_Id fields for line order dependent data .option("schema_retention_policy", "collapse_root") // Collapses the root group returning it's field on the top level of the schema .option("segment_field", "SEGMENT_ID") // The SEGMENT_ID field contains IDs of segments diff --git a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCodecApp.scala b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCodecApp.scala index 454546ce5..43a911885 100755 --- a/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCodecApp.scala +++ b/examples/spark-cobol-app/src/main/scala/com/example/spark/cobol/app/SparkCodecApp.scala @@ -41,7 +41,7 @@ object SparkCodecApp { .read .format("cobol") .option("copybook", "../example_data/copybook_codec.cob") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", true) .option("schema_retention_policy", "collapse_root") .option("record_header_parser", "com.example.spark.cobol.app.CustomRecordHeadersParser") // Custom record header parser class diff --git a/pom.xml b/pom.xml index 9d22161ff..2057d4835 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ za.co.absa.cobrix cobrix_2.12 - 2.3.1-SNAPSHOT + 2.4.0-SNAPSHOT pom diff --git a/spark-cobol/pom.xml b/spark-cobol/pom.xml index b4f7279bb..29de93e4a 100644 --- a/spark-cobol/pom.xml +++ b/spark-cobol/pom.xml @@ -22,7 +22,7 @@ za.co.absa.cobrix cobrix_2.12 - 2.3.1-SNAPSHOT + 2.4.0-SNAPSHOT ../pom.xml diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index bb0983bff..88d2e9feb 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -24,6 +24,8 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPoint import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, StringTrimmingPolicy} +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat._ import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, MultisegmentParameters, VariableLengthParameters} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy @@ -77,13 +79,16 @@ object CobolParametersParser { val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection" // Parameters for multisegment variable length files + val PARAM_RECORD_FORMAT = "record_format" val PARAM_RECORD_LENGTH = "record_length" val PARAM_IS_XCOM = "is_xcom" val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" val PARAM_IS_TEXT = "is_text" val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" + val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" + val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" val PARAM_SEGMENT_FIELD = "segment_field" val PARAM_SEGMENT_ID_ROOT = "segment_id_root" val PARAM_SEGMENT_FILTER = "segment_filter" @@ -197,9 +202,20 @@ object CobolParametersParser { val ebcdicCodePageClass = params.get(PARAM_EBCDIC_CODE_PAGE_CLASS) val asciiCharset = params.getOrElse(PARAM_ASCII_CHARSET, "") + val recordFormat = getRecordFormat(params) + val encoding = params.getOrElse(PARAM_ENCODING, "") val isEbcdic = { - if (encoding.isEmpty || encoding.compareToIgnoreCase("ebcdic") == 0) { + if (encoding.isEmpty) { + if (recordFormat == AsciiText) { + false + } else { + true + } + } else if (encoding.compareToIgnoreCase("ebcdic") == 0) { + if (recordFormat == AsciiText) { + logger.warn(s"$PARAM_RECORD_FORMAT = D and $PARAM_ENCODING = $encoding are used together. Most of the time the encoding should be ASCII for text files.") + } true } else { if (encoding.compareToIgnoreCase("ascii") == 0) { @@ -217,7 +233,8 @@ object CobolParametersParser { params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','), getParameter(PARAM_COPYBOOK_CONTENTS, params), paths, - params.getOrElse(PARAM_IS_TEXT, "false").toBoolean, + recordFormat, + recordFormat == AsciiText || params.getOrElse(PARAM_IS_TEXT, "false").toBoolean, isEbcdic, ebcdicCodePageName, ebcdicCodePageClass, @@ -227,7 +244,7 @@ object CobolParametersParser { params.getOrElse(PARAM_RECORD_START_OFFSET, "0").toInt, params.getOrElse(PARAM_RECORD_END_OFFSET, "0").toInt, params.get(PARAM_RECORD_LENGTH).map(_.toInt), - parseVariableLengthParameters(params), + parseVariableLengthParameters(params, recordFormat), schemaRetentionPolicy, stringTrimmingPolicy, parseMultisegmentParameters(params), @@ -240,13 +257,13 @@ object CobolParametersParser { getDebuggingFieldsPolicy(params), params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean ) - validateSparkCobolOptions(params) + validateSparkCobolOptions(params, recordFormat) cobolParameters } - private def parseVariableLengthParameters(params: Parameters): Option[VariableLengthParameters] = { + private def parseVariableLengthParameters(params: Parameters, recordFormat: RecordFormat): Option[VariableLengthParameters] = { val recordLengthFieldOpt = params.get(PARAM_RECORD_LENGTH_FIELD) - val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false")).toBoolean + val isRecordSequence = Seq(VariableLength, VariableBlock, AsciiText).contains(recordFormat) val isRecordIdGenerationEnabled = params.getOrElse(PARAM_GENERATE_RECORD_ID, "false").toBoolean val fileStartOffset = params.getOrElse(PARAM_FILE_START_OFFSET, "0").toInt val fileEndOffset = params.getOrElse(PARAM_FILE_END_OFFSET, "0").toInt @@ -269,9 +286,12 @@ object CobolParametersParser { Some(VariableLengthParameters ( isRecordSequence, + recordFormat == VariableBlock, params.getOrElse(PARAM_IS_RDW_BIG_ENDIAN, "false").toBoolean, + params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean, params.getOrElse(PARAM_IS_RDW_PART_REC_LENGTH, "false").toBoolean, params.getOrElse(PARAM_RDW_ADJUSTMENT, "0").toInt, + params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt, params.get(PARAM_RECORD_HEADER_PARSER), params.get(PARAM_RECORD_EXTRACTOR), params.get(PARAM_RHP_ADDITIONAL_INFO), @@ -294,6 +314,36 @@ object CobolParametersParser { } } + private def getRecordFormat(params: Parameters): RecordFormat = { + if (params.contains(PARAM_RECORD_FORMAT)) { + val recordFormatStr = params(PARAM_RECORD_FORMAT) + + RecordFormat.withNameOpt(recordFormatStr).getOrElse(throw new IllegalArgumentException(s"Unknown record format: $recordFormatStr")) + } else { + val hasRdw = params.getOrElse(PARAM_IS_XCOM, params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false")).toBoolean + + val q = "\"" + + if (params.contains(PARAM_IS_XCOM)) { + logger.warn(s"Option '$PARAM_IS_XCOM' is deprecated. Use .option($q$PARAM_RECORD_FORMAT$q, ${q}V$q)") + } + + if (params.contains(PARAM_IS_RECORD_SEQUENCE)) { + logger.warn(s"Option '$PARAM_IS_RECORD_SEQUENCE' is deprecated. Use .option($q$PARAM_RECORD_FORMAT$q, ${q}V$q)") + } + + if (hasRdw) { + VariableLength + } else { + if (params.getOrElse(PARAM_IS_TEXT, "false").toBoolean) { + AsciiText + } else { + FixedLength + } + } + } + } + /** * Parses parameters for reading multisegment mainframe files * @@ -475,13 +525,29 @@ object CobolParametersParser { * * @param params Parameters provided by spark.read.option(...) */ - private def validateSparkCobolOptions(params: Parameters): Unit = { + private def validateSparkCobolOptions(params: Parameters, recordFormat: RecordFormat): Unit = { val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, "false").toBoolean || params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean || params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean || params.contains(PARAM_FILE_START_OFFSET) || params.contains(PARAM_FILE_END_OFFSET) || - params.contains(PARAM_RECORD_LENGTH_FIELD) + params.contains(PARAM_RECORD_LENGTH_FIELD) || + recordFormat == VariableLength || + recordFormat == VariableBlock + + if (params.contains(PARAM_RECORD_FORMAT)) { + if (params.contains(PARAM_IS_XCOM)) { + throw new IllegalArgumentException(s"Option '$PARAM_RECORD_FORMAT' and $PARAM_IS_XCOM cannot be used together. The use of $PARAM_RECORD_FORMAT is preferable.") + } + + if (params.contains(PARAM_IS_RECORD_SEQUENCE)) { + throw new IllegalArgumentException(s"Option '$PARAM_RECORD_FORMAT' and $PARAM_IS_RECORD_SEQUENCE cannot be used together. The use of $PARAM_RECORD_FORMAT is preferable.") + } + + if (params.contains(PARAM_IS_TEXT)) { + throw new IllegalArgumentException(s"Option '$PARAM_RECORD_FORMAT' and $PARAM_IS_TEXT cannot be used together. The use of $PARAM_RECORD_FORMAT is preferable.") + } + } val hasRecordExtractor = params.contains(PARAM_RECORD_EXTRACTOR) @@ -579,7 +645,7 @@ object CobolParametersParser { } } if (!isRecordSequence && params.contains(PARAM_INPUT_FILE_COLUMN)) { - val recordSequenceCondition = s"one of this holds: '$PARAM_IS_RECORD_SEQUENCE' = true or '$PARAM_VARIABLE_SIZE_OCCURS' = true" + + val recordSequenceCondition = s"one of this holds: '$PARAM_RECORD_FORMAT' = V or '$PARAM_RECORD_FORMAT' = VB or '$PARAM_IS_RECORD_SEQUENCE' = true or '$PARAM_VARIABLE_SIZE_OCCURS' = true" + s" or one of these options is set: '$PARAM_RECORD_LENGTH_FIELD', '$PARAM_FILE_START_OFFSET', '$PARAM_FILE_END_OFFSET' or " + "a custom record extractor is specified" throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when $recordSequenceCondition") diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index 567744e62..64fbc5e46 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -142,9 +142,12 @@ class DefaultSource val varLenParams: VariableLengthParameters = parameters.variableLengthParams .getOrElse( VariableLengthParameters(isRecordSequence = false, + hasBdw = false, isRdwBigEndian = false, + isBdwBigEndian = false, isRdwPartRecLength = false, rdwAdjustment = 0, + bdwAdjustment = 0, recordHeaderParser = None, recordExtractor = None, rhpAdditionalInfo = None, @@ -179,9 +182,12 @@ class DefaultSource recordLength = parameters.recordLength, lengthFieldName = recordLengthField, isRecordSequence = varLenParams.isRecordSequence, + hasBdw = varLenParams.hasBdw, isRdwBigEndian = varLenParams.isRdwBigEndian, + isBdwBigEndian = varLenParams.isBdwBigEndian, isRdwPartRecLength = varLenParams.isRdwPartRecLength, rdwAdjustment = varLenParams.rdwAdjustment, + bdwAdjustment = varLenParams.bdwAdjustment, isIndexGenerationNeeded = varLenParams.isUsingIndex, inputSplitRecords = varLenParams.inputSplitRecords, inputSplitSizeMB = varLenParams.inputSplitSizeMB, diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CommentsTruncationSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CommentsTruncationSpec.scala index 52ae4d584..0d30a912d 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CommentsTruncationSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CommentsTruncationSpec.scala @@ -125,7 +125,7 @@ class CommentsTruncationSpec extends FunSuite with SparkTestBase with BinaryFile .read .format("cobol") .option("copybook_contents", copybookWithComments) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .load(tmpFileName) @@ -141,7 +141,7 @@ class CommentsTruncationSpec extends FunSuite with SparkTestBase with BinaryFile .read .format("cobol") .option("copybook_contents", copybookWithTruncatedComments) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("comments_lbound", 3) .option("comments_ubound", 50) .option("schema_retention_policy", "collapse_root") @@ -159,7 +159,7 @@ class CommentsTruncationSpec extends FunSuite with SparkTestBase with BinaryFile .read .format("cobol") .option("copybook_contents", copybookWithNoCommentTruncation) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("truncate_comments", "false") .option("schema_retention_policy", "collapse_root") .load(tmpFileName) @@ -177,7 +177,7 @@ class CommentsTruncationSpec extends FunSuite with SparkTestBase with BinaryFile .read .format("cobol") .option("copybook_contents", copybookWithTruncatedComments) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("comments_lbound", 3) .option("truncate_comments", "false") .option("schema_retention_policy", "collapse_root") @@ -194,7 +194,7 @@ class CommentsTruncationSpec extends FunSuite with SparkTestBase with BinaryFile .read .format("cobol") .option("copybook_contents", copybookWithTruncatedComments) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("comments_ubound", 50) .option("truncate_comments", "false") .option("schema_retention_policy", "collapse_root") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/VarSizeArraysSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/VarSizeArraysSpec.scala index d9b54e9fd..8d6b3ad83 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/VarSizeArraysSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/VarSizeArraysSpec.scala @@ -113,7 +113,7 @@ class VarSizeArraysSpec extends FunSuite with SparkTestBase with BinaryFileFixtu .read .format("cobol") .option("copybook_contents", copybook) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("variable_size_occurs", "false") .load(tmpFileName) @@ -130,7 +130,7 @@ class VarSizeArraysSpec extends FunSuite with SparkTestBase with BinaryFileFixtu .read .format("cobol") .option("copybook_contents", copybook) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("variable_size_occurs", "true") .load(tmpFileName) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test11CustomRDWParser.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test11CustomRDWParser.scala index 93d6ef541..68887c933 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test11CustomRDWParser.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test11CustomRDWParser.scala @@ -60,7 +60,7 @@ class Test11CustomRDWParser extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("record_header_parser", "za.co.absa.cobrix.spark.cobol.source.utils.Test10CustomRDWParser") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13bVarLenFileHeadersSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13bVarLenFileHeadersSpec.scala index ecb6d00d5..89e1891b4 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13bVarLenFileHeadersSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13bVarLenFileHeadersSpec.scala @@ -61,7 +61,7 @@ class Test13bVarLenFileHeadersSpec extends FunSuite with SparkTestBase { .format("cobol") .option("copybook_contents", copybookContents) .option("schema_retention_policy", "collapse_root") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("is_rdw_big_endian", "true") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test14RdwAdjustmentsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test14RdwAdjustmentsSpec.scala index 8b56e1098..b4473aaaa 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test14RdwAdjustmentsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test14RdwAdjustmentsSpec.scala @@ -41,7 +41,7 @@ class Test14RdwAdjustmentsSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") @@ -84,7 +84,7 @@ class Test14RdwAdjustmentsSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala index fcaa7deac..c33edfa77 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala @@ -51,7 +51,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") @@ -87,7 +87,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") @@ -127,7 +127,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") @@ -162,7 +162,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") @@ -210,7 +210,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") .option("redefine-segment-id-map:2", "CONTACTS => P") @@ -240,7 +240,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") .option("redefine-segment-id-map:2", "CONTACTS => P") @@ -272,7 +272,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "keep_original") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") @@ -301,7 +301,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") .option("redefine-segment-id-map:2", "CONTACTS => P") @@ -328,7 +328,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") .option("redefine-segment-id-map:2", "CONTACTS => P") @@ -347,7 +347,7 @@ class Test17HierarchicalSpec extends WordSpec with SparkTestBase with CobolTestB .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "STATIC-DETAILS => C") .option("redefine-segment-id-map:2", "CONTACTS => P") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test18PathSpecialCharSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test18PathSpecialCharSpec.scala index 17e4a757d..6c49454a6 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test18PathSpecialCharSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test18PathSpecialCharSpec.scala @@ -49,7 +49,7 @@ class Test18PathSpecialCharSpec extends WordSpec with SparkTestBase with CobolTe .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("generate_record_id", "true") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala index 6258adce5..1da7f8688 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test20InputFileNameSpec.scala @@ -98,7 +98,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("encoding", "ascii") .option("with_input_file_name_col", "F") .load(inputDataPath + "/COMP.DETAILS.SEP30.DATA.dat") @@ -115,7 +115,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("encoding", "ascii") .option("with_input_file_name_col", "F") @@ -133,7 +133,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("encoding", "ascii") .option("with_input_file_name_col", "F") .option("generate_record_id", "true") @@ -151,7 +151,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("encoding", "ascii") .option("with_input_file_name_col", "F") @@ -176,7 +176,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase { .format("cobol") .option("copybook", inputCopybookPath) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "COMPANY => 1") .option("redefine-segment-id-map:2", "DEPT => 2") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala index 0f2b74bda..26744c16e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala @@ -44,7 +44,7 @@ class Test21VariableOccurs extends FunSuite with SparkTestBase { val inputStream = new FSStream(s"$inputDataPath/data.dat") val copybookContents = Files.readAllLines(Paths.get("../data/test21_copybook.cob"), StandardCharsets.ISO_8859_1).toArray.mkString("\n") val copybook = CopybookParser.parse(copybookContents, ASCII) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, null, null, "")) val expectedRecords = ListBuffer(Array(48.toByte), Array(49.toByte, 48.toByte), diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test22HierarchicalOccursSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test22HierarchicalOccursSpec.scala index 58eb6e6b5..4a5c2c165 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test22HierarchicalOccursSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test22HierarchicalOccursSpec.scala @@ -92,7 +92,7 @@ class Test22HierarchicalOccursSpec extends FunSuite with SparkTestBase with Bina .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("generate_record_id", "true") .option("variable_size_occurs", "true") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala index c4c880102..354a6e664 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala @@ -67,7 +67,7 @@ class Test25OccursMappings extends FunSuite with SparkTestBase { ) val copybook = CopybookParser.parse(copybookContents, ASCII, occursHandlers = occursMapping) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, null, null, "")) val expectedRecords = ListBuffer( "1AX".getBytes, diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala index 0b4a6571d..aa989f88b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala @@ -24,7 +24,7 @@ import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture //noinspection NameBooleanParameters class Test27RecordLengthSpec extends WordSpec with SparkTestBase with BinaryFileFixture { - private val exampleName = "Test26 (custom record extractor)" + private val exampleName = "Test27 (record length option)" private val copybook = """ 01 R. diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala index 112e6b031..149b7e3a9 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala @@ -25,7 +25,7 @@ import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture //noinspection NameBooleanParameters class Test28MultipartLoadSpec extends WordSpec with SparkTestBase with BinaryFileFixture { - private val exampleName = "Test26 (custom record extractor)" + private val exampleName = "Test28 (multipart load)" private val copybook = """ 01 R. diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala new file mode 100644 index 000000000..554c772b3 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala @@ -0,0 +1,145 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.SparkException +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col +import org.scalatest.WordSpec +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +//noinspection NameBooleanParameters +class Test29BdwFileSpec extends WordSpec with SparkTestBase with BinaryFileFixture { + + private val exampleName = "Test29 (VB record format RDW+BDW" + + private val copybook = + """ 01 R. + 03 A PIC X(1). + """ + + val expected2Records = """[{"A":"0"},{"A":"1"}]""" + val expected4Records = """[{"A":"0"},{"A":"1"},{"A":"2"},{"A":"3"}]""" + + "VB record (little-endian BDW, little-endian RDW)" should { + "load data without adjustments" in { + testVbRecordLoad(false, false, 0, 0, 2, 1, expected2Records) + } + "load data with adjustments" in { + testVbRecordLoad(false, false, -1, 1, 2, 1, expected2Records) + } + } + + "VB record (big-endian BDW, little-endian RDW)" should { + "load data without adjustments" in { + testVbRecordLoad(true, false, 0, 0, 1, 2, expected2Records) + } + "load data with adjustments" in { + testVbRecordLoad(true, false, -1, 1, 1, 2, expected2Records) + } + } + + "VB record (little-endian BDW, big-endian RDW)" should { + "load data without adjustments" in { + testVbRecordLoad(false, true, 0, 0, 1, 2, expected2Records) + } + "load data with adjustments" in { + testVbRecordLoad(false, true, -1, 1, 1, 2, expected2Records) + } + } + + "VB record (big-endian BDW, big-endian RDW)" should { + "load data without adjustments" in { + testVbRecordLoad(true, true, 0, 0, 2, 2, expected4Records) + } + "load data with adjustments" in { + testVbRecordLoad(true, true, -1, 1, 2, 2, expected4Records) + } + } + + "in case of failures" should { + "thrown an exception if there is only BDW, but nor RDW" in { + val record: Seq[Byte] = getHeader(5, false, 0) ++ Seq(0xF0.toByte) + + withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "VB") + .load(tmpFileName1) + + intercept[SparkException] { + df.orderBy(col("A")) + .toJSON + .collect() + } + } + } + } + + private def testVbRecordLoad(bdwBigEndian: Boolean, + rdwBigEndian: Boolean, + bdwAdjustment: Int, + rdwAdjustment: Int, + blocks: Int, + records: Int, + expected: String): Unit = { + val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => { + getHeader(records * 5, bdwBigEndian, bdwAdjustment) ++ + Range(0, records).flatMap(recordNum => { + val idx = (blockNum * records + recordNum) % 10 + getHeader(1, rdwBigEndian, rdwAdjustment) ++ Seq((0xF0 + idx).toByte) + }) + }) + + withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "VB") + .option("is_bdw_big_endian", bdwBigEndian) + .option("is_rdw_big_endian", rdwBigEndian) + .option("bdw_adjustment", -bdwAdjustment) + .option("rdw_adjustment", -rdwAdjustment) + .load(tmpFileName1) + + val actual = df + .orderBy(col("A")) + .toJSON + .collect() + .mkString("[", ",", "]") + + assert(df.count() == blocks * records) + assert(actual == expected) + } + } + + private def getHeader(payloadSize: Int, bigEndian: Boolean, adjustment: Int): Seq[Byte] = { + val byte0 = ((payloadSize + adjustment) % 256).toByte + val byte1 = ((payloadSize + adjustment) / 256).toByte + if (bigEndian) { + Seq(byte1, byte0, 0, 0) + } else { + Seq(0, 0, byte0, byte1) + } + } + + +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test4MultisegmentSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test4MultisegmentSpec.scala index 8216eaf1b..056c797f4 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test4MultisegmentSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test4MultisegmentSpec.scala @@ -44,7 +44,7 @@ class Test4MultisegmentSpec extends FunSuite with SparkTestBase { .format("cobol") .option("copybook", inputCopybookPath) .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") @@ -92,7 +92,7 @@ class Test4MultisegmentSpec extends FunSuite with SparkTestBase { .option("copybook", inputCopybookPath) .option("encoding", "ascii") .option("ascii_charset", "ISO-8859-1") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala index 5d925eba1..c2fd517fa 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test5MultisegmentSpec.scala @@ -52,7 +52,7 @@ class Test5MultisegmentSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") .option("segment_id_level1", "P") @@ -106,7 +106,7 @@ class Test5MultisegmentSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("input_split_records", "100") .option("segment_field", "SEGMENT_ID") .option("segment_id_root", "C") @@ -158,7 +158,7 @@ class Test5MultisegmentSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("input_split_records", "100") .option("segment_field", "SEGMENT_ID") .option("segment_id_root", "C") @@ -228,7 +228,7 @@ class Test5MultisegmentSpec extends FunSuite with SparkTestBase { .read .format("cobol") .option("copybook", inputCopybookPath) - .option("is_record_sequence", "true") + .option("record_format", "V") .option("is_rdw_big_endian", "true") .option("segment_field", "SEGMENT_ID") .option("segment_id_level0", "C") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test01RecordIdSequence.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test01RecordIdSequence.scala index 7c11615b3..32a2612b3 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test01RecordIdSequence.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test01RecordIdSequence.scala @@ -118,7 +118,7 @@ class Test01RecordIdSequence extends FunSuite with BeforeAndAfter with SparkTest .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "keep_original") .load(fileName) logger.debug("Rows Count (No Segment Filter): " + dfRecordAll.count()) @@ -134,7 +134,7 @@ class Test01RecordIdSequence extends FunSuite with BeforeAndAfter with SparkTest .option("generate_record_id", true) .option("schema_retention_policy", "keep_original") .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1") .load(fileName) @@ -152,7 +152,7 @@ class Test01RecordIdSequence extends FunSuite with BeforeAndAfter with SparkTest .option("generate_record_id", true) .option("schema_retention_policy", "keep_original") .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1") .option("segment_id_root", "1") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test02SparseIndexGenerator.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test02SparseIndexGenerator.scala index 9b6f00611..502384dce 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test02SparseIndexGenerator.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test02SparseIndexGenerator.scala @@ -75,7 +75,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .load(dataWithHeaderPath) logger.debug("Data With Header, No segment options # partitions: " + df.rdd.partitions.size) assert(df.rdd.partitions.size == 2) @@ -89,7 +89,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1") .load(dataWithHeaderPath) @@ -105,7 +105,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1,0") .option("segment_id_root", "1") @@ -122,7 +122,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .load(dataWithoutHeaderPath) logger.debug("Data Without Header, No segment options # partitions: " + df.rdd.partitions.size) assert(df.rdd.partitions.size == 2) @@ -136,7 +136,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1") .load(dataWithoutHeaderPath) @@ -152,7 +152,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1,0") .option("segment_id_root", "1") @@ -169,7 +169,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .load(dataWithHeaderOnly) logger.debug("Data With Header only, No segment options # partitions: " + df.rdd.partitions.size) assert(df.rdd.partitions.size == 1) @@ -183,7 +183,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1") .load(dataWithHeaderOnly) @@ -199,7 +199,7 @@ class Test02SparseIndexGenerator extends FunSuite with BeforeAndAfterAll with Sp .option("copybook_contents", copybook) .option("generate_record_id", true) .option("input_split_records", 5) - .option("is_xcom", true) + .option("record_format", "V") .option("segment_field", "I") .option("segment_filter", "1,0") .option("segment_id_root", "1") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test03IbmFloats.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test03IbmFloats.scala index a463bf6de..5188cce21 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test03IbmFloats.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test03IbmFloats.scala @@ -104,7 +104,7 @@ class Test03IbmFloats extends FunSuite with BeforeAndAfterAll with SparkTestBase .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("floating_point_format", "IBM") .load(ibmBigEndianPath) @@ -122,7 +122,7 @@ class Test03IbmFloats extends FunSuite with BeforeAndAfterAll with SparkTestBase .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("floating_point_format", "IBM_little_endian") .load(ibmLittleEndianPath) @@ -140,7 +140,7 @@ class Test03IbmFloats extends FunSuite with BeforeAndAfterAll with SparkTestBase .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("floating_point_format", "IEEE754") .load(ieee754BigEndianPath) @@ -158,7 +158,7 @@ class Test03IbmFloats extends FunSuite with BeforeAndAfterAll with SparkTestBase .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("floating_point_format", "IEEE754_little_endian") .load(ieee754LittleEndianPath) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test04VarcharFields.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test04VarcharFields.scala index 4080fc986..7e9d06cad 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test04VarcharFields.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test04VarcharFields.scala @@ -79,7 +79,7 @@ class Test04VarcharFields extends FunSuite with SparkTestBase with BinaryFileFix .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .load(tmpFileName) @@ -111,7 +111,7 @@ class Test04VarcharFields extends FunSuite with SparkTestBase with BinaryFileFix .format("cobol") .option("copybook_contents", copybook) .option("generate_record_id", true) - .option("is_xcom", true) + .option("record_format", "V") .option("string_trimming_policy", "none") .option("schema_retention_policy", "collapse_root") .load(tmpFileName) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test06EmptySegmentIds.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test06EmptySegmentIds.scala index 934552abd..d6f49ad8c 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test06EmptySegmentIds.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test06EmptySegmentIds.scala @@ -63,7 +63,7 @@ class Test06EmptySegmentIds extends FunSuite with SparkTestBase with BinaryFileF .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "SEG1 => A") @@ -86,7 +86,7 @@ class Test06EmptySegmentIds extends FunSuite with SparkTestBase with BinaryFileF .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("segment_field", "SEGMENT_ID") .option("redefine_segment_id_map:1", "SEG1 => A") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test10DeepSegmentRedefines.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test10DeepSegmentRedefines.scala index f7abce222..e8c5ac709 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test10DeepSegmentRedefines.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test10DeepSegmentRedefines.scala @@ -58,7 +58,7 @@ class Test10DeepSegmentRedefines extends FunSuite with SparkTestBase with Binary .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") + .option("record_format", "V") .option("schema_retention_policy", "collapse_root") .option("segment_field", "ID") .option("redefine_segment_id_map:1", "SEG1 => A") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala index ef644d46d..2ab7de977 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test13AsciiCrLfText.scala @@ -79,9 +79,7 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") - .option("is_text", "true") - .option("encoding", "ascii") + .option("record_format", "D") .load(tmpFileName) val expected = """[{"A":"fd"},{"A":"hd"},{"A":"sd"}]""" @@ -120,9 +118,7 @@ class Test13AsciiCrLfText extends WordSpec with SparkTestBase with BinaryFileFix .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_record_sequence", "true") - .option("is_text", "true") - .option("encoding", "ascii") + .option("record_format", "D") .load(tmpFileName) val count = df.count() diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test03AsciiMultisegment.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test03AsciiMultisegment.scala index 023fa2df0..7d71e2354 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test03AsciiMultisegment.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test03AsciiMultisegment.scala @@ -132,9 +132,7 @@ class Test03AsciiMultisegment extends WordSpec with SparkTestBase with BinaryFil .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_text", "true") - .option("encoding", "ascii") - .option("is_record_sequence", "true") + .option("record_format", "D") .option("schema_retention_policy", "collapse_root") .option("segment_field", "T") .option("redefine-segment-id-map:00", "R1 => 1") @@ -179,10 +177,7 @@ class Test03AsciiMultisegment extends WordSpec with SparkTestBase with BinaryFil .format("cobol") .option("copybook_contents", copybook) .option("pedantic", "true") - .option("is_text", "true") - .option("encoding", "ascii") - .option("is_record_sequence", "true") - .option("schema_retention_policy", "collapse_root") + .option("record_format", "D") .option("segment_field", "T") .option("redefine-segment-id-map:00", "R1 => 1") .option("redefine-segment-id-map:01", "R2 => 2") diff --git a/version.sbt b/version.sbt index 5783339cc..341046bbb 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "2.3.1-SNAPSHOT" +version in ThisBuild := "2.4.0-SNAPSHOT"