From e42a0c4dcd477af5bdce905037eb49e43ef51e52 Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Wed, 28 Oct 2020 19:20:43 +0000 Subject: [PATCH 01/11] Check whether file is found before trying to dereference it --- src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs | 26 ++++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs index 69838b25f..a3b0c7328 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs @@ -262,18 +262,34 @@ private static Type DeserializeType(TypeData typeData) => typeData, td => { - Type type = AssemblyLoader.LoadAssembly( + Assembly assembly = AssemblyLoader.LoadAssembly( td.AssemblyName, - td.AssemblyFileName).GetType(td.Name); - if (type == null) + td.AssemblyFileName); + if (assembly == null) { + string searchPath = + Environment.GetEnvironmentVariable("DOTNET_ASSEMBLY_SEARCH_PATHS"); + + if (String.IsNullOrEmpty(searchPath)) + { + searchPath = "Empty"; + } + throw new FileNotFoundException( string.Format( - "Assembly '{0}' file not found '{1}'", + "Assembly '{0}' file not found '{1}'. " + + "Current DOTNET_ASSEMBLY_SEARCH_PATHS '{2}'", td.AssemblyName, - td.AssemblyFileName)); + td.AssemblyFileName, + searchPath)); } + Type type = assembly.GetType(td.Name); + if (type == null) + { + throw new TypeLoadException( + string.Format("Unable to load Type '{0}' from Assembly '{1}'")); + } return type; }); } From ca84f6a954f23a2da0a3bb407e39482bdae0d3b7 Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 29 Oct 2020 08:14:00 +0000 Subject: [PATCH 02/11] switching to null conditional check --- src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs | 26 ++++---------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs index a3b0c7328..6b60f25d5 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs @@ -262,34 +262,18 @@ private static Type DeserializeType(TypeData typeData) => typeData, td => { - Assembly assembly = AssemblyLoader.LoadAssembly( + Type type = AssemblyLoader.LoadAssembly( td.AssemblyName, - td.AssemblyFileName); - if (assembly == null) + td.AssemblyFileName)?.GetType(td.Name); + if (type == null) { - string searchPath = - Environment.GetEnvironmentVariable("DOTNET_ASSEMBLY_SEARCH_PATHS"); - - if (String.IsNullOrEmpty(searchPath)) - { - searchPath = "Empty"; - } - throw new FileNotFoundException( string.Format( - "Assembly '{0}' file not found '{1}'. " - + "Current DOTNET_ASSEMBLY_SEARCH_PATHS '{2}'", + "Assembly '{0}' file not found '{1}'", td.AssemblyName, - td.AssemblyFileName, - searchPath)); + td.AssemblyFileName)); } - Type type = assembly.GetType(td.Name); - if (type == null) - { - throw new TypeLoadException( - string.Format("Unable to load Type '{0}' from Assembly '{1}'")); - } return type; }); } From 5dc21430ff244a5154a329b1e0156a2b7bc0622a Mon Sep 17 00:00:00 2001 From: Ed Elliott Date: Thu, 29 Oct 2020 18:23:06 +0000 Subject: [PATCH 03/11] Update src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs Co-authored-by: Steve Suh --- src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs index 6b60f25d5..0d457188e 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs @@ -264,7 +264,7 @@ private static Type DeserializeType(TypeData typeData) => { Type type = AssemblyLoader.LoadAssembly( td.AssemblyName, - td.AssemblyFileName)?.GetType(td.Name); + td.AssemblyFileName)?.GetType(td.Name, true); if (type == null) { throw new FileNotFoundException( From 338210aebcd47c7574cf02d700fe22630a8c2e0e Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:11:27 +0100 Subject: [PATCH 04/11] Decimal Support --- .../Microsoft.Spark/Interop/Ipc/JvmBridge.cs | 3 +++ .../Interop/Ipc/PayloadHelper.cs | 9 +++++++- .../Microsoft.Spark/Interop/Ipc/SerDe.cs | 22 ++++++++++++++++++- .../org/apache/spark/api/dotnet/SerDe.scala | 18 ++++++++++++++- 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs index 6c8d61840..6b413370f 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs @@ -267,6 +267,9 @@ private object CallJavaMethod( case 'd': returnValue = SerDe.ReadDouble(inputStream); break; + case 'm': + returnValue = decimal.Parse(SerDe.ReadString(inputStream)); + break; case 'b': returnValue = Convert.ToBoolean(inputStream.ReadByte()); break; diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs index 3373bca62..ac9914672 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs @@ -32,6 +32,7 @@ internal class PayloadHelper private static readonly byte[] s_dictionaryTypeId = new[] { (byte)'e' }; private static readonly byte[] s_rowArrTypeId = new[] { (byte)'R' }; private static readonly byte[] s_objectArrTypeId = new[] { (byte)'O' }; + private static readonly byte[] s_decimalTypeId = new[] { (byte)'m' }; private static readonly ConcurrentDictionary s_isDictionaryTable = new ConcurrentDictionary(); @@ -109,6 +110,10 @@ internal static void ConvertArgsToBytes( case TypeCode.Double: SerDe.Write(destination, (double)arg); break; + + case TypeCode.Decimal: + SerDe.Write(destination, (decimal)arg); + break; case TypeCode.Object: switch (arg) @@ -321,7 +326,9 @@ internal static byte[] GetTypeId(Type type) case TypeCode.Boolean: return s_boolTypeId; case TypeCode.Double: - return s_doubleTypeId; + return s_doubleTypeId; + case TypeCode.Decimal: + return s_decimalTypeId; case TypeCode.Object: if (typeof(IJvmObjectReferenceProvider).IsAssignableFrom(type)) { diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs index c2c742e87..21a37e7e6 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs @@ -6,6 +6,7 @@ using System.Buffers.Binary; using System.IO; using System.Text; +using Razorvine.Pickle.Objects; namespace Microsoft.Spark.Interop.Ipc { @@ -100,7 +101,19 @@ public static double ReadDouble(Stream s) TryReadBytes(s, buffer, sizeof(long)); return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64BigEndian(buffer)); } - + + /// + /// Reads a decimal from a stream. + /// + /// The stream to be read + /// The decimal read from stream + public static decimal ReadDecimal(Stream s) + { + byte[] buffer = GetThreadLocalBuffer(sizeof(long)); + TryReadBytes(s, buffer, sizeof(long)); + return BinaryPrimitives.ReadInt64BigEndian(buffer); + } + /// /// Reads a string from a stream /// @@ -322,6 +335,13 @@ public static void Write(Stream s, long value) public static void Write(Stream s, double value) => Write(s, BitConverter.DoubleToInt64Bits(value)); + /// + /// Writes a decimal to a stream as a string. + /// + /// The stream to write + /// The decimal to write + public static void Write(Stream s, decimal value) => Write(s, value.ToString()); + /// /// Writes a string to a stream. /// diff --git a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index a3df3788a..d3f5c6afd 100644 --- a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -36,6 +36,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'g' => new java.lang.Long(readLong(dis)) case 'd' => new java.lang.Double(readDouble(dis)) case 'b' => new java.lang.Boolean(readBoolean(dis)) + case 'm' => readDecimal(dis) case 'c' => readString(dis) case 'e' => readMap(dis) case 'r' => readBytes(dis) @@ -60,6 +61,10 @@ class SerDe(val tracker: JVMObjectTracker) { in.readInt() } + private def readDecimal(in: DataInputStream): BigDecimal = { + BigDecimal(readString(in)) + } + private def readLong(in: DataInputStream): Long = { in.readLong() } @@ -111,6 +116,11 @@ class SerDe(val tracker: JVMObjectTracker) { (0 until len).map(_ => readInt(in)).toArray } + private def readDecimalArr(in: DataInputStream): Array[BigDecimal] = { + val len = readInt(in) + (0 until len).map(_ => readDecimal(in)).toArray + } + private def readLongArr(in: DataInputStream): Array[Long] = { val len = readInt(in) (0 until len).map(_ => readLong(in)).toArray @@ -157,6 +167,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'b' => readBooleanArr(dis) case 'j' => readStringArr(dis).map(x => tracker.getObject(x)) case 'r' => readBytesArr(dis) + case 'm' => readDecimalArr(dis) case _ => throw new IllegalArgumentException(s"Invalid array type $arrType") } } @@ -207,6 +218,7 @@ class SerDe(val tracker: JVMObjectTracker) { case "long" => dos.writeByte('g') case "integer" => dos.writeByte('i') case "logical" => dos.writeByte('b') + case "bigdecimal" => dos.writeByte('m') case "date" => dos.writeByte('D') case "time" => dos.writeByte('t') case "raw" => dos.writeByte('r') @@ -217,10 +229,11 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { + if (value == null || value == Unit) { writeType(dos, "void") } else { - value.getClass.getName match { + value.getClass.getName match { case "java.lang.String" => writeType(dos, "character") writeString(dos, value.asInstanceOf[String]) @@ -239,6 +252,9 @@ class SerDe(val tracker: JVMObjectTracker) { case "boolean" | "java.lang.Boolean" => writeType(dos, "logical") writeBoolean(dos, value.asInstanceOf[Boolean]) + case "BigDecimal" | "java.math.BigDecimal" => + writeType(dos, "bigdecimal") + writeString(dos, value.toString) case "java.sql.Date" => writeType(dos, "date") writeDate(dos, value.asInstanceOf[Date]) From 66e357bac3a91b494ef310b794ed24e8921b0d14 Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:16:38 +0100 Subject: [PATCH 05/11] isnt used --- src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs index 21a37e7e6..f78ea09be 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs @@ -101,19 +101,7 @@ public static double ReadDouble(Stream s) TryReadBytes(s, buffer, sizeof(long)); return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64BigEndian(buffer)); } - - /// - /// Reads a decimal from a stream. - /// - /// The stream to be read - /// The decimal read from stream - public static decimal ReadDecimal(Stream s) - { - byte[] buffer = GetThreadLocalBuffer(sizeof(long)); - TryReadBytes(s, buffer, sizeof(long)); - return BinaryPrimitives.ReadInt64BigEndian(buffer); - } - + /// /// Reads a string from a stream /// From ceab50f7bd0f81a989c08c3be8098311d4d6033b Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:17:16 +0100 Subject: [PATCH 06/11] unnecessary import --- src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs index f78ea09be..a36a293a0 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs @@ -6,7 +6,6 @@ using System.Buffers.Binary; using System.IO; using System.Text; -using Razorvine.Pickle.Objects; namespace Microsoft.Spark.Interop.Ipc { From 7ff1c8e630f2d982c44c4b63d812fcee07866550 Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:18:56 +0100 Subject: [PATCH 07/11] formatting --- .../src/main/scala/org/apache/spark/api/dotnet/SerDe.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index d3f5c6afd..2b5dbe4b6 100644 --- a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -233,7 +233,7 @@ class SerDe(val tracker: JVMObjectTracker) { if (value == null || value == Unit) { writeType(dos, "void") } else { - value.getClass.getName match { + value.getClass.getName match { case "java.lang.String" => writeType(dos, "character") writeString(dos, value.asInstanceOf[String]) From 546ba36a1a7ec4d5179acbd749cb8a10d643502b Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:21:01 +0100 Subject: [PATCH 08/11] other versions --- .../org/apache/spark/api/dotnet/SerDe.scala | 17 +++++++++++++++++ .../org/apache/spark/api/dotnet/SerDe.scala | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index 44cad97c1..2b5dbe4b6 100644 --- a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -19,6 +19,7 @@ import scala.collection.JavaConverters._ * This implementation of methods is mostly identical to the SerDe implementation in R. */ class SerDe(val tracker: JVMObjectTracker) { + def readObjectType(dis: DataInputStream): Char = { dis.readByte().toChar } @@ -35,6 +36,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'g' => new java.lang.Long(readLong(dis)) case 'd' => new java.lang.Double(readDouble(dis)) case 'b' => new java.lang.Boolean(readBoolean(dis)) + case 'm' => readDecimal(dis) case 'c' => readString(dis) case 'e' => readMap(dis) case 'r' => readBytes(dis) @@ -59,6 +61,10 @@ class SerDe(val tracker: JVMObjectTracker) { in.readInt() } + private def readDecimal(in: DataInputStream): BigDecimal = { + BigDecimal(readString(in)) + } + private def readLong(in: DataInputStream): Long = { in.readLong() } @@ -110,6 +116,11 @@ class SerDe(val tracker: JVMObjectTracker) { (0 until len).map(_ => readInt(in)).toArray } + private def readDecimalArr(in: DataInputStream): Array[BigDecimal] = { + val len = readInt(in) + (0 until len).map(_ => readDecimal(in)).toArray + } + private def readLongArr(in: DataInputStream): Array[Long] = { val len = readInt(in) (0 until len).map(_ => readLong(in)).toArray @@ -156,6 +167,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'b' => readBooleanArr(dis) case 'j' => readStringArr(dis).map(x => tracker.getObject(x)) case 'r' => readBytesArr(dis) + case 'm' => readDecimalArr(dis) case _ => throw new IllegalArgumentException(s"Invalid array type $arrType") } } @@ -206,6 +218,7 @@ class SerDe(val tracker: JVMObjectTracker) { case "long" => dos.writeByte('g') case "integer" => dos.writeByte('i') case "logical" => dos.writeByte('b') + case "bigdecimal" => dos.writeByte('m') case "date" => dos.writeByte('D') case "time" => dos.writeByte('t') case "raw" => dos.writeByte('r') @@ -216,6 +229,7 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { + if (value == null || value == Unit) { writeType(dos, "void") } else { @@ -238,6 +252,9 @@ class SerDe(val tracker: JVMObjectTracker) { case "boolean" | "java.lang.Boolean" => writeType(dos, "logical") writeBoolean(dos, value.asInstanceOf[Boolean]) + case "BigDecimal" | "java.math.BigDecimal" => + writeType(dos, "bigdecimal") + writeString(dos, value.toString) case "java.sql.Date" => writeType(dos, "date") writeDate(dos, value.asInstanceOf[Date]) diff --git a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index a3df3788a..2b5dbe4b6 100644 --- a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -36,6 +36,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'g' => new java.lang.Long(readLong(dis)) case 'd' => new java.lang.Double(readDouble(dis)) case 'b' => new java.lang.Boolean(readBoolean(dis)) + case 'm' => readDecimal(dis) case 'c' => readString(dis) case 'e' => readMap(dis) case 'r' => readBytes(dis) @@ -60,6 +61,10 @@ class SerDe(val tracker: JVMObjectTracker) { in.readInt() } + private def readDecimal(in: DataInputStream): BigDecimal = { + BigDecimal(readString(in)) + } + private def readLong(in: DataInputStream): Long = { in.readLong() } @@ -111,6 +116,11 @@ class SerDe(val tracker: JVMObjectTracker) { (0 until len).map(_ => readInt(in)).toArray } + private def readDecimalArr(in: DataInputStream): Array[BigDecimal] = { + val len = readInt(in) + (0 until len).map(_ => readDecimal(in)).toArray + } + private def readLongArr(in: DataInputStream): Array[Long] = { val len = readInt(in) (0 until len).map(_ => readLong(in)).toArray @@ -157,6 +167,7 @@ class SerDe(val tracker: JVMObjectTracker) { case 'b' => readBooleanArr(dis) case 'j' => readStringArr(dis).map(x => tracker.getObject(x)) case 'r' => readBytesArr(dis) + case 'm' => readDecimalArr(dis) case _ => throw new IllegalArgumentException(s"Invalid array type $arrType") } } @@ -207,6 +218,7 @@ class SerDe(val tracker: JVMObjectTracker) { case "long" => dos.writeByte('g') case "integer" => dos.writeByte('i') case "logical" => dos.writeByte('b') + case "bigdecimal" => dos.writeByte('m') case "date" => dos.writeByte('D') case "time" => dos.writeByte('t') case "raw" => dos.writeByte('r') @@ -217,6 +229,7 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { + if (value == null || value == Unit) { writeType(dos, "void") } else { @@ -239,6 +252,9 @@ class SerDe(val tracker: JVMObjectTracker) { case "boolean" | "java.lang.Boolean" => writeType(dos, "logical") writeBoolean(dos, value.asInstanceOf[Boolean]) + case "BigDecimal" | "java.math.BigDecimal" => + writeType(dos, "bigdecimal") + writeString(dos, value.toString) case "java.sql.Date" => writeType(dos, "date") writeDate(dos, value.asInstanceOf[Date]) From a72e1524f83ef7c5e1ed85f3f59467cb74cedb7e Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:22:58 +0100 Subject: [PATCH 09/11] extra line --- .../src/main/scala/org/apache/spark/api/dotnet/SerDe.scala | 1 - .../src/main/scala/org/apache/spark/api/dotnet/SerDe.scala | 1 - .../src/main/scala/org/apache/spark/api/dotnet/SerDe.scala | 1 - 3 files changed, 3 deletions(-) diff --git a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index 2b5dbe4b6..31cf97c12 100644 --- a/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -229,7 +229,6 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { - if (value == null || value == Unit) { writeType(dos, "void") } else { diff --git a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index 2b5dbe4b6..31cf97c12 100644 --- a/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -229,7 +229,6 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { - if (value == null || value == Unit) { writeType(dos, "void") } else { diff --git a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala index 2b5dbe4b6..31cf97c12 100644 --- a/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala +++ b/src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/api/dotnet/SerDe.scala @@ -229,7 +229,6 @@ class SerDe(val tracker: JVMObjectTracker) { } def writeObject(dos: DataOutputStream, value: Object): Unit = { - if (value == null || value == Unit) { writeType(dos, "void") } else { From 4c88ece7f3d86b2e5bdae00ca1bbc936bacc615c Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:39:02 +0100 Subject: [PATCH 10/11] test --- .../IpcTests/Sql/DataTypesTests.cs | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs new file mode 100644 index 000000000..910365d88 --- /dev/null +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs @@ -0,0 +1,64 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Spark.Sql; +using Microsoft.Spark.Sql.Types; +using Xunit; + +namespace Microsoft.Spark.E2ETest.IpcTests +{ + + [Collection("Spark E2E Tests")] + public class DataTypesTests + { + private readonly SparkSession _spark; + + public DataTypesTests(SparkFixture fixture) + { + _spark = fixture.Spark; + } + + /// + /// + /// + [Fact] + public void TestDecimalType() + { + var df = _spark.CreateDataFrame( + new List + { + new GenericRow( + new object[] + { + decimal.MinValue, decimal.MaxValue, decimal.Zero, decimal.MinusOne, + new object[] + { + decimal.MinValue, decimal.MaxValue, decimal.Zero, decimal.MinusOne + } + }), + }, + new StructType( + new List() + { + new StructField("min", new DecimalType(38, 0)), + new StructField("max", new DecimalType(38, 0)), + new StructField("zero", new DecimalType(38, 0)), + new StructField("minusOne", new DecimalType(38, 0)), + new StructField("array", new ArrayType(new DecimalType(38,0))) + })); + + Row row = df.Collect().First(); + Assert.Equal(decimal.MinValue, row[0]); + Assert.Equal(decimal.MaxValue, row[1]); + Assert.Equal(decimal.Zero, row[2]); + Assert.Equal(decimal.MinusOne, row[3]); + Assert.Equal(new object[]{decimal.MinValue, decimal.MaxValue, decimal.Zero, decimal.MinusOne}, + row[4]); + } + + } +} From 592a57da807056be7cf10f0baabfa6069ccc3558 Mon Sep 17 00:00:00 2001 From: GOEddieUK Date: Thu, 14 Oct 2021 22:40:12 +0100 Subject: [PATCH 11/11] comment --- .../Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs index 910365d88..919f61cb8 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataTypesTests.cs @@ -23,7 +23,8 @@ public DataTypesTests(SparkFixture fixture) } /// - /// + /// Tests that we can pass a decimal over to Apache Spark and collect it back again, include a check + /// for the minimum and maximum decimal that .NET can represent /// [Fact] public void TestDecimalType()