Skip to content

Async multi packet fixes for 6.1.0 #3534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13112,6 +13112,8 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
char[] temp = null;
bool buffIsRented = false;
int startOffset = 0;

stateObj.RequestContinue(true);
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();

if (canContinue)
Expand Down Expand Up @@ -13206,7 +13208,7 @@ bool writeDataSizeToSnapshot
if (stateObj._longlen == 0)
{
Debug.Assert(stateObj._longlenleft == 0);
totalCharsRead = 0;
totalCharsRead = startOffsetByteCount / 2;
return TdsOperationStatus.Done; // No data
}

Expand All @@ -13226,14 +13228,14 @@ bool writeDataSizeToSnapshot
// later needing to repeatedly allocate new target buffers and copy data as we discover new data
if (buff == null && stateObj._longlen != TdsEnums.SQL_PLP_UNKNOWNLEN && stateObj._longlen < (int.MaxValue >> 1))
{
if (supportRentedBuff && stateObj._longlen < 1073741824) // 1 Gib
if (supportRentedBuff && stateObj._longlen >> 1 < 1073741824) // 1 Gib
{
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen, len));
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen >> 1, len));
rentedBuff = true;
}
else
{
buff = new char[(int)Math.Min((int)stateObj._longlen, len)];
buff = new char[(int)Math.Min((int)stateObj._longlen >> 1, len)];
rentedBuff = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13396,7 +13396,7 @@ bool writeDataSizeToSnapshot
if (stateObj._longlen == 0)
{
Debug.Assert(stateObj._longlenleft == 0);
totalCharsRead = 0;
totalCharsRead = startOffsetByteCount / 2;
return TdsOperationStatus.Done; // No data
}

Expand All @@ -13416,14 +13416,14 @@ bool writeDataSizeToSnapshot
// allocate the whole buffer in one shot instead of realloc'ing and copying over each time
if (buff == null && stateObj._longlen != TdsEnums.SQL_PLP_UNKNOWNLEN && stateObj._longlen < (int.MaxValue >> 1))
{
if (supportRentedBuff && stateObj._longlen < 1073741824) // 1 Gib
if (supportRentedBuff && stateObj._longlen >> 1 < 1073741824) // 1 Gib
{
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen, len));
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen >> 1, len));
rentedBuff = true;
}
else
{
buff = new char[(int)Math.Min((int)stateObj._longlen, len)];
buff = new char[(int)Math.Min((int)stateObj._longlen >> 1, len)];
rentedBuff = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ public static bool UseCompatibilityProcessSni
{
if (s_useCompatibilityProcessSni == Tristate.NotInitialized)
{
if (AppContext.TryGetSwitch(UseCompatibilityProcessSniString, out bool returnedValue) && returnedValue)
if (AppContext.TryGetSwitch(UseCompatibilityProcessSniString, out bool returnedValue) && !returnedValue)
{
s_useCompatibilityProcessSni = Tristate.True;
s_useCompatibilityProcessSni = Tristate.False;
}
else
{
s_useCompatibilityProcessSni = Tristate.False;
s_useCompatibilityProcessSni = Tristate.True;
Comment on lines +81 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ErikEJ in this PR the default is changed to that UseCompatibilityProcessSni is true by default. That means that new async behaviour is false by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, so in a repro context, we should use:

AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.UseCompatibilityAsyncBehaviour", false);
AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.UseCompatibilityProcessSni", false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For public consumption the settings you give are the defaults.

If you can find an issue I want to know about it so I can fix it, regardless of the settings but you will need to tell me what the settings are so I can try to repro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. No issues found with latest build.

}
}
return s_useCompatibilityProcessSni == Tristate.True;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
{
buffer = null;

(bool canContinue, bool isStarting, _) = stateObj.GetSnapshotStatuses();
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();

List<byte[]> cachedBytes = null;
if (canContinue)
if (canContinue && isContinuing)
{
cachedBytes = stateObj.TryTakeSnapshotStorage() as List<byte[]>;
if (isStarting)
Expand Down Expand Up @@ -81,7 +81,7 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, canContinue, writeDataSizeToSnapshot: false, compatibilityMode: false);
if (result != TdsOperationStatus.Done)
{
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length)
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length && (isContinuing || !isStarting))
{
// succeeded in getting the data but failed to find the next plp length
returnAfterAdd = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4596,9 +4596,7 @@ private TdsOperationStatus TryResetBlobState()
else
{
Debug.Assert(
(_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1)
&&
(_stateObj._longlen == 0 || _stateObj.IsSnapshotContinuing()),
(_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1),
"Haven't read header yet, but column is partially read?"
);
}
Expand Down Expand Up @@ -5753,6 +5751,10 @@ private static Task<T> GetFieldValueAsyncExecute<T>(Task task, object state)
{
return Task.FromResult<T>(reader.GetFieldValueFromSqlBufferInternal<T>(reader._data[columnIndex], reader._metaData[columnIndex], isAsync: true));
}
else
{
return reader.ExecuteAsyncCall(context);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
if (PartialPacketContainsCompletePacket())
{
Packet partialPacket = _partialPacket;
// the partial packet can contain more than a single packet worth of data so to consume the
// partial packet we must use the CurrentLength not just the RequiredLength and then later
// the multiplexer will split out the complete packet for consumption and maintain the
// additional data
SetBuffer(partialPacket.Buffer, 0, partialPacket.CurrentLength);
ClearPartialPacket();
getDataError = TdsEnums.SNI_SUCCESS;
Expand All @@ -50,7 +54,7 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
{
if (_inBytesRead != 0)
{
SetBuffer(new byte[_inBuff.Length], 0, 0);
NewBuffer(_inBuff.Length);
}
getDataError = GetSniPacket(packet, ref dataSize);
}
Expand All @@ -76,7 +80,7 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
{
if (recurse && appended)
{
SetBuffer(new byte[_inBuff.Length], 0, 0);
NewBuffer(_inBuff.Length);
appended = false;
}
MultiplexPackets(
Expand All @@ -95,16 +99,19 @@ out recurse
// if a partial packet was reconstructed it must be handled first
if (consumePartialPacket)
{
// the partial packet has been processed by the multiplexer and should now have
// only data from a single packet in it so we should use RequiredLength which
// is defined by the packet header here not CurrentLength
Debug.Assert(PartialPacket != null && PartialPacket.RequiredLength == PartialPacket.CurrentLength);
if (_snapshot != null)
{
_snapshot.AppendPacketData(PartialPacket.Buffer, PartialPacket.CurrentLength);
SetBuffer(new byte[_inBuff.Length], 0, 0);
_snapshot.AppendPacketData(PartialPacket.Buffer, PartialPacket.RequiredLength);
NewBuffer(_inBuff.Length);
appended = true;
}
else
{
SetBuffer(PartialPacket.Buffer, 0, PartialPacket.CurrentLength);

SetBuffer(PartialPacket.Buffer, 0, PartialPacket.RequiredLength);
}
bufferIsPartialCompleted = true;
ClearPartialPacket();
Expand All @@ -125,7 +132,7 @@ out recurse
// if we SetBuffer here to clear the packet buffer we will break the attention handling which relies
// on the attention containing packet remaining in the active buffer even if we're appending to the
// snapshot so we will have to use the appended variable to prevent the same buffer being added again
//// SetBuffer(new byte[_inBuff.Length], 0, 0);
//// NewBuffer(_inBuff.Length);
appended = true;
}
else
Expand All @@ -141,19 +148,28 @@ out recurse
// we don't process it
if (!bufferIsPartialCompleted)
{
SetBuffer(_inBuff, 0, 0);
NewBuffer(_inBuff.Length);
}
}

// if there is a remainder it must be last
if (remainderPacketProduced)
{
SetPartialPacket(remainderPacket);
if (appended && recurse)
{
// if we've appended to the snapshot already we can't recurse and append to it again because the
// snapshot might be cleared by the async cleanup functions
// the only way to get a recurse output from the multiplexer is if it has produced a remainder packet so
// assert that this is the case and the put the remainder packet in the partial packet so that it
// can be picked up in another call.
recurse = false;
}
if (!bufferIsPartialCompleted)
{
// we are keeping the partial packet buffer so replace it with a new one
// unless we have already set the buffer to the partial packet buffer
SetBuffer(new byte[_inBuff.Length], 0, 0);
NewBuffer(_inBuff.Length);
}
}

Expand Down Expand Up @@ -301,7 +317,7 @@ out bool recurse
remainderPacket = new Packet
{
Buffer = new byte[dataBuffer.Length],
CurrentLength = remainderLength,
CurrentLength = remainderLength
};
remainderPacket.SetCreatedBy(1);

Expand Down
Loading