diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index b7939ece6b..f5d044a118 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -21,8 +21,8 @@ namespace Microsoft.Data.SqlClient // with ColumnOrdinals from the source. internal sealed class _ColumnMapping { - internal int _sourceColumnOrdinal; - internal _SqlMetaData _metadata; + internal readonly int _sourceColumnOrdinal; + internal readonly _SqlMetaData _metadata; internal _ColumnMapping(int columnId, _SqlMetaData metadata) { @@ -33,28 +33,16 @@ internal _ColumnMapping(int columnId, _SqlMetaData metadata) internal sealed class Row { - private object[] _dataFields; + private readonly object[] _dataFields; internal Row(int rowCount) { _dataFields = new object[rowCount]; } - internal object[] DataFields - { - get - { - return _dataFields; - } - } + internal object[] DataFields => _dataFields; - internal object this[int index] - { - get - { - return _dataFields[index]; - } - } + internal object this[int index] => _dataFields[index]; } // The controlling class for one result (metadata + rows) @@ -75,10 +63,7 @@ internal Result(_SqlMetaDataSet metadata) internal Row this[int index] => _rowset[index]; - internal void AddRow(Row row) - { - _rowset.Add(row); - } + internal void AddRow(Row row) => _rowset.Add(row); } // A wrapper object for metadata and rowsets returned by our initial queries @@ -93,7 +78,6 @@ internal BulkCopySimpleResultSet() _results = new List(); } - // Indexer internal Result this[int idx] => _results[idx]; // Callback function for the tdsparser @@ -177,10 +161,13 @@ public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) private const int DefaultCommandTimeout = 30; + /// + public event SqlRowsCopiedEventHandler SqlRowsCopied; + private bool _enableStreaming = false; private int _batchSize; - private bool _ownConnection; - private SqlBulkCopyOptions _copyOptions; + private readonly bool _ownConnection; + private readonly SqlBulkCopyOptions _copyOptions; private int _timeout = DefaultCommandTimeout; private string _destinationTableName; private int _rowsCopied; @@ -189,8 +176,8 @@ public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) private bool _insideRowsCopiedEvent; private object _rowSource; - private SqlDataReader _SqlDataReaderRowSource; - private DbDataReader _DbDataReaderRowSource; + private SqlDataReader _sqlDataReaderRowSource; + private DbDataReader _dbDataReaderRowSource; private DataTable _dataTableSource; private SqlBulkCopyColumnMappingCollection _columnMappings; @@ -234,8 +221,6 @@ private int RowNumber private TdsParserStateObject _stateObj; private List<_ColumnMapping> _sortedColumnMappings; - private SqlRowsCopiedEventHandler _rowsCopiedEventHandler; - private static int _objectTypeCount; // EventSource Counter internal readonly int _objectID = Interlocked.Increment(ref _objectTypeCount); @@ -249,17 +234,11 @@ private int RowNumber private SourceColumnMetadata[] _currentRowMetadata; #if DEBUG - internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task + internal static bool s_setAlwaysTaskOnWrite; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task internal static bool SetAlwaysTaskOnWrite { - set - { - _setAlwaysTaskOnWrite = value; - } - get - { - return _setAlwaysTaskOnWrite; - } + set => s_setAlwaysTaskOnWrite = value; + get => s_setAlwaysTaskOnWrite; } #endif @@ -314,10 +293,7 @@ public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions) /// public int BatchSize { - get - { - return _batchSize; - } + get => _batchSize; set { if (value >= 0) @@ -334,10 +310,7 @@ public int BatchSize /// public int BulkCopyTimeout { - get - { - return _timeout; - } + get => _timeout; set { if (value < 0) @@ -351,24 +324,12 @@ public int BulkCopyTimeout /// public bool EnableStreaming { - get - { - return _enableStreaming; - } - set - { - _enableStreaming = value; - } + get => _enableStreaming; + set => _enableStreaming = value; } /// - public SqlBulkCopyColumnMappingCollection ColumnMappings - { - get - { - return _columnMappings; - } - } + public SqlBulkCopyColumnMappingCollection ColumnMappings => _columnMappings; /// public SqlBulkCopyColumnOrderHintCollection ColumnOrderHints @@ -379,10 +340,7 @@ public SqlBulkCopyColumnOrderHintCollection ColumnOrderHints /// public string DestinationTableName { - get - { - return _destinationTableName; - } + get => _destinationTableName; set { if (value == null) @@ -400,10 +358,7 @@ public string DestinationTableName /// public int NotifyAfter { - get - { - return _notifyAfter; - } + get => _notifyAfter; set { if (value >= 0) @@ -417,35 +372,10 @@ public int NotifyAfter } } - internal int ObjectID - { - get - { - return _objectID; - } - } - - /// - public event SqlRowsCopiedEventHandler SqlRowsCopied - { - add - { - _rowsCopiedEventHandler += value; - } - remove - { - _rowsCopiedEventHandler -= value; - } - } + internal int ObjectID => _objectID; /// - public int RowsCopied - { - get - { - return _rowsCopied; - } - } + public int RowsCopied => _rowsCopied; internal SqlStatistics Statistics { @@ -464,7 +394,7 @@ internal SqlStatistics Statistics void IDisposable.Dispose() { - this.Dispose(true); + Dispose(true); GC.SuppressFinalize(this); } @@ -476,19 +406,18 @@ private string CreateInitialQuery() string[] parts; try { - parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); + parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); } catch (Exception e) { - throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, e); + throw SQL.BulkLoadInvalidDestinationTable(DestinationTableName, e); } if (string.IsNullOrEmpty(parts[MultipartIdentifier.TableIndex])) { - throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, null); + throw SQL.BulkLoadInvalidDestinationTable(DestinationTableName, null); } string TDSCommand; - TDSCommand = "select @@trancount; SET FMTONLY ON select * from " + ADP.BuildMultiPartName(parts) + " SET FMTONLY OFF "; string TableCollationsStoredProc; @@ -555,7 +484,7 @@ private Task CreateAndExecuteInitialQueryAsync(out Bulk string TDSCommand = CreateInitialQuery(); SqlClientEventSource.Log.TryTraceEvent("SqlBulkCopy.CreateAndExecuteInitialQueryAsync | Info | Initial Query: '{0}'", TDSCommand); SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlBulkCopy.CreateAndExecuteInitialQueryAsync | Info | Correlation | Object Id {0}, Activity Id {1}", ObjectID, ActivityCorrelator.Current); - Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); + Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); if (executeTask == null) { @@ -597,7 +526,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i throw SQL.BulkLoadNoCollation(); } - string[] parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); + string[] parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); updateBulkCommandText.AppendFormat("insert bulk {0} (", ADP.BuildMultiPartName(parts)); int nmatched = 0; // Number of columns that match and are accepted int nrejected = 0; // Number of columns that match but were rejected @@ -750,15 +679,15 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { updateBulkCommandText.Append(" COLLATE " + collation_name.Value); // Compare collations only if the collation value was set on the metadata - if (null != _SqlDataReaderRowSource && metadata.collation != null) + if (null != _sqlDataReaderRowSource && metadata.collation != null) { // On SqlDataReader we can verify the sourcecolumn collation! int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal; int destinationLcid = metadata.collation.LCID; - int sourceLcid = _SqlDataReaderRowSource.GetLocaleId(sourceColumnId); + int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId); if (sourceLcid != destinationLcid) { - throw SQL.BulkLoadLcidMismatch(sourceLcid, _SqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column); + throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column); } } } @@ -845,7 +774,7 @@ private string TryGetOrderHintText(HashSet destColumnNames) } } - orderHintText.Length = orderHintText.Length - 2; + orderHintText.Length -= 2; orderHintText.Append(")"); return orderHintText.ToString(); } @@ -853,7 +782,7 @@ private string TryGetOrderHintText(HashSet destColumnNames) private Task SubmitUpdateBulkCommand(string TDSCommand) { SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlBulkCopy.SubmitUpdateBulkCommand | Info | Correlation | Object Id {0}, Activity Id {1}", ObjectID, ActivityCorrelator.Current); - Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); + Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); if (executeTask == null) { @@ -881,7 +810,7 @@ private Task SubmitUpdateBulkCommand(string TDSCommand) // Starts writing the Bulkcopy data stream private void WriteMetaData(BulkCopySimpleResultSet internalResults) { - _stateObj.SetTimeoutSeconds(this.BulkCopyTimeout); + _stateObj.SetTimeoutSeconds(BulkCopyTimeout); _SqlMetaDataSet metadataCollection = internalResults[MetaDataResultId].MetaData; _stateObj._outputMessageType = TdsEnums.MT_BULK; @@ -956,7 +885,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b // Handle data feeds (common for both DbDataReader and SqlDataReader) if (_currentRowMetadata[destRowIndex].IsDataFeed) { - if (_DbDataReaderRowSource.IsDBNull(sourceOrdinal)) + if (_dbDataReaderRowSource.IsDBNull(sourceOrdinal)) { isSqlType = false; isDataFeed = false; @@ -971,25 +900,25 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b switch (_currentRowMetadata[destRowIndex].Method) { case ValueMethod.DataFeedStream: - return new StreamDataFeed(_DbDataReaderRowSource.GetStream(sourceOrdinal)); + return new StreamDataFeed(_dbDataReaderRowSource.GetStream(sourceOrdinal)); case ValueMethod.DataFeedText: - return new TextDataFeed(_DbDataReaderRowSource.GetTextReader(sourceOrdinal)); + return new TextDataFeed(_dbDataReaderRowSource.GetTextReader(sourceOrdinal)); case ValueMethod.DataFeedXml: // Only SqlDataReader supports an XmlReader // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field - Debug.Assert(_SqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader"); - return new XmlDataFeed(_SqlDataReaderRowSource.GetXmlReader(sourceOrdinal)); + Debug.Assert(_sqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader"); + return new XmlDataFeed(_sqlDataReaderRowSource.GetXmlReader(sourceOrdinal)); default: Debug.Fail($"Current column is marked as being a DataFeed, but no DataFeed compatible method was provided. Method: {_currentRowMetadata[destRowIndex].Method}"); isDataFeed = false; - object columnValue = _DbDataReaderRowSource.GetValue(sourceOrdinal); + object columnValue = _dbDataReaderRowSource.GetValue(sourceOrdinal); ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType); return columnValue; } } } // SqlDataReader-specific logic - else if (null != _SqlDataReaderRowSource) + else if (null != _sqlDataReaderRowSource) { if (_currentRowMetadata[destRowIndex].IsSqlType) { @@ -999,19 +928,19 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b switch (_currentRowMetadata[destRowIndex].Method) { case ValueMethod.SqlTypeSqlDecimal: - value = _SqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal); + value = _sqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal); break; case ValueMethod.SqlTypeSqlDouble: // use cast to handle IsNull correctly because no public constructor allows it - value = (SqlDecimal)_SqlDataReaderRowSource.GetSqlDouble(sourceOrdinal); + value = (SqlDecimal)_sqlDataReaderRowSource.GetSqlDouble(sourceOrdinal); break; case ValueMethod.SqlTypeSqlSingle: // use cast to handle IsNull correctly because no public constructor allows it - value = (SqlDecimal)_SqlDataReaderRowSource.GetSqlSingle(sourceOrdinal); + value = (SqlDecimal)_sqlDataReaderRowSource.GetSqlSingle(sourceOrdinal); break; default: Debug.Fail($"Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {_currentRowMetadata[destRowIndex].Method}"); - value = (INullable)_SqlDataReaderRowSource.GetSqlValue(sourceOrdinal); + value = (INullable)_sqlDataReaderRowSource.GetSqlValue(sourceOrdinal); break; } @@ -1023,7 +952,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b isSqlType = false; isDataFeed = false; - object value = _SqlDataReaderRowSource.GetValue(sourceOrdinal); + object value = _sqlDataReaderRowSource.GetValue(sourceOrdinal); isNull = ((value == null) || (value == DBNull.Value)); if ((!isNull) && (metadata.type == SqlDbType.Udt)) { @@ -1046,7 +975,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b IDataReader rowSourceAsIDataReader = (IDataReader)_rowSource; // Only use IsDbNull when streaming is enabled and only for non-SqlDataReader - if ((_enableStreaming) && (_SqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) + if ((_enableStreaming) && (_sqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) { isSqlType = false; isNull = true; @@ -1059,6 +988,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b return columnValue; } } + case ValueSourceType.DataTable: case ValueSourceType.RowArray: { @@ -1145,10 +1075,10 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b // "more" -- should be used by the caller only when the return value is null. private Task ReadFromRowSourceAsync(CancellationToken cts) { - if (_isAsyncBulkCopy && _DbDataReaderRowSource != null) + if (_isAsyncBulkCopy && _dbDataReaderRowSource != null) { // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truly async read; for non-SqlDataReader it may block.) - return _DbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => + return _dbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => { if (t.Status == TaskStatus.RanToCompletion) { @@ -1230,7 +1160,8 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) ValueMethod method; bool isSqlType; bool isDataFeed; - if (((_SqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) + + if (((_sqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) { isDataFeed = false; @@ -1239,7 +1170,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) { case ValueSourceType.DbDataReader: case ValueSourceType.IDataReader: - t = _SqlDataReaderRowSource.GetFieldType(sourceOrdinal); + t = _sqlDataReaderRowSource.GetFieldType(sourceOrdinal); break; case ValueSourceType.DataTable: case ValueSourceType.RowArray: @@ -1277,13 +1208,13 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) { isSqlType = false; - if (_SqlDataReaderRowSource != null) + if (_sqlDataReaderRowSource != null) { // MetaData property is not set for SMI, but since streaming is disabled we do not need it - MetaType mtSource = _SqlDataReaderRowSource.MetaData[sourceOrdinal].metaType; + MetaType mtSource = _sqlDataReaderRowSource.MetaData[sourceOrdinal].metaType; // There is no memory gain for non-sequential access for binary - if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _SqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) + if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _sqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) { isDataFeed = true; method = ValueMethod.DataFeedStream; @@ -1305,7 +1236,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) method = ValueMethod.GetValue; } } - else if (_DbDataReaderRowSource != null) + else if (_dbDataReaderRowSource != null) { if (metadata.type == SqlDbType.VarBinary) { @@ -1701,8 +1632,8 @@ public void WriteToServer(DbDataReader reader) { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _DbDataReaderRowSource = reader; - _SqlDataReaderRowSource = reader as SqlDataReader; + _dbDataReaderRowSource = reader; + _sqlDataReaderRowSource = reader as SqlDataReader; _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; @@ -1734,8 +1665,8 @@ public void WriteToServer(IDataReader reader) { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = _rowSource as SqlDataReader; - _DbDataReaderRowSource = _rowSource as DbDataReader; + _sqlDataReaderRowSource = _rowSource as SqlDataReader; + _dbDataReaderRowSource = _rowSource as DbDataReader; _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = false; @@ -1770,7 +1701,7 @@ public void WriteToServer(DataTable table, DataRowState rowState) _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted; _rowSource = table; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.DataTable; _rowEnumerator = table.Rows.GetEnumerator(); _isAsyncBulkCopy = false; @@ -1812,7 +1743,7 @@ public void WriteToServer(DataRow[] rows) _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows _rowSource = rows; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.RowArray; _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = false; @@ -1860,7 +1791,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows _rowSource = rows; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.RowArray; _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = true; @@ -1895,8 +1826,8 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = reader as SqlDataReader; - _DbDataReaderRowSource = reader; + _sqlDataReaderRowSource = reader as SqlDataReader; + _dbDataReaderRowSource = reader; _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; _isAsyncBulkCopy = true; @@ -1932,8 +1863,8 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = _rowSource as SqlDataReader; - _DbDataReaderRowSource = _rowSource as DbDataReader; + _sqlDataReaderRowSource = _rowSource as SqlDataReader; + _dbDataReaderRowSource = _rowSource as DbDataReader; _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = true; @@ -1976,7 +1907,7 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat statistics = SqlStatistics.StartTimer(Statistics); _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted; _rowSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _dataTableSource = table; _rowSourceType = ValueSourceType.DataTable; _rowEnumerator = table.Rows.GetEnumerator(); @@ -2159,14 +2090,13 @@ private void WriteRowSourceToServerCommon(int columnCount) } catch (IndexOutOfRangeException e) { - throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e)); + throw SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e); } break; } - if (index == -1) { - throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName)); + throw SQL.BulkLoadNonMatchingColumnName(unquotedColumnName); } bulkCopyColumn._internalSourceColumnOrdinal = index; } @@ -2184,15 +2114,6 @@ internal void OnConnectionClosed() } } - private void OnRowsCopied(SqlRowsCopiedEventArgs value) - { - SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler; - if (handler != null) - { - handler(this, value); - } - } - private bool FireRowsCopiedEvent(long rowsCopied) { // Release lock to prevent possible deadlocks @@ -2204,7 +2125,7 @@ private bool FireRowsCopiedEvent(long rowsCopied) try { _insideRowsCopiedEvent = true; - this.OnRowsCopied(eventArgs); + SqlRowsCopied?.Invoke(this, eventArgs); } finally { @@ -2252,9 +2173,9 @@ private Task ReadWriteColumnValueAsync(int col) // Target type shouldn't be encrypted Debug.Assert(!metadata.isEncrypted, "Can't encrypt SQL Variant type"); SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty; - if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) + if ((_sqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) { - variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal); + variantInternalType = _sqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal); } if (variantInternalType == SqlBuffer.StorageType.DateTime2) @@ -2333,17 +2254,18 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource source = nul // This is in its own method to avoid always allocating the lambda in CopyColumnsAsync private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource source, Task task, int i) { - AsyncHelper.ContinueTask(task, source, () => - { - if (i + 1 < _sortedColumnMappings.Count) + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - CopyColumnsAsync(i + 1, source); //continue from the next column - } - else - { - source.SetResult(null); + if (i + 1 < _sortedColumnMappings.Count) + { + CopyColumnsAsync(i + 1, source); //continue from the next column + } + else + { + source.SetResult(null); + } } - } ); } @@ -2479,7 +2401,9 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, } resultTask = source.Task; - AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source)); + AsyncHelper.ContinueTask(readTask, source, + onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source) + ); return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled. } } @@ -2488,22 +2412,25 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, source = source ?? new TaskCompletionSource(); resultTask = source.Task; - AsyncHelper.ContinueTask(task, source, onSuccess: () => - { - CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment. - - Task readTask = ReadFromRowSourceAsync(cts); - if (readTask == null) - { - CopyRowsAsync(i + 1, totalRows, cts, source); - } - else + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source)); - } - } + CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment. + + Task readTask = ReadFromRowSourceAsync(cts); + if (readTask == null) + { + CopyRowsAsync(i + 1, totalRows, cts, source); + } + else + { + AsyncHelper.ContinueTask(readTask, source, + onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source) + ); + } + } ); - return resultTask; + return resultTask; } } @@ -2572,7 +2499,7 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up } AsyncHelper.ContinueTask(commandTask, source, - () => + onSuccess: () => { Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source); if (continuedTask == null) @@ -2645,7 +2572,7 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); } }, - onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false), + onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), onCancellation: () => CopyBatchesAsyncContinuedOnError(cleanupParser: true) ); @@ -2711,7 +2638,7 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal // Always call back into CopyBatchesAsync CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); }, - onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false) + onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false) ); return source.Task; } @@ -2832,7 +2759,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int source = new TaskCompletionSource(); } AsyncHelper.ContinueTask(task, source, - () => + onSuccess: () => { // Bulk copy task is completed at this moment. if (task.IsCanceled) @@ -2993,7 +2920,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio { try { - AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }); + AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }); } catch (SqlException ex) { @@ -3034,7 +2961,9 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio if (internalResultsTask != null) { - AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source)); + AsyncHelper.ContinueTask(internalResultsTask, source, + onSuccess: () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source) + ); } else { @@ -3106,7 +3035,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) { Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode"); AsyncHelper.ContinueTask(readTask, source, - () => + onSuccess: () => { if (!_hasMoreRowToCopy) { diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index f95a76348d..03ad16c5bf 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -16,20 +16,14 @@ using System.Xml; using Microsoft.Data.Common; -// todo list: -// * An ID column need to be ignored - even if there is an association -// * Spec: ID columns will be ignored - even if there is an association -// * Spec: How do we publish CommandTimeout on the bcpoperation? -// - namespace Microsoft.Data.SqlClient { // This internal class helps us to associate the metadata from the target. // with ColumnOrdinals from the source. internal sealed class _ColumnMapping { - internal int _sourceColumnOrdinal; - internal _SqlMetaData _metadata; + internal readonly int _sourceColumnOrdinal; + internal readonly _SqlMetaData _metadata; internal _ColumnMapping(int columnId, _SqlMetaData metadata) { @@ -38,134 +32,83 @@ internal _ColumnMapping(int columnId, _SqlMetaData metadata) } } - sealed internal class Row + internal sealed class Row { - private object[] _dataFields; + private readonly object[] _dataFields; internal Row(int rowCount) { _dataFields = new object[rowCount]; } - internal object[] DataFields - { - get - { - return _dataFields; - } - } + internal object[] DataFields => _dataFields; - internal object this[int index] - { - get - { - return _dataFields[index]; - } - } + internal object this[int index] => _dataFields[index]; } - // the controlling class for one result (metadata + rows) - // - sealed internal class Result + // The controlling class for one result (metadata + rows) + internal sealed class Result { - private _SqlMetaDataSet _metadata; - private List _rowset; + private readonly _SqlMetaDataSet _metadata; + private readonly List _rowset; internal Result(_SqlMetaDataSet metadata) { - this._metadata = metadata; - this._rowset = new List(); + _metadata = metadata; + _rowset = new List(); } - internal int Count - { - get - { - return _rowset.Count; - } - } + internal int Count => _rowset.Count; - internal _SqlMetaDataSet MetaData - { - get - { - return _metadata; - } - } + internal _SqlMetaDataSet MetaData => _metadata; - internal Row this[int index] - { - get - { - return (Row)_rowset[index]; - } - } + internal Row this[int index] => _rowset[index]; - internal void AddRow(Row row) - { - _rowset.Add(row); - } + internal void AddRow(Row row) => _rowset.Add(row); } // A wrapper object for metadata and rowsets returned by our initial queries - // - sealed internal class BulkCopySimpleResultSet + internal sealed class BulkCopySimpleResultSet { - private List _results; // the list of results - private Result resultSet; // the current result - private int[] indexmap; // associates columnids with indexes in the rowarray + private readonly List _results; // The list of results + private Result _resultSet; // The current result + private int[] _indexmap; // Associates columnids with indexes in the rowarray - // c-tor - // internal BulkCopySimpleResultSet() { _results = new List(); } - // indexer - // - internal Result this[int idx] - { - get - { - return (Result)_results[idx]; - } - } - // callback function for the tdsparser - // note that setting the metadata adds a resultset - // + internal Result this[int idx] => _results[idx]; + + // Callback function for the tdsparser + // (note that setting the metadata adds a resultset) internal void SetMetaData(_SqlMetaDataSet metadata) { - resultSet = new Result(metadata); - _results.Add(resultSet); + _resultSet = new Result(metadata); + _results.Add(_resultSet); - indexmap = new int[resultSet.MetaData.Length]; - for (int i = 0; i < indexmap.Length; i++) + _indexmap = new int[_resultSet.MetaData.Length]; + for (int i = 0; i < _indexmap.Length; i++) { - indexmap[i] = i; + _indexmap[i] = i; } } - // callback function for the tdsparser - // this will create an indexmap for the active resultset - // - internal int[] CreateIndexMap() - { - return indexmap; - } + // Callback function for the tdsparser. + // This will create an indexmap for the active resultset. + internal int[] CreateIndexMap() => _indexmap; - // callback function for the tdsparser - // this will return an array of rows to store the rowdata - // + // Callback function for the tdsparser. + // This will return an array of rows to store the rowdata. internal object[] CreateRowBuffer() { - Row row = new Row(resultSet.MetaData.Length); - resultSet.AddRow(row); + Row row = new Row(_resultSet.MetaData.Length); + _resultSet.AddRow(row); return row.DataFields; } } - // ------------------------------------------------------------------------------------------------- /// public sealed class SqlBulkCopy : IDisposable { @@ -199,7 +142,7 @@ private enum ValueMethod : byte } // Used to hold column metadata for SqlDataReader case - private struct SourceColumnMetadata + private readonly struct SourceColumnMetadata { public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) { @@ -234,10 +177,13 @@ public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) private const int DefaultCommandTimeout = 30; + /// + public event SqlRowsCopiedEventHandler SqlRowsCopied; + private bool _enableStreaming = false; private int _batchSize; - private bool _ownConnection; - private SqlBulkCopyOptions _copyOptions; + private readonly bool _ownConnection; + private readonly SqlBulkCopyOptions _copyOptions; private int _timeout = DefaultCommandTimeout; private string _destinationTableName; private int _rowsCopied; @@ -246,9 +192,9 @@ public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) private bool _insideRowsCopiedEvent; private object _rowSource; - private SqlDataReader _SqlDataReaderRowSource; + private SqlDataReader _sqlDataReaderRowSource; private bool _rowSourceIsSqlDataReaderSmi; - private DbDataReader _DbDataReaderRowSource; + private DbDataReader _dbDataReaderRowSource; private DataTable _dataTableSource; private SqlBulkCopyColumnMappingCollection _columnMappings; @@ -292,13 +238,11 @@ private int RowNumber private TdsParserStateObject _stateObj; private List<_ColumnMapping> _sortedColumnMappings; - private SqlRowsCopiedEventHandler _rowsCopiedEventHandler; - private static int _objectTypeCount; // EventSource Counter - internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount); + internal readonly int _objectID = Interlocked.Increment(ref _objectTypeCount); - //newly added member variables for Async modification, m = member variable to bcp - private int _savedBatchSize = 0; //save the batchsize so that changes are not affected unexpectedly + // Newly added member variables for Async modification, m = member variable to bcp. + private int _savedBatchSize = 0; // Save the batchsize so that changes are not affected unexpectedly. private bool _hasMoreRowToCopy = false; private bool _isAsyncBulkCopy = false; private bool _isBulkCopyingInProgress = false; @@ -306,30 +250,21 @@ private int RowNumber private SourceColumnMetadata[] _currentRowMetadata; - // for debug purpose only. - // TODO: I will make this internal to use Reflection. #if DEBUG - internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task + internal static bool s_setAlwaysTaskOnWrite; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task internal static bool SetAlwaysTaskOnWrite { - set - { - _setAlwaysTaskOnWrite = value; - } - get - { - return _setAlwaysTaskOnWrite; - } + set => s_setAlwaysTaskOnWrite = value; + get => s_setAlwaysTaskOnWrite; } #endif - // ctor - // + /// public SqlBulkCopy(SqlConnection connection) { if (connection == null) { - throw ADP.ArgumentNull("connection"); + throw ADP.ArgumentNull(nameof(connection)); } _connection = connection; _columnMappings = new SqlBulkCopyColumnMappingCollection(); @@ -340,7 +275,6 @@ public SqlBulkCopy(SqlConnection connection) public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction) : this(connection) { - _copyOptions = copyOptions; if (externalTransaction != null && IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) { @@ -354,11 +288,11 @@ public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, Sql } /// - public SqlBulkCopy(string connectionString) : this(new SqlConnection(connectionString)) + public SqlBulkCopy(string connectionString) { if (connectionString == null) { - throw ADP.ArgumentNull("connectionString"); + throw ADP.ArgumentNull(nameof(connectionString)); } _connection = new SqlConnection(connectionString); _columnMappings = new SqlBulkCopyColumnMappingCollection(); @@ -376,10 +310,7 @@ public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions) /// public int BatchSize { - get - { - return _batchSize; - } + get => _batchSize; set { if (value >= 0) @@ -388,7 +319,7 @@ public int BatchSize } else { - throw ADP.ArgumentOutOfRange("BatchSize"); + throw ADP.ArgumentOutOfRange(nameof(BatchSize)); } } } @@ -396,10 +327,7 @@ public int BatchSize /// public int BulkCopyTimeout { - get - { - return _timeout; - } + get => _timeout; set { if (value < 0) @@ -413,24 +341,12 @@ public int BulkCopyTimeout /// public bool EnableStreaming { - get - { - return _enableStreaming; - } - set - { - _enableStreaming = value; - } + get => _enableStreaming; + set => _enableStreaming = value; } /// - public SqlBulkCopyColumnMappingCollection ColumnMappings - { - get - { - return _columnMappings; - } - } + public SqlBulkCopyColumnMappingCollection ColumnMappings => _columnMappings; /// public SqlBulkCopyColumnOrderHintCollection ColumnOrderHints @@ -441,19 +357,16 @@ public SqlBulkCopyColumnOrderHintCollection ColumnOrderHints /// public string DestinationTableName { - get - { - return _destinationTableName; - } + get => _destinationTableName; set { if (value == null) { - throw ADP.ArgumentNull("DestinationTableName"); + throw ADP.ArgumentNull(nameof(DestinationTableName)); } else if (value.Length == 0) { - throw ADP.ArgumentOutOfRange("DestinationTableName"); + throw ADP.ArgumentOutOfRange(nameof(DestinationTableName)); } _destinationTableName = value; } @@ -462,10 +375,7 @@ public string DestinationTableName /// public int NotifyAfter { - get - { - return _notifyAfter; - } + get => _notifyAfter; set { if (value >= 0) @@ -474,41 +384,15 @@ public int NotifyAfter } else { - throw ADP.ArgumentOutOfRange("NotifyAfter"); + throw ADP.ArgumentOutOfRange(nameof(NotifyAfter)); } } } - internal int ObjectID - { - get - { - return _objectID; - } - } - - /// - public event SqlRowsCopiedEventHandler SqlRowsCopied - { - add - { - _rowsCopiedEventHandler += value; - } - remove - { - _rowsCopiedEventHandler -= value; - } - - } + internal int ObjectID => _objectID; /// - public int RowsCopied - { - get - { - return _rowsCopied; - } - } + public int RowsCopied => _rowsCopied; internal SqlStatistics Statistics { @@ -525,38 +409,29 @@ internal SqlStatistics Statistics } } - //================================================================ - // IDisposable - //================================================================ - /// void IDisposable.Dispose() { - this.Dispose(true); + Dispose(true); GC.SuppressFinalize(this); - } - private bool IsCopyOption(SqlBulkCopyOptions copyOption) - { - return (_copyOptions & copyOption) == copyOption; - } + private bool IsCopyOption(SqlBulkCopyOptions copyOption) => ((_copyOptions & copyOption) == copyOption); //Creates the initial query string, but does not execute it. - // private string CreateInitialQuery() { string[] parts; try { - parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); + parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); } catch (Exception e) { - throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, e); + throw SQL.BulkLoadInvalidDestinationTable(DestinationTableName, e); } if (ADP.IsEmpty(parts[MultipartIdentifier.TableIndex])) { - throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, null); + throw SQL.BulkLoadInvalidDestinationTable(DestinationTableName, null); } string TDSCommand; @@ -634,7 +509,7 @@ private Task CreateAndExecuteInitialQueryAsync(out Bulk string TDSCommand = CreateInitialQuery(); SqlClientEventSource.Log.TryTraceEvent(" Initial Query: '{0}'", TDSCommand); SqlClientEventSource.Log.TryCorrelationTraceEvent(" ObjectID {0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current); - Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); + Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); if (executeTask == null) { @@ -663,11 +538,12 @@ private Task CreateAndExecuteInitialQueryAsync(out Bulk } } - // Matches associated columns with metadata from initial query - // builds and executes the update bulk command - // + // Matches associated columns with metadata from initial query. + // Builds and executes the update bulk command. private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet internalResults) { + Debug.Assert(internalResults != null, "Where are the results from the initial query?"); + StringBuilder updateBulkCommandText = new StringBuilder(); if (_connection.IsShiloh && 0 == internalResults[CollationResultId].Count) @@ -675,13 +551,11 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i throw SQL.BulkLoadNoCollation(); } - Debug.Assert((internalResults != null), "Where are the results from the initial query?"); - - string[] parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); + string[] parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true); updateBulkCommandText.AppendFormat("insert bulk {0} (", ADP.BuildMultiPartName(parts)); - int nmatched = 0; // number of columns that match and are accepted - int nrejected = 0; // number of columns that match but were rejected - bool rejectColumn; // true if a column is rejected because of an excluded type + int nmatched = 0; // Number of columns that match and are accepted + int nrejected = 0; // Number of columns that match but were rejected + bool rejectColumn; // True if a column is rejected because of an excluded type bool isInTransaction; @@ -702,8 +576,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i HashSet destColumnNames = new HashSet(); - // loop over the metadata for each column - // + // Loop over the metadata for each column _SqlMetaDataSet metaDataSet = internalResults[MetaDataResultId].MetaData; _sortedColumnMappings = new List<_ColumnMapping>(metaDataSet.Length); for (int i = 0; i < metaDataSet.Length; i++) @@ -712,17 +585,16 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i rejectColumn = false; // Check for excluded types - // if ((metadata.type == SqlDbType.Timestamp) || ((metadata.IsIdentity) && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity))) { - // remove metadata for excluded columns + // Remove metadata for excluded columns metaDataSet[i] = null; rejectColumn = true; - // we still need to find a matching column association + // We still need to find a matching column association } - // find out if this column is associated + // Find out if this column is associated int assocId; for (assocId = 0; assocId < _localColumnMappings.Count; assocId++) { @@ -731,7 +603,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { if (rejectColumn) { - nrejected++; // count matched columns only + nrejected++; // Count matched columns only break; } @@ -741,18 +613,16 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i if (nmatched > 1) { - updateBulkCommandText.Append(", "); // a leading comma for all but the first one + updateBulkCommandText.Append(", "); // A leading comma for all but the first one } - // some datatypes need special handling ... - // + // Some datatypes need special handling ... if (metadata.type == SqlDbType.Variant) { AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant"); } else if (metadata.type == SqlDbType.Udt) { - // UDTs are sent as varbinary AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary"); } else @@ -764,8 +634,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { case TdsEnums.SQLNUMERICN: case TdsEnums.SQLDECIMALN: - // decimal and numeric need to include precision and scale - // + // Decimal and numeric need to include precision and scale updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale); break; case TdsEnums.SQLUDT: @@ -785,13 +654,11 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i case TdsEnums.SQLDATETIME2: case TdsEnums.SQLDATETIMEOFFSET: // date, dateime2, and datetimeoffset need to include scale - // updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale); break; default: { - // for non-long non-fixed types we need to add the Size - // + // For non-long non-fixed types we need to add the Size if (!metadata.metaType.IsFixed && !metadata.metaType.IsLong) { int size = metadata.length; @@ -850,32 +717,31 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i { updateBulkCommandText.Append(" COLLATE " + collation_name.Value); // VSTFDEVDIV 461426: compare collations only if the collation value was set on the metadata - if (null != _SqlDataReaderRowSource && metadata.collation != null) + if (null != _sqlDataReaderRowSource && metadata.collation != null) { // On SqlDataReader we can verify the sourcecolumn collation! int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal; int destinationLcid = metadata.collation.LCID; - int sourceLcid = _SqlDataReaderRowSource.GetLocaleId(sourceColumnId); + int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId); if (sourceLcid != destinationLcid) { - throw SQL.BulkLoadLcidMismatch(sourceLcid, _SqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column); + throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column); } } } } } - break; - } // end if found - } // end of (inner) for loop + } + } if (assocId == _localColumnMappings.Count) { - // remove metadata for unmatched columns + // Remove metadata for unmatched columns metaDataSet[i] = null; } - } // end of (outer) for loop + } - // all columnmappings should have matched up + // All columnmappings should have matched up if (nmatched + nrejected != _localColumnMappings.Count) { throw (SQL.BulkLoadNonMatchingColumnMapping()); @@ -891,7 +757,7 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i | SqlBulkCopyOptions.AllowEncryptedValueModifications)) != SqlBulkCopyOptions.Default) || ColumnOrderHints.Count > 0) { - bool addSeparator = false; // insert a comma character if multiple options in list ... + bool addSeparator = false; // Insert a comma character if multiple options in list updateBulkCommandText.Append(" with ("); if (IsCopyOption(SqlBulkCopyOptions.KeepNulls)) { @@ -947,17 +813,15 @@ private string TryGetOrderHintText(HashSet destColumnNames) } } - orderHintText.Length = orderHintText.Length - 2; + orderHintText.Length -= 2; orderHintText.Append(")"); return orderHintText.ToString(); } - // submitts the updatebulk command - // private Task SubmitUpdateBulkCommand(string TDSCommand) { SqlClientEventSource.Log.TryCorrelationTraceEvent(" ObjectID{0}, ActivityID {1}", ObjectID, ActivityCorrelator.Current); - Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); + Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true); if (executeTask == null) { @@ -983,22 +847,17 @@ private Task SubmitUpdateBulkCommand(string TDSCommand) } // Starts writing the Bulkcopy data stream - // private void WriteMetaData(BulkCopySimpleResultSet internalResults) { - _stateObj.SetTimeoutSeconds(this.BulkCopyTimeout); + _stateObj.SetTimeoutSeconds(BulkCopyTimeout); _SqlMetaDataSet metadataCollection = internalResults[MetaDataResultId].MetaData; _stateObj._outputMessageType = TdsEnums.MT_BULK; _parser.WriteBulkCopyMetaData(metadataCollection, _sortedColumnMappings.Count, _stateObj); } - //================================================================ - // Close() - // // Terminates the bulk copy operation. // Must be called at the end of the bulk copy session. - //================================================================ /// public void Close() { @@ -1014,7 +873,7 @@ private void Dispose(bool disposing) { if (disposing) { - // dispose dependend objects + // Dispose dependent objects _columnMappings = null; _parser = null; try @@ -1032,7 +891,6 @@ private void Dispose(bool disposing) } catch (Exception e) { - // UNDONE - should not be catching all exceptions!!! if (!ADP.IsCatchableExceptionType(e)) { throw; @@ -1052,11 +910,9 @@ private void Dispose(bool disposing) } } } - // free unmanaged objects } - // unified method to read a value from the current row - // + // Unified method to read a value from the current row private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out bool isDataFeed, out bool isNull) { _SqlMetaData metadata = _sortedColumnMappings[destRowIndex]._metadata; @@ -1069,7 +925,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b // Handle data feeds (common for both DbDataReader and SqlDataReader) if (_currentRowMetadata[destRowIndex].IsDataFeed) { - if (_DbDataReaderRowSource.IsDBNull(sourceOrdinal)) + if (_dbDataReaderRowSource.IsDBNull(sourceOrdinal)) { isSqlType = false; isDataFeed = false; @@ -1084,25 +940,25 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b switch (_currentRowMetadata[destRowIndex].Method) { case ValueMethod.DataFeedStream: - return new StreamDataFeed(_DbDataReaderRowSource.GetStream(sourceOrdinal)); + return new StreamDataFeed(_dbDataReaderRowSource.GetStream(sourceOrdinal)); case ValueMethod.DataFeedText: - return new TextDataFeed(_DbDataReaderRowSource.GetTextReader(sourceOrdinal)); + return new TextDataFeed(_dbDataReaderRowSource.GetTextReader(sourceOrdinal)); case ValueMethod.DataFeedXml: // Only SqlDataReader supports an XmlReader // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field - Debug.Assert(_SqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader"); - return new XmlDataFeed(_SqlDataReaderRowSource.GetXmlReader(sourceOrdinal)); + Debug.Assert(_sqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader"); + return new XmlDataFeed(_sqlDataReaderRowSource.GetXmlReader(sourceOrdinal)); default: - Debug.Assert(false, string.Format("Current column is marked as being a DataFeed, but no DataFeed compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method)); + Debug.Fail($"Current column is marked as being a DataFeed, but no DataFeed compatible method was provided. Method: {_currentRowMetadata[destRowIndex].Method}"); isDataFeed = false; - object columnValue = _DbDataReaderRowSource.GetValue(sourceOrdinal); + object columnValue = _dbDataReaderRowSource.GetValue(sourceOrdinal); ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType); return columnValue; } } } // SqlDataReader-specific logic - else if (null != _SqlDataReaderRowSource) + else if (null != _sqlDataReaderRowSource) { if (_currentRowMetadata[destRowIndex].IsSqlType) { @@ -1112,19 +968,19 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b switch (_currentRowMetadata[destRowIndex].Method) { case ValueMethod.SqlTypeSqlDecimal: - value = _SqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal); + value = _sqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal); break; case ValueMethod.SqlTypeSqlDouble: // use cast to handle IsNull correctly because no public constructor allows it - value = (SqlDecimal)_SqlDataReaderRowSource.GetSqlDouble(sourceOrdinal); + value = (SqlDecimal)_sqlDataReaderRowSource.GetSqlDouble(sourceOrdinal); break; case ValueMethod.SqlTypeSqlSingle: // use cast to handle IsNull correctly because no public constructor allows it - value = (SqlDecimal)_SqlDataReaderRowSource.GetSqlSingle(sourceOrdinal); + value = (SqlDecimal)_sqlDataReaderRowSource.GetSqlSingle(sourceOrdinal); break; default: - Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method)); - value = (INullable)_SqlDataReaderRowSource.GetSqlValue(sourceOrdinal); + Debug.Fail($"Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {_currentRowMetadata[destRowIndex].Method}"); + value = (INullable)_sqlDataReaderRowSource.GetSqlValue(sourceOrdinal); break; } @@ -1136,7 +992,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b isSqlType = false; isDataFeed = false; - object value = _SqlDataReaderRowSource.GetValue(sourceOrdinal); + object value = _sqlDataReaderRowSource.GetValue(sourceOrdinal); isNull = ((value == null) || (value == DBNull.Value)); if ((!isNull) && (metadata.type == SqlDbType.Udt)) { @@ -1158,8 +1014,8 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b IDataReader rowSourceAsIDataReader = (IDataReader)_rowSource; - // Back-compat with 4.0 and 4.5 - only use IsDbNull when streaming is enabled and only for non-SqlDataReader - if ((_enableStreaming) && (_SqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) + // Only use IsDbNull when streaming is enabled and only for non-SqlDataReader + if ((_enableStreaming) && (_sqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) { isSqlType = false; isNull = true; @@ -1231,12 +1087,12 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b else { isSqlType = true; - return new SqlDecimal((Decimal)currentRowValue); + return new SqlDecimal((decimal)currentRowValue); } } default: { - Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method)); + Debug.Fail($"Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {_currentRowMetadata[destRowIndex].Method}"); break; } } @@ -1247,22 +1103,22 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b } default: { - Debug.Assert(false, "ValueSourcType unspecified"); + Debug.Fail("ValueSourcType unspecified"); throw ADP.NotSupported(); } } } - // unified method to read a row from the current rowsource + // Unified method to read a row from the current rowsource. // When _isAsyncBulkCopy == true (i.e. async copy): returns Task when IDataReader is a DbDataReader, Null for others. // When _isAsyncBulkCopy == false (i.e. sync copy): returns null. Uses ReadFromRowSource to get the boolean value. // "more" -- should be used by the caller only when the return value is null. private Task ReadFromRowSourceAsync(CancellationToken cts) { - if (_isAsyncBulkCopy && (_DbDataReaderRowSource != null)) + if (_isAsyncBulkCopy && _dbDataReaderRowSource != null) { - // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.) - return _DbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => + // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truly async read; for non-SqlDataReader it may block.) + return _dbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => { if (t.Status == TaskStatus.RanToCompletion) { @@ -1281,15 +1137,13 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) _hasMoreRowToCopy = false; try { - _hasMoreRowToCopy = ReadFromRowSource(); //Synchronous calls for DataRows and DataTable won't block. For IDataReader, it may block. + _hasMoreRowToCopy = ReadFromRowSource(); // Synchronous calls for DataRows and DataTable won't block. For IDataReader, it may block. } catch (Exception ex) { if (_isAsyncBulkCopy) { - TaskCompletionSource tcs = new TaskCompletionSource(); - tcs.SetException(ex); - return tcs.Task; + return Task.FromException(ex); } else { @@ -1312,13 +1166,13 @@ private bool ReadFromRowSource() case ValueSourceType.IDataReader: return ((IDataReader)_rowSource).Read(); - // treatment for RowArray case is same as for DataTable, prevent code duplicate + // Treatment for RowArray case is same as for DataTable, prevent code duplicate case ValueSourceType.RowArray: case ValueSourceType.DataTable: Debug.Assert(_rowEnumerator != null, "uninitialized _rowEnumerator"); Debug.Assert((_rowStateToSkip & DataRowState.Deleted) != 0, "Deleted is a permitted rowstate?"); - // repeat until we get a row that is not deleted or there are no more rows ... + // Repeat until we get a row that is not deleted or there are no more rows do { if (!_rowEnumerator.MoveNext()) @@ -1326,15 +1180,13 @@ private bool ReadFromRowSource() return false; } _currentRow = (DataRow)_rowEnumerator.Current; - } while ((_currentRow.RowState & _rowStateToSkip) != 0); // repeat if there is an unexpected rowstate + } while ((_currentRow.RowState & _rowStateToSkip) != 0); // Repeat if there is an unexpected rowstate - // SQLBUVSTS01:36286 - move this line out of loop because - // ItemArray raises exception when used on deleted row _currentRowLength = _currentRow.ItemArray.Length; return true; default: - Debug.Assert(false, "ValueSourcType unspecified"); + Debug.Fail("ValueSourceType unspecified"); throw ADP.NotSupported(); } } @@ -1349,7 +1201,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) bool isSqlType; bool isDataFeed; - if (((_SqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) + if (((_sqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) { isDataFeed = false; @@ -1358,7 +1210,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) { case ValueSourceType.DbDataReader: case ValueSourceType.IDataReader: - t = _SqlDataReaderRowSource.GetFieldType(sourceOrdinal); + t = _sqlDataReaderRowSource.GetFieldType(sourceOrdinal); break; case ValueSourceType.DataTable: case ValueSourceType.RowArray: @@ -1366,11 +1218,11 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) break; default: t = null; - Debug.Assert(false, string.Format("Unknown value source: {0}", _rowSourceType)); + Debug.Fail($"Unknown value source: {_rowSourceType}"); break; } - if (typeof(SqlDecimal) == t || typeof(Decimal) == t) + if (typeof(SqlDecimal) == t || typeof(decimal) == t) { isSqlType = true; method = ValueMethod.SqlTypeSqlDecimal; // Source Type Decimal @@ -1396,13 +1248,13 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) { isSqlType = false; - if (_SqlDataReaderRowSource != null) + if (_sqlDataReaderRowSource != null) { // MetaData property is not set for SMI, but since streaming is disabled we do not need it - MetaType mtSource = _SqlDataReaderRowSource.MetaData[sourceOrdinal].metaType; + MetaType mtSource = _sqlDataReaderRowSource.MetaData[sourceOrdinal].metaType; // There is no memory gain for non-sequential access for binary - if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _SqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) + if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _sqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) { isDataFeed = true; method = ValueMethod.DataFeedStream; @@ -1424,7 +1276,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) method = ValueMethod.GetValue; } } - else if (_DbDataReaderRowSource != null) + else if (_dbDataReaderRowSource != null) { if (metadata.type == SqlDbType.VarBinary) { @@ -1458,8 +1310,6 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) return new SourceColumnMetadata(method, isSqlType, isDataFeed); } - // - // private void CreateOrValidateConnection(string method) { if (null == _connection) @@ -1477,12 +1327,12 @@ private void CreateOrValidateConnection(string method) _connection.Open(); } - // close any non MARS dead readers, if applicable, and then throw if still busy. + // Close any non-MARS dead readers, if applicable, and then throw if still busy. _connection.ValidateConnectionForExecute(method, null); - // if we have a transaction, check to ensure that the active + // If we have a transaction, check to ensure that the active // connection property matches the connection associated with - // the transaction + // the transaction. if (null != _externalTransaction && _connection != _externalTransaction.Connection) { throw ADP.TransactionConnectionMismatch(); @@ -1490,7 +1340,7 @@ private void CreateOrValidateConnection(string method) } // Runs the _parser until it is done and ensures that ThreadHasParserLockForClose is correctly set and unset - // Ensure that you only call this inside of a Reliabilty Section + // Ensure that you only call this inside of a Reliability Section private void RunParser(BulkCopySimpleResultSet bulkCopyHandler = null) { // In case of error while reading, we should let the connection know that we already own the _parserLock @@ -1529,10 +1379,10 @@ private void CommitTransaction() if (null != _internalTransaction) { SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); - internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock + internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock try { - _internalTransaction.Commit(); //commit. + _internalTransaction.Commit(); _internalTransaction.Dispose(); _internalTransaction = null; } @@ -1550,7 +1400,7 @@ private void AbortTransaction() if (!_internalTransaction.IsZombied) { SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); - internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock + internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock try { _internalTransaction.Rollback(); @@ -1565,11 +1415,10 @@ private void AbortTransaction() } } - // Appends columnname in square brackets, a space and the typename to the query - // putting the name in quotes also requires doubling existing ']' so that they are not mistaken for - // the closing quote - // example: abc will become [abc] but abc[] will becom [abc[]]] - // + // Appends columnname in square brackets, a space, and the typename to the query. + // Putting the name in quotes also requires doubling existing ']' so that they are not mistaken for + // the closing quote. + // example: abc will become [abc] but abc[] will become [abc[]]] private void AppendColumnNameAndTypeName(StringBuilder query, string columnName, string typeName) { SqlServerEscapeHelper.EscapeIdentifier(query, columnName); @@ -1584,7 +1433,7 @@ private string UnquotedName(string name) if (name[0] == '[') { int l = name.Length; - Debug.Assert(name[l - 1] == ']', "Name starts with [ but doesn not end with ]"); + Debug.Assert(name[l - 1] == ']', "Name starts with [ but does not end with ]"); name = name.Substring(1, l - 2); } return name; @@ -1592,11 +1441,10 @@ private string UnquotedName(string name) private object ValidateBulkCopyVariant(object value) { - // from the spec: + // From the spec: // "The only acceptable types are ..." // GUID, BIGVARBINARY, BIGBINARY, BIGVARCHAR, BIGCHAR, NVARCHAR, NCHAR, BIT, INT1, INT2, INT4, INT8, // MONEY4, MONEY, DECIMALN, NUMERICN, FTL4, FLT8, DATETIME4 and DATETIME - // MetaType metatype = MetaType.GetMetaTypeFromValue(value); switch (metatype.TDSType) { @@ -1674,16 +1522,10 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false); // Convert Source Decimal Precision and Scale to Destination Precision and Scale - // Fix Bug: 385971 sql decimal data could get corrupted on insert if the scale of - // the source and destination weren't the same. The BCP protocol, specifies the + // Sql decimal data could get corrupted on insert if the scale of + // the source and destination weren't the same. The BCP protocol, specifies the // scale of the incoming data in the insert statement, we just tell the server we - // are inserting the same scale back. This then created a bug inside the BCP operation - // if the scales didn't match. The fix is to do the same thing that SQL Parameter does, - // and adjust the scale before writing. In Orcas is scale adjustment should be removed from - // SqlParameter and SqlBulkCopy and Isolated inside SqlParameter.CoerceValue, but because of - // where we are in the cycle, the changes must be kept at minimum, so I'm just bringing the - // code over to SqlBulkCopy. - + // are inserting the same scale back. SqlDecimal sqlValue; if ((isSqlType) && (!typeChanged)) { @@ -1691,7 +1533,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re } else { - sqlValue = new SqlDecimal((Decimal)value); + sqlValue = new SqlDecimal((decimal)value); } if (sqlValue.Scale != scale) @@ -1718,7 +1560,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re // Perf: It is more efficient to write a SqlDecimal than a decimal since we need to break it into its 'bits' when writing value = sqlValue; isSqlType = true; - typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when converting to a CLR type + typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when converting to a CLR type break; case TdsEnums.SQLINTN: @@ -1799,7 +1641,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re break; default: - Debug.Assert(false, "Unknown TdsType!" + type.NullableType.ToString("x2", (IFormatProvider)null)); + Debug.Fail("Unknown TdsType!" + type.NullableType.ToString("x2", (IFormatProvider)null)); throw SQL.BulkLoadCannotConvertValue(value.GetType(), type, metadata.ordinal, RowNumber, metadata.isEncrypted, metadata.column, value.ToString(), null); } @@ -1828,7 +1670,7 @@ public void WriteToServer(DbDataReader reader) if (reader == null) { - throw new ArgumentNullException("reader"); + throw new ArgumentNullException(nameof(reader)); } if (_isBulkCopyingInProgress) @@ -1841,15 +1683,16 @@ public void WriteToServer(DbDataReader reader) { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _DbDataReaderRowSource = reader; - _SqlDataReaderRowSource = reader as SqlDataReader; + _dbDataReaderRowSource = reader; + _sqlDataReaderRowSource = reader as SqlDataReader; - if (_SqlDataReaderRowSource != null) + if (_sqlDataReaderRowSource != null) { - _rowSourceIsSqlDataReaderSmi = _SqlDataReaderRowSource is SqlDataReaderSmi; + _rowSourceIsSqlDataReaderSmi = _sqlDataReaderRowSource is SqlDataReaderSmi; } _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; + _isAsyncBulkCopy = false; WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; } @@ -1866,7 +1709,7 @@ public void WriteToServer(IDataReader reader) if (reader == null) { - throw new ArgumentNullException("reader"); + throw new ArgumentNullException(nameof(reader)); } if (_isBulkCopyingInProgress) @@ -1879,12 +1722,12 @@ public void WriteToServer(IDataReader reader) { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = _rowSource as SqlDataReader; - if (_SqlDataReaderRowSource != null) + _sqlDataReaderRowSource = _rowSource as SqlDataReader; + if (_sqlDataReaderRowSource != null) { - _rowSourceIsSqlDataReaderSmi = _SqlDataReaderRowSource is SqlDataReaderSmi; + _rowSourceIsSqlDataReaderSmi = _sqlDataReaderRowSource is SqlDataReaderSmi; } - _DbDataReaderRowSource = _rowSource as DbDataReader; + _dbDataReaderRowSource = _rowSource as DbDataReader; _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = false; @@ -1897,10 +1740,7 @@ public void WriteToServer(IDataReader reader) } /// - public void WriteToServer(DataTable table) - { - WriteToServer(table, 0); - } + public void WriteToServer(DataTable table) => WriteToServer(table, 0); /// public void WriteToServer(DataTable table, DataRowState rowState) @@ -1909,7 +1749,7 @@ public void WriteToServer(DataTable table, DataRowState rowState) if (table == null) { - throw new ArgumentNullException("table"); + throw new ArgumentNullException(nameof(table)); } if (_isBulkCopyingInProgress) @@ -1924,7 +1764,7 @@ public void WriteToServer(DataTable table, DataRowState rowState) _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted; _rowSource = table; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.DataTable; _rowEnumerator = table.Rows.GetEnumerator(); _isAsyncBulkCopy = false; @@ -1946,7 +1786,7 @@ public void WriteToServer(DataRow[] rows) if (rows == null) { - throw new ArgumentNullException("rows"); + throw new ArgumentNullException(nameof(rows)); } if (_isBulkCopyingInProgress) @@ -1956,7 +1796,7 @@ public void WriteToServer(DataRow[] rows) if (rows.Length == 0) { - return; // nothing to do. user passed us an empty array + return; // Nothing to do. user passed us an empty array } try @@ -1968,7 +1808,7 @@ public void WriteToServer(DataRow[] rows) _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows _rowSource = rows; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.RowArray; _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = false; @@ -1982,11 +1822,7 @@ public void WriteToServer(DataRow[] rows) } /// - /*Async overloads start here*/ - public Task WriteToServerAsync(DataRow[] rows) - { - return WriteToServerAsync(rows, CancellationToken.None); - } + public Task WriteToServerAsync(DataRow[] rows) => WriteToServerAsync(rows, CancellationToken.None); /// public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationToken) @@ -1996,7 +1832,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok if (rows == null) { - throw new ArgumentNullException("rows"); + throw new ArgumentNullException(nameof(rows)); } if (_isBulkCopyingInProgress) @@ -2026,14 +1862,14 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok DataTable table = rows[0].Table; Debug.Assert(null != table, "How can we have rows without a table?"); - _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows + _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows _rowSource = rows; _dataTableSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _rowSourceType = ValueSourceType.RowArray; _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); // It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2043,10 +1879,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok } /// - public Task WriteToServerAsync(DbDataReader reader) - { - return WriteToServerAsync(reader, CancellationToken.None); - } + public Task WriteToServerAsync(DbDataReader reader) => WriteToServerAsync(reader, CancellationToken.None); /// public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellationToken) @@ -2056,7 +1889,7 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati if (reader == null) { - throw new ArgumentNullException("reader"); + throw new ArgumentNullException(nameof(reader)); } if (_isBulkCopyingInProgress) @@ -2069,12 +1902,12 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = reader as SqlDataReader; - _DbDataReaderRowSource = reader; + _sqlDataReaderRowSource = reader as SqlDataReader; + _dbDataReaderRowSource = reader; _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2084,10 +1917,7 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati } /// - public Task WriteToServerAsync(IDataReader reader) - { - return WriteToServerAsync(reader, CancellationToken.None); - } + public Task WriteToServerAsync(IDataReader reader) => WriteToServerAsync(reader, CancellationToken.None); /// public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellationToken) @@ -2097,7 +1927,7 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio if (reader == null) { - throw new ArgumentNullException("reader"); + throw new ArgumentNullException(nameof(reader)); } if (_isBulkCopyingInProgress) @@ -2110,12 +1940,12 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio { statistics = SqlStatistics.StartTimer(Statistics); _rowSource = reader; - _SqlDataReaderRowSource = _rowSource as SqlDataReader; - _DbDataReaderRowSource = _rowSource as DbDataReader; + _sqlDataReaderRowSource = _rowSource as SqlDataReader; + _dbDataReaderRowSource = _rowSource as DbDataReader; _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2125,22 +1955,13 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio } /// - public Task WriteToServerAsync(DataTable table) - { - return WriteToServerAsync(table, 0, CancellationToken.None); - } + public Task WriteToServerAsync(DataTable table) => WriteToServerAsync(table, 0, CancellationToken.None); /// - public Task WriteToServerAsync(DataTable table, CancellationToken cancellationToken) - { - return WriteToServerAsync(table, 0, cancellationToken); - } + public Task WriteToServerAsync(DataTable table, CancellationToken cancellationToken) => WriteToServerAsync(table, 0, cancellationToken); /// - public Task WriteToServerAsync(DataTable table, DataRowState rowState) - { - return WriteToServerAsync(table, rowState, CancellationToken.None); - } + public Task WriteToServerAsync(DataTable table, DataRowState rowState) => WriteToServerAsync(table, rowState, CancellationToken.None); /// public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) @@ -2150,7 +1971,7 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat if (table == null) { - throw new ArgumentNullException("table"); + throw new ArgumentNullException(nameof(table)); } if (_isBulkCopyingInProgress) @@ -2164,12 +1985,12 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat statistics = SqlStatistics.StartTimer(Statistics); _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted; _rowSource = table; - _SqlDataReaderRowSource = null; + _sqlDataReaderRowSource = null; _dataTableSource = table; _rowSourceType = ValueSourceType.DataTable; _rowEnumerator = table.Rows.GetEnumerator(); _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); // It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2178,8 +1999,6 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat return resultTask; } - // Writes row source. - // private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) { // If user's token is canceled, return a canceled task @@ -2192,7 +2011,7 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok Task reconnectTask = _connection._currentReconnectionTask; if (reconnectTask != null && !reconnectTask.IsCompleted) { - if (this._isAsyncBulkCopy) + if (_isAsyncBulkCopy) { TaskCompletionSource tcs = new TaskCompletionSource(); reconnectTask.ContinueWith((t) => @@ -2204,9 +2023,11 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok } else { - AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null)); + AsyncHelper.ContinueTask(writeTask, tcs, + onSuccess: () => tcs.SetResult(null) + ); } - }, ctoken); // we do not need to propagate exception etc. from reconnect task, we just need to wait for it to finish + }, ctoken); // We do not need to propagate exception, etc, from reconnect task, we just need to wait for it to finish. return tcs.Task; } else @@ -2300,7 +2121,7 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok { try { - AbortTransaction(); // if there is one, on success transactions will be committed + AbortTransaction(); // If there is one, on success transactions will be committed. } finally { @@ -2319,9 +2140,7 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok } } - // Handles the column mapping. - // private void WriteRowSourceToServerCommon(int columnCount) { bool unspecifiedColumnOrdinals = false; @@ -2347,14 +2166,12 @@ private void WriteRowSourceToServerCommon(int columnCount) } // perf: If the user specified all column ordinals we do not need to get a schematable - // if (unspecifiedColumnOrdinals) { int index = -1; unspecifiedColumnOrdinals = false; // Match up sourceColumn names with sourceColumn ordinals - // if (_localColumnMappings.Count > 0) { foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) @@ -2363,7 +2180,7 @@ private void WriteRowSourceToServerCommon(int columnCount) { string unquotedColumnName = UnquotedName(bulkCopyColumn.SourceColumn); - switch (this._rowSourceType) + switch (_rowSourceType) { case ValueSourceType.DataTable: index = ((DataTable)_rowSource).Columns.IndexOf(unquotedColumnName); @@ -2375,17 +2192,17 @@ private void WriteRowSourceToServerCommon(int columnCount) case ValueSourceType.IDataReader: try { - index = ((IDataRecord)this._rowSource).GetOrdinal(unquotedColumnName); + index = ((IDataRecord)_rowSource).GetOrdinal(unquotedColumnName); } catch (IndexOutOfRangeException e) { - throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e)); + throw SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e); } break; } if (index == -1) { - throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName)); + throw SQL.BulkLoadNonMatchingColumnName(unquotedColumnName); } bulkCopyColumn._internalSourceColumnOrdinal = index; } @@ -2403,21 +2220,9 @@ internal void OnConnectionClosed() } } - /// - private void OnRowsCopied(SqlRowsCopiedEventArgs value) - { - SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler; - if (handler != null) - { - handler(this, value); - } - } - // fxcop: - // Use the .NET Event System whenever appropriate. - private bool FireRowsCopiedEvent(long rowsCopied) { - // release lock to prevent possible deadlocks + // Release lock to prevent possible deadlocks SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread; internalConnection._parserLock.Release(); @@ -2426,7 +2231,7 @@ private bool FireRowsCopiedEvent(long rowsCopied) try { _insideRowsCopiedEvent = true; - this.OnRowsCopied(eventArgs); + SqlRowsCopied?.Invoke(this, eventArgs); } finally { @@ -2439,14 +2244,13 @@ private bool FireRowsCopiedEvent(long rowsCopied) // Reads a cell and then writes it. // Read may block at this moment since there is no getValueAsync or DownStream async at this moment. // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance. - // When _isAsyncBulkCopy == false: Writes are purely sync. This method reutrn null at the end. - // + // When _isAsyncBulkCopy == false: Writes are purely sync. This method return null at the end. private Task ReadWriteColumnValueAsync(int col) { bool isSqlType; bool isDataFeed; bool isNull; - Object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask + object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask _SqlMetaData metadata = _sortedColumnMappings[col]._metadata; if (!isDataFeed) @@ -2475,9 +2279,9 @@ private Task ReadWriteColumnValueAsync(int col) // Target type shouldn't be encrypted Debug.Assert(!metadata.isEncrypted, "Can't encrypt SQL Variant type"); SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty; - if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) + if ((_sqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) { - variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal); + variantInternalType = _sqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal); } if (variantInternalType == SqlBuffer.StorageType.DateTime2) @@ -2510,9 +2314,8 @@ private void RegisterForConnectionCloseNotification(ref Task outterTask) } // Runs a loop to copy all columns of a single row. - // maintains a state by remembering #columns copied so far (int col) + // Maintains a state by remembering #columns copied so far (int col). // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, (2) _isAsyncBulkCopy == true but all async writes finished synchronously. - // private Task CopyColumnsAsync(int col, TaskCompletionSource source = null) { Task resultTask = null, task = null; @@ -2533,7 +2336,7 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource source = nul resultTask = source.Task; } CopyColumnsAsyncSetupContinuation(source, task, i); - return resultTask; //associated task will be completed when all colums (i.e. the entire row) is written + return resultTask; //associated task will be completed when all columns (i.e. the entire row) is written } if (source != null) { @@ -2557,23 +2360,23 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource source = nul // This is in its own method to avoid always allocating the lambda in CopyColumnsAsync private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource source, Task task, int i) { - AsyncHelper.ContinueTask(task, source, () => - { - if (i + 1 < _sortedColumnMappings.Count) + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - CopyColumnsAsync(i + 1, source); //continue from the next column - } - else - { - source.SetResult(null); - } - }, - _connection.GetOpenTdsConnection()); + if (i + 1 < _sortedColumnMappings.Count) + { + CopyColumnsAsync(i + 1, source); //continue from the next column + } + else + { + source.SetResult(null); + } + }, + connectionToDoom: _connection.GetOpenTdsConnection() + ); } - // The notification logic. - // private void CheckAndRaiseNotification() { bool abortOperation = false; //returns if the operation needs to be aborted. @@ -2583,36 +2386,34 @@ private void CheckAndRaiseNotification() // Fire event logic if (_notifyAfter > 0) - { // no action if no value specified - // (0=no notification) + { if (_rowsUntilNotification > 0) - { // > 0? + { if (--_rowsUntilNotification == 0) - { // decrement counter - // Fire event during operation. This is the users chance to abort the operation + { + // Fire event during operation. This is the users chance to abort the operation. try { - // it's also the user's chance to cause an exception ... + // It's also the user's chance to cause an exception. _stateObj.BcpLock = true; abortOperation = FireRowsCopiedEvent(_rowsCopied); SqlClientEventSource.Log.TryTraceEvent(""); - // just in case some pathological person closes the target connection ... + // In case the target connection is closed accidentally. if (ConnectionState.Open != _connection.State) { - exception = ADP.OpenConnectionRequired("CheckAndRaiseNotification", _connection.State); + exception = ADP.OpenConnectionRequired(nameof(CheckAndRaiseNotification), _connection.State); } } catch (Exception e) { - // UNDONE - should not be catching all exceptions!!! if (!ADP.IsCatchableExceptionType(e)) { exception = e; } else { - exception = Microsoft.Data.OperationAbortedException.Aborted(e); + exception = OperationAbortedException.Aborted(e); } } finally @@ -2627,12 +2428,12 @@ private void CheckAndRaiseNotification() } } if (!abortOperation && _rowsUntilNotification > _notifyAfter) - { // if the specified counter decreased we update - _rowsUntilNotification = _notifyAfter; // decreased we update otherwise not + { + _rowsUntilNotification = _notifyAfter; // Update on decrement of count } if (exception == null && abortOperation) { - exception = Microsoft.Data.OperationAbortedException.Aborted(null); + exception = OperationAbortedException.Aborted(null); } if (_connection.State != ConnectionState.Open) { @@ -2641,7 +2442,7 @@ private void CheckAndRaiseNotification() if (exception != null) { _parser._asyncWrite = false; - Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch upto this row. + Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch up to this row. Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy"); RunParser(); AbortTransaction(); @@ -2650,7 +2451,7 @@ private void CheckAndRaiseNotification() } // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task - Task CheckForCancellation(CancellationToken cts, TaskCompletionSource tcs) + private Task CheckForCancellation(CancellationToken cts, TaskCompletionSource tcs) { if (cts.IsCancellationRequested) { @@ -2669,7 +2470,6 @@ Task CheckForCancellation(CancellationToken cts, TaskCompletionSource tc private TaskCompletionSource ContinueTaskPend(Task task, TaskCompletionSource source, Func> action) { - if (task == null) { return action(); @@ -2677,19 +2477,20 @@ private TaskCompletionSource ContinueTaskPend(Task task, TaskCompletionS else { Debug.Assert(source != null, "source should already be initialized if task is not null"); - AsyncHelper.ContinueTask(task, source, () => - { - TaskCompletionSource newSource = action(); - Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists"); - }); + AsyncHelper.ContinueTask(task, source, + onSuccess: () => + { + TaskCompletionSource newSource = action(); + Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists"); + } + ); } return null; } - // Copies all the rows in a batch - // maintains state machine with state variable: rowSoFar + // Copies all the rows in a batch. + // Maintains state machine with state variable: rowSoFar. // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously. - // private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, TaskCompletionSource source = null) { Task resultTask = null; @@ -2697,7 +2498,7 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, int i; try { - //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false). + // totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false). for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) { if (_isAsyncBulkCopy == true) @@ -2705,20 +2506,20 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, resultTask = CheckForCancellation(cts, source); if (resultTask != null) { - return resultTask; // task got cancelled! + return resultTask; // Task got cancelled! } } _stateObj.WriteByte(TdsEnums.SQLROW); - task = CopyColumnsAsync(0); //copy 1 row + task = CopyColumnsAsync(0); // Copy 1 row if (task == null) - { //tsk is done. - CheckAndRaiseNotification(); //check notification logic after copying the row + { // Task is done. + CheckAndRaiseNotification(); // Check notification logic after copying the row - //now we will read the next row. - Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result + // Now we will read the next row. + Task readTask = ReadFromRowSourceAsync(cts); // Read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result if (readTask != null) { if (source == null) @@ -2727,36 +2528,45 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, } resultTask = source.Task; - AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection()); - return resultTask; //associated task will be completed when all rows are copied to server/exception/cancelled. + AsyncHelper.ContinueTask(readTask, source, + onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), + connectionToDoom: _connection.GetOpenTdsConnection() + ); + return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled. } } else - { //tsk != null, we add continuation for it. + { // task != null, so add continuation for it. source = source ?? new TaskCompletionSource(); resultTask = source.Task; - AsyncHelper.ContinueTask(task, source, onSuccess: () => - { - CheckAndRaiseNotification(); //check for notification now as the current row copy is done at this moment. - - Task readTask = ReadFromRowSourceAsync(cts); - if (readTask == null) + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - CopyRowsAsync(i + 1, totalRows, cts, source); - } - else - { - AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection()); - } - }, connectionToDoom: _connection.GetOpenTdsConnection()); + CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment. + + Task readTask = ReadFromRowSourceAsync(cts); + if (readTask == null) + { + CopyRowsAsync(i + 1, totalRows, cts, source); + } + else + { + AsyncHelper.ContinueTask(readTask, source, + onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), + connectionToDoom: _connection.GetOpenTdsConnection() + ); + } + }, + connectionToDoom: _connection.GetOpenTdsConnection() + ); return resultTask; } } if (source != null) { - source.TrySetResult(null); // this is set only on the last call of async copy. But may not be set if everything runs synchronously. + source.TrySetResult(null); // This is set only on the last call of async copy. But may not be set if everything runs synchronously. } } catch (Exception ex) @@ -2776,7 +2586,6 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, // Copies all the batches in a loop. One iteration for one batch. // state variable is essentially not needed. (however, _hasMoreRowToCopy might be thought as a state variable) // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously. - // private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource source = null) { Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!"); @@ -2819,15 +2628,18 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up source = new TaskCompletionSource(); } - AsyncHelper.ContinueTask(commandTask, source, () => - { - Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source); - if (continuedTask == null) + AsyncHelper.ContinueTask(commandTask, source, + onSuccess: () => { - // Continuation finished sync, recall into CopyBatchesAsync to continue - CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); - } - }, _connection.GetOpenTdsConnection()); + Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source); + if (continuedTask == null) + { + // Continuation finished sync, recall into CopyBatchesAsync to continue + CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); + } + }, + connectionToDoom: _connection.GetOpenTdsConnection() + ); return source.Task; } } @@ -2857,8 +2669,8 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up } } - // Writes the MetaData and a single batch - // If this returns true, then the caller is responsible for starting the next stage + // Writes the MetaData and a single batch. + // If this returns true, then the caller is responsible for starting the next stage. private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource source) { Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!"); @@ -2871,25 +2683,30 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, internalResults[MetaDataResultId].MetaData, _connection.DataSource); - Task task = CopyRowsAsync(0, _savedBatchSize, cts); //this is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false. + Task task = CopyRowsAsync(0, _savedBatchSize, cts); // This is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false. - //post->after every batch + // post->after every batch if (task != null) { Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy"); if (source == null) - { //first time only + { // First time only source = new TaskCompletionSource(); } - AsyncHelper.ContinueTask(task, source, () => - { - Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source); - if (continuedTask == null) + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - // Continuation finished sync, recall into CopyBatchesAsync to continue - CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); - } - }, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true)); + Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source); + if (continuedTask == null) + { + // Continuation finished sync, recall into CopyBatchesAsync to continue + CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); + } + }, + connectionToDoom: _connection.GetOpenTdsConnection(), + onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), + onCancellation: () => CopyBatchesAsyncContinuedOnError(cleanupParser: true) + ); return source.Task; } @@ -2912,8 +2729,8 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, } } - // Takes care of finishing a single batch (write done, run parser, commit transaction) - // If this returns true, then the caller is responsible for starting the next stage + // Takes care of finishing a single batch (write done, run parser, commit transaction). + // If this returns true, then the caller is responsible for starting the next stage. private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource source) { Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!"); @@ -2936,23 +2753,26 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal source = new TaskCompletionSource(); } - AsyncHelper.ContinueTask(writeTask, source, () => - { - try - { - RunParser(); - CommitTransaction(); - } - catch (Exception) + AsyncHelper.ContinueTask(writeTask, source, + onSuccess: () => { - CopyBatchesAsyncContinuedOnError(cleanupParser: false); - throw; - } - - // Always call back into CopyBatchesAsync - CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); + try + { + RunParser(); + CommitTransaction(); + } + catch (Exception) + { + CopyBatchesAsyncContinuedOnError(cleanupParser: false); + throw; + } - }, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false)); + // Always call back into CopyBatchesAsync + CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source); + }, + connectionToDoom: _connection.GetOpenTdsConnection(), + onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false) + ); return source.Task; } } @@ -2970,7 +2790,7 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal } } - // Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails + // Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails. private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) { SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); @@ -3023,8 +2843,7 @@ private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) AbortTransaction(); } - //Cleans the stateobj. Used in a number of places, specially in exceptions - // + // Cleans the stateobj. Used in a number of places, specially in exceptions. private void CleanUpStateObject(bool isCancelRequested = true) { if (_stateObj != null) @@ -3034,7 +2853,7 @@ private void CleanUpStateObject(bool isCancelRequested = true) { _stateObj.ResetBuffer(); _stateObj.ResetPacketCounters(); - //If _parser is closed, sending attention will raise debug assertion, so we avoid it but not calling CancelRequest; + // If _parser is closed, sending attention will raise debug assertion, so we avoid it (but not calling CancelRequest). if (isCancelRequested && (_parser.State == TdsParserState.OpenNotLoggedIn || _parser.State == TdsParserState.OpenLoggedIn)) { _stateObj.CancelRequest(); @@ -3056,7 +2875,6 @@ private void CleanUpStateObject(bool isCancelRequested = true) // It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done. // The carried on source may be null in case of Sync copy. So no need to SetResult at that time. // It launches the copy operation. - // private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource source) { Task task = null; @@ -3069,7 +2887,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int if (_sortedColumnMappings.Count != 0) { _stateObj.SniContext = SniContext.Snix_SendRows; - _savedBatchSize = _batchSize; // for safety. If someone changes the batchsize during copy we still be using _savedBatchSize + _savedBatchSize = _batchSize; // For safety. If someone changes the batchsize during copy we still be using _savedBatchSize. _rowsUntilNotification = _notifyAfter; _rowsCopied = 0; @@ -3079,7 +2897,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int _currentRowMetadata[i] = GetColumnMetadata(i); } - task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); //launch the BulkCopy + task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); // Launch the BulkCopy } if (task != null) @@ -3088,49 +2906,51 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int { source = new TaskCompletionSource(); } - AsyncHelper.ContinueTask(task, source, () => - { - //Bulk copy task is completed at this moment. - //Todo: The cases may be combined for code reuse. - if (task.IsCanceled) + AsyncHelper.ContinueTask(task, source, + onSuccess: () => { - _localColumnMappings = null; - try + // Bulk copy task is completed at this moment. + if (task.IsCanceled) { - CleanUpStateObject(); + _localColumnMappings = null; + try + { + CleanUpStateObject(); + } + finally + { + source.SetCanceled(); + } } - finally + else if (task.Exception != null) { - source.SetCanceled(); + source.SetException(task.Exception.InnerException); } - } - else if (task.Exception != null) - { - source.SetException(task.Exception.InnerException); - } - else - { - _localColumnMappings = null; - try - { - CleanUpStateObject(isCancelRequested: false); - } - finally + else { - if (source != null) + _localColumnMappings = null; + try { - if (cts.IsCancellationRequested) - { //We may get cancellation req even after the entire copy. - source.SetCanceled(); - } - else + CleanUpStateObject(isCancelRequested: false); + } + finally + { + if (source != null) { - source.SetResult(null); + if (cts.IsCancellationRequested) + { // We may get cancellation req even after the entire copy. + source.SetCanceled(); + } + else + { + source.SetResult(null); + } } } } - } - }, _connection.GetOpenTdsConnection()); + }, + connectionToDoom: _connection.GetOpenTdsConnection() + ); return; } else @@ -3180,7 +3000,6 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int // It carries on the source from its caller WriteToServerInternal. // source is null in case of Sync bcp. But valid in case of Async bcp. // It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task. - // private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource source) { Debug.Assert(_hasMoreRowToCopy, "first time it is true, otherwise this method would not have been called."); @@ -3191,7 +3010,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio try { _parser = _connection.Parser; - _parser._asyncWrite = _isAsyncBulkCopy; //very important! + _parser._asyncWrite = _isAsyncBulkCopy; // Very important! Task reconnectTask; try @@ -3221,12 +3040,14 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio { regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled()); } - AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); }); - // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection + AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, + onSuccess: () => { cancellableReconnectTS.SetResult(null); } + ); + // No need to cancel timer since SqlBulkCopy creates specific task source for reconnection AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout, () => { return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout()); }, CancellationToken.None); AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source, - () => + onSuccess: () => { regReconnectCancel.Dispose(); if (_parserLock != null) @@ -3241,18 +3062,19 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio connectionToAbort: _connection, onFailure: (e) => { regReconnectCancel.Dispose(); }, onCancellation: () => { regReconnectCancel.Dispose(); }, - exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex)); + exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex) + ); return; } else { try { - AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }); + AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }); } catch (SqlException ex) { - throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // preserve behavior (throw InvalidOperationException on failure to connect) + throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // Preserve behavior (throw InvalidOperationException on failure to connect) } _parserLock = _connection.GetOpenTdsConnection()._parserLock; _parserLock.Wait(canReleaseFromAnyThread: false); @@ -3265,7 +3087,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio _connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag); } - internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock + internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock. try { @@ -3280,7 +3102,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio try { - internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); //Task/Null + internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); // Task/Null } catch (SqlException ex) { @@ -3289,12 +3111,15 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio if (internalResultsTask != null) { - AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection()); + AsyncHelper.ContinueTask(internalResultsTask, source, + onSuccess: () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), + connectionToDoom: _connection.GetOpenTdsConnection() + ); } else { Debug.Assert(internalResults != null, "Executing initial query finished synchronously, but there were no results"); - WriteToServerInternalRestContinuedAsync(internalResults, cts, source); //internalResults is valid here. + WriteToServerInternalRestContinuedAsync(internalResults, cts, source); // internalResults is valid here. } } catch (Exception ex) @@ -3311,7 +3136,6 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio } // This returns Task for Async, Null for Sync - // private Task WriteToServerInternalAsync(CancellationToken ctoken) { TaskCompletionSource source = null; @@ -3319,7 +3143,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) if (_isAsyncBulkCopy) { - source = new TaskCompletionSource(); //creating the completion source/Task that we pass to application + source = new TaskCompletionSource(); // Creating the completion source/Task that we pass to application resultTask = source.Task; RegisterForConnectionCloseNotification(ref resultTask); @@ -3329,7 +3153,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) { if (source != null) { - source.SetException(SQL.BulkLoadMissingDestinationTable()); //no table to copy + source.SetException(SQL.BulkLoadMissingDestinationTable()); // No table to copy } else { @@ -3343,9 +3167,9 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) Task readTask = ReadFromRowSourceAsync(ctoken); // readTask == reading task. This is the first read call. "more" is valid only if readTask == null; if (readTask == null) - { //synchronously finished reading. + { // Synchronously finished reading. if (!_hasMoreRowToCopy) - { //no rows in the source to copy! + { // No rows in the source to copy! if (source != null) { source.SetResult(null); @@ -3353,7 +3177,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) return resultTask; } else - { //true, we have more rows. + { // True, we have more rows. WriteToServerInternalRestAsync(ctoken, source); //rest of the method, passing the same completion and returning the incomplete task (ret). return resultTask; } @@ -3361,17 +3185,20 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) else { Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode"); - AsyncHelper.ContinueTask(readTask, source, () => - { - if (!_hasMoreRowToCopy) + AsyncHelper.ContinueTask(readTask, source, + onSuccess: () => { - source.SetResult(null); //no rows to copy! - } - else - { - WriteToServerInternalRestAsync(ctoken, source); //passing the same completion which will be completed by the Callee. - } - }, _connection.GetOpenTdsConnection()); + if (!_hasMoreRowToCopy) + { + source.SetResult(null); // No rows to copy! + } + else + { + WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee. + } + }, + connectionToDoom: _connection.GetOpenTdsConnection() + ); return resultTask; } } @@ -3388,5 +3215,5 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken) } return resultTask; } - }//End of SqlBulkCopy Class -}//End of namespace + } +}