From c8a46358d95d2dd2656aa0759637409ed8396cf1 Mon Sep 17 00:00:00 2001 From: Matt Hughes Date: Sat, 17 Jun 2023 09:55:57 -0400 Subject: [PATCH 1/2] Add test case that exhibits portal does not exist problem. --- .../src/test/scala/DescribeCacheTest.scala | 59 +++++++++++++++++-- world/world.sql | 5 ++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala index c636e8a6d..8dc664749 100644 --- a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala +++ b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala @@ -4,16 +4,17 @@ package tests -import skunk.implicits._ -import skunk.codec.numeric.int4 -import cats.syntax.all._ import cats.effect.IO -import skunk.exception.PostgresErrorException import cats.effect.Resource -import skunk.Session +import cats.syntax.all._ import org.typelevel.otel4s.trace.Tracer +import skunk.codec.numeric.int4 +import skunk.codec.text +import skunk._ +import skunk.exception.PostgresErrorException +import skunk.implicits._ -class DescribeCacheTest extends SkunkTest { +class DescribeCacheTest extends SkunkTest(true) { implicit val tracer: Tracer[IO] = Tracer.noop @@ -82,6 +83,52 @@ class DescribeCacheTest extends SkunkTest { } yield "ok" } + val runs = 100 + // This should not fail + pooledTest("portal1 - concurrent portal with normal flatMap") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).flatMap (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + } + } + } + + // This should not fail + pooledTest("portal2 - concurrent portal with map / identity *inside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ).flatMap(identity) + } + } + } + } + + // This will fail if run enough + pooledTest("portal3 - concurrent portal with map / identity *outside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + }.flatMap(identity) + } + } + sessionTest("command should not be cached after cache is cleared") { s => val cmd = sql"commit".command for { diff --git a/world/world.sql b/world/world.sql index ec93a04da..6a52692ea 100644 --- a/world/world.sql +++ b/world/world.sql @@ -7,6 +7,11 @@ CREATE TYPE myenum AS ENUM ('foo', 'bar'); +CREATE TABLE IF NOT EXISTS scalars( + a_string varchar not null, + a_int integer not null +); + CREATE TABLE IF NOT EXISTS city ( id integer NOT NULL, name varchar NOT NULL, From 865c21a397e3bb772367ea030d3c815c0ee4a55c Mon Sep 17 00:00:00 2001 From: Matt Hughes Date: Sat, 17 Jun 2023 09:58:32 -0400 Subject: [PATCH 2/2] Add another example with flatten. --- .../shared/src/test/scala/DescribeCacheTest.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala index 8dc664749..2780b4892 100644 --- a/modules/tests/shared/src/test/scala/DescribeCacheTest.scala +++ b/modules/tests/shared/src/test/scala/DescribeCacheTest.scala @@ -128,6 +128,21 @@ class DescribeCacheTest extends SkunkTest(true) { }.flatMap(identity) } } + + // This will fail if run enough + pooledTest("portal4 - concurrent portal with flatten *outside tx*") { pool => + import scala.concurrent.duration._ + def cmd(idx: Int): Command[String *: Int *: EmptyTuple] = sql"INSERT INTO scalars VALUES (${text.varchar}, ${int4}) -- #${idx.toString}".command + 1.to(runs).toList.parTraverse{ idx => + pool.use { s => + s.transaction.use { _ => + s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).map (_ => + IO.sleep(1.second) >> s.prepare(cmd(idx)).flatMap(_.execute((s"name$idx", idx))).void + ) + } + }.flatten + } + } sessionTest("command should not be cached after cache is cleared") { s => val cmd = sql"commit".command