Skip to content

Commit 34ef52a

Browse files
authored
Merge pull request #72 from nrkno/feature/rework-udf
feature: add support for persistant UDFs and UDF references
2 parents 9f0919e + e051431 commit 34ef52a

File tree

13 files changed

+444
-74
lines changed

13 files changed

+444
-74
lines changed

core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package no.nrk.bigquery
22

3+
import cats.syntax.all._
34
import no.nrk.bigquery.syntax._
45
import no.nrk.bigquery.BQSqlFrag.asSubQuery
5-
import no.nrk.bigquery.UDF.Body
66

77
import scala.annotation.tailrec
88

@@ -67,7 +67,7 @@ sealed trait BQSqlFrag {
6767
}
6868

6969
final lazy val asStringWithUDFs: String = {
70-
val udfs = allReferencedUDFs.map(_.definition.asString)
70+
val udfs = allReferencedUDFs.collect { case udf: UDF.Temporary => udf.definition.asString }
7171
val udfsAsString = udfs.mkString("\n\n") + (if (udfs.nonEmpty) "\n\n" else "")
7272
udfsAsString + asString
7373
}
@@ -77,9 +77,10 @@ sealed trait BQSqlFrag {
7777
f match {
7878
case BQSqlFrag.Frag(_) => Nil
7979
case BQSqlFrag.Call(udf, args) =>
80-
(udf.body match {
81-
case Body.Sql(body) => body :: Nil
82-
case _: Body.Js => Nil
80+
(udf match {
81+
case UDF.Temporary(_, _, UDF.Body.Sql(body), _) => body :: Nil
82+
case UDF.Persistent(_, _, UDF.Body.Sql(body), _) => body :: Nil
83+
case _ => Nil
8384
}) ++ args.toList
8485
case BQSqlFrag.Combined(values) => values.toList
8586
case BQSqlFrag.PartitionRef(_) => Nil
@@ -105,7 +106,7 @@ sealed trait BQSqlFrag {
105106
case BQSqlFrag.FilledTableRef(fill) => fill.tableDef.unpartitioned.assertPartition
106107
}.distinct
107108

108-
final def allReferencedUDFs: Seq[UDF] =
109+
final def allReferencedUDFs: Seq[UDF[UDF.UDFId]] =
109110
this.collect { case BQSqlFrag.Call(udf, _) => udf }.distinct
110111

111112
override def toString: String = asString
@@ -116,10 +117,10 @@ object BQSqlFrag {
116117
def backticks(string: String): BQSqlFrag = Frag("`" + string + "`")
117118

118119
case class Frag(string: String) extends BQSqlFrag
119-
case class Call(udf: UDF, args: List[BQSqlFrag]) extends BQSqlFrag {
120+
case class Call(udf: UDF[UDF.UDFId], args: List[BQSqlFrag]) extends BQSqlFrag {
120121
require(
121122
udf.params.length == args.length,
122-
s"UDF ${udf.name.value}: Expected ${udf.params.length} arguments, got ${args.length}"
123+
show"UDF ${udf.name}: Expected ${udf.params.length} arguments, got ${args.length}"
123124
)
124125
}
125126
case class Combined(values: Seq[BQSqlFrag]) extends BQSqlFrag
@@ -130,7 +131,7 @@ object BQSqlFrag {
130131
val Empty: BQSqlFrag = Frag("")
131132

132133
/*
133-
* Where `BQSqlFrag` is wanted as a parameter, we can instead ask for a `BQSqlFrag.Magnet`,
134+
* Where `BQSqlFrag` is wanted as a parameter, wee can instead ask for a `BQSqlFrag.Magnet`,
134135
* to enable implicit conversions to trigger. This means the called can provide values of any
135136
* type, as long as it is convertible to `BQSqlFrag` through a `BQShow` */
136137
case class Magnet(frag: BQSqlFrag) extends AnyVal

core/src/main/scala/no/nrk/bigquery/BigQueryClient.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,22 @@ class BigQueryClient[F[_]](
533533
}
534534
}
535535
}
536+
537+
def getRoutine(udfId: UDF.UDFId.PersistentId): F[Option[Routine]] =
538+
F.interruptible {
539+
val routineId = RoutineId.of(udfId.dataset.project.value, udfId.dataset.id, udfId.name.value)
540+
Option(bigQuery.getRoutine(routineId)).filter(_.exists())
541+
}
542+
543+
def create(info: RoutineInfo): F[Routine] =
544+
F.delay(bigQuery.create(info))
545+
546+
def update(info: RoutineInfo): F[Routine] =
547+
F.delay(bigQuery.update(info))
548+
549+
def delete(udfId: UDF.UDFId.PersistentId): F[Boolean] =
550+
F.delay(bigQuery.delete(RoutineId.of(udfId.dataset.project.value, udfId.dataset.id, udfId.name.value)))
551+
536552
}
537553

538554
object BigQueryClient {

core/src/main/scala/no/nrk/bigquery/EnsureUpdated.scala

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,24 @@ package no.nrk.bigquery
33
import cats.{Applicative, MonadThrow, Show}
44
import cats.syntax.all._
55
import com.google.cloud.bigquery.{Option => _, _}
6-
import no.nrk.bigquery.internal.TableUpdateOperation
6+
import no.nrk.bigquery.internal.{TableUpdateOperation, UdfUpdateOperation}
77
import org.typelevel.log4cats.LoggerFactory
88

99
sealed trait OperationMeta {
1010
def identifier: String
1111
}
12-
case class TableDefOperationMeta(existingRemoteTable: TableInfo, localTableDef: BQTableDef[Any]) extends OperationMeta {
12+
case class TableDefOperationMeta(
13+
existingRemoteTable: TableInfo,
14+
localTableDef: BQTableDef[Any]
15+
) extends OperationMeta {
1316
def identifier: String = existingRemoteTable.getTableId.toString
1417
}
18+
case class UdfOperationMeta(
19+
routine: RoutineInfo,
20+
persistentUdf: UDF.Persistent
21+
) extends OperationMeta {
22+
override def identifier: String = persistentUdf.name.asString
23+
}
1524

1625
sealed trait UpdateOperation
1726
object UpdateOperation {
@@ -40,6 +49,16 @@ object UpdateOperation {
4049
create: CreateTable
4150
) extends Success
4251

52+
case class CreatePersistentUdf(
53+
persistentUdf: UDF.Persistent,
54+
routine: RoutineInfo
55+
) extends Success
56+
57+
case class UpdatePersistentUdf(
58+
persistentUdf: UDF.Persistent,
59+
routine: RoutineInfo
60+
) extends Success
61+
4362
sealed trait Error extends UpdateOperation
4463

4564
case class Illegal(meta: OperationMeta, reason: String) extends Error
@@ -64,16 +83,19 @@ class EnsureUpdated[F[_]](
6483
TableUpdateOperation.from(template, maybeExisting)
6584
}
6685

86+
def check(persistentUdf: UDF.Persistent): F[UpdateOperation] =
87+
bqClient.getRoutine(persistentUdf.name).map { maybeExisting =>
88+
UdfUpdateOperation.from(persistentUdf, maybeExisting)
89+
}
90+
6791
def perform(updateOperation: UpdateOperation): F[Unit] =
6892
updateOperation match {
6993
case UpdateOperation.Noop(_) =>
7094
Applicative[F].unit
7195

7296
case UpdateOperation.CreateTable(to, table, maybePatchedTable) =>
7397
for {
74-
_ <- logger.warn(
75-
show"Creating ${table.getTableId} of type ${to.getClass.getSimpleName}"
76-
)
98+
_ <- logger.warn(show"Creating ${table.getTableId} of type ${to.getClass.getSimpleName}")
7799
_ <- bqClient.create(table)
78100
_ <- maybePatchedTable match {
79101
case Some(patchedTable) => bqClient.update(patchedTable).void
@@ -86,6 +108,18 @@ class EnsureUpdated[F[_]](
86108
show"Updating ${table.getTableId} of type ${to.getClass.getSimpleName} from ${from.toString}, to ${to.toString}"
87109
logger.warn(msg) >> bqClient.update(table).void
88110

111+
case UpdateOperation.CreatePersistentUdf(udf, routine) =>
112+
for {
113+
_ <- logger.warn(show"Creating ${udf.name} of type PersistentUdf")
114+
_ <- bqClient.create(routine)
115+
} yield ()
116+
117+
case UpdateOperation.UpdatePersistentUdf(udf, routine) =>
118+
for {
119+
_ <- logger.warn(show"Updating ${udf.name} of type PersistentUdf")
120+
_ <- bqClient.update(routine)
121+
} yield ()
122+
89123
case UpdateOperation.RecreateView(from, to, createNew) =>
90124
val msg =
91125
show"Recreating ${to.tableId} of type ${to.getClass.getSimpleName} from ${from.toString}, to ${to.toString}"

core/src/main/scala/no/nrk/bigquery/UDF.scala

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,112 @@ package no.nrk.bigquery
22

33
import cats.data.NonEmptyList
44
import cats.syntax.all._
5+
import cats.Show
6+
import no.nrk.bigquery.UDF._
7+
import no.nrk.bigquery.UDF.UDFId._
58
import no.nrk.bigquery.syntax._
69

7-
case class UDF(
8-
name: Ident,
9-
params: List[UDF.Param],
10-
body: UDF.Body,
11-
returnType: Option[BQType]
12-
) {
13-
lazy val definition: BQSqlFrag = {
14-
val returning = returnType match {
15-
case Some(returnType) => bqfr" RETURNS $returnType"
16-
case None => BQSqlFrag.Empty
17-
}
18-
bqfr"CREATE TEMP FUNCTION $name${params.map(_.definition).mkFragment("(", ", ", ")")}$returning${body.languageFragment} AS ${body.bodyFragment};"
19-
}
20-
10+
sealed trait UDF[+A <: UDFId] {
11+
def name: A
12+
def params: List[UDF.Param]
13+
def returnType: Option[BQType]
2114
def apply(args: BQSqlFrag.Magnet*): BQSqlFrag.Call =
2215
BQSqlFrag.Call(this, args.toList.map(_.frag))
2316
}
24-
2517
object UDF {
18+
19+
case class Temporary(
20+
name: TemporaryId,
21+
params: List[UDF.Param],
22+
body: UDF.Body,
23+
returnType: Option[BQType]
24+
) extends UDF[UDFId.TemporaryId] {
25+
lazy val definition: BQSqlFrag = {
26+
val returning = returnType match {
27+
case Some(returnType) => bqfr" RETURNS $returnType"
28+
case None => BQSqlFrag.Empty
29+
}
30+
bqfr"CREATE TEMP FUNCTION ${name}${params.map(_.definition).mkFragment("(", ", ", ")")}$returning${body.languageFragment} AS ${body.bodyFragment};"
31+
}
32+
}
33+
34+
case class Persistent(
35+
name: PersistentId,
36+
params: List[UDF.Param],
37+
body: UDF.Body,
38+
returnType: Option[BQType]
39+
) extends UDF[UDFId.PersistentId] {
40+
def convertToTemporary: Temporary =
41+
Temporary(TemporaryId(name.name), params, body, returnType)
42+
}
43+
44+
case class Reference(
45+
name: UDFId,
46+
params: List[UDF.Param],
47+
returnType: Option[BQType]
48+
) extends UDF[UDFId]
49+
50+
@deprecated("use UDF.temporary constructor", "0.5")
2651
def apply(
2752
name: Ident,
2853
params: Seq[UDF.Param],
2954
body: BQSqlFrag,
3055
returnType: Option[BQType]
31-
): UDF =
32-
UDF(name, params.toList, UDF.Body.Sql(body), returnType)
56+
): Temporary =
57+
Temporary(UDFId.TemporaryId(name), params.toList, UDF.Body.Sql(body), returnType)
58+
59+
def temporary(
60+
name: Ident,
61+
params: List[UDF.Param],
62+
body: UDF.Body,
63+
returnType: Option[BQType]
64+
): Temporary =
65+
Temporary(UDFId.TemporaryId(name), params, body, returnType)
66+
67+
def persistent(
68+
name: Ident,
69+
dataset: BQDataset,
70+
params: List[UDF.Param],
71+
body: UDF.Body,
72+
returnType: Option[BQType]
73+
): Persistent =
74+
Persistent(UDFId.PersistentId(dataset, name), params, body, returnType)
75+
76+
def reference(
77+
name: Ident,
78+
dataset: BQDataset,
79+
params: List[UDF.Param],
80+
returnType: Option[BQType]
81+
): Reference =
82+
Reference(UDFId.PersistentId(dataset, name), params, returnType)
83+
84+
sealed trait UDFId {
85+
def asString: String
86+
def asFragment: BQSqlFrag
87+
}
88+
89+
object UDFId {
90+
case class TemporaryId(name: Ident) extends UDFId {
91+
override def asString: String = name.value
92+
override def asFragment: BQSqlFrag = name.bqShow
93+
}
94+
95+
object TemporaryId {
96+
implicit val bqShows: BQShow[TemporaryId] = _.asFragment
97+
}
98+
99+
case class PersistentId(dataset: BQDataset, name: Ident) extends UDFId {
100+
override def asString: String = show"${dataset.project.value}.${dataset.id}.$name"
101+
override def asFragment: BQSqlFrag = BQSqlFrag.backticks(asString)
102+
}
103+
104+
object PersistentId {
105+
implicit val bqShow: BQShow[PersistentId] = _.asFragment
106+
}
107+
108+
implicit val bqShow: BQShow[UDFId] = _.asFragment
109+
implicit val show: Show[UDFId] = _.asString
110+
}
33111

34112
case class Param(name: Ident, maybeType: Option[BQType]) {
35113
def definition: BQSqlFrag =

0 commit comments

Comments
 (0)