@@ -620,5 +620,207 @@ describe('DataStreamClient', () => {
620620 await expect ( companyPromise ) . rejects . toThrow ( 'Company not found' ) ;
621621 } ) ;
622622
623+ describe ( 'two-level caching' , ( ) => {
624+ const multiKeyCompany = {
625+ id : 'company-multi' ,
626+ account_id : 'account-123' ,
627+ environment_id : 'env-123' ,
628+ keys : { name : 'acme' , slug : 'acme-corp' , external_id : 'ext-1' } ,
629+ traits : [ ] ,
630+ rules : [ ] ,
631+ metrics : [ ] ,
632+ plan_ids : [ ] ,
633+ billing_product_ids : [ ] ,
634+ crm_product_ids : [ ] ,
635+ credit_balances : { } ,
636+ } as unknown as Schematic . RulesengineCompany ;
637+
638+ const multiKeyUser = {
639+ id : 'user-multi' ,
640+ account_id : 'account-123' ,
641+ environment_id : 'env-123' ,
642+ keys : { email : 'alice@example.com' , user_id : 'u-1' } ,
643+ traits : [ ] ,
644+ rules : [ ] ,
645+ } as unknown as Schematic . RulesengineUser ;
646+
647+ let messageHandler : ( message : DataStreamResp ) => Promise < void > ;
648+
649+ beforeEach ( async ( ) => {
650+ await client . start ( ) ;
651+ const DatastreamWSClientMock = DatastreamWSClient as jest . MockedClass < typeof DatastreamWSClient > ;
652+ messageHandler = DatastreamWSClientMock . mock . calls [ 0 ] [ 0 ] . messageHandler ;
653+ } ) ;
654+
655+ test ( 'should retrieve company by any of its keys after caching' , async ( ) => {
656+ await messageHandler ( {
657+ entity_type : EntityType . COMPANY ,
658+ message_type : MessageType . FULL ,
659+ data : multiKeyCompany ,
660+ } ) ;
661+
662+ // Look up by each key individually
663+ const byName = await client . getCompany ( { name : 'acme' } ) ;
664+ const bySlug = await client . getCompany ( { slug : 'acme-corp' } ) ;
665+ const byExtId = await client . getCompany ( { external_id : 'ext-1' } ) ;
666+
667+ expect ( byName ) . toEqual ( multiKeyCompany ) ;
668+ expect ( bySlug ) . toEqual ( multiKeyCompany ) ;
669+ expect ( byExtId ) . toEqual ( multiKeyCompany ) ;
670+ } ) ;
671+
672+ test ( 'should retrieve user by any of its keys after caching' , async ( ) => {
673+ await messageHandler ( {
674+ entity_type : EntityType . USER ,
675+ message_type : MessageType . FULL ,
676+ data : multiKeyUser ,
677+ } ) ;
678+
679+ const byEmail = await client . getUser ( { email : 'alice@example.com' } ) ;
680+ const byUserId = await client . getUser ( { user_id : 'u-1' } ) ;
681+
682+ expect ( byEmail ) . toEqual ( multiKeyUser ) ;
683+ expect ( byUserId ) . toEqual ( multiKeyUser ) ;
684+ } ) ;
685+
686+ test ( 'should remove company from cache on DELETE for all keys' , async ( ) => {
687+ mockDatastreamWSClientInstance . isConnected . mockReturnValue ( true ) ;
688+
689+ // Cache the company first
690+ await messageHandler ( {
691+ entity_type : EntityType . COMPANY ,
692+ message_type : MessageType . FULL ,
693+ data : multiKeyCompany ,
694+ } ) ;
695+
696+ // Verify it's cached — returns from cache without sending a WS request
697+ mockDatastreamWSClientInstance . sendMessage . mockClear ( ) ;
698+ const cached = await client . getCompany ( { name : 'acme' } ) ;
699+ expect ( cached ) . toEqual ( multiKeyCompany ) ;
700+ expect ( mockDatastreamWSClientInstance . sendMessage ) . not . toHaveBeenCalled ( ) ;
701+
702+ // Send DELETE
703+ await messageHandler ( {
704+ entity_type : EntityType . COMPANY ,
705+ message_type : MessageType . DELETE ,
706+ data : multiKeyCompany ,
707+ } ) ;
708+
709+ // Don't respond — leave requests pending so we can inspect sendMessage calls
710+ mockDatastreamWSClientInstance . sendMessage . mockResolvedValue ( undefined ) ;
711+
712+ // After delete, each key should miss cache and trigger a WS request
713+ for ( const [ key , value ] of Object . entries ( multiKeyCompany . keys ! ) ) {
714+ mockDatastreamWSClientInstance . sendMessage . mockClear ( ) ;
715+ // Don't await — the promise will pend waiting for a WS response
716+ const pending = client . getCompany ( { [ key ] : value } ) ;
717+ // Let the async cache lookup resolve before checking
718+ await new Promise ( resolve => process . nextTick ( resolve ) ) ;
719+ // A WS request proves the cache was empty for this key
720+ expect ( mockDatastreamWSClientInstance . sendMessage ) . toHaveBeenCalledTimes ( 1 ) ;
721+ // Resolve the pending request so it doesn't leak
722+ await messageHandler ( {
723+ entity_type : EntityType . COMPANY ,
724+ message_type : MessageType . FULL ,
725+ data : { ...multiKeyCompany , keys : { [ key ] : value } } ,
726+ } ) ;
727+ await pending ;
728+ }
729+ } ) ;
730+
731+ test ( 'should remove user from cache on DELETE for all keys' , async ( ) => {
732+ mockDatastreamWSClientInstance . isConnected . mockReturnValue ( true ) ;
733+
734+ // Cache the user
735+ await messageHandler ( {
736+ entity_type : EntityType . USER ,
737+ message_type : MessageType . FULL ,
738+ data : multiKeyUser ,
739+ } ) ;
740+
741+ // Verify it's cached — returns from cache without sending a WS request
742+ mockDatastreamWSClientInstance . sendMessage . mockClear ( ) ;
743+ const cached = await client . getUser ( { email : 'alice@example.com' } ) ;
744+ expect ( cached ) . toEqual ( multiKeyUser ) ;
745+ expect ( mockDatastreamWSClientInstance . sendMessage ) . not . toHaveBeenCalled ( ) ;
746+
747+ // Send DELETE
748+ await messageHandler ( {
749+ entity_type : EntityType . USER ,
750+ message_type : MessageType . DELETE ,
751+ data : multiKeyUser ,
752+ } ) ;
753+
754+ // Don't respond — leave requests pending so we can inspect sendMessage calls
755+ mockDatastreamWSClientInstance . sendMessage . mockResolvedValue ( undefined ) ;
756+
757+ // After delete, each key should miss cache and trigger a WS request
758+ for ( const [ key , value ] of Object . entries ( multiKeyUser . keys ! ) ) {
759+ mockDatastreamWSClientInstance . sendMessage . mockClear ( ) ;
760+ const pending = client . getUser ( { [ key ] : value } ) ;
761+ await new Promise ( resolve => process . nextTick ( resolve ) ) ;
762+ // A WS request proves the cache was empty for this key
763+ expect ( mockDatastreamWSClientInstance . sendMessage ) . toHaveBeenCalledTimes ( 1 ) ;
764+ // Resolve the pending request so it doesn't leak
765+ await messageHandler ( {
766+ entity_type : EntityType . USER ,
767+ message_type : MessageType . FULL ,
768+ data : { ...multiKeyUser , keys : { [ key ] : value } } ,
769+ } ) ;
770+ await pending ;
771+ }
772+ } ) ;
773+
774+ test ( 'should update company in cache and reflect changes across all keys' , async ( ) => {
775+ // Cache initial company
776+ await messageHandler ( {
777+ entity_type : EntityType . COMPANY ,
778+ message_type : MessageType . FULL ,
779+ data : multiKeyCompany ,
780+ } ) ;
781+
782+ // Send updated company with same keys but different data
783+ const updatedCompany = {
784+ ...multiKeyCompany ,
785+ traits : [ { key : 'tier' , value : 'enterprise' } ] ,
786+ } as unknown as Schematic . RulesengineCompany ;
787+
788+ await messageHandler ( {
789+ entity_type : EntityType . COMPANY ,
790+ message_type : MessageType . FULL ,
791+ data : updatedCompany ,
792+ } ) ;
793+
794+ // All keys should return the updated company
795+ const byName = await client . getCompany ( { name : 'acme' } ) ;
796+ const bySlug = await client . getCompany ( { slug : 'acme-corp' } ) ;
797+
798+ expect ( byName ) . toEqual ( updatedCompany ) ;
799+ expect ( bySlug ) . toEqual ( updatedCompany ) ;
800+ } ) ;
801+
802+ test ( 'should update company metrics and reflect via all keys' , async ( ) => {
803+ const companyWithMetrics = {
804+ ...multiKeyCompany ,
805+ metrics : [
806+ { eventSubtype : 'api-call' , value : 10 } ,
807+ ] ,
808+ } as unknown as Schematic . RulesengineCompany ;
809+
810+ await messageHandler ( {
811+ entity_type : EntityType . COMPANY ,
812+ message_type : MessageType . FULL ,
813+ data : companyWithMetrics ,
814+ } ) ;
815+
816+ // Update metrics via one key set
817+ await client . updateCompanyMetrics ( { name : 'acme' } , 'api-call' , 5 ) ;
818+
819+ // Read back via a different key
820+ const bySlug = await client . getCompany ( { slug : 'acme-corp' } ) ;
821+ const metric = ( bySlug as any ) . metrics . find ( ( m : any ) => m . eventSubtype === 'api-call' ) ;
822+ expect ( metric . value ) . toBe ( 15 ) ;
823+ } ) ;
824+ } ) ;
623825
624826} ) ;
0 commit comments