diff --git a/src/ApacheOrcDotNet.WriterTest.App/Program.cs b/src/ApacheOrcDotNet.WriterTest.App/Program.cs index 37a0f09..62b57ee 100644 --- a/src/ApacheOrcDotNet.WriterTest.App/Program.cs +++ b/src/ApacheOrcDotNet.WriterTest.App/Program.cs @@ -1,9 +1,7 @@ -using System; +using ApacheOrcDotNet.FluentSerialization; +using System; using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Threading.Tasks; -using ApacheOrcDotNet.FluentSerialization; namespace ApacheOrcDotNet.Test.App { @@ -11,58 +9,66 @@ public class Program { public static void Main(string[] args) { - var baseTime = new DateTime(2017, 3, 16, 0, 0, 0, DateTimeKind.Utc); - var rand = new Random(123); - var testElements = new List(); - for (int i = 0; i < 80000; i++) - { - var random = rand.Next(); - var set = i / 10000; - var randomInRange = (random % 10000) + set * 10000 - 40000; - var dec = (DateTime.Now - DateTime.Today).Ticks / (decimal)TimeSpan.TicksPerSecond; - var timestamp = baseTime.AddTicks(random); - var element = new TestClass - { - Random = random, - RandomInRange = randomInRange, - Incrementing = i, - SetNumber = set, - Double = (double)i / (set + 1), - Float = (float)i / (set + 1), - Dec = dec, - Timestamp = timestamp, - Str = $"Random={random}, RandomInRange={randomInRange}, Incrementing={i}, SetNumber={set}, Dec={dec}, Timestamp={timestamp:MM/dd/yyyy hh:mm:ss.fffffff}", - DictionaryStr = $"SetNumber={set}" - }; - testElements.Add(element); - } + var baseTime = new DateTime(2017, 3, 16, 0, 0, 0, DateTimeKind.Utc); + var rand = new Random(123); + var testElements = new List(); + var boolToggler = false; + for (int i = 0; i < 1000; i++) + { + var random = rand.Next(); + var set = i / 10000; + var randomInRange = (random % 10000) + set * 10000 - 40000; + var dec = (DateTime.Now - DateTime.Today).Ticks / (decimal)TimeSpan.TicksPerSecond; + var timestamp = baseTime.AddTicks(random); + var element = new TestClass + { + Random = random, + RandomInRange = randomInRange, + Incrementing = i, + SetNumber = set, + Double = (double)i / (set + 1), + Float = (float)i / (set + 1), + Dec = dec, + Timestamp = timestamp, + Str = $"Random={random}, RandomInRange={randomInRange}, Incrementing={i}, SetNumber={set}, Dec={dec}, Timestamp={timestamp:MM/dd/yyyy hh:mm:ss.fffffff}", + DictionaryStr = $"SetNumber={set}", + Boolean = boolToggler, + NullBooleans = boolToggler ? null : true, + }; - var serializationConfiguration = new SerializationConfiguration() - .ConfigureType() - .ConfigureProperty(x => x.Dec, x => { x.DecimalPrecision = 14; x.DecimalScale = 9; }) - .Build(); + boolToggler = !boolToggler; - using (var fileStream = new FileStream("test.orc", FileMode.Create, FileAccess.Write)) - using (var writer = new OrcWriter(fileStream, new WriterConfiguration(), serializationConfiguration)) //Use the default configuration - { - writer.AddRows(testElements); - } - } - } + testElements.Add(element); + } - class TestClass - { - public int Random { get; set; } - public int RandomInRange { get; set; } - public int Incrementing { get; set; } - public int SetNumber { get; set; } - public int? AllNulls { get; set; } - public double Double { get; set; } - public float Float { get; set; } - public decimal Dec { get; set; } - public decimal? AllNullsDec { get; set; } - public DateTime Timestamp { get; set; } - public string Str { get; set; } - public string DictionaryStr { get; set; } - } + var serializationConfiguration = new SerializationConfiguration() + .ConfigureType() + .ConfigureProperty(x => x.Dec, x => { x.DecimalPrecision = 14; x.DecimalScale = 9; }) + .Build(); + + using (var fileStream = new FileStream("test.orc", FileMode.Create, FileAccess.Write)) + using (var writer = new OrcWriter(fileStream, new WriterConfiguration() { RowIndexStride = 10 }, serializationConfiguration)) //Use the default configuration + { + writer.AddRows(testElements); + } + } + } + + class TestClass + { + public int Random { get; set; } + public int RandomInRange { get; set; } + public int Incrementing { get; set; } + public int SetNumber { get; set; } + public int? AllNulls { get; set; } + public double Double { get; set; } + public float Float { get; set; } + public decimal Dec { get; set; } + public decimal? AllNullsDec { get; set; } + public DateTime Timestamp { get; set; } + public string Str { get; set; } + public string DictionaryStr { get; set; } + public bool Boolean { get; set; } + public bool? NullBooleans { get; set; } + } } diff --git a/src/ApacheOrcDotNet/ColumnTypes/BooleanWriter.cs b/src/ApacheOrcDotNet/ColumnTypes/BooleanWriter.cs index de126ac..136e131 100644 --- a/src/ApacheOrcDotNet/ColumnTypes/BooleanWriter.cs +++ b/src/ApacheOrcDotNet/ColumnTypes/BooleanWriter.cs @@ -1,91 +1,98 @@ using ApacheOrcDotNet.Compression; -using ApacheOrcDotNet.Encodings; using ApacheOrcDotNet.Protocol; -using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; namespace ApacheOrcDotNet.ColumnTypes { public class BooleanWriter : IColumnWriter { - readonly bool _isNullable; - readonly OrcCompressedBuffer _presentBuffer; - readonly OrcCompressedBuffer _dataBuffer; + readonly bool _isNullable; + readonly OrcCompressedBuffer _presentBuffer; + readonly OrcCompressedBuffer _dataBuffer; + private readonly ContinuousBitWriter _dataWriter; + private readonly ContinuousBitWriter _presentWriter; - public BooleanWriter(bool isNullable, OrcCompressedBufferFactory bufferFactory, uint columnId) - { - _isNullable = isNullable; - ColumnId = columnId; + public BooleanWriter(bool isNullable, OrcCompressedBufferFactory bufferFactory, uint columnId) + { + _isNullable = isNullable; + ColumnId = columnId; - if (_isNullable) - { - _presentBuffer = bufferFactory.CreateBuffer(StreamKind.Present); - _presentBuffer.MustBeIncluded = false; - } - _dataBuffer = bufferFactory.CreateBuffer(StreamKind.Data); - } + if (_isNullable) + { + _presentBuffer = bufferFactory.CreateBuffer(StreamKind.Present); + _presentBuffer.MustBeIncluded = false; + _presentWriter = new ContinuousBitWriter(_presentBuffer); + } + _dataBuffer = bufferFactory.CreateBuffer(StreamKind.Data); + _dataWriter = new ContinuousBitWriter(_dataBuffer); + + } - public List Statistics { get; } = new List(); - public long CompressedLength => Buffers.Sum(s => s.Length); - public uint ColumnId { get; } - public OrcCompressedBuffer[] Buffers => _isNullable ? new[] { _presentBuffer, _dataBuffer } : new[] { _dataBuffer }; - public ColumnEncodingKind ColumnEncoding => ColumnEncodingKind.Direct; + public List Statistics { get; } = new List(); + public long CompressedLength => Buffers.Sum(s => s.Length); + public uint ColumnId { get; } + public OrcCompressedBuffer[] Buffers => _isNullable ? new[] { _presentBuffer, _dataBuffer } : new[] { _dataBuffer }; + public ColumnEncodingKind ColumnEncoding => ColumnEncodingKind.Direct; + + public void FlushBuffers() + { + if (_isNullable) + { + _presentWriter.Flush(); + } - public void FlushBuffers() - { - foreach (var buffer in Buffers) - buffer.Flush(); - } + _dataWriter.Flush(); - public void Reset() - { - foreach (var buffer in Buffers) - buffer.Reset(); - if(_isNullable) - _presentBuffer.MustBeIncluded = false; - Statistics.Clear(); - } + foreach (var buffer in Buffers) + buffer.Flush(); + } - public void AddBlock(IList values) - { - var stats = new BooleanWriterStatistics(); - Statistics.Add(stats); + public void Reset() + { + foreach (var buffer in Buffers) + buffer.Reset(); + if (_isNullable) + _presentBuffer.MustBeIncluded = false; + Statistics.Clear(); + } + + public void AddBlock(IList values) + { + var stats = new BooleanWriterStatistics(); + Statistics.Add(stats); if (_isNullable) _presentBuffer.AnnotatePosition(stats, rleValuesToConsume: 0, bitsToConsume: 0); _dataBuffer.AnnotatePosition(stats, rleValuesToConsume: 0, bitsToConsume: 0); - var valList = new List(values.Count); + var valList = new List(values.Count); - if(_isNullable) - { - var presentList = new List(values.Count); + if (_isNullable) + { + var presentList = new List(values.Count); - foreach(var value in values) - { - stats.AddValue(value); - if (value.HasValue) - valList.Add(value.Value); - presentList.Add(value.HasValue); - } + foreach (var value in values) + { + stats.AddValue(value); + if (value.HasValue) + valList.Add(value.Value); + presentList.Add(value.HasValue); + } - var presentEncoder = new BitWriter(_presentBuffer); - presentEncoder.Write(presentList); - if (stats.HasNull) - _presentBuffer.MustBeIncluded = true; - } - else - { - foreach(var value in values) - { - stats.AddValue(value); - valList.Add(value.Value); - } - } + _presentWriter.Write(presentList); + if (stats.HasNull) + _presentBuffer.MustBeIncluded = true; + } + else + { + foreach (var value in values) + { + stats.AddValue(value); + valList.Add(value.Value); + } + } - var valEncoder = new BitWriter(_dataBuffer); - valEncoder.Write(valList); - } - } -} + _dataWriter.Write(valList); + } + } +} \ No newline at end of file diff --git a/src/ApacheOrcDotNet/ColumnTypes/ContinuousBitWriter.cs b/src/ApacheOrcDotNet/ColumnTypes/ContinuousBitWriter.cs new file mode 100644 index 0000000..9153d8b --- /dev/null +++ b/src/ApacheOrcDotNet/ColumnTypes/ContinuousBitWriter.cs @@ -0,0 +1,54 @@ +using ApacheOrcDotNet.Encodings; +using System.Collections.Generic; +using System.IO; + +namespace ApacheOrcDotNet.ColumnTypes +{ + public class ContinuousBitWriter + { + readonly ByteRunLengthEncodingWriter _byteWriter; + private byte byteBuffer; + private int bitIndex; + private bool hasData; + + public ContinuousBitWriter(Stream outputStream) + { + _byteWriter = new ByteRunLengthEncodingWriter(outputStream); + Flush(); + } + + public void Write(IList values) + { + foreach (var value in values) + { + Write(value); + } + } + + public void Write(bool value) + { + if (value) + byteBuffer |= (byte)(1 << bitIndex); + + hasData = true; + bitIndex--; + + if (bitIndex == -1) + { + Flush(); + } + } + + public void Flush() + { + if (hasData) + { + _byteWriter.Write(new[] { byteBuffer }); + } + + byteBuffer = 0; + bitIndex = 7; + hasData = false; + } + } +} diff --git a/test/ApacheOrcDotNet.Test/ColumnTypes/BooleanColumn_Test.cs b/test/ApacheOrcDotNet.Test/ColumnTypes/BooleanColumn_Test.cs index 842c4cc..4a5195c 100644 --- a/test/ApacheOrcDotNet.Test/ColumnTypes/BooleanColumn_Test.cs +++ b/test/ApacheOrcDotNet.Test/ColumnTypes/BooleanColumn_Test.cs @@ -14,10 +14,15 @@ public class BooleanColumn_Test [Fact] public void RoundTrip_BooleanColumn() { - RoundTripSingleBool(70000); + // Default case + RoundTripSingleBool(70000); + + // Problematic cases + RoundTripSingleBool(70000, 1000); + RoundTripSingleBool(70000, 10); } - void RoundTripSingleBool(int numValues) + void RoundTripSingleBool(int numValues, int rowIndexStride = 10000) { var pocos = new List(); var random = new Random(123); @@ -26,7 +31,7 @@ void RoundTripSingleBool(int numValues) var stream = new MemoryStream(); Footer footer; - StripeStreamHelper.Write(stream, pocos, out footer); + StripeStreamHelper.Write(stream, pocos, out footer, rowIndexStride: rowIndexStride); var stripeStreams = StripeStreamHelper.GetStripeStreams(stream, footer); var boolReader = new BooleanReader(stripeStreams, 1); var results = boolReader.Read().ToArray(); diff --git a/test/ApacheOrcDotNet.Test/ColumnTypes/StripeStreamHelper.cs b/test/ApacheOrcDotNet.Test/ColumnTypes/StripeStreamHelper.cs index 7e3a878..7a12ed6 100644 --- a/test/ApacheOrcDotNet.Test/ColumnTypes/StripeStreamHelper.cs +++ b/test/ApacheOrcDotNet.Test/ColumnTypes/StripeStreamHelper.cs @@ -13,10 +13,10 @@ namespace ApacheOrcDotNet.Test.ColumnTypes { public static class StripeStreamHelper { - public static void Write(System.IO.Stream outputStream, IEnumerable values, out Footer footer, SerializationConfiguration serializationConfiguration = null) where T : class + public static void Write(System.IO.Stream outputStream, IEnumerable values, out Footer footer, SerializationConfiguration serializationConfiguration = null, int rowIndexStride = 10000) where T : class { var bufferFactory = new OrcCompressedBufferFactory(256 * 1024, CompressionKind.Zlib, CompressionStrategy.Size); - var stripeWriter = new StripeWriter(typeof(T), outputStream, false, 0.8, 18,6, bufferFactory, 10000, 512 * 1024 * 1024, serializationConfiguration); + var stripeWriter = new StripeWriter(typeof(T), outputStream, false, 0.8, 18,6, bufferFactory, rowIndexStride, 512 * 1024 * 1024, serializationConfiguration); stripeWriter.AddRows(values); stripeWriter.RowAddingCompleted(); footer = stripeWriter.GetFooter();