Skip to content

#666 Add support for record length value mapping #674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ or
.option("record_length_field", "FIELD1 * 10 + 200")
```

If the record field contains a string that can be mapped to a record size, you can add the mapping as a JSON:
```
.option("record_format", "F")
.option("record_length_field", "FIELD_STR")
.option("record_length_map", """{"SEG1":100,"SEG2":200}""")
```

### Use cases for various variable length formats

In order to understand the file format it is often sufficient to look at the first 4 bytes of the file (un case of RDW only files),
Expand Down Expand Up @@ -1547,6 +1554,7 @@ The output looks like this:
| .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. |
| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. |
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field or expression to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. |
| .option("record_length_map", """{"A":100,"B":50}""") | Specifies a mapping between record length field values and actual record lengths. |
| .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. |
| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. |
| .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],

override def getRecordSize: Int = cobolSchema.copybook.getRecordSize

override def isIndexGenerationNeeded: Boolean = (readerProperties.lengthFieldExpression.isEmpty || readerProperties.isRecordSequence) && readerProperties.isIndexGenerationNeeded
override def isIndexGenerationNeeded: Boolean = readerProperties.isIndexGenerationNeeded

override def isRdwBigEndian: Boolean = readerProperties.isRdwBigEndian

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.iterator

import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator

case class RecordLengthField(
field: Primitive,
valueMap: Map[String, Int]
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.cobrix.cobol.reader.iterator

import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
Expand Down Expand Up @@ -50,13 +51,16 @@ class VRLRecordReader(cobolSchema: Copybook,
private var byteIndex = startingFileOffset
private var recordIndex = startRecordId - 1

private val copyBookRecordSize = cobolSchema.getRecordSize
private val (lengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, cobolSchema)
private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
private val recordLengthAdjustment = readerProperties.rdwAdjustment
private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
private val minimumRecordLength = readerProperties.minimumRecordLength
private val maximumRecordLength = readerProperties.maximumRecordLength
final private val copyBookRecordSize = cobolSchema.getRecordSize
final private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, cobolSchema)
final private val lengthField = recordLengthField.map(_.field)
final private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty)
final private val isLengthMapEmpty = lengthMap.isEmpty
final private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
final private val recordLengthAdjustment = readerProperties.rdwAdjustment
final private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
final private val minimumRecordLength = readerProperties.minimumRecordLength
final private val maximumRecordLength = readerProperties.maximumRecordLength

fetchNext()

Expand Down Expand Up @@ -129,14 +133,8 @@ class VRLRecordReader(cobolSchema: Copybook,
}

val recordLength = lengthField match {
case Some(lengthAST) =>
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i + recordLengthAdjustment
case l: Long => l.toInt + recordLengthAdjustment
case s: String => s.toInt + recordLengthAdjustment
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
case None => copyBookRecordSize
case Some(lengthAST) => getRecordLengthFromField(lengthAST, binaryDataStart)
case None => copyBookRecordSize
}

val restOfDataLength = recordLength - lengthFieldBlock + readerProperties.endOffset
Expand All @@ -150,6 +148,38 @@ class VRLRecordReader(cobolSchema: Copybook,
}
}

final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = {
val length = if (isLengthMapEmpty) {
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i
case l: Long => l.toInt
case s: String => s.toInt
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
} else {
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => getRecordLengthFromMapping(i.toString)
case l: Long => getRecordLengthFromMapping(l.toString)
case s: String => getRecordLengthFromMapping(s)
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
}
length + recordLengthAdjustment
}

final private def getRecordLengthFromMapping(v: String): Int = {
lengthMap.get(v) match {
case Some(len) => len
case None => throw new IllegalStateException(s"Record length value '$v' is not mapped to a record length.")
}
}

final private def getBytesAsHexString(bytes: Array[Byte]): String = {
bytes.map("%02X" format _).mkString
}

private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = {
val lengthFieldBlock = expr.requiredBytesToread
val evaluator = expr.evaluator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param minimumRecordLength Minium record length for which the record is considered valid.
* @param maximumRecordLength Maximum record length for which the record is considered valid.
* @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used.
* @param lengthFieldMap Mapping between record length field values to actual length. The field name should be specified in lengthFieldExpression.
* @param isRecordSequence Does input files have 4 byte record length headers
* @param bdw Block descriptor word (if specified), for FB and VB record formats
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
Expand Down Expand Up @@ -88,6 +89,7 @@ case class ReaderParameters(
minimumRecordLength: Int = 1,
maximumRecordLength: Int = Int.MaxValue,
lengthFieldExpression: Option[String] = None,
lengthFieldMap: Map[String, Int] = Map.empty,
isRecordSequence: Boolean = false,
bdw: Option[Bdw] = None,
isRdwBigEndian: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package za.co.absa.cobrix.cobol.reader.parameters
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
* @param reAdditionalInfo An optional additional option string passed to a custom record extractor
* @param recordLengthField A field that stores record length
* @param recordLengthMap A mapping between field value and record size.
* @param fileStartOffset A number of bytes to skip at the beginning of each file
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
Expand All @@ -50,6 +51,7 @@ case class VariableLengthParameters(
rhpAdditionalInfo: Option[String],
reAdditionalInfo: String,
recordLengthField: String,
recordLengthMap: Map[String, Int],
fileStartOffset: Int,
fileEndOffset: Int,
generateRecordId: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ package za.co.absa.cobrix.cobol.reader.validator
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator
import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression
import za.co.absa.cobrix.cobol.reader.iterator.{RecordLengthExpression, RecordLengthField}
import za.co.absa.cobrix.cobol.reader.parameters.MultisegmentParameters

import scala.util.Try

object ReaderParametersValidator {

def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], cobolSchema: Copybook): (Option[Primitive], Option[RecordLengthExpression]) = {
def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], recordLengthMap: Map[String, Int], cobolSchema: Copybook): (Option[RecordLengthField], Option[RecordLengthExpression]) = {
fieldOrExpressionOpt match {
case Some(fieldOrExpression) =>
val canBeExpression = fieldOrExpression.exists(c => "+-*/".contains(c))

if (canBeExpression && Try(cobolSchema.getFieldByName(fieldOrExpression)).isSuccess) {
(getLengthField(fieldOrExpression, cobolSchema), None)
(getLengthField(fieldOrExpression, recordLengthMap, cobolSchema), None)
} else {
(None, getLengthFieldExpr(fieldOrExpression, cobolSchema))
(None, getLengthFieldExpr(fieldOrExpression, recordLengthMap, cobolSchema))
}
case None =>
(None, None)
Expand All @@ -43,13 +43,13 @@ object ReaderParametersValidator {
}

@throws(classOf[IllegalStateException])
def getLengthField(recordLengthFieldName: String, cobolSchema: Copybook): Option[Primitive] = {
def getLengthField(recordLengthFieldName: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthField] = {
val field = cobolSchema.getFieldByName(recordLengthFieldName)

val astNode = field match {
case s: Primitive =>
if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral]) {
throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type.")
if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral] && recordLengthMap.isEmpty) {
throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type or a value mapping must be specified.")
}
if (s.occurs.isDefined && s.occurs.get > 1) {
throw new IllegalStateException(s"The record length field '$recordLengthFieldName' cannot be an array.")
Expand All @@ -58,17 +58,17 @@ object ReaderParametersValidator {
case _ =>
throw new IllegalStateException(s"The record length field $recordLengthFieldName must have an primitive integral type.")
}
Some(astNode)
Some(RecordLengthField(astNode, recordLengthMap))
}

@throws(classOf[IllegalStateException])
def getLengthFieldExpr(recordLengthFieldExpr: String, cobolSchema: Copybook): Option[RecordLengthExpression] = {
def getLengthFieldExpr(recordLengthFieldExpr: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthExpression] = {
val evaluator = new NumberExprEvaluator(recordLengthFieldExpr)
val vars = evaluator.getVariables
val fields = vars.map { field =>
val primitive = getLengthField(field, cobolSchema)
val primitive = getLengthField(field, recordLengthMap, cobolSchema)
.getOrElse(throw new IllegalArgumentException(s"The record length expression '$recordLengthFieldExpr' contains an unknown field '$field'."))
(field, primitive)
(field, primitive.field)
}
val requiredBytesToRead = if (fields.nonEmpty) {
fields.map { case (_, field) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class VRLRecordReaderSpec extends AnyWordSpec {
lengthFieldExpression = Some("LEN"))
}

assert(ex.getMessage == "The record length field LEN must be an integral type.")
assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.")
}
}

Expand Down
Loading