Skip to content

ParquetLoader - Save Schema to context to support loading the model without files. #472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 18, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 110 additions & 35 deletions src/Microsoft.ML.Parquet/ParquetLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.ML.Runtime;
using Microsoft.ML.Runtime.CommandLine;
using Microsoft.ML.Runtime.Data;
using Microsoft.ML.Runtime.Data.IO;
using Microsoft.ML.Runtime.Internal.Utilities;
using Microsoft.ML.Runtime.Model;
using Parquet;
Expand Down Expand Up @@ -88,48 +89,29 @@ public sealed class Arguments
internal const string ShortName = "Parquet";
internal const string ModelSignature = "PARQELDR";

private const string SchemaCtxName = "Schema.idv";

private readonly IHost _host;
private readonly Stream _parquetStream;
private readonly ParquetOptions _parquetOptions;
private readonly int _columnChunkReadSize;
private readonly Column[] _columnsLoaded;
private readonly DataSet _schemaDataSet;
private const int _defaultColumnChunkReadSize = 1000000;

private bool _disposed;
private long? _rowCount;

private static VersionInfo GetVersionInfo()
{
return new VersionInfo(
modelSignature: ModelSignature,
verWrittenCur: 0x00010001, // Initial
verReadableCur: 0x00010001,
//verWrittenCur: 0x00010001, // Initial
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this commented out code provide value? Can it be removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we generally prefer to avoid checking in commented in code, the version history of past versions is different, since we want to know why we had to bump the version number each time, and we like to have that first version in there. See e.g., the text loader model, which is the most extreme example I am aware of.

verWrittenCur: 0x00010002, // Add Schema to Model Context
verReadableCur: 0x00010002,
verWeCanReadBack: 0x00010001,
loaderSignature: LoaderSignature);
}

public static ParquetLoader Create(IHostEnvironment env, ModelLoadContext ctx, IMultiStreamSource files)
{
Contracts.CheckValue(env, nameof(env));
IHost host = env.Register(LoaderName);

env.CheckValue(ctx, nameof(ctx));
ctx.CheckAtModel(GetVersionInfo());
env.CheckValue(files, nameof(files));

// *** Binary format ***
// int: cached chunk size
// bool: TreatBigIntegersAsDates flag

Arguments args = new Arguments
{
ColumnChunkReadSize = ctx.Reader.ReadInt32(),
TreatBigIntegersAsDates = ctx.Reader.ReadBoolean()
};
return host.Apply("Loading Model",
ch => new ParquetLoader(args, host, OpenStream(files)));
}

public ParquetLoader(IHostEnvironment env, Arguments args, IMultiStreamSource files)
: this(env, args, OpenStream(files))
{
Expand Down Expand Up @@ -165,6 +147,8 @@ private ParquetLoader(Arguments args, IHost host, Stream stream)
TreatBigIntegersAsDates = args.TreatBigIntegersAsDates
};

DataSet schemaDataSet;

try
{
// We only care about the schema so ignore the rows.
Expand All @@ -173,36 +157,112 @@ private ParquetLoader(Arguments args, IHost host, Stream stream)
Count = 0,
Offset = 0
};
_schemaDataSet = ParquetReader.Read(stream, _parquetOptions, readerOptions);
schemaDataSet = ParquetReader.Read(stream, _parquetOptions, readerOptions);
_rowCount = schemaDataSet.TotalRowCount;
}
catch (Exception ex)
{
throw new InvalidDataException("Cannot read Parquet file", ex);
}

_columnChunkReadSize = args.ColumnChunkReadSize;
InitColumns(ch, out _columnsLoaded);
_columnsLoaded = InitColumns(schemaDataSet);
Schema = CreateSchema(_host, _columnsLoaded);
}
}

private ParquetLoader(IHost host, ModelLoadContext ctx, IMultiStreamSource files)
{
Contracts.AssertValue(host);
_host = host;
_host.AssertValue(ctx);
_host.AssertValue(files);

// *** Binary format ***
// int: cached chunk size
// bool: TreatBigIntegersAsDates flag
// Schema of the loader (0x00010002)

_columnChunkReadSize = ctx.Reader.ReadInt32();
bool treatBigIntegersAsDates = ctx.Reader.ReadBoolean();

if (ctx.Header.ModelVerWritten >= 0x00010002)
{
// Load the schema
byte[] buffer = null;
if (!ctx.TryLoadBinaryStream(SchemaCtxName, r => buffer = r.ReadByteArray()))
throw _host.ExceptDecode();
var strm = new MemoryStream(buffer, writable: false);
var loader = new BinaryLoader(_host, new BinaryLoader.Arguments(), strm);
Schema = loader.Schema;
}

// Only load Parquest related data if a file is present. Otherwise, just the Schema is valid.
if (files.Count > 0)
{
_parquetOptions = new ParquetOptions()
{
TreatByteArrayAsString = true,
TreatBigIntegersAsDates = treatBigIntegersAsDates
};

_parquetStream = OpenStream(files);
DataSet schemaDataSet;

try
{
// We only care about the schema so ignore the rows.
ReaderOptions readerOptions = new ReaderOptions()
{
Count = 0,
Offset = 0
};
schemaDataSet = ParquetReader.Read(_parquetStream, _parquetOptions, readerOptions);
_rowCount = schemaDataSet.TotalRowCount;
}
catch (Exception ex)
{
throw new InvalidDataException("Cannot read Parquet file", ex);
}

_columnsLoaded = InitColumns(schemaDataSet);
Copy link
Contributor

@TomFinley TomFinley Jul 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_columnsLoaded = InitColumns(schemaDataSet); [](start = 16, length = 44)

What happens if you load a schema from the model, but then the parquet loader does not "agree" with that schema? I don't see how this case is handled here. #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added schema check.


In reply to: 199612477 [](ancestors = 199612477)

Schema = CreateSchema(_host, _columnsLoaded);
}
else if (Schema == null)
{
throw _host.Except("Parquet loader must be created with one file");
}
}

public static ParquetLoader Create(IHostEnvironment env, ModelLoadContext ctx, IMultiStreamSource files)
{
Contracts.CheckValue(env, nameof(env));
IHost host = env.Register(LoaderName);

env.CheckValue(ctx, nameof(ctx));
ctx.CheckAtModel(GetVersionInfo());
env.CheckValue(files, nameof(files));

return host.Apply("Loading Model",
ch => new ParquetLoader(host, ctx, files));
}

/// <summary>
/// Helper function called by the ParquetLoader constructor to initialize the Columns that belong in the Parquet file.
/// Composite data fields are flattened; for example, a Map Field in Parquet is flattened into a Key column and a Value
/// column.
/// </summary>
/// <param name="ch">Communication channel for error reporting.</param>
/// <param name="cols">The array of flattened columns instantiated from the parquet file.</param>
private void InitColumns(IChannel ch, out Column[] cols)
/// <param name="dataSet">The schema data set.</param>
/// <returns>The array of flattened columns instantiated from the parquet file.</returns>
private Column[] InitColumns(DataSet dataSet)
{
cols = null;
List<Column> columnsLoaded = new List<Column>();

foreach (var parquetField in _schemaDataSet.Schema.Fields)
foreach (var parquetField in dataSet.Schema.Fields)
{
FlattenFields(parquetField, ref columnsLoaded, false);
}
cols = columnsLoaded.ToArray();
return columnsLoaded.ToArray();
}

private void FlattenFields(Field field, ref List<Column> cols, bool isRepeatable)
Expand Down Expand Up @@ -239,7 +299,7 @@ private void FlattenFields(Field field, ref List<Column> cols, bool isRepeatable
}
else
{
throw new InvalidDataException("Encountered unknown Parquet field type(Currently recognizes data, map, list, and struct).");
throw _host.ExceptNotSupp("Encountered unknown Parquet field type(Currently recognizes data, map, list, and struct).");
}
}

Expand Down Expand Up @@ -326,7 +386,7 @@ private static Stream OpenStream(string filename)

public long? GetRowCount(bool lazy = true)
{
return _schemaDataSet.TotalRowCount;
return _rowCount;
}

public IRowCursor GetRowCursor(Func<int, bool> predicate, IRandom rand = null)
Expand All @@ -353,9 +413,22 @@ public void Save(ModelSaveContext ctx)
// *** Binary format ***
// int: cached chunk size
// bool: TreatBigIntegersAsDates flag
// Schema of the loader

ctx.Writer.Write(_columnChunkReadSize);
ctx.Writer.Write(_parquetOptions.TreatBigIntegersAsDates);

// Save the schema
var noRows = new EmptyDataView(_host, Schema);
var saverArgs = new BinarySaver.Arguments();
saverArgs.Silent = true;
var saver = new BinarySaver(_host, saverArgs);
using (var strm = new MemoryStream())
{
var allColumns = Enumerable.Range(0, Schema.ColumnCount).ToArray();
saver.SaveData(strm, noRows, allColumns);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible to refactor? It seems inefficient to first save it to a MemoryStream, and then write that memory stream out. Can we just do it in a single step?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this specific case not necessarily, since the binary saver requires a seekable writable stream. (E.g., it writes data, then seeks back to the header so it can store in the header the offsets of various records in the file.) The repository writer, on the other hand, is based on a zip archive, which AFAIK does not provide seekable writable streams.

ctx.SaveBinaryStream(SchemaCtxName, w => w.WriteByteArray(strm.ToArray()));
}
}

private sealed class Cursor : RootCursorBase, IRowCursor
Expand All @@ -377,6 +450,8 @@ public Cursor(ParquetLoader parent, Func<int, bool> predicate, IRandom rand)
: base(parent._host)
{
Ch.AssertValue(predicate);
Ch.AssertValue(parent._parquetStream);

_loader = parent;
_fileStream = parent._parquetStream;
_parquetConversions = new ParquetConversions(Ch);
Expand Down