@@ -18,7 +18,9 @@ package events
1818
1919import (
2020 "context"
21+ "database/sql/driver"
2122
23+ "github.com/hyperledger/firefly-common/pkg/ffapi"
2224 "github.com/hyperledger/firefly-common/pkg/fftypes"
2325 "github.com/hyperledger/firefly-common/pkg/log"
2426 "github.com/hyperledger/firefly/pkg/core"
@@ -298,22 +300,56 @@ func (em *eventManager) persistBatchContent(ctx context.Context, batch *core.Bat
298300 })
299301 if err != nil {
300302 log .L (ctx ).Debugf ("Batch message insert optimization failed for batch '%s': %s" , batch .ID , err )
301- // Fall back to individual upserts
302- for i , msg := range batch .Payload .Messages {
303- postHookUpdateMessageCache := func () {
304- mm := matchedMsgs [i ]
305- em .data .UpdateMessageCache (mm .message , mm .data )
306- }
307- if err = em .database .UpsertMessage (ctx , msg , database .UpsertOptimizationExisting , postHookUpdateMessageCache ); err != nil {
308- if err == database .HashMismatch {
309- log .L (ctx ).Errorf ("Invalid message entry %d in batch'%s'. Hash mismatch with existing record with same UUID '%s' Hash=%s" , i , batch .ID , msg .Header .ID , msg .Hash )
310- return false , nil // This is not retryable. skip this data entry
303+
304+ // Messages are immutable in their contents, and it's entirely possible we're being sent
305+ // messages we've already been sent in a previous batch, and subsequently modified th
306+ // state of (they've been confirmed etc.).
307+ // So we find a list of those that aren't in the DB and so and insert just those.
308+ var foundIDs []* core.IDAndSequence
309+ foundIDs , err = em .database .GetMessageIDs (ctx , batch .Namespace , messageIDFilter (ctx , batch .Payload .Messages ))
310+ if err == nil {
311+ remainingInserts := make ([]* core.Message , 0 , len (batch .Payload .Messages ))
312+ for _ , m := range batch .Payload .Messages {
313+ isFound := false
314+ for _ , foundID := range foundIDs {
315+ if foundID .ID .Equals (m .Header .ID ) {
316+ isFound = true
317+ log .L (ctx ).Warnf ("Message %s in batch '%s' is a duplicate" , m .Header .ID , batch .ID )
318+ break
319+ }
320+ }
321+ if ! isFound {
322+ remainingInserts = append (remainingInserts , m )
311323 }
312- log .L (ctx ).Errorf ("Failed to insert message entry %d in batch '%s': %s" , i , batch .ID , err )
313- return false , err // a persistence failure here is considered retryable (so returned)
324+ }
325+ if len (remainingInserts ) > 0 {
326+ // Only the remaining ones get updates
327+ err = em .database .InsertMessages (ctx , batch .Payload .Messages , func () {
328+ for _ , mm := range matchedMsgs {
329+ for _ , m := range remainingInserts {
330+ if mm .message .Header .ID .Equals (m .Header .ID ) {
331+ em .data .UpdateMessageCache (mm .message , mm .data )
332+ }
333+ }
334+ }
335+ })
314336 }
315337 }
338+ // If we have an error at this point, we cannot insert (must not be a duplicate)
339+ if err != nil {
340+ log .L (ctx ).Errorf ("Failed to insert messages: %s" , err )
341+ return false , err // a persistence failure here is considered retryable (so returned)
342+ }
316343 }
317344
318345 return true , nil
319346}
347+
348+ func messageIDFilter (ctx context.Context , msgs []* core.Message ) ffapi.Filter {
349+ fb := database .MessageQueryFactory .NewFilter (ctx )
350+ ids := make ([]driver.Value , len (msgs ))
351+ for i , msg := range msgs {
352+ ids [i ] = msg .Header .ID
353+ }
354+ return fb .In ("id" , ids )
355+ }
0 commit comments