11package it .agilelab .bigdata .wasp .consumers .spark .plugins .kafka
22
33import com .typesafe .config .Config
4+ import it .agilelab .bigdata .utils .NonEmptyList
45import it .agilelab .bigdata .wasp .consumers .spark .utils .AvroSerializerExpression
56import it .agilelab .bigdata .wasp .core .logging .Logging
67import it .agilelab .bigdata .wasp .core .utils .SubjectUtils
@@ -40,21 +41,17 @@ object KafkaWriters extends Logging {
4041 private [kafka] def prepareDfToWrite (
4142 df : DataFrame ,
4243 topicFieldNameOpt : Option [String ],
43- topics : Seq [TopicModel ],
44+ topics : NonEmptyList [TopicModel ],
4445 darwinConf : Option [Config ]
4546 ) = {
4647
47- val throwException = udf { s : String =>
48- throw new Exception (s " Unknown topic name $s" )
49- }
50-
5148 topicFieldNameOpt match {
5249 case Some (topicFieldName) =>
5350 require(topics.size > 1 , s " Got topicFieldName = $topicFieldName but only one topic to write ( $topics) " )
54- val keyCol : Option [Column ] = keyExpression(topics, topicFieldNameOpt, throwException, df.col, darwinConf)
55- val headersCol : Option [Column ] = headerExpression(topics, topicFieldNameOpt, throwException )
51+ val keyCol : Option [Column ] = keyExpression(topics, topicFieldNameOpt, df.col, darwinConf)
52+ val headersCol : Option [Column ] = headerExpression(topics, topicFieldNameOpt)
5653 val topicCol : Column = col(topicFieldName)
57- val valueCol : Column = valueExpression(topics, topicFieldNameOpt, df.schema, df.col, throwException, darwinConf)
54+ val valueCol : Column = valueExpression(topics, topicFieldNameOpt, df.schema, df.col, darwinConf)
5855
5956 val columns =
6057 (keyCol.map(_.as(" key" )) ++
@@ -68,9 +65,9 @@ object KafkaWriters extends Logging {
6865 topics.size == 1 ,
6966 " More than one topic to write specified but there's no column containing the topics' name."
7067 )
71- val keyCol : Option [Column ] = keyExpression(topics, topicFieldNameOpt, throwException, df.col, darwinConf)
72- val headersCol : Option [Column ] = headerExpression(topics, topicFieldNameOpt, throwException )
73- val valueCol : Column = valueExpression(topics, topicFieldNameOpt, df.schema, df.col, throwException, darwinConf)
68+ val keyCol : Option [Column ] = keyExpression(topics, topicFieldNameOpt, df.col, darwinConf)
69+ val headersCol : Option [Column ] = headerExpression(topics, topicFieldNameOpt)
70+ val valueCol : Column = valueExpression(topics, topicFieldNameOpt, df.schema, df.col, darwinConf)
7471
7572 val columns =
7673 (keyCol.map(_.as(" key" )) ++
@@ -83,53 +80,38 @@ object KafkaWriters extends Logging {
8380 }
8481
8582 private def keyExpression (
86- topics : Seq [TopicModel ],
83+ topics : NonEmptyList [TopicModel ],
8784 topicFieldName : Option [String ],
88- exceptionUdf : UserDefinedFunction ,
8985 columnExtractor : String => Column ,
9086 darwinConf : Option [Config ]
91- ) = {
87+ ): Option [ Column ] = {
9288
9389 def valueOfKey (topicModel : TopicModel ): Column = {
94- val keyField = topicModel.keyFieldName.get
95- topicModel.topicDataType match {
96- case " avro" => convertKeyForAvro(columnExtractor(keyField), topicModel, darwinConf)
97- case dataType if dataType == " json" || dataType == " binary" || dataType == " plaintext" =>
98- convertKeyToBinary(columnExtractor(keyField))
99- case unknown => throw new UnsupportedOperationException (s " Unknown topic data type $unknown" )
100- }
90+ topicModel.keyFieldName
91+ .map(keyField =>
92+ topicModel.topicDataType match {
93+ case " avro" => convertKeyForAvro(columnExtractor(keyField), topicModel, darwinConf)
94+ case " json" | " binary" | " plaintext" => convertKeyToBinary(columnExtractor(keyField))
95+ case unknown => throw new UnsupportedOperationException (s " Unknown topic data type $unknown" )
96+ }
97+ )
98+ .getOrElse(lit(null ).cast(BinaryType ))
10199 }
102-
103100 if (topics.exists(_.keyFieldName.isDefined)) {
104-
105- if (topicFieldName.isDefined) {
106- val head = topics.head
107- val tail = topics.tail
108-
109- Some (
110- tail
111- .foldLeft(when(conditionOnTopicName(topicFieldName.get, head), valueOfKey(head))) { (z, x) =>
112- z.when(conditionOnTopicName(topicFieldName.get, x), valueOfKey(x))
113- }
114- .otherwise(exceptionUdf(col(topicFieldName.get)))
115- )
116- } else {
117- Some (valueOfKey(topics.head))
118- }
119-
101+ Some (computeFieldExpression(topics, topicFieldName, valueOfKey))
120102 } else {
121103 None
122104 }
105+
123106 }
124107
125108 private def valueExpression (
126- topics : Seq [TopicModel ],
109+ topics : NonEmptyList [TopicModel ],
127110 topicFieldName : Option [String ],
128111 dfSchema : StructType ,
129112 columnExtractor : String => Column ,
130- exceptionUdf : UserDefinedFunction ,
131113 darwinConf : Option [Config ]
132- ) = {
114+ ): Column = {
133115
134116 def valueOfValue (topicModel : TopicModel ): Column = {
135117 val columnsInValues = topicModel.valueFieldsNames match {
@@ -145,56 +127,47 @@ object KafkaWriters extends Logging {
145127 case unknown => throw new UnsupportedOperationException (s " Unknown topic data type $unknown" )
146128 }
147129 }
148-
149- if (topicFieldName.isDefined) {
150- val head = topics.head
151- val tail = topics.tail
152-
153- tail
154- .foldLeft(when(conditionOnTopicName(topicFieldName.get, head), valueOfValue(head))) {
155- (z : Column , x : TopicModel ) =>
156- z.when(conditionOnTopicName(topicFieldName.get, x), valueOfValue(x))
157- }
158- .otherwise(exceptionUdf(col(topicFieldName.get)))
159-
160- } else {
161- valueOfValue(topics.head)
162- }
163-
130+ computeFieldExpression(topics, topicFieldName, valueOfValue)
164131 }
165132
166133 private def headerExpression (
167- topics : Seq [TopicModel ],
168- topicFieldName : Option [String ],
169- exceptionUdf : UserDefinedFunction
170- ) = {
134+ topics : NonEmptyList [TopicModel ],
135+ topicFieldName : Option [String ]
136+ ): Option [Column ] = {
171137
172138 def valueOfHeader (head : TopicModel ) = {
173139 head.headersFieldName.map(col).getOrElse(lit(null ))
174140 }
175141
176142 if (topics.exists(_.headersFieldName.isDefined)) {
177-
178- if (topicFieldName.isDefined) {
179- val head = topics.head
180- val tail = topics.tail
181- Some (
182- tail
183- .foldLeft(when(conditionOnTopicName(topicFieldName.get, head), valueOfHeader(head))) {
184- (z : Column , x : TopicModel ) =>
185- z.when(conditionOnTopicName(topicFieldName.get, x), valueOfHeader(x))
186- }
187- .otherwise(exceptionUdf(col(topicFieldName.get)))
188- )
189- } else {
190- Some (valueOfHeader(topics.head))
191- }
143+ Some (computeFieldExpression(topics, topicFieldName, valueOfHeader))
192144 } else {
193145 None
194146 }
195147
196148 }
197149
150+ private val unknownTopicExpression : UserDefinedFunction = udf { s : String =>
151+ throw new Exception (s " Unknown topic name $s" )
152+ }
153+
154+ private def computeFieldExpression (
155+ topics : NonEmptyList [TopicModel ], // this list is non empty
156+ maybeTopicFieldName : Option [String ],
157+ valueExtractor : TopicModel => Column
158+ ): Column = {
159+ val NonEmptyList (head, tail) = topics
160+ maybeTopicFieldName
161+ .map(topicFieldName =>
162+ tail
163+ .foldLeft(when(conditionOnTopicName(topicFieldName, head), valueExtractor(head))) { (z, x) =>
164+ z.when(conditionOnTopicName(topicFieldName, x), valueExtractor(x))
165+ }
166+ .otherwise(unknownTopicExpression(col(topicFieldName)))
167+ )
168+ .getOrElse(valueExtractor(head))
169+ }
170+
198171 private def conditionOnTopicName (topicFieldName : String , head : TopicModel ) = {
199172 col(topicFieldName).equalTo(head.name)
200173 }
@@ -352,10 +325,9 @@ object KafkaWriters extends Logging {
352325 _ => ()
353326 )
354327
355- val topicsToWrite = if (topics.isEmpty) {
356- List (mainTopicModel.asInstanceOf [TopicModel ])
357- } else {
358- topics
328+ val topicsToWrite = topics match {
329+ case Nil => NonEmptyList .one(mainTopicModel.asInstanceOf [TopicModel ])
330+ case head :: tail => NonEmptyList (head, tail)
359331 }
360332
361333 logger.info(s " Writing with topic models: ${topicsToWrite.map(_.name).mkString(" " )}" )
0 commit comments