diff --git a/README.md b/README.md index ab10050d6..7bcabe406 100644 --- a/README.md +++ b/README.md @@ -478,6 +478,13 @@ or .option("record_length_field", "FIELD1 * 10 + 200") ``` +If the record field contains a string that can be mapped to a record size, you can add the mapping as a JSON: +``` +.option("record_format", "F") +.option("record_length_field", "FIELD_STR") +.option("record_length_map", """{"SEG1":100,"SEG2":200}""") +``` + ### Use cases for various variable length formats In order to understand the file format it is often sufficient to look at the first 4 bytes of the file (un case of RDW only files), @@ -1547,6 +1554,7 @@ The output looks like this: | .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. | | .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. | | .option("record_length_field", "RECORD-LEN") | Specifies a record length field or expression to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. | +| .option("record_length_map", """{"A":100,"B":50}""") | Specifies a mapping between record length field values and actual record lengths. | | .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. | | .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. | | .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. | diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 0656fc901..aab089d95 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -99,7 +99,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], override def getRecordSize: Int = cobolSchema.copybook.getRecordSize - override def isIndexGenerationNeeded: Boolean = (readerProperties.lengthFieldExpression.isEmpty || readerProperties.isRecordSequence) && readerProperties.isIndexGenerationNeeded + override def isIndexGenerationNeeded: Boolean = readerProperties.isIndexGenerationNeeded override def isRdwBigEndian: Boolean = readerProperties.isRdwBigEndian diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/RecordLengthField.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/RecordLengthField.scala new file mode 100644 index 000000000..378c2c64a --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/RecordLengthField.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.iterator + +import za.co.absa.cobrix.cobol.parser.ast.Primitive +import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator + +case class RecordLengthField( + field: Primitive, + valueMap: Map[String, Int] + ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala index 28a8936a8..c0560254c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala @@ -18,6 +18,7 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.internal.Logging import za.co.absa.cobrix.cobol.parser.Copybook +import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor @@ -50,13 +51,16 @@ class VRLRecordReader(cobolSchema: Copybook, private var byteIndex = startingFileOffset private var recordIndex = startRecordId - 1 - private val copyBookRecordSize = cobolSchema.getRecordSize - private val (lengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, cobolSchema) - private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema) - private val recordLengthAdjustment = readerProperties.rdwAdjustment - private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty - private val minimumRecordLength = readerProperties.minimumRecordLength - private val maximumRecordLength = readerProperties.maximumRecordLength + final private val copyBookRecordSize = cobolSchema.getRecordSize + final private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, cobolSchema) + final private val lengthField = recordLengthField.map(_.field) + final private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty) + final private val isLengthMapEmpty = lengthMap.isEmpty + final private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema) + final private val recordLengthAdjustment = readerProperties.rdwAdjustment + final private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty + final private val minimumRecordLength = readerProperties.minimumRecordLength + final private val maximumRecordLength = readerProperties.maximumRecordLength fetchNext() @@ -129,14 +133,8 @@ class VRLRecordReader(cobolSchema: Copybook, } val recordLength = lengthField match { - case Some(lengthAST) => - cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { - case i: Int => i + recordLengthAdjustment - case l: Long => l.toInt + recordLengthAdjustment - case s: String => s.toInt + recordLengthAdjustment - case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") - } - case None => copyBookRecordSize + case Some(lengthAST) => getRecordLengthFromField(lengthAST, binaryDataStart) + case None => copyBookRecordSize } val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset @@ -150,6 +148,38 @@ class VRLRecordReader(cobolSchema: Copybook, } } + final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = { + val length = if (isLengthMapEmpty) { + cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + case i: Int => i + case l: Long => l.toInt + case s: String => s.toInt + case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).") + case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") + } + } else { + cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match { + case i: Int => getRecordLengthFromMapping(i.toString) + case l: Long => getRecordLengthFromMapping(l.toString) + case s: String => getRecordLengthFromMapping(s) + case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).") + case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") + } + } + length + recordLengthAdjustment + } + + final private def getRecordLengthFromMapping(v: String): Int = { + lengthMap.get(v) match { + case Some(len) => len + case None => throw new IllegalStateException(s"Record length value '$v' is not mapped to a record length.") + } + } + + final private def getBytesAsHexString(bytes: Array[Byte]): String = { + bytes.map("%02X" format _).mkString + } + private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = { val lengthFieldBlock = expr.requiredBytesToread val evaluator = expr.evaluator diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index be6deeb50..b002d1edc 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -42,6 +42,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param minimumRecordLength Minium record length for which the record is considered valid. * @param maximumRecordLength Maximum record length for which the record is considered valid. * @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used. + * @param lengthFieldMap Mapping between record length field values to actual length. The field name should be specified in lengthFieldExpression. * @param isRecordSequence Does input files have 4 byte record length headers * @param bdw Block descriptor word (if specified), for FB and VB record formats * @param isRdwPartRecLength Does RDW count itself as part of record length itself @@ -88,6 +89,7 @@ case class ReaderParameters( minimumRecordLength: Int = 1, maximumRecordLength: Int = Int.MaxValue, lengthFieldExpression: Option[String] = None, + lengthFieldMap: Map[String, Int] = Map.empty, isRecordSequence: Boolean = false, bdw: Option[Bdw] = None, isRdwBigEndian: Boolean = false, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala index 1f0ec9d28..d2d289ae5 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala @@ -29,6 +29,7 @@ package za.co.absa.cobrix.cobol.reader.parameters * @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser * @param reAdditionalInfo An optional additional option string passed to a custom record extractor * @param recordLengthField A field that stores record length + * @param recordLengthMap A mapping between field value and record size. * @param fileStartOffset A number of bytes to skip at the beginning of each file * @param fileEndOffset A number of bytes to skip at the end of each file * @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data @@ -50,6 +51,7 @@ case class VariableLengthParameters( rhpAdditionalInfo: Option[String], reAdditionalInfo: String, recordLengthField: String, + recordLengthMap: Map[String, Int], fileStartOffset: Int, fileEndOffset: Int, generateRecordId: Boolean, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/validator/ReaderParametersValidator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/validator/ReaderParametersValidator.scala index b77041879..8bdc8c258 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/validator/ReaderParametersValidator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/validator/ReaderParametersValidator.scala @@ -19,22 +19,22 @@ package za.co.absa.cobrix.cobol.reader.validator import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator -import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression +import za.co.absa.cobrix.cobol.reader.iterator.{RecordLengthExpression, RecordLengthField} import za.co.absa.cobrix.cobol.reader.parameters.MultisegmentParameters import scala.util.Try object ReaderParametersValidator { - def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], cobolSchema: Copybook): (Option[Primitive], Option[RecordLengthExpression]) = { + def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], recordLengthMap: Map[String, Int], cobolSchema: Copybook): (Option[RecordLengthField], Option[RecordLengthExpression]) = { fieldOrExpressionOpt match { case Some(fieldOrExpression) => val canBeExpression = fieldOrExpression.exists(c => "+-*/".contains(c)) if (canBeExpression && Try(cobolSchema.getFieldByName(fieldOrExpression)).isSuccess) { - (getLengthField(fieldOrExpression, cobolSchema), None) + (getLengthField(fieldOrExpression, recordLengthMap, cobolSchema), None) } else { - (None, getLengthFieldExpr(fieldOrExpression, cobolSchema)) + (None, getLengthFieldExpr(fieldOrExpression, recordLengthMap, cobolSchema)) } case None => (None, None) @@ -43,13 +43,13 @@ object ReaderParametersValidator { } @throws(classOf[IllegalStateException]) - def getLengthField(recordLengthFieldName: String, cobolSchema: Copybook): Option[Primitive] = { + def getLengthField(recordLengthFieldName: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthField] = { val field = cobolSchema.getFieldByName(recordLengthFieldName) val astNode = field match { case s: Primitive => - if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral]) { - throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type.") + if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral] && recordLengthMap.isEmpty) { + throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type or a value mapping must be specified.") } if (s.occurs.isDefined && s.occurs.get > 1) { throw new IllegalStateException(s"The record length field '$recordLengthFieldName' cannot be an array.") @@ -58,17 +58,17 @@ object ReaderParametersValidator { case _ => throw new IllegalStateException(s"The record length field $recordLengthFieldName must have an primitive integral type.") } - Some(astNode) + Some(RecordLengthField(astNode, recordLengthMap)) } @throws(classOf[IllegalStateException]) - def getLengthFieldExpr(recordLengthFieldExpr: String, cobolSchema: Copybook): Option[RecordLengthExpression] = { + def getLengthFieldExpr(recordLengthFieldExpr: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthExpression] = { val evaluator = new NumberExprEvaluator(recordLengthFieldExpr) val vars = evaluator.getVariables val fields = vars.map { field => - val primitive = getLengthField(field, cobolSchema) + val primitive = getLengthField(field, recordLengthMap, cobolSchema) .getOrElse(throw new IllegalArgumentException(s"The record length expression '$recordLengthFieldExpr' contains an unknown field '$field'.")) - (field, primitive) + (field, primitive.field) } val requiredBytesToRead = if (fields.nonEmpty) { fields.map { case (_, field) => diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala index a25af1c51..727bc2fe4 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala @@ -203,7 +203,7 @@ class VRLRecordReaderSpec extends AnyWordSpec { lengthFieldExpression = Some("LEN")) } - assert(ex.getMessage == "The record length field LEN must be an integral type.") + assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.") } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index 960c387e5..2ee6d985e 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy -import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, FillerNamingPolicy, MetadataPolicy, StringTrimmingPolicy} +import za.co.absa.cobrix.cobol.parser.policies._ import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat._ import za.co.absa.cobrix.cobol.reader.parameters._ @@ -32,6 +32,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten import scala.collection.mutable import scala.collection.mutable.ListBuffer +import scala.util.control.NonFatal /** * This class provides methods for parsing the parameters set as Spark options. @@ -55,6 +56,7 @@ object CobolParametersParser extends Logging { val PARAM_MAXIMUM_RECORD_LENGTH = "maximum_record_length" val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" val PARAM_RECORD_LENGTH_FIELD = "record_length_field" + val PARAM_RECORD_LENGTH_MAP = "record_length_map" val PARAM_RECORD_START_OFFSET = "record_start_offset" val PARAM_RECORD_END_OFFSET = "record_end_offset" val PARAM_FILE_START_OFFSET = "file_start_offset" @@ -348,6 +350,7 @@ object CobolParametersParser extends Logging { rhpAdditionalInfo = None, reAdditionalInfo = "", recordLengthField = "", + Map.empty, fileStartOffset = 0, fileEndOffset = 0, generateRecordId = false, @@ -380,6 +383,7 @@ object CobolParametersParser extends Logging { minimumRecordLength = parameters.minimumRecordLength.getOrElse(1), maximumRecordLength = parameters.maximumRecordLength.getOrElse(Int.MaxValue), lengthFieldExpression = recordLengthField, + lengthFieldMap = varLenParams.recordLengthMap, isRecordSequence = varLenParams.isRecordSequence, bdw = varLenParams.bdw, isRdwBigEndian = varLenParams.isRdwBigEndian, @@ -461,6 +465,7 @@ object CobolParametersParser extends Logging { params.get(PARAM_RHP_ADDITIONAL_INFO), params.get(PARAM_RE_ADDITIONAL_INFO).getOrElse(""), recordLengthFieldOpt.getOrElse(""), + getRecordLengthMappings(params.getOrElse(PARAM_RECORD_LENGTH_MAP, "{}")), fileStartOffset, fileEndOffset, isRecordIdGenerationEnabled, @@ -912,6 +917,38 @@ object CobolParametersParser extends Logging { } } + /** + * Parses the options for the record length mappings. + * + * @param recordLengthMapJson Parameters provided by spark.read.option(...) + * @return Returns a mapping from the record length field values to the actual record length + */ + @throws(classOf[IllegalArgumentException]) + def getRecordLengthMappings(recordLengthMapJson: String): Map[String, Int] = { + val parser = new ParserJson() + val json = try { + parser.parseMap(recordLengthMapJson) + } catch { + case NonFatal(ex) => throw new IllegalArgumentException(s"Unable to parse record length mapping JSON.", ex) + } + + json.toSeq // Converting to a non-lazy sequence first. If .mapValues() is used the map stays lazy and errors pop up later + .map { case (k, v) => + val vInt = v match { + case num: Int => num + case num: Long => num.toInt + case str: String => + try { + str.toInt + } catch { + case NonFatal(ex) => throw new IllegalArgumentException(s"Unsupported record length value: '$str'. Please, use numeric values only.", ex) + } + case any => throw new IllegalArgumentException(s"Unsupported record length value: '$any'. Please, use numeric values only.") + } + (k, vInt) + }.toMap[String, Int] + } + /** * Parses the options for the occurs mappings. * diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala index 42455ad36..159c62e14 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala @@ -69,11 +69,16 @@ private[source] object CobolScanners extends Logging { val fileSystem = path.getFileSystem(sconf.value) logger.info(s"Going to parse file: $filePath") - val dataStream = new FileStreamer(filePath, fileSystem) - val headerStream = new FileStreamer(filePath, fileSystem) - reader.getRowIterator(dataStream, headerStream, 0L, fileOrder, 0L) - } - ) + val startFileOffset = reader.getReaderProperties.fileStartOffset + val maximumFileBytes = if (reader.getReaderProperties.fileEndOffset == 0) { + 0 + } else { + fileSystem.getFileStatus(path).getLen - reader.getReaderProperties.fileEndOffset - startFileOffset + } + val dataStream = new FileStreamer(filePath, fileSystem, startFileOffset, maximumFileBytes) + val headerStream = new FileStreamer(filePath, fileSystem, startFileOffset) + reader.getRowIterator(dataStream, headerStream, startFileOffset, fileOrder, 0L) + }) }) } diff --git a/spark-cobol/src/test/resources/log4j.properties b/spark-cobol/src/test/resources/log4j.properties index 775b79fdc..32cdbf791 100644 --- a/spark-cobol/src/test/resources/log4j.properties +++ b/spark-cobol/src/test/resources/log4j.properties @@ -32,3 +32,4 @@ log4j.logger.za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder$=ERROR log4j.logger.za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer=ERROR log4j.logger.za.co.absa.cobrix.spark.cobol.utils.FileUtils$=OFF log4j.logger.za.co.absa.cobrix.spark.cobol.utils.FileUtils=OFF +log4j.logger.za.co.absa.cobrix.cobol.parser.antlr.ParserJson=OFF diff --git a/spark-cobol/src/test/resources/log4j2.properties b/spark-cobol/src/test/resources/log4j2.properties index 0120d2bbe..84980c30d 100644 --- a/spark-cobol/src/test/resources/log4j2.properties +++ b/spark-cobol/src/test/resources/log4j2.properties @@ -39,3 +39,6 @@ logger.cobrix_file_utils1.level = OFF logger.cobrix_file_utils2.name = za.co.absa.cobrix.cobol.utils.FileUtils logger.cobrix_file_utils2.level = OFF + +logger.parserjson.name = za.co.absa.cobrix.cobol.parser.antlr.ParserJson +logger.parserjson.level = OFF diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/ParametersParsingSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/ParametersParsingSpec.scala index 3cbb5aec9..7fbc3049c 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/ParametersParsingSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/ParametersParsingSpec.scala @@ -101,4 +101,38 @@ class ParametersParsingSpec extends AnyFunSuite { assert(fieldCodePageMap("field_3") == "us-ascii") } + test("Test getRecordLengthMappings() works as expected") { + val map1 = CobolParametersParser.getRecordLengthMappings("""{}""") + assert(map1.isEmpty) + + val map2 = CobolParametersParser.getRecordLengthMappings("""{"A": 12}""") + assert(map2("A") == 12) + + val map3 = CobolParametersParser.getRecordLengthMappings("""{"0A1": "1234", "B": 122}""") + assert(map3("0A1") == 1234) + assert(map3("B") == 122) + } + + test("Test getRecordLengthMappings() exceptional situations") { + val ex = intercept[IllegalArgumentException] { + CobolParametersParser.getRecordLengthMappings("""{"A": "ABC"}""") + } + assert(ex.getMessage == "Unsupported record length value: 'ABC'. Please, use numeric values only.") + + val ex2 = intercept[IllegalArgumentException] { + CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 12}}""") + } + assert(ex2.getMessage == "Unsupported record length value: 'Map(B -> 12)'. Please, use numeric values only.") + + val ex3 = intercept[IllegalArgumentException] { + CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 5000000000}}""") + } + assert(ex3.getMessage == "Unsupported record length value: 'Map(B -> 5.0E9)'. Please, use numeric values only.") + + val ex4 = intercept[IllegalArgumentException] { + CobolParametersParser.getRecordLengthMappings("""Hmm...""") + } + assert(ex4.getMessage == "Unable to parse record length mapping JSON.") + } + } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala new file mode 100644 index 000000000..62b2d29e6 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala @@ -0,0 +1,170 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.SparkException +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture { + private val copybook = + """ 01 R. + 03 SEG-ID PIC X(1). + 03 TEXT PIC X(7). + """ + + val dataSimple: Array[Byte] = Array( + 0xC1, 0xF1, 0xF2, 0xF3, // record 0 'A123' + 0xC2, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, // record 1 'B123456' + 0xC3, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7).map(_.toByte) // record 2 'C1234567890' + + val dataNumeric: Array[Byte] = Array( + 0xF1, 0xF1, 0xF2, 0xF3, // record 0 '1123' + 0xF2, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, // record 1 '1123456' + 0xF3, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7).map(_.toByte) // record 2 '11234567890' + + val dataWithFileOffsets: Array[Byte] = Array( + 0x00, // header + 0xC1, 0xF1, 0xF2, 0xF3, // record 0 'A123' + 0xC2, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, // record 1 'B123456' + 0xC3, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, // record 2 'C1234567890' + 0x00, 0x00).map(_.toByte) // footer + + "data with record length mapping" should { + "work for simple mappings" in { + withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile => + val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}""" + + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("input_split_records", "2") + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + + val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + + assert(actual == expected) + } + } + + "work for numeric mappings" in { + withTempBinFile("record_length_mapping", ".tmp", dataNumeric) { tempFile => + val expected = """{"SEG_ID":"1","TEXT":"123"},{"SEG_ID":"2","TEXT":"123456"},{"SEG_ID":"3","TEXT":"1234567"}""" + + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("input_split_records", "2") + .option("pedantic", "true") + .option("record_length_map", """{"1":4,"2":7,"3":8}""") + .load(tempFile) + + val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + + assert(actual == expected) + } + } + + "work for data with offsets" in { + withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile => + val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}""" + + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("file_start_offset", 1) + .option("file_end_offset", 2) + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + + val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + + assert(actual == expected) + } + } + + "work for data with offsets and indexes" in { + withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile => + val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}""" + + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("file_start_offset", 1) + .option("file_end_offset", 2) + .option("input_split_records", "2") + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + + val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",") + + assert(actual == expected) + } + } + + "throw an exception for unknown mapping" in { + withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("record_length_map", """{"A":4,"B":7}""") + .option("pedantic", "true") + .load(tempFile) + + val ex = intercept[SparkException] { + df.count() + } + + assert(ex.getMessage.contains("Record length value 'C' is not mapped to a record length")) + } + } + + "throw an exception for null fields" in { + withTempBinFile("rdw_test", ".tmp", dataWithFileOffsets) { tempFile => + val df = spark.read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "F") + .option("record_length_field", "SEG-ID") + .option("pedantic", "true") + .option("record_length_map", """{"A":4,"B":7,"C":8}""") + .load(tempFile) + + val ex = intercept[SparkException] { + df.count() + } + + assert(ex.getMessage.contains("Null encountered as a record length field (offset: 1, raw value: 00)")) + } + } + } +}