Skip to content
7 changes: 5 additions & 2 deletions stamina-core/src/main/scala/stamina/Persisted.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ package stamina
* A simple container holding a persistence key, a version number,
* and the associated serialized bytes.
*/
case class Persisted(key: String, version: Int, bytes: ByteString)
case class Persisted(key: String, version: Int, bytes: Array[Byte]) {
lazy val manifest = Manifest(key, version)
}

object Persisted {
def apply(key: String, version: Int, bytes: Array[Byte]): Persisted = apply(key, version, ByteString(bytes))
def apply(manifest: Manifest, bytes: Array[Byte]): Persisted = apply(manifest.key, manifest.version, bytes)
def apply(key: String, version: Int, bytes: ByteString): Persisted = apply(key, version, bytes.toArray)
}
15 changes: 8 additions & 7 deletions stamina-core/src/main/scala/stamina/Persister.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@ import scala.util._
*/
abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) {
lazy val currentVersion = Version.numberFor[V]
lazy val currentManifest = Manifest(key, currentVersion)

def persist(t: T): Persisted
def unpersist(persisted: Persisted): T
def persist(t: T): Array[Byte]
def unpersist(manifest: Manifest, persisted: Array[Byte]): T

def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined
def canUnpersist(p: Persisted): Boolean = p.key == key && p.version <= currentVersion
def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion

private[stamina] def convertToT(any: AnyRef): Option[T] = any match {
case t: T ⇒ Some(t)
case _ ⇒ None
}

private[stamina] def persistAny(any: AnyRef): Persisted = {
private[stamina] def persistAny(any: AnyRef): Array[Byte] = {
convertToT(any).map(persist(_)).getOrElse(
throw new IllegalArgumentException(
s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}."
)
)
}

private[stamina] def unpersistAny(persisted: Persisted): AnyRef = {
Try(unpersist(persisted).asInstanceOf[AnyRef]) match {
private[stamina] def unpersistAny(manifest: Manifest, persistedBytes: Array[Byte]): AnyRef = {
Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match {
case Success(anyref) ⇒ anyref
case Failure(error) ⇒ throw UnrecoverableDataException(persisted, error)
case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error)
}
}
}
25 changes: 17 additions & 8 deletions stamina-core/src/main/scala/stamina/Persisters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,28 @@ import scala.reflect.ClassTag
*/
case class Persisters(persisters: List[Persister[_, _]]) {
def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a))
def canUnpersist(p: Persisted): Boolean = persisters.exists(_.canUnpersist(p))
def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest))

// format: OFF
private def persister[T <: AnyRef](anyref: T): Persister[T, _] =
persisters
.find(_.canPersist(anyref))
.map(_.asInstanceOf[Persister[T, _]])
.getOrElse(throw UnregisteredTypeException(anyref))

def manifest(anyref: AnyRef): Manifest =
persister(anyref).currentManifest

def persist(anyref: AnyRef): Persisted = {
persisters.find(_.canPersist(anyref))
.map(_.persistAny(anyref))
.getOrElse(throw UnregisteredTypeException(anyref))
val p = persister(anyref)
Persisted(p.currentManifest, p.persistAny(anyref))
}

def unpersist(persisted: Persisted): AnyRef = {
persisters.find(_.canUnpersist(persisted))
.map(_.unpersistAny(persisted))
.getOrElse(throw UnsupportedDataException(persisted))
def unpersist(persisted: Persisted): AnyRef = unpersist(persisted.bytes, persisted.manifest)
def unpersist(payload: Array[Byte], manifest: Manifest): AnyRef = {
persisters.find(_.canUnpersist(manifest))
.map(_.unpersistAny(manifest, payload))
.getOrElse(throw UnsupportedDataException(manifest.key, manifest.version))
}
// format: ON

Expand Down
28 changes: 15 additions & 13 deletions stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,37 @@ import akka.serialization._

/**
* A custom Akka Serializer specifically designed for use with Akka Persistence.
*
* Key and version information is encoded in the manifest.
*/
abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer {
def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec)
def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)
abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) extends SerializerWithStringManifest {
def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters))
def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList))

/** We don't need class manifests since we're using keys to identify types. */
val includeManifest: Boolean = false
/** Uniquely identifies this Serializer. */
val identifier = 490304

/** Uniquely identifies this Serializer by combining the codec with a unique number. */
val identifier = 42 * codec.identifier
def manifest(obj: AnyRef): String =
persisters.manifest(obj).manifest

/**
* @throws UnregisteredTypeException when the specified object is not supported by the persisters.
*/
def toBinary(obj: AnyRef): Array[Byte] = {
if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)

codec.writePersisted(persisters.persist(obj))
persisters.persist(obj).bytes
}

/**
* @throws UnsupportedDataException when the persisted key and/or version is not supported.
* @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val persisted = codec.readPersisted(bytes)
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found")
val m = Manifest(manifest)
if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version)

if (!persisters.canUnpersist(persisted)) throw UnsupportedDataException(persisted)

persisters.unpersist(persisted)
persisters.unpersist(Persisted(m, bytes))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package stamina
package codec

import akka.serialization._

/**
* A custom Akka Serializer encoding key and version along with the serialized object.
*
* This is particularly useful when there is no separate field for metadata, such as when
* dealing with pre-akka-2.3 persistence.
*
* Wrapping/unwrapping the metadata around the serialized object is done by the Codec.
*/
abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer {
def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec)
def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec)

/** We don't need class manifests since we're using keys to identify types. */
val includeManifest: Boolean = false

/** Uniquely identifies this Serializer by combining the codec with a unique number. */
val identifier = 42 * codec.identifier

/**
* @throws UnregisteredTypeException when the specified object is not supported by the persisters.
*/
def toBinary(obj: AnyRef): Array[Byte] = {
if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj)

codec.writePersisted(persisters.persist(obj))
}

/**
* @throws UnsupportedDataException when the persisted key and/or version is not supported.
* @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception.
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val persisted = codec.readPersisted(bytes)

if (!persisters.canUnpersist(persisted.manifest)) throw UnsupportedDataException(persisted.key, persisted.version)

persisters.unpersist(persisted)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package stamina
package codec

/**
* The encoding used to translate an instance of <code>Persisted</code>
Expand Down Expand Up @@ -33,7 +34,7 @@ object DefaultPersistedCodec extends PersistedCodec {
putInt(keyBytes.length).
putBytes(keyBytes).
putInt(persisted.version).
append(persisted.bytes).
append(ByteString(persisted.bytes)).
result.
toArray
}
Expand Down
16 changes: 12 additions & 4 deletions stamina-core/src/main/scala/stamina/stamina.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ package stamina {
extends RuntimeException(s"No persister registered for class: ${obj.getClass}")
with NoStackTrace

case class UnsupportedDataException(persisted: Persisted)
extends RuntimeException(s"No unpersister registered for key: '${persisted.key}' and version: ${persisted.version}")
case class UnsupportedDataException(key: String, version: Int)
extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version")
with NoStackTrace

case class UnrecoverableDataException(persisted: Persisted, error: Throwable)
extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}")
case class UnrecoverableDataException(manifest: Manifest, error: Throwable)
extends RuntimeException(s"Error while trying to unpersist data with key '${manifest.key}' and version ${manifest.version}. Cause: ${error}")
with NoStackTrace

case class Manifest(manifest: String) {
lazy val key: String = manifest.substring(manifest.indexOf('-') + 1)
lazy val version: Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-')))
}
object Manifest {
def apply(key: String, version: Int): Manifest = Manifest(version + "-" + key)
}
}
18 changes: 9 additions & 9 deletions stamina-core/src/test/scala/stamina/PersistersSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ class PersistersSpec extends StaminaSpec {
}

"correctly implement canUnpersist()" in {
canUnpersist(itemPersister.persist(item1)) should be(true)
canUnpersist(cartPersister.persist(cart)) should be(true)
canUnpersist(itemPersister.currentManifest) should be(true)
canUnpersist(cartPersister.currentManifest) should be(true)

canUnpersist(cartCreatedPersister.persist(cartCreated)) should be(false)
canUnpersist(Persisted("unknown", 1, ByteString("..."))) should be(false)
canUnpersist(Persisted("item", 2, ByteString("..."))) should be(false)
canUnpersist(cartCreatedPersister.currentManifest) should be(false)
canUnpersist(Manifest("unknown", 1)) should be(false)
canUnpersist(Manifest("item", 2)) should be(false)

// works because canUnpersist only looks at the key and the version, not at the raw data
canUnpersist(Persisted("item", 1, ByteString("Not an item at all!"))) should be(true)
canUnpersist(Manifest("item", 1)) should be(true)
}

"correctly implement persist() and unpersist()" in {
Expand All @@ -44,17 +44,17 @@ class PersistersSpec extends StaminaSpec {

"throw an UnsupportedDataException when unpersisting data with an unknown key" in {
an[UnsupportedDataException] should
be thrownBy unpersist(Persisted("unknown", 1, ByteString("...")))
be thrownBy unpersist(Array[Byte](), Manifest("unknown", 1))
}

"throw an UnsupportedDataException when deserializing data with an unsupported version" in {
an[UnsupportedDataException] should
be thrownBy unpersist(Persisted("item", 2, ByteString("...")))
be thrownBy unpersist(Array[Byte](), Manifest("item", 2))
}

"throw an UnrecoverableDataException when an exception occurs while deserializing" in {
an[UnrecoverableDataException] should
be thrownBy unpersist(Persisted("item", 1, ByteString("not an item")))
be thrownBy unpersist(ByteString("not an item").toArray, itemPersister.currentManifest)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package stamina
class StaminaAkkaSerializerSpec extends StaminaSpec {
import TestDomain._
import TestOnlyPersister._
import DefaultPersistedCodec._

val itemPersister = persister[Item]("item")
val cartPersister = persister[Cart]("cart")
val cartCreatedPersister = persister[CartCreated]("cart-created")

class MyAkkaSerializer1a extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister))
class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister), DefaultPersistedCodec)
class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister))
class MyAkkaSerializer2 extends StaminaAkkaSerializer(itemPersister, cartPersister, cartCreatedPersister)

val serializer = new MyAkkaSerializer1a
Expand All @@ -19,29 +18,29 @@ class StaminaAkkaSerializerSpec extends StaminaSpec {

"The StaminaAkkaSerializer" should {
"correctly serialize and deserialize the current version of the domain" in {
fromBinary(toBinary(item1)) should equal(item1)
fromBinary(toBinary(item2)) should equal(item2)
fromBinary(toBinary(cart)) should equal(cart)
fromBinary(toBinary(cartCreated)) should equal(cartCreated)
fromBinary(toBinary(item1), manifest(item1)) should equal(item1)
fromBinary(toBinary(item2), manifest(item2)) should equal(item2)
fromBinary(toBinary(cart), manifest(cart)) should equal(cart)
fromBinary(toBinary(cartCreated), manifest(cartCreated)) should equal(cartCreated)
}

"throw an UnregisteredTypeException when serializing an unregistered type" in {
a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported")
a[UnregisteredTypeException] should be thrownBy toBinary(ByteString("a raw String is not supported").toArray)
}

"throw an UnsupportedDataException when deserializing data with an unknown key" in {
an[UnsupportedDataException] should
be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("..."))))
be thrownBy fromBinary(Array[Byte](), Manifest("unknown", 1).manifest)
}

"throw an UnsupportedDataException when deserializing data with an unsupported version" in {
an[UnsupportedDataException] should
be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("..."))))
be thrownBy fromBinary(Array[Byte](), Manifest("item", 2).manifest)
}

"throw an UnrecoverableDataException when an exception occurs while deserializing" in {
an[UnrecoverableDataException] should
be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item"))))
be thrownBy fromBinary(ByteString("not an item").toArray, itemPersister.currentManifest.manifest)
}
}
}
6 changes: 3 additions & 3 deletions stamina-core/src/test/scala/stamina/TestOnlyPersister.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ object TestOnlyPersister {
def persister[T <: AnyRef: ClassTag](key: String): Persister[T, V1] = new JavaPersister[T](key)

private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) {
def persist(t: T): Persisted = Persisted(key, currentVersion, toBinary(t))
def unpersist(p: Persisted): T = {
if (canUnpersist(p)) fromBinary(p.bytes.toArray).asInstanceOf[T]
def persist(t: T): Array[Byte] = toBinary(t)
def unpersist(manifest: Manifest, p: Array[Byte]): T = {
if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T]
else throw new IllegalArgumentException("")
}
}
Expand Down
30 changes: 16 additions & 14 deletions stamina-json/src/main/scala/stamina/json/json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,37 @@ package object json {
*/
def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator)

private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): ByteString = ByteString(writer.write(t).compactPrint)
private[json] def fromJsonBytes[T](bytes: ByteString)(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes))
private[json] def parseJson(bytes: ByteString): JsValue = JsonParser(ParserInput(bytes.toArray))
import java.nio.charset.StandardCharsets
val UTF_8: String = StandardCharsets.UTF_8.name()
private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): Array[Byte] = writer.write(t).compactPrint.getBytes(UTF_8)
private[json] def fromJsonBytes[T](bytes: Array[Byte])(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes))
private[json] def parseJson(bytes: Array[Byte]): JsValue = JsonParser(ParserInput(bytes.toArray))
}

package json {
/**
* Simple abstract marker superclass to unify (and hide) the two internal Persister implementations.
*/
sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) {
private[json] def cannotUnpersist(p: Persisted) =
s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with key "${p.key}" and version ${p.version}."""
private[json] def cannotUnpersist(manifest: Manifest) =
s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest"."""
}

private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) {
def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t))
def unpersist(p: Persisted): T = {
if (canUnpersist(p)) fromJsonBytes[T](p.bytes)
else throw new IllegalArgumentException(cannotUnpersist(p))
def persist(t: T): Array[Byte] = toJsonBytes(t)
def unpersist(manifest: Manifest, p: Array[Byte]): T = {
if (canUnpersist(manifest)) fromJsonBytes[T](p)
else throw new IllegalArgumentException(cannotUnpersist(manifest))
}
}

private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) {
override def canUnpersist(p: Persisted): Boolean = p.key == key && migrator.canMigrate(p.version)
override def canUnpersist(m: Manifest): Boolean = m.key == key && migrator.canMigrate(m.version)

def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t))
def unpersist(p: Persisted): T = {
if (canUnpersist(p)) migrator.migrate(parseJson(p.bytes), p.version).convertTo[T]
else throw new IllegalArgumentException(cannotUnpersist(p))
def persist(t: T): Array[Byte] = toJsonBytes(t)
def unpersist(manifest: Manifest, p: Array[Byte]): T = {
if (canUnpersist(manifest)) migrator.migrate(parseJson(p), manifest.version).convertTo[T]
else throw new IllegalArgumentException(cannotUnpersist(manifest))
}
}
}
Loading