Skip to content

New features: #412 and #417 #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ In some scenarios Spark is unable to find "cobol" data source by it's short name
Cobrix assumes input data is encoded in EBCDIC. You can load ASCII files as well by specifying the following option:
`.option("encoding", "ascii")`.

If the input file is a text file (CRLF / LF are used to split records), use
`.option("is_text", "true")`.

Multisegment ASCII text files are supported using this option:
`.option("record_format", "D)`.

Read more on record formats at https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets

### Streaming Cobol binary files from a directory

1. Create a Spark ```StreamContext```
Expand Down Expand Up @@ -267,7 +275,7 @@ You can collect the uber jar of `spark-cobol` either at

Then, run `spark-shell` or `spark-submit` adding the fat jar as the option.
```sh
$ spark-shell --jars spark-cobol-assembly-2.3.1-SNAPSHOT.jar
$ spark-shell --jars spark-cobol-assembly-2.4.0-SNAPSHOT.jar
```

## Other Features
Expand Down Expand Up @@ -317,10 +325,17 @@ file is copied from a mainframe.

To load variable length record file the following option should be specified:
```
.option("is_record_sequence", "true")
.option("record_format", "V")
```

To load variable blocked length record file the following option should be specified:
```
.option("record_format", "VB")
```

The space used by the headers should not be mentioned in the copybook if this option is used. Please refer to the
More on record formats: https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets

The space used by the headers (both BDW and RDW) should not be mentioned in the copybook if this option is used. Please refer to the
'Record headers support' section below.

### Schema collapsing
Expand Down Expand Up @@ -458,11 +473,11 @@ about record length.
You can instruct the reader to use 4 byte record headers to extract records from a mainframe file.

```
.option("is_record_sequence", "true")
.option("record_format", "V")
```

This is very helpful for multisegment files when segments have different lengths. Since each segment has it's own
copybook it is very convenient to extract segments one by one by combining 'is_record_sequence' option with segment
copybook it is very convenient to extract segments one by one by combining `record_format = V` option with segment
filter option.

```
Expand Down Expand Up @@ -549,6 +564,17 @@ Working example 1:
````

Working example 2:
```scala
// A new experimental way
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "D")
.load(tmpFileName)
````

Working example 3:
```scala
val spark = SparkSession
.builder()
Expand Down Expand Up @@ -634,7 +660,7 @@ val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("is_record_sequence", "true")
.option("record_format", "V")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
.option("segment_id_level1", "P")
Expand Down Expand Up @@ -732,7 +758,7 @@ in redefined groups. Here is the copybook for our example:
The 'SEGMENT-ID' and 'COMPANY-ID' fields are present in all of the segments. The 'STATIC-DETAILS' group is present only in
the root record. The 'CONTACTS' group is present only in child record. Notice that 'CONTACTS' redefine 'STATIC-DETAILS'.

Because the records have different lengths the 'is_record_sequence' option should be set to 'true'.
Because the records have different lengths use `record_format = V` or `record_format = VB` depending of the record format.

If you load this file as is you will get the schema and the data similar to this.

Expand All @@ -742,7 +768,7 @@ val df = spark
.read
.format("cobol")
.option("copybook", "/path/to/thecopybook")
.option("is_record_sequence", "true")
.option("record_format", "V")
.load("examples/multisegment_data")
```

Expand Down Expand Up @@ -807,7 +833,7 @@ val df = spark
.read
.format("cobol")
.option("copybook", "/path/to/thecopybook")
.option("is_record_sequence", "true")
.option("record_format", "V")

// Specifies a field containing a segment id
.option("segment_field", "SEGMENT_ID")
Expand Down Expand Up @@ -960,7 +986,7 @@ val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("is_record_sequence", "true")
.option("record_format", "V")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
.option("segment_id_level1", "P")
Expand Down Expand Up @@ -1130,14 +1156,16 @@ Again, the full example is available at

| Option (usage example) | Description |
| --------------------------------------------- |:----------------------------------------------------------------------------- |
| .option("is_record_sequence", "true") | If 'true' the parser will look for 4 byte RDW headers to read variable record length files. |
| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("is_record_sequence", "true") | _[deprecated]_ If 'true' the parser will look for 4 byte RDW headers to read variable record length files. Use `.option("record_format", "V")` instead. |
| .option("is_rdw_big_endian", "true") | Specifies if RDW headers are big endian. They are considered little-endian by default. |
| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. |
| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. For BDW use `.option("bdw_adjustment", -4)` |
| .option("rdw_adjustment", 0) | If there is a mismatch between RDW and record length this option can be used to adjust the difference. |
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. This option is incompatible with `is_record_sequence`. |
| .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. |
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. |
| .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. |
| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. |
| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `is_record_sequence`, multisegment and hierarchical text record files can be loaded. |
| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. |
| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. |


##### Multisegment files options
Expand Down Expand Up @@ -1213,7 +1241,7 @@ For multisegment variable lengths tests:
.format("cobol")
.option("copybook_contents", copybook)
.option("generate_record_id", true)
.option("is_record_sequence", "true")
.option("record_format", "V")
.option("segment_field", "SEGMENT_ID")
.option("segment_id_level0", "C")
.load(args(0))
Expand Down Expand Up @@ -1259,6 +1287,13 @@ For multisegment variable lengths tests:
![](performance/images/exp3_multiseg_wide_records_throughput.svg) ![](performance/images/exp3_multiseg_wide_mb_throughput.svg)

## Changelog
- #### 2.4.0 to be released soon.
- [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format.
Options to adjust BDW settings are added:
- `is_bdw_big_endian` - specifies if BDW is big-endian (false by default)
- `bdw_adjustment` - Specifies how the value of a BDW is different from the block payload. For example, if the side in BDW headers includes BDW record itself, use `.option("bdw_adjustment", "-4")`.
- Options `is_record_sequence` and `is_xcom` are deprecated. Use `.option("record_format", "V")` instead.
- [#417](https://github.com/AbsaOSS/cobrix/issues/417) Multisegment ASCII text files have now direct support using `record_format = D`.
- #### 2.3.0 released 2 August 2021.
- [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs.
- [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking,
Expand Down
2 changes: 1 addition & 1 deletion cobol-converters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.12</artifactId>
<version>2.3.1-SNAPSHOT</version>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cobol-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>za.co.absa.cobrix</groupId>
<artifactId>cobrix_2.12</artifactId>
<version>2.3.1-SNAPSHOT</version>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.parser.recordformats

sealed trait RecordFormat

object RecordFormat {
case object FixedLength extends RecordFormat
case object VariableLength extends RecordFormat
case object VariableBlock extends RecordFormat
case object AsciiText extends RecordFormat

def withNameOpt(s: String): Option[RecordFormat] = {
s match {
case "F" => Some(FixedLength)
case "V" => Some(VariableLength)
case "VB" => Some(VariableBlock)
case "D" => Some(AsciiText)
case "T" => Some(AsciiText) // Same as 'D' - Cobrix extension
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.cobol.reader.iterator.{VarLenHierarchicalIterator, VarLenNestedIterator}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters}
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator
Expand Down Expand Up @@ -61,13 +62,21 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
binaryData: SimpleStream,
copybook: Copybook
): Option[RawRecordExtractor] = {
val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, readerProperties.reAdditionalInfo)
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)
val bdwParams = RecordHeaderParameters(readerProperties.isBdwBigEndian, readerProperties.bdwAdjustment)

val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)
val bdwDecoder = new RecordHeaderDecoderBdw(bdwParams)

val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoder, readerProperties.reAdditionalInfo)

readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams))
case None if readerProperties.isText =>
Some(new TextRecordExtractor(reParams))
case None if readerProperties.hasBdw =>
Some(new VariableBlockVariableRecordExtractor(reParams))
case None if readerProperties.variableSizeOccurs &&
readerProperties.recordHeaderParser.isEmpty &&
!readerProperties.isRecordSequence &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.cobrix.cobol.reader.extractors.raw

import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.reader.recordheader.RecordHeaderDecoder
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

/**
Expand All @@ -29,5 +30,7 @@ case class RawRecordContext(
startingRecordNumber: Long,
inputStream: SimpleStream,
copybook: Copybook,
rdwDecoder: RecordHeaderDecoder,
bdwDecoder: RecordHeaderDecoder,
additionalInfo: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,51 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw

import scala.collection.mutable

class VariableBlockRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
VariableBlockRecordExtractor.additionalInfo = ctx.additionalInfo
private var recordQueue = new mutable.Queue[Array[Byte]]
private var initialRead = true
private var recordNumber = ctx.startingRecordNumber
class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
private val recordQueue = new mutable.Queue[Array[Byte]]

override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = {
var output: Boolean = true
if (initialRead == true) {
if (recordQueue.isEmpty) {
readNextBlock()
initialRead = false
} else {
if (recordQueue.isEmpty && !ctx.inputStream.isEndOfStream) {
readNextBlock()
} else {
if (recordQueue.isEmpty && ctx.inputStream.isEndOfStream) {
output = false
}
}
}
output
recordQueue.nonEmpty
}

private def readNextBlock(): Unit = {
val bdwSize = ctx.bdwDecoder.headerSize
val rdwSize = ctx.rdwDecoder.headerSize

if (!ctx.inputStream.isEndOfStream) {
val bdwFirstPart = BigInt(ctx.inputStream.next(2)).toInt
val bdwSecondPart = BigInt(ctx.inputStream.next(2)).toInt
var blockBuffer = ctx.inputStream.next(bdwFirstPart-4)
var blockPointer: Int = 0
var recordCounter: Int =0
while (blockPointer < blockBuffer.length) {
recordCounter += 1
var rdwFirstPart = BigInt(blockBuffer.slice(blockPointer, blockPointer+2)).toInt
var rdwSecondPart = BigInt(blockBuffer.slice(blockPointer+2, blockPointer+4)).toInt
val payload = blockBuffer.slice(blockPointer+4, blockPointer + rdwFirstPart)
recordQueue.enqueue(payload)
blockPointer += rdwFirstPart
val bdwOffset = ctx.inputStream.offset
val bdw = ctx.inputStream.next(bdwSize)

val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
val blockBuffer = ctx.inputStream.next(blockLength)

var blockIndex = 0

while (blockIndex < blockBuffer.length) {
val rdwOffset = bdwOffset + blockIndex
val rdw = blockBuffer.slice(blockIndex, blockIndex + rdwSize)
val recordLength = ctx.rdwDecoder.getRecordLength(rdw, rdwOffset)

val payload = blockBuffer.slice(blockIndex + rdwSize, blockIndex + recordLength + rdwSize)
if (payload.length > 0) {
recordQueue.enqueue(payload)
}
blockIndex += recordLength + rdwSize
}
}
}


@throws[NoSuchElementException]
override def next(): Array[Byte] = {
var rawRecord: Array[Byte] = new Array[Byte](0)
if (!hasNext) {
throw new NoSuchElementException
}
rawRecord = recordQueue.dequeue()
rawRecord
recordQueue.dequeue()
}
}

object VariableBlockRecordExtractor {
var additionalInfo: String = ""
}
Loading