Skip to content

Commit dc7d583

Browse files
committed
feat(sql): append_events improve
At first when writing this change I assumed I would be out to fix issue #429. Turns out that is not an issue as the client wraps this procedure call in a transaction. Thus, no transction is needed within the procedure itself. Here are some improvements, nonetheless. - Changed @stream_name parameter to NVARCHAR(850) for consistency - Introduced @Inserted table to capture inserted GlobalPosition values atomically - Normalized optimistic concurrency errors (2627, 2601) to client-detectable messages - Thrown error messages still StartsWith 'WrongExpectedVersion' but now also includes the full SQL error text for client tracing - Removed redundant post-commit queries and error-parsing logic
1 parent 7b38d67 commit dc7d583

File tree

1 file changed

+45
-25
lines changed

1 file changed

+45
-25
lines changed
Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,41 @@
11
CREATE OR ALTER PROCEDURE __schema__.append_events
2-
@stream_name VARCHAR(850),
2+
@stream_name NVARCHAR(850),
33
@expected_version INT,
44
@created DATETIME2(7) NULL,
55
@messages __schema__.StreamMessage READONLY
66
AS
77
BEGIN
88
SET NOCOUNT ON;
99

10+
-- Note: This procedure is wrapped in a transaction by the caller. This explains why there is no explicit transaction here within the procedure.
11+
1012
DECLARE
1113
@current_version INT,
1214
@stream_id INT,
1315
@position BIGINT,
14-
@customErrorMessage NVARCHAR(200);
16+
@count_messages INT,
17+
@new_version INT;
1518

16-
IF @created IS NULL
17-
BEGIN
18-
SET @created = SYSUTCDATETIME();
19-
END
19+
-- capture inserted rows to compute final position
20+
DECLARE @inserted TABLE (
21+
GlobalPosition BIGINT
22+
);
2023

21-
EXEC [__schema__].check_stream
24+
SELECT @count_messages = COUNT(1) FROM @messages;
25+
26+
EXEC __schema__.check_stream
2227
@stream_name = @stream_name,
2328
@expected_version = @expected_version,
2429
@current_version = @current_version OUTPUT,
2530
@stream_id = @stream_id OUTPUT;
2631

2732
BEGIN TRY
33+
SET @new_version = @current_version + @count_messages;
34+
35+
/*
36+
If another writer raced us, the unique constraint (StreamId,StreamPosition) will throw here.
37+
Translate to WrongExpectedVersion in the CATCH below.
38+
*/
2839
INSERT INTO __schema__.Messages (
2940
MessageId,
3041
MessageType,
@@ -34,45 +45,54 @@ BEGIN
3445
JsonMetadata,
3546
Created
3647
)
48+
OUTPUT inserted.GlobalPosition
49+
INTO @inserted (GlobalPosition)
3750
SELECT
3851
message_id,
3952
message_type,
4053
@stream_id,
41-
@current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))),
54+
@current_version + CAST(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS INT),
4255
json_data,
4356
json_metadata,
44-
@created
45-
FROM @messages
57+
ISNULL(@created, SYSUTCDATETIME())
58+
FROM @messages;
4659
END TRY
4760
BEGIN CATCH
48-
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
61+
DECLARE @errmsg NVARCHAR(2048) = ERROR_MESSAGE();
62+
63+
IF ERROR_NUMBER() IN (
64+
2627, -- Violation of PRIMARY KEY or UNIQUE constraint
65+
2601 -- Cannot insert duplicate key row in object with unique index
66+
)
67+
AND (@errmsg LIKE N'%UQ_StreamIdAndStreamPosition%')
4968
BEGIN
50-
DECLARE @streamIdFromError NVARCHAR(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
51-
DECLARE @streamPositionFromError NVARCHAR(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))
69+
-- Must BEGIN with "WrongExpectedVersion" for the client detection of OptimisticConcurrencyException
70+
DECLARE @clientMsg NVARCHAR(4000) =
71+
N'WrongExpectedVersion: duplicate append for stream '
72+
+ CAST(@stream_id AS NVARCHAR(20))
73+
+ N' with expected_version=' + CAST(@expected_version AS NVARCHAR(20))
74+
+ N'. SQL: ' + @errmsg;
5275

53-
-- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
54-
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
55-
THROW 50000, @customErrorMessage, 1;
76+
THROW 50000, @clientMsg, 1;
5677
END;
5778
ELSE
5879
BEGIN
5980
;THROW;
6081
END;
6182
END CATCH;
6283

63-
SELECT TOP (1)
64-
@current_version = StreamPosition,
65-
@position = GlobalPosition
66-
FROM __schema__.Messages
67-
WHERE StreamId = @stream_id
68-
ORDER BY GlobalPosition DESC;
69-
7084
UPDATE s
71-
SET [Version] = @current_version
85+
SET [Version] = @new_version
7286
FROM __schema__.Streams s
7387
WHERE s.StreamId = @stream_id;
7488

89+
-- final GlobalPosition value to return
90+
SELECT @position = (
91+
SELECT MAX(GlobalPosition)
92+
FROM @inserted
93+
);
94+
7595
SELECT
76-
@current_version current_version,
96+
@new_version current_version,
7797
@position position;
7898
END;

0 commit comments

Comments
 (0)