Skip to content

Commit c489f67

Browse files
nmummaunealmummau
andauthored
mssql improvements, code clean up and fix issue 428 (#431)
* feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - Wrap keywords in brackets ([Version], [Messages]) - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (INT, AND) - use alias to DELETE statement - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - Wrap keywords in brackets ([Messages]) - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (INT, BIGINT) - remove 'Messages.' from query since it was redundant - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - Wrap keywords in brackets ([Version], [Messages]) - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (INT) - remove 'Messages.' from query since it was redundant - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - Wrap keywords in brackets ([Version], [Messages]) - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (INT) - remove 'Messages.' from query since it was redundant - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - formatting: each column on it's own line. Best practice for maintainability of procedures - use aliases - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (AND) - semicolons * feat(mssql): Minor SQL changes; no functional changes - SET NOCOUNT ON - remove unused declared variables - specify all parameters when calling [check_stream] procedure - formatting: each column on it's own line. Best practice for maintainability of procedures - proper casing of keywords (IF, ISNULL, NVARCHAR) - use alias to UPDATE statement - remove 'AS' keyword - semicolons * feat(mssql): Minor SQL updates - consistent casing of [Version] column - explicitly specify NULLable columns * fix(mssql): Messages.Created NOT NULL SQL Server schema change so that Messages.Created is NOT NULL. This is safe and accurate to do since append_events will always set this value if a NULL is provided to the procedure. * feat(mssql): json is not null - Messages TABLE - JsonMetaData is NOT NULL - json columns are checked to ensure they contain json data - StreamMessage TYPE - json_metadata is NOT NULL * feat(mssql): DATETIME2 is actually a DATETIME2(7) so explicitly say that in the scripts * feat(mssql): ORDER BY StreamPosition per bito-code-review, and and remove redundencies in the CHECK definition * feat(postgresql): ORDER BY stream_position per bito-code-review * refactor(mssql): remove square brackets to satisfy 'inconsistent table naming' warning that bito-code-review flagged * feat(mssql): append_events improve At first when writing this change I assumed I would be out to fix issue #429. Turns out that is not an issue as the client wraps this procedure call in a transaction. Thus, no transction is needed within the procedure itself. Here are some improvements, nonetheless. - Changed @stream_name parameter to NVARCHAR(850) for consistency - Introduced @Inserted table to capture inserted GlobalPosition values atomically - Normalized optimistic concurrency errors (2627, 2601) to client-detectable messages - Thrown error messages still StartsWith 'WrongExpectedVersion' but now also includes the full SQL error text for client tracing - Removed redundant post-commit queries and error-parsing logic * perf(mssql): enable OPTIMIZE_FOR_SEQUENTIAL_KEY on primary keys for Streams and Messages - Added WITH (OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) to clustered primary keys on __schema__.Streams (PK_Streams) and __schema__.Messages (PK_Events) - Improves insert throughput and reduces latch contention on identity-based keys - No changes to schema shape, constraints, or indexes beyond PK optimization This improves concurrent insert performance under high write workloads by allowing SQL Server to better handle last-page insert contention on sequential identity keys. See docs on OPTIMIZE_FOR_SEQUENTIAL_KEY: https://learn.microsoft.com/en-us/sql/t-sql/statements/alter-table-index-option-transact-sql?view=sql-server-ver17#optimize_for_sequential_key---on--off- * chore(mssql): enable XACT_ABORT for all stored procedures used by Eventuous Added SET XACT_ABORT ON to all SQL stored procedures invoked through GetStoredProcCommand() to ensure atomic rollback behavior when the client transaction encounters an error. This prevents partial commits and inconsistent transactional states during event append and stream operations. Related to issue #429 * fix(mssql): truncate_stream wasn't properly formatting and and sending the error message * refactor(mssql): use SCOPE_IDENTITY() to get new StreamId rather than querying the table for the record we just inserted * fix(mssql): correct backwards read bounds check Replaced invalid guard `@current_version < @from_position + @count` with proper range validation `@from_position < 0 OR @from_position > @current_version`. Previous logic incorrectly returned no results when valid messages existed. New check correctly validates start position independently of page size. * fix(mssql): moved stream version update inside TRY block and added `@@ROWCOUNT` check to detect concurrent updates. If no rows are affected, the procedure now throws a `WrongExpectedVersion` error, aligning behavior with duplicate append handling. - Prevents silent version overwrite on race conditions - Ensures consistency with optimistic concurrency semantics - Keeps existing WrongExpectedVersion error format for client handling --------- Co-authored-by: Neal Mummau <[email protected]>
1 parent f4444af commit c489f67

File tree

9 files changed

+282
-133
lines changed

9 files changed

+282
-133
lines changed

src/Postgres/src/Eventuous.Postgresql/Scripts/6_ReadStreamForwards.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ begin
3333
m.json_data, m.json_metadata, m.created
3434
from __schema__.messages m
3535
where m.stream_id = _stream_id and m.stream_position >= _from_position
36-
order by m.global_position
36+
order by m.stream_position
3737
limit _count;
3838
end;
3939

src/SqlServer/src/Eventuous.SqlServer/Scripts/1_Schema.sql

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ IF OBJECT_ID('__schema__.Streams', 'U') IS NULL
1010
StreamId INT IDENTITY (1,1) NOT NULL,
1111
StreamName NVARCHAR(850) NOT NULL,
1212
[Version] INT DEFAULT (-1) NOT NULL,
13-
CONSTRAINT PK_Streams PRIMARY KEY CLUSTERED (StreamId),
13+
CONSTRAINT PK_Streams PRIMARY KEY CLUSTERED (StreamId) WITH (OPTIMIZE_FOR_SEQUENTIAL_KEY = ON),
1414
CONSTRAINT UQ_StreamName UNIQUE NONCLUSTERED (StreamName),
15-
CONSTRAINT CK_VersionGteNegativeOne CHECK ([version] >= -1)
15+
CONSTRAINT CK_VersionGteNegativeOne CHECK ([Version] >= -1)
1616
);
1717
END
1818

@@ -25,14 +25,16 @@ IF OBJECT_ID('__schema__.Messages', 'U') IS NULL
2525
StreamId INT NOT NULL,
2626
StreamPosition INT NOT NULL,
2727
GlobalPosition BIGINT IDENTITY (0,1) NOT NULL,
28-
JsonData NVARCHAR(max) NOT NULL,
29-
JsonMetadata NVARCHAR(max),
30-
Created DATETIME2,
31-
CONSTRAINT PK_Events PRIMARY KEY CLUSTERED (GlobalPosition),
28+
JsonData NVARCHAR(MAX) NOT NULL,
29+
JsonMetadata NVARCHAR(MAX) NOT NULL,
30+
Created DATETIME2(7) NOT NULL,
31+
CONSTRAINT PK_Events PRIMARY KEY CLUSTERED (GlobalPosition) WITH (OPTIMIZE_FOR_SEQUENTIAL_KEY = ON),
3232
CONSTRAINT FK_MessageStreamId FOREIGN KEY (StreamId) REFERENCES __schema__.Streams (StreamId),
3333
CONSTRAINT UQ_StreamIdAndStreamPosition UNIQUE NONCLUSTERED (StreamId, StreamPosition),
3434
CONSTRAINT UQ_StreamIdAndMessageId UNIQUE NONCLUSTERED (StreamId, MessageId),
35-
CONSTRAINT CK_StreamPositionGteZero CHECK (Messages.StreamPosition >= 0),
35+
CONSTRAINT CK_StreamPositionGteZero CHECK (StreamPosition >= 0),
36+
CONSTRAINT CK_JsonDataIsJson CHECK (ISJSON(JsonData) = 1),
37+
CONSTRAINT CK_JsonMetadataIsJson CHECK (ISJSON(JsonMetadata) = 1),
3638
INDEX IDX_EventsStream (StreamId)
3739
);
3840
END
@@ -42,7 +44,7 @@ IF OBJECT_ID('__schema__.Checkpoints', 'U') IS NULL
4244
CREATE TABLE __schema__.Checkpoints
4345
(
4446
Id NVARCHAR(128) NOT NULL,
45-
Position BIGINT NULL,
47+
Position BIGINT NULL,
4648
CONSTRAINT PK_Checkpoints PRIMARY KEY CLUSTERED (Id),
4749
);
4850
END
@@ -53,8 +55,7 @@ IF TYPE_ID('__schema__.StreamMessage') IS NULL
5355
(
5456
message_id UNIQUEIDENTIFIER NOT NULL,
5557
message_type NVARCHAR(128) NOT NULL,
56-
json_data NVARCHAR(max) NOT NULL,
57-
json_metadata NVARCHAR(max)
58+
json_data NVARCHAR(MAX) NOT NULL,
59+
json_metadata NVARCHAR(MAX) NOT NULL
5860
)
5961
END
60-
Lines changed: 95 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,111 @@
11
CREATE OR ALTER PROCEDURE __schema__.append_events
2-
@stream_name VARCHAR(850),
2+
@stream_name NVARCHAR(850),
33
@expected_version INT,
4-
@created DATETIME2 NULL,
4+
@created DATETIME2(7) NULL,
55
@messages __schema__.StreamMessage READONLY
66
AS
77
BEGIN
8-
DECLARE @current_version INT,
8+
SET NOCOUNT ON;
9+
SET XACT_ABORT ON;
10+
11+
-- Note: This procedure is wrapped in a transaction by the caller. This explains why there is no explicit transaction here within the procedure.
12+
13+
DECLARE
14+
@current_version INT,
915
@stream_id INT,
1016
@position BIGINT,
11-
@customErrorMessage NVARCHAR(200),
12-
@newMessagesCount INT,
13-
@expected_StreamVersionAfterUpdate INT,
14-
@actual_StreamVersionAfterUpdate INT
17+
@count_messages INT,
18+
@new_version INT;
1519

16-
if @created is null
17-
BEGIN
18-
SET @created = SYSUTCDATETIME()
19-
END
20+
-- capture inserted rows to compute final position
21+
DECLARE @inserted TABLE (
22+
GlobalPosition BIGINT
23+
);
2024

21-
EXEC [__schema__].[check_stream] @stream_name, @expected_version, @current_version = @current_version OUTPUT, @stream_id = @stream_id OUTPUT
25+
SELECT @count_messages = COUNT(1) FROM @messages;
26+
27+
EXEC __schema__.check_stream
28+
@stream_name = @stream_name,
29+
@expected_version = @expected_version,
30+
@current_version = @current_version OUTPUT,
31+
@stream_id = @stream_id OUTPUT;
32+
33+
SET @new_version = @current_version + @count_messages;
2234

2335
BEGIN TRY
24-
INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
25-
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
26-
FROM @messages
36+
37+
/*
38+
If another writer raced us, the unique constraint (StreamId,StreamPosition) will throw here.
39+
Translate to WrongExpectedVersion in the CATCH below.
40+
*/
41+
INSERT INTO __schema__.Messages (
42+
MessageId,
43+
MessageType,
44+
StreamId,
45+
StreamPosition,
46+
JsonData,
47+
JsonMetadata,
48+
Created
49+
)
50+
OUTPUT inserted.GlobalPosition
51+
INTO @inserted (GlobalPosition)
52+
SELECT
53+
message_id,
54+
message_type,
55+
@stream_id,
56+
@current_version + CAST(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS INT),
57+
json_data,
58+
json_metadata,
59+
ISNULL(@created, SYSUTCDATETIME())
60+
FROM @messages;
61+
62+
UPDATE s
63+
SET [Version] = @new_version
64+
FROM __schema__.Streams s
65+
WHERE s.StreamId = @stream_id
66+
AND s.[Version] = @current_version;
67+
68+
IF @@ROWCOUNT = 0
69+
BEGIN
70+
DECLARE @streamUpdateErrorMessage NVARCHAR(4000) = CONCAT(
71+
N'WrongExpectedVersion: concurrent update detected for stream ',
72+
CAST(@stream_id AS NVARCHAR(20))
73+
);
74+
;THROW 50000, @streamUpdateErrorMessage, 1;
75+
END
76+
2777
END TRY
2878
BEGIN CATCH
29-
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
79+
DECLARE @errmsg NVARCHAR(2048) = ERROR_MESSAGE();
80+
81+
IF ERROR_NUMBER() IN (
82+
2627, -- Violation of PRIMARY KEY or UNIQUE constraint
83+
2601 -- Cannot insert duplicate key row in object with unique index
84+
)
85+
AND (@errmsg LIKE N'%UQ_StreamIdAndStreamPosition%')
3086
BEGIN
31-
DECLARE @streamIdFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
32-
DECLARE @streamPositionFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))
87+
-- Must BEGIN with "WrongExpectedVersion" for the client detection of OptimisticConcurrencyException
88+
DECLARE @clientMsg NVARCHAR(4000) =
89+
N'WrongExpectedVersion: duplicate append for stream '
90+
+ CAST(@stream_id AS NVARCHAR(20))
91+
+ N' with expected_version=' + CAST(@expected_version AS NVARCHAR(20))
92+
+ N'. SQL: ' + @errmsg;
3393

34-
-- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
35-
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
36-
THROW 50000, @customErrorMessage, 1;
37-
END
94+
THROW 50000, @clientMsg, 1;
95+
END;
3896
ELSE
39-
THROW
40-
END CATCH
41-
42-
SELECT TOP 1 @current_version = StreamPosition, @position = GlobalPosition
43-
FROM __schema__.Messages
44-
WHERE StreamId = @stream_id
45-
ORDER BY GlobalPosition DESC
46-
47-
UPDATE __schema__.Streams SET Version = @current_version WHERE StreamId = @stream_id
48-
49-
SELECT @current_version AS current_version, @position AS position
50-
END
97+
BEGIN
98+
;THROW;
99+
END;
100+
END CATCH;
101+
102+
-- final GlobalPosition value to return
103+
SELECT @position = (
104+
SELECT MAX(GlobalPosition)
105+
FROM @inserted
106+
);
107+
108+
SELECT
109+
@new_version current_version,
110+
@position position;
111+
END;
Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,61 @@
1-
CREATE OR ALTER PROCEDURE __schema__.check_stream @stream_name NVARCHAR(850),
2-
@expected_version int,
3-
@current_version INT OUTPUT,
4-
@stream_id INT OUTPUT
1+
CREATE OR ALTER PROCEDURE __schema__.check_stream
2+
@stream_name NVARCHAR(850),
3+
@expected_version INT,
4+
@current_version INT OUTPUT,
5+
@stream_id INT OUTPUT
56
AS
67
BEGIN
7-
DECLARE @customErrorMessage NVARCHAR(200)
8+
SET NOCOUNT ON;
9+
SET XACT_ABORT ON;
810

9-
SELECT @current_version = [Version], @stream_id = StreamId
11+
DECLARE @customErrorMessage NVARCHAR(200);
12+
13+
SELECT
14+
@current_version = [Version],
15+
@stream_id = StreamId
1016
FROM [__schema__].Streams
11-
WHERE StreamName = @stream_name
17+
WHERE StreamName = @stream_name;
1218

13-
IF @stream_id is null
19+
IF @stream_id IS NULL
1420
BEGIN
1521
IF @expected_version = -2 -- Any
1622
OR @expected_version = -1 -- NoStream
1723
BEGIN
1824
BEGIN TRY
19-
INSERT INTO [__schema__].Streams (StreamName, Version) VALUES (@stream_name, -1);
20-
SELECT @current_version = Version, @stream_id = StreamId
21-
FROM [__schema__].Streams
22-
WHERE StreamName = @stream_name
25+
SET @current_version = -1
26+
INSERT INTO [__schema__].Streams (
27+
StreamName,
28+
[Version]
29+
) VALUES (
30+
@stream_name,
31+
@current_version
32+
);
33+
34+
SET @stream_id = SCOPE_IDENTITY();
2335
END TRY
2436
BEGIN CATCH
2537
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
2638
BEGIN
2739
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already exists', @expected_version);
2840
THROW 50000, @customErrorMessage, 1;
29-
END
41+
END;
3042
ELSE
31-
THROW
32-
END CATCH
33-
END
43+
BEGIN
44+
;THROW;
45+
END;
46+
END CATCH;
47+
END;
3448
ELSE
35-
THROW 50001, N'StreamNotFound', 1;
49+
BEGIN
50+
;THROW 50001, N'StreamNotFound', 1;
51+
END;
3652
END
3753
ELSE
38-
IF @expected_version != -2 and @expected_version != @current_version
54+
BEGIN
55+
IF @expected_version != -2 AND @expected_version != @current_version
3956
BEGIN
4057
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
4158
THROW 50000, @customErrorMessage, 1;
42-
END
43-
END
59+
END;
60+
END;
61+
END;
Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
CREATE OR ALTER PROCEDURE __schema__.read_all_forwards
2-
@from_position bigint,
3-
@count int
4-
AS
2+
@from_position BIGINT,
3+
@count INT
4+
AS
55
BEGIN
6-
7-
SELECT TOP (@count)
8-
MessageId, MessageType, StreamPosition, GlobalPosition,
9-
JsonData, JsonMetadata, Created, StreamName
10-
FROM __schema__.Messages
11-
INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId
12-
WHERE Messages.GlobalPosition >= @from_position
13-
ORDER BY Messages.GlobalPosition
6+
SET NOCOUNT ON;
7+
SET XACT_ABORT ON;
148

15-
END
9+
SELECT TOP (@count)
10+
m.MessageId,
11+
m.MessageType,
12+
m.StreamPosition,
13+
m.GlobalPosition,
14+
m.JsonData,
15+
m.JsonMetadata,
16+
m.Created,
17+
s.StreamName
18+
FROM __schema__.Messages m
19+
JOIN __schema__.Streams s ON m.StreamId = s.StreamId
20+
WHERE m.GlobalPosition >= @from_position
21+
ORDER BY m.GlobalPosition;
22+
END;

src/SqlServer/src/Eventuous.SqlServer/Scripts/5_ReadStreamBackwards.sql

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,47 @@ CREATE OR ALTER PROCEDURE __schema__.read_stream_backwards
44
@count INT
55
AS
66
BEGIN
7+
SET NOCOUNT ON;
8+
SET XACT_ABORT ON;
79

8-
DECLARE @current_version int, @stream_id int
10+
DECLARE
11+
@current_version INT,
12+
@stream_id INT;
913

10-
SELECT @current_version = Version, @stream_id = StreamId
11-
FROM __schema__.Streams
12-
WHERE StreamName = @stream_name
14+
SELECT
15+
@current_version = [Version],
16+
@stream_id = StreamId
17+
FROM __schema__.Streams
18+
WHERE StreamName = @stream_name;
1319

14-
IF @stream_id IS NULL
15-
THROW 50001, 'StreamNotFound', 1;
20+
IF @stream_id IS NULL
21+
BEGIN
22+
;THROW 50001, 'StreamNotFound', 1;
23+
END;
1624

17-
IF @current_version < @from_position + @count
18-
RETURN
25+
-- nothing to read / invalid request
26+
IF @count <= 0
27+
BEGIN
28+
RETURN;
29+
END;
1930

20-
SELECT TOP (@count)
21-
MessageId, MessageType, StreamPosition, GlobalPosition,
22-
JsonData, JsonMetadata, Created
23-
FROM __schema__.Messages
24-
WHERE StreamId = @stream_id AND StreamPosition <= @from_position
25-
ORDER BY Messages.StreamPosition DESC
31+
-- Validate the starting position for backwards read.
32+
IF @from_position < 0 -- A negative starting position is invalid
33+
OR @from_position > @current_version -- A starting position greater than the current version means we're trying to read from beyond the head of the stream
34+
BEGIN
35+
RETURN;
36+
END;
2637

27-
END
38+
SELECT TOP (@count)
39+
MessageId,
40+
MessageType,
41+
StreamPosition,
42+
GlobalPosition,
43+
JsonData,
44+
JsonMetadata,
45+
Created
46+
FROM __schema__.Messages
47+
WHERE StreamId = @stream_id
48+
AND StreamPosition <= @from_position
49+
ORDER BY StreamPosition DESC;
50+
END;

0 commit comments

Comments
 (0)