@@ -29,14 +29,14 @@ import ko from 'knockout';
2929import { container } from 'tsyringe' ;
3030
3131import { Account , ConnectionState , ProcessedEventPayload } from '@wireapp/core' ;
32+ import { PromiseQueue } from '@wireapp/promise-queue' ;
3233import { WebAppEvents } from '@wireapp/webapp-events' ;
3334
3435import { ClientConversationEvent , EventBuilder } from 'Repositories/conversation/EventBuilder' ;
3536import { CryptographyMapper } from 'Repositories/cryptography/CryptographyMapper' ;
3637import { EventName } from 'Repositories/tracking/EventName' ;
3738import { UserState } from 'Repositories/user/UserState' ;
3839import { getLogger , Logger } from 'Util/Logger' ;
39- import { queue } from 'Util/PromiseQueue' ;
4040import { TIME_IN_MILLIS } from 'Util/TimeUtil' ;
4141
4242import { ClientEvent } from './Client' ;
@@ -63,6 +63,8 @@ export class EventRepository {
6363 /** event processors are classes that are able to react and process an incoming event */
6464 private eventProcessors : EventProcessor [ ] = [ ] ;
6565
66+ private eventQueue : PromiseQueue = new PromiseQueue ( ) ;
67+
6668 static get CONFIG ( ) {
6769 return {
6870 E_CALL_EVENT_LIFETIME : TIME_IN_MILLIS . SECOND * 30 ,
@@ -160,17 +162,19 @@ export class EventRepository {
160162 * this function will process any incoming event. It is being queued in case 2 events arrive at the same time.
161163 * Processing events should happen sequentially (thus the queue)
162164 */
163- private readonly handleIncomingEvent = queue ( async ( payload : HandledEventPayload , source : NotificationSource ) => {
164- try {
165- await this . handleEvent ( payload , source ) ;
166- } catch ( error ) {
167- if ( source === EventSource . NOTIFICATION_STREAM ) {
168- this . logger . warn ( `Failed to handle event of type "${ event . type } ": ${ error . message } ` , error ) ;
169- } else {
170- throw error ;
165+ private readonly handleIncomingEvent = async ( payload : HandledEventPayload , source : NotificationSource ) => {
166+ return this . eventQueue . push ( async ( ) => {
167+ try {
168+ await this . handleEvent ( payload , source ) ;
169+ } catch ( error ) {
170+ if ( source === EventSource . NOTIFICATION_STREAM ) {
171+ this . logger . warn ( `Failed to handle event of type "${ event . type } ": ${ error . message } ` , error ) ;
172+ } else {
173+ throw error ;
174+ }
171175 }
172- }
173- } ) ;
176+ } ) ;
177+ } ;
174178
175179 /**
176180 * Import events coming from an external source. This is only useful useful for profiling or debugging.
@@ -324,7 +328,7 @@ export class EventRepository {
324328 }
325329
326330 const conversationId = 'conversation' in event && event . conversation ;
327- const inSelfConversation = conversationId === this . userState . self ( ) . id ;
331+ const inSelfConversation = conversationId === this . userState . self ( ) ? .id ;
328332 if ( ! inSelfConversation ) {
329333 return this . processEvent ( event , source ) ;
330334 }
0 commit comments