X Tutup
#region License // The PostgreSQL License // // Copyright (C) 2015 The Npgsql Development Team // // Permission to use, copy, modify, and distribute this software and its // documentation for any purpose, without fee, and without a written // agreement is hereby granted, provided that the above copyright notice // and this paragraph and the following two paragraphs appear in all copies. // // IN NO EVENT SHALL THE NPGSQL DEVELOPMENT TEAM BE LIABLE TO ANY PARTY // FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, // INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS // DOCUMENTATION, EVEN IF THE NPGSQL DEVELOPMENT TEAM HAS BEEN ADVISED OF // THE POSSIBILITY OF SUCH DAMAGE. // // THE NPGSQL DEVELOPMENT TEAM SPECIFICALLY DISCLAIMS ANY WARRANTIES, // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY // AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS // ON AN "AS IS" BASIS, AND THE NPGSQL DEVELOPMENT TEAM HAS NO OBLIGATIONS // TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. #endregion using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using Npgsql.BackendMessages; using Npgsql.FrontendMessages; using NpgsqlTypes; namespace Npgsql { /// /// Provides an API for a binary COPY FROM operation, a high-performance data import mechanism to /// a PostgreSQL table. Initiated by /// /// /// See http://www.postgresql.org/docs/current/static/sql-copy.html. /// public class NpgsqlBinaryImporter : IDisposable, ICancelable { #region Fields and Properties NpgsqlConnector _connector; NpgsqlBuffer _buf; TypeHandlerRegistry _registry; LengthCache _lengthCache; bool _isDisposed; bool _writingDataMsg; /// /// The number of columns in the current (not-yet-written) row. /// short _column; /// /// The number of columns, as returned from the backend in the CopyInResponse. /// internal int NumColumns { get; private set; } /// /// NpgsqlParameter instance needed in order to pass the from /// the validation phase to the writing phase. /// NpgsqlParameter _dummyParam; #endregion #region Construction / Initialization internal NpgsqlBinaryImporter(NpgsqlConnector connector, string copyFromCommand) { _connector = connector; _buf = connector.Buffer; _registry = connector.TypeHandlerRegistry; _lengthCache = new LengthCache(); _column = -1; _dummyParam = new NpgsqlParameter(); try { _connector.SendSingleMessage(new QueryMessage(copyFromCommand)); // TODO: Failure will break the connection (e.g. if we get CopyOutResponse), handle more gracefully var copyInResponse = _connector.ReadExpecting(); if (!copyInResponse.IsBinary) { throw new ArgumentException("copyFromCommand triggered a text transfer, only binary is allowed", "copyFromCommand"); } NumColumns = copyInResponse.NumColumns; WriteHeader(); } catch { _connector.Break(); throw; } } void WriteHeader() { EnsureDataMessage(); _buf.WriteBytes(NpgsqlRawCopyStream.BinarySignature, 0, NpgsqlRawCopyStream.BinarySignature.Length); _buf.WriteInt32(0); // Flags field. OID inclusion not supported at the moment. _buf.WriteInt32(0); // Header extension area length } #endregion #region Write /// /// Starts writing a single row, must be invoked before writing any columns. /// public void StartRow() { CheckDisposed(); if (_column != -1 && _column != NumColumns) { throw new InvalidOperationException("Row has already been started and must be finished"); } if (_buf.WriteSpaceLeft < 2) { FlushAndStartDataMessage(); } _buf.WriteInt16(NumColumns); _column = 0; } /// /// Writes a single column in the current row. /// /// The value to be written /// /// The type of the column to be written. This must correspond to the actual type or data /// corruption will occur. If in doubt, use to manually /// specify the type. /// public void Write(T value) { CheckDisposed(); if (_column == -1) { throw new InvalidOperationException("A row hasn't been started"); } var handler = _registry[value]; DoWrite(handler, value); } /// /// Writes a single column in the current row as type . /// /// The value to be written /// /// In some cases isn't enough to infer the data type to be written to /// the database. This parameter and be used to unambiguously specify the type. An example is /// the JSONB type, for which will be a simple string but for which /// must be specified as . /// /// The .NET type of the column to be written. public void Write(T value, NpgsqlDbType type) { CheckDisposed(); if (_column == -1) { throw new InvalidOperationException("A row hasn't been started"); } var handler = _registry[type]; DoWrite(handler, value); } void DoWrite(TypeHandler handler, T value) { try { if (_buf.WriteSpaceLeft < 4) { FlushAndStartDataMessage(); } var asObject = (object) value; // TODO: Implement boxless writing in the future if (asObject == null) { _buf.WriteInt32(-1); _column++; return; } _dummyParam.ConvertedValue = null; var asSimple = handler as ISimpleTypeWriter; if (asSimple != null) { var len = asSimple.ValidateAndGetLength(asObject, _dummyParam); _buf.WriteInt32(len); if (_buf.WriteSpaceLeft < len) { Contract.Assume(_buf.Size >= len); FlushAndStartDataMessage(); } asSimple.Write(asObject, _buf, _dummyParam); _column++; return; } var asChunking = handler as IChunkingTypeWriter; if (asChunking != null) { _lengthCache.Clear(); var len = asChunking.ValidateAndGetLength(asObject, ref _lengthCache, _dummyParam); _buf.WriteInt32(len); // If the type handler used the length cache, rewind it to skip the first position: // it contains the entire value length which we already have in len. if (_lengthCache.Position > 0) { _lengthCache.Rewind(); _lengthCache.Position++; } asChunking.PrepareWrite(asObject, _buf, _lengthCache, _dummyParam); var directBuf = new DirectBuffer(); while (!asChunking.Write(ref directBuf)) { Flush(); // The following is an optimization hack for writing large byte arrays without passing // through our buffer if (directBuf.Buffer != null) { len = directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size; _buf.WritePosition = 1; _buf.WriteInt32(len + 4); _buf.Flush(); _writingDataMsg = false; _buf.Underlying.Write(directBuf.Buffer, directBuf.Offset, len); directBuf.Buffer = null; directBuf.Size = 0; } EnsureDataMessage(); } _column++; return; } throw PGUtil.ThrowIfReached(); } catch { _connector.Break(); Cleanup(); throw; } } /// /// Writes a single null column value. /// public void WriteNull() { CheckDisposed(); if (_column == -1) { throw new InvalidOperationException("A row hasn't been started"); } if (_buf.WriteSpaceLeft < 4) { FlushAndStartDataMessage(); } _buf.WriteInt32(-1); _column++; } /// /// Writes an entire row of columns. /// Equivalent to calling , followed by multiple /// on each value. /// /// An array of column values to be written as a single row public void WriteRow(params object[] values) { StartRow(); foreach (var value in values) { Write(value); } } void FlushAndStartDataMessage() { Flush(); EnsureDataMessage(); } void Flush() { if (!_writingDataMsg) { return; } // Need to update the length for the CopyData about to be sent var pos = _buf.WritePosition; _buf.WritePosition = 1; _buf.WriteInt32(pos - 1); _buf.WritePosition = pos; _buf.Flush(); _writingDataMsg = false; } void EnsureDataMessage() { if (_writingDataMsg) { return; } Contract.Assert(_buf.WritePosition == 0); _buf.WriteByte((byte)BackendMessageCode.CopyData); // Leave space for the message length _buf.WriteInt32(0); _writingDataMsg = true; } #endregion #region Cancel / Close / Dispose /// /// Cancels and terminates an ongoing import. Any data already written will be discarded. /// public void Cancel() { _isDisposed = true; _buf.Clear(); _connector.SendSingleMessage(new CopyFailMessage()); try { var msg = _connector.ReadSingleMessage(); // The CopyFail should immediately trigger an exception from the read above. _connector.Break(); throw new Exception("Expected ErrorResponse when cancelling COPY but got: " + msg.Code); } catch (NpgsqlException e) { if (e.Code == "57014") { return; } throw; } } /// /// Completes that binary import and sets the connection back to idle state /// public void Dispose() { Close(); } /// /// Completes the import process and signals to the database to write everything. /// public void Close() { if (_isDisposed) { return; } if (_column != -1 && _column != NumColumns) { throw new InvalidOperationException("Can't close writer, a row is still in progress, end it first"); } WriteTrailer(); _connector.SendSingleMessage(CopyDoneMessage.Instance); _connector.ReadExpecting(); _connector.ReadExpecting(); _connector.CurrentCopyOperation = null; _connector.EndUserAction(); Cleanup(); } void Cleanup() { _connector = null; _registry = null; _buf = null; _isDisposed = true; } void WriteTrailer() { if (_buf.WriteSpaceLeft < 2) { FlushAndStartDataMessage(); } _buf.WriteInt16(-1); Flush(); } void CheckDisposed() { if (_isDisposed) { throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended."); } } [ContractInvariantMethod] void ObjectInvariants() { Contract.Invariant(_isDisposed || _writingDataMsg); } #endregion } }
X Tutup