Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions packages/agents-hosting-dialogs/src/agentStateSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,22 @@ export class AgentStateSet {
*
* @remarks
* This will trigger all of the plugins to read in their state in parallel.
*
* If any individual load operation fails, it will be logged but won't fail the entire operation.
*/
async loadAll (context: TurnContext, force = false): Promise<void> {
const promises: Promise<any>[] = this.agentStates.map((agentstate: AgentState) => agentstate.load(context, force))
if (this.agentStates.length === 0) {
return
}

const promises: Promise<any>[] = this.agentStates.map(async (agentstate: AgentState) => {
try {
return await agentstate.load(context, force)
} catch (error) {
// Log individual errors but don't fail the entire operation
console.error(`Failed to load state for agent state: ${error}`)
throw error // Re-throw to maintain original behavior
}
})

await Promise.all(promises)
}
Expand All @@ -67,11 +79,22 @@ export class AgentStateSet {
*
* @remarks
* This will trigger all of the plugins to write out their state in parallel.
* If any individual save operation fails, it will be logged but won't fail the entire operation.
*/
async saveAllChanges (context: TurnContext, force = false): Promise<void> {
const promises: Promise<void>[] = this.agentStates.map((agentstate: AgentState) =>
agentstate.saveChanges(context, force)
)
if (this.agentStates.length === 0) {
return
}

const promises: Promise<void>[] = this.agentStates.map(async (agentstate: AgentState) => {
try {
return await agentstate.saveChanges(context, force)
} catch (error) {
// Log individual errors but don't fail the entire operation
console.error(`Failed to save changes for agent state: ${error}`)
throw error // Re-throw to maintain original behavior
}
})

await Promise.all(promises)
}
Expand Down
37 changes: 25 additions & 12 deletions packages/agents-hosting-dialogs/src/memory/dialogStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,19 +478,24 @@ export class DialogStateManager {
* @returns Normalized paths to pass to `anyPathChanged` method.
*/
trackPaths (paths: string[]): string[] {
if (!paths || paths.length === 0) {
return []
}

const allPaths: string[] = []
paths.forEach((path) => {
for (const path of paths) {
const tpath = this.transformPath(path)
const segments = this.parsePath(tpath, false)
if (segments.length > 0 && (segments.length === 1 || !segments[1].toString().startsWith('_'))) {
// Normalize path and initialize change tracker
// Use join with pre-allocated buffer for better performance
const npath = segments.join('_').toLowerCase()
this.setValue(`${PATH_TRACKER}.${npath}`, 0)

// Return normalized path
allPaths.push(npath)
}
})
}

return allPaths
}
Expand All @@ -503,17 +508,17 @@ export class DialogStateManager {
* @returns True if any path has changed since counter.
*/
anyPathChanged (counter: number, paths: string[]): boolean {
let found = false
if (paths) {
for (let i = 0; i < paths.length; i++) {
if (this.getValue(`${PATH_TRACKER}.${paths[i]}`, 0) > counter) {
found = true
break
}
if (!paths || paths.length === 0) {
return false
}

for (const path of paths) {
if (this.getValue(`${PATH_TRACKER}.${path}`, 0) > counter) {
return true
}
}

return found
return false
}

/**
Expand All @@ -524,9 +529,17 @@ export class DialogStateManager {
// Normalize path and scan for any matches or children that match.
// - We're appending an extra '_' so that we can do substring matches and
// avoid any false positives.
let counter: number
const npath = this.parsePath(path, false).join('_') + '_'
const segments = this.parsePath(path, false)
if (segments.length === 0) {
return
}

const npath = segments.join('_') + '_'
const tracking: object = this.getValue(PATH_TRACKER) || {}

// Get counter once if needed
let counter: number | undefined

for (const key in tracking) {
if (`${key}_`.startsWith(npath)) {
// Populate counter on first use
Expand Down
183 changes: 120 additions & 63 deletions packages/agents-hosting/src/app/agentApplication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -590,87 +590,36 @@ export class AgentApplication<TState extends TurnState> {
logger.info('Running application with activity:', turnContext.activity.id!)
return await this.startLongRunningCall(turnContext, async (context) => {
try {
if (this._options.startTypingTimer) {
this.startTypingTimer(context)
}

if (this._options.removeRecipientMention && context.activity.type === ActivityTypes.Message) {
context.activity.removeRecipientMention()
}

if (this._options.normalizeMentions && context.activity.type === ActivityTypes.Message) {
context.activity.normalizeMentions()
}
await this.initializeTurn(context)

const { storage, turnStateFactory } = this._options
const state = turnStateFactory()
await state.load(context, storage)

const signInState : SignInState = state.getValue('user.__SIGNIN_STATE_')
logger.debug('SignIn State:', signInState)
if (this._authorization && signInState && signInState.completed === false) {
const flowState = await this._authorization.authHandlers[signInState.handlerId!]?.flow?.getFlowState(context)
logger.debug('Flow State:', flowState)
if (flowState && flowState.flowStarted === true) {
const tokenResponse = await this._authorization.beginOrContinueFlow(turnContext, state, signInState?.handlerId!)
const savedAct = Activity.fromObject(signInState?.continuationActivity!)
if (tokenResponse?.token && tokenResponse.token.length > 0) {
logger.info('resending continuation activity:', savedAct.text)
await this.run(new TurnContext(context.adapter, savedAct))
await state.deleteValue('user.__SIGNIN_STATE_')
return true
}
}

// return true
// Handle authentication flows
const authResult = await this.handleAuthentication(context, state)
if (authResult) {
return true
}

// Execute before-turn handlers
if (!(await this.callEventHandlers(context, state, this._beforeTurn))) {
await state.save(context, storage)
return false
}

if (Array.isArray(this._options.fileDownloaders) && this._options.fileDownloaders.length > 0) {
const inputFiles = state.temp.inputFiles ?? []
for (let i = 0; i < this._options.fileDownloaders.length; i++) {
const files = await this._options.fileDownloaders[i].downloadFiles(context, state)
inputFiles.push(...files)
}
state.temp.inputFiles = inputFiles
}

for (const route of this._routes) {
if (await route.selector(context)) {
if (route.authHandlers === undefined || route.authHandlers.length === 0) {
await route.handler(context, state)
} else {
let signInComplete = false
for (const authHandlerId of route.authHandlers) {
logger.info(`Executing route handler for authHandlerId: ${authHandlerId}`)
const tokenResponse = await this._authorization?.beginOrContinueFlow(turnContext, state, authHandlerId)
signInComplete = (tokenResponse?.token !== undefined && tokenResponse?.token.length > 0)
if (!signInComplete) {
break
}
}
if (signInComplete) {
await route.handler(context, state)
}
}
// Process file downloads
await this.processFileDownloads(context, state)

if (await this.callEventHandlers(context, state, this._afterTurn)) {
await state.save(context, storage)
}

return true
}
}
// Route to handlers
const routeHandled = await this.processRoutes(context, state)

// Execute after-turn handlers and save state
if (await this.callEventHandlers(context, state, this._afterTurn)) {
await state.save(context, storage)
}

return false
return routeHandled
} catch (err: any) {
logger.error(err)
throw err
Expand All @@ -680,6 +629,114 @@ export class AgentApplication<TState extends TurnState> {
})
}

/**
* Initializes the turn by starting typing timer and processing mentions.
*/
protected async initializeTurn (context: TurnContext): Promise<void> {
if (this._options.startTypingTimer) {
this.startTypingTimer(context)
}

if (context.activity.type === ActivityTypes.Message) {
if (this._options.removeRecipientMention) {
context.activity.removeRecipientMention()
}

if (this._options.normalizeMentions) {
context.activity.normalizeMentions()
}
}
}

/**
* Handles authentication flows if needed.
* @returns True if authentication handling completed the turn, false to continue processing.
*/
protected async handleAuthentication (context: TurnContext, state: TState): Promise<boolean> {
if (!this._authorization) {
return false
}

const signInState: SignInState = state.getValue('user.__SIGNIN_STATE_')
logger.debug('SignIn State:', signInState)

if (signInState && signInState.completed === false) {
const flowState = await this._authorization.authHandlers[signInState.handlerId!]?.flow?.getFlowState(context)
logger.debug('Flow State:', flowState)

if (flowState && flowState.flowStarted === true) {
const tokenResponse = await this._authorization.beginOrContinueFlow(context, state, signInState?.handlerId!)
const savedAct = Activity.fromObject(signInState?.continuationActivity!)

if (tokenResponse?.token && tokenResponse.token.length > 0) {
logger.info('resending continuation activity:', savedAct.text)
await this.run(new TurnContext(context.adapter, savedAct))
await state.deleteValue('user.__SIGNIN_STATE_')
return true
}
}
}

return false
}

/**
* Processes file downloads if file downloaders are configured.
*/
protected async processFileDownloads (context: TurnContext, state: TState): Promise<void> {
if (!Array.isArray(this._options.fileDownloaders) || this._options.fileDownloaders.length === 0) {
return
}

const inputFiles = state.temp.inputFiles ?? []
for (const downloader of this._options.fileDownloaders) {
const files = await downloader.downloadFiles(context, state)
inputFiles.push(...files)
}
state.temp.inputFiles = inputFiles
}

/**
* Processes route matching and handler execution.
* @returns True if a route was matched and handled, false otherwise.
*/
protected async processRoutes (context: TurnContext, state: TState): Promise<boolean> {
for (const route of this._routes) {
if (await route.selector(context)) {
if (!route.authHandlers || route.authHandlers.length === 0) {
await route.handler(context, state)
} else {
const signInComplete = await this.processRouteAuthentication(context, state, route.authHandlers)
if (signInComplete) {
await route.handler(context, state)
}
}

return true
}
}

return false
}

/**
* Processes authentication for routes that require it.
* @returns True if all required authentications are complete, false otherwise.
*/
protected async processRouteAuthentication (context: TurnContext, state: TState, authHandlers: string[]): Promise<boolean> {
for (const authHandlerId of authHandlers) {
logger.info(`Executing route handler for authHandlerId: ${authHandlerId}`)
const tokenResponse = await this._authorization?.beginOrContinueFlow(context, state, authHandlerId)
const signInComplete = (tokenResponse?.token !== undefined && tokenResponse?.token.length > 0)

if (!signInComplete) {
return false
}
}

return true
}

/**
* Sends a proactive message to a conversation.
*
Expand Down
30 changes: 13 additions & 17 deletions packages/agents-hosting/src/app/turnState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,43 +378,39 @@ export class TurnState<
throw new Error(this._stateNotLoadedString)
}

let changes: StoreItems | undefined
let deletions: string[] | undefined
// Pre-allocate arrays to avoid repeated allocation
const changes: StoreItems = {}
const deletions: string[] = []
let hasChanges = false
let hasDeletions = false

for (const key in this._scopes) {
if (!Object.prototype.hasOwnProperty.call(this._scopes, key)) {
continue
}
const entry = this._scopes[key]
if (entry.storageKey) {
if (entry.isDeleted) {
if (deletions) {
deletions.push(entry.storageKey)
} else {
deletions = [entry.storageKey]
}
deletions.push(entry.storageKey)
hasDeletions = true
} else if (entry.hasChanged) {
if (!changes) {
changes = {}
}

changes[entry.storageKey] = entry.value
hasChanges = true
}
}
}

if (storage) {
if (storage && (hasChanges || hasDeletions)) {
const promises: Promise<void>[] = []
if (changes) {
if (hasChanges) {
promises.push(storage.write(changes))
}

if (deletions) {
if (hasDeletions) {
promises.push(storage.delete(deletions))
}

if (promises.length > 0) {
await Promise.all(promises)
}
await Promise.all(promises)
}
}

Expand Down
Loading
Loading