Skip to content
Open
41 changes: 30 additions & 11 deletions packages/agents-hosting-storage-blob/src/blobsStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import StreamConsumers from 'stream/consumers'
import { isTokenCredential, TokenCredential } from '@azure/core-auth'
import {
AnonymousCredential,
BlobRequestConditions,
ContainerClient,
StoragePipelineOptions,
StorageSharedKeyCredential,
} from '@azure/storage-blob'
import { Storage, StoreItems } from '@microsoft/agents-hosting'
import { ETagConflictError, ItemAlreadyExistsError, Storage, StorageWriteOptions, StoreItems } from '@microsoft/agents-hosting'
import { sanitizeBlobKey } from './blobsTranscriptStore'
import { ignoreError, isStatusCodeError } from './ignoreError'
import { debug } from '@microsoft/agents-activity/logger'
Expand Down Expand Up @@ -134,30 +135,48 @@ export class BlobsStorage implements Storage {
* @returns A promise that resolves when the write operation is complete
* @throws Will throw if there's a validation error, eTag conflict, or other storage error
*/
async write (changes: StoreItems): Promise<void> {
async write (changes: StoreItems, options?: StorageWriteOptions): Promise<StoreItems> {
z.record(z.unknown()).parse(changes)

await this._initialize()

await Promise.all(
Object.entries(changes).map(async ([key, { eTag = '', ...change }]) => {
const entries = Object.entries(changes)
const results = await Promise.all(
entries.map(async ([key, value]) => {
const { eTag = '', ...change } = value
const conditions: BlobRequestConditions = {}

if (options?.ifNotExists) {
logger.debug(`ifNoneMatch=* condition applied for key: ${key}`)
conditions.ifNoneMatch = '*'
} else if (typeof eTag === 'string' && eTag !== '*') {
logger.debug(`ifMatch=${eTag} condition applied for key: ${key}`)
conditions.ifMatch = eTag
}

try {
const blob = this._containerClient.getBlockBlobClient(sanitizeBlobKey(key))
const serialized = JSON.stringify(change)
logger.debug(`Writing blob: ${key}, eTag: ${eTag}, size: ${serialized.length}`)
return await blob.upload(serialized, serialized.length, {
conditions: typeof eTag === 'string' && eTag !== '*' ? { ifMatch: eTag } : {},
const item = await blob.upload(serialized, serialized.length, {
conditions,
blobHTTPHeaders: { blobContentType: 'application/json' },
})
} catch (err: any) {
if (err.statusCode === 412) {
throw new Error(`Storage: error writing "${key}" due to eTag conflict.`)
} else {
throw err

return { key, eTag: item.etag }
} catch (cause: any) {
if (cause.code === 409) {
throw new ItemAlreadyExistsError(`Unable to write '${key}' because it already exists.`, { cause })
} else if (cause.code === 412) {
throw new ETagConflictError(`Unable to write '${key}' due to eTag conflict.`, { cause })
}

throw cause
}
})
)

return results.reduce((acc, { key, eTag }) => ({ ...acc, [key]: { eTag } }), {})
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { Container, CosmosClient } from '@azure/cosmos'
import { Container, CosmosClient, ItemDefinition, ItemResponse, RequestOptions } from '@azure/cosmos'
import { CosmosDbKeyEscape } from './cosmosDbKeyEscape'
import { DocumentStoreItem } from './documentStoreItem'
import { CosmosDbPartitionedStorageOptions } from './cosmosDbPartitionedStorageOptions'
import { Storage, StoreItems } from '@microsoft/agents-hosting'
import { ETagConflictError, ItemAlreadyExistsError, Storage, StorageWriteOptions, StoreItems } from '@microsoft/agents-hosting'

/**
* A utility class to ensure that a specific asynchronous task is executed only once for a given key.
Expand Down Expand Up @@ -141,40 +141,56 @@ export class CosmosDbPartitionedStorage implements Storage {
* Writes items to storage.
* @param changes The items to write.
*/
async write (changes: StoreItems): Promise<void> {
async write (changes: StoreItems, options?: StorageWriteOptions): Promise<StoreItems> {
if (!changes) {
throw new ReferenceError('Changes are required when writing.')
} else if (changes.length === 0) {
return
} else if (Object.keys(changes).length === 0) {
return {}
}

await this.initialize()

await Promise.all(
Object.entries(changes).map(async ([key, { eTag, ...change }]): Promise<void> => {
const document = new DocumentStoreItem({
id: CosmosDbKeyEscape.escapeKey(
key,
this.cosmosDbStorageOptions.keySuffix,
this.cosmosDbStorageOptions.compatibilityMode
),
realId: key,
document: change,
})
const writePromises = Object.entries(changes).map(async ([key, value]) => {
const { eTag, ...change } = value
const requestOptions: RequestOptions = {}

const accessCondition =
eTag !== '*' && eTag != null && eTag.length > 0
? { accessCondition: { type: 'IfMatch', condition: eTag } }
: undefined
if (eTag !== '*' && eTag != null && eTag.length > 0) {
requestOptions.accessCondition = { type: 'IfMatch', condition: eTag }
}

try {
await this.container.items.upsert(document, accessCondition)
} catch (err: any) {
this.checkForNestingError(change, err)
this.throwInformativeError('Error upserting document', err)
}
const document = new DocumentStoreItem({
id: CosmosDbKeyEscape.escapeKey(
key,
this.cosmosDbStorageOptions.keySuffix,
this.cosmosDbStorageOptions.compatibilityMode
),
realId: key,
document: change,
})
)

try {
let item: ItemResponse<ItemDefinition>
if (options?.ifNotExists) {
requestOptions.accessCondition = { type: 'IfNoneMatch', condition: '*' }
item = await this.container.items.create(document, requestOptions)
} else {
item = await this.container.items.upsert(document, requestOptions)
}
return { key, eTag: item.etag }
} catch (cause: any) {
if (cause.code === 409) {
throw new ItemAlreadyExistsError(`Unable to write '${key}' because it already exists.`, { cause })
} else if (cause.code === 412) {
throw new ETagConflictError(`Unable to write '${key}' due to eTag conflict.`, { cause })
}
this.checkForNestingError(change, cause)
this.throwInformativeError('Error upserting document', cause)
throw cause
}
})

const results = await Promise.all(writePromises)
return results.reduce((acc, { key, eTag }) => ({ ...acc, [key]: { eTag } }), {})
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/agents-hosting/src/app/auth/handlerStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ export class HandlerStorage<TActiveHandler extends ActiveAuthorizationHandler =
/**
* Writes handler state to storage.
*/
public write (data: TActiveHandler) : Promise<void> {
public write (data: TActiveHandler) {
return this.storage.write({ [this.key]: data })
}

/**
* Deletes handler state from storage.
*/
public async delete (): Promise<void> {
public async delete () {
try {
await this.storage.delete([this.key])
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ export class AzureBotAuthorization implements AuthorizationHandler {
* @param callback The callback function to be invoked on sign-in failure.
*/
onFailure (callback: (context: TurnContext, reason?: string) => Promise<void> | void): void {
this._onFailure = callback
this._onFailure = async (context, reason) => {
// Clear any token exchange state on failure.
await this.deleteTokenExchange(context)
return callback(context, reason)
}
}

/**
Expand Down Expand Up @@ -274,6 +278,7 @@ export class AzureBotAuthorization implements AuthorizationHandler {
logger.debug(this.prefix(`Signing out User '${user}' from => Channel: '${channel}', Connection: '${connection}'`), context.activity)
const userTokenClient = await this.getUserTokenClient(context)
await userTokenClient.signOut(user, connection, channel)
await this.deleteTokenExchange(context)
return true
}

Expand Down Expand Up @@ -403,6 +408,8 @@ export class AzureBotAuthorization implements AuthorizationHandler {
const oCard = CardFactory.oauthCard(this._options.name!, this._options.title!, this._options.text!, signInResource, this._options.enableSso)
await context.sendActivity(MessageFactory.attachment(oCard))
await storage.write({ activity, id: this.id, ...(active ?? {}), attemptsLeft: this.maxAttempts })
// Clear any previous token exchange state when starting a new sign-in process.
await this.deleteTokenExchange(context)
return AuthorizationHandlerStatus.PENDING
}

Expand Down Expand Up @@ -461,6 +468,16 @@ export class AzureBotAuthorization implements AuthorizationHandler {
return AuthorizationHandlerStatus.PENDING
}

// Duplication check is done after successful token exchange to allow MS Teams show the consent prompt per platform (e.g., web, mobile) in case of failing the token exchange.
// If the duplication check is done before, only one platform will show the consent prompt.
// Note: in case this check needs to be done before token exchange, consider adding the isSsoUserConsentFlow === undefined flag,
// to allow multiple token exchanges when the flag is set (indicating user consent flow), duplicated across platforms will still apply (showing one consent prompt).
if (await this.isTokenExchangeDuplicated(context)) {
logger.debug('Skipping duplicated signin/tokenExchange invoke activity.')
// Using PENDING state to avoid deleting the active session, as the deletion will be done by the first token exchange activity.
return AuthorizationHandlerStatus.PENDING
}

await this.sendInvokeResponse<TokenExchangeInvokeResponse>(context, {
status: 200,
body: { id: tokenExchangeInvokeRequest.id, connectionName: this._options.name! }
Expand Down Expand Up @@ -491,12 +508,7 @@ export class AzureBotAuthorization implements AuthorizationHandler {
/**
* Verifies the magic code provided by the user.
*/
private async codeVerification (storage: HandlerStorage<AzureBotActiveHandler>, context: TurnContext, active?: AzureBotActiveHandler): Promise<{ status: AuthorizationHandlerStatus, code?: string }> {
if (!active) {
logger.debug(this.prefix('No active session found. Skipping code verification.'), context.activity)
return { status: AuthorizationHandlerStatus.IGNORED }
}

private async codeVerification (storage: HandlerStorage<AzureBotActiveHandler>, context: TurnContext, active: AzureBotActiveHandler): Promise<{ status: AuthorizationHandlerStatus, code?: string }> {
const { activity } = context
let state: string | undefined = activity.text

Expand Down Expand Up @@ -603,4 +615,54 @@ export class AzureBotAuthorization implements AuthorizationHandler {
return acc
}, []) ?? []
}

/**
* Generates a storage key for persisting token exchange state.
* @param context The turn context containing activity information.
* @returns The storage key string in format: `auth/azurebot/exchange/{channelId}/{userId}`.
* @throws Will throw an error if required activity properties are missing.
*/
private getTokenExchangeKey (context: TurnContext): string {
const channelId = context.activity.channelId
const userId = context.activity.from?.id
if (!channelId || !userId) {
throw new Error('Both \'activity.channelId\' and \'activity.from.id\' are required to generate the Token Exchange Storage key.')
}
return `auth/azurebot/exchange/${channelId}/${userId}`
}

/**
* Checks if a token exchange request is duplicated.
* This is used to prevent duplicating responses when receiving multiple identical token exchange requests
* from the same user across different platforms (e.g., web and mobile).
* @param context The turn context.
* @returns True if the token exchange request is duplicated, false otherwise.
*/
private async isTokenExchangeDuplicated (context: TurnContext) {
const key = this.getTokenExchangeKey(context)
try {
await this.settings.storage.write({ [key]: { exchanged: true, timestamp: Date.now() } }, { ifNotExists: true })
return false
} catch (error) {
return true
}
}

/**
* Deletes the token exchange state from storage.
* @param context The turn context.
* @returns A promise that resolves when the state is deleted.
*/
private async deleteTokenExchange (context: TurnContext) {
try {
await this.settings.storage.delete([this.getTokenExchangeKey(context)])
} catch (error) {
if ((error as Error).message?.toLowerCase().includes('not found')) {
logger.debug(this.prefix('Token exchange state not found for deletion.'))
return
}

throw error
}
}
}
2 changes: 1 addition & 1 deletion packages/agents-hosting/src/app/turnState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ export class TurnState<
}

if (storage) {
const promises: Promise<void>[] = []
const promises: Promise<any>[] = []
if (changes) {
promises.push(storage.write(changes))
}
Expand Down
11 changes: 11 additions & 0 deletions packages/agents-hosting/src/storage/eTagConflictError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

/**
* Error thrown when there is an eTag conflict during a storage operation.
*/
export class ETagConflictError extends Error {
public readonly code: number = 412
}
13 changes: 9 additions & 4 deletions packages/agents-hosting/src/storage/fileStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import path from 'path'
import fs from 'fs'
import { Storage, StoreItem } from './storage'
import { Storage, StorageWriteOptions, StoreItem, StoreItems } from './storage'
import { ItemAlreadyExistsError } from './itemAlreadyExistsError'

/**
* A file-based storage implementation that persists data to the local filesystem.
Expand Down Expand Up @@ -50,7 +51,7 @@ import { Storage, StoreItem } from './storage'
*/
export class FileStorage implements Storage {
private _folder: string
private _stateFile: Record<string, string>
private _stateFile: Record<string, any>

/**
* Creates a new FileStorage instance that stores data in the specified folder.
Expand Down Expand Up @@ -123,13 +124,17 @@ export class FileStorage implements Storage {
* > Any eTag values in the changes object are ignored.
*
*/
write (changes: StoreItem) : Promise<void> {
write (changes: StoreItems, options?: StorageWriteOptions) : Promise<StoreItems> {
const keys = Object.keys(changes)
for (const key of keys) {
if (options?.ifNotExists && key in this._stateFile) {
throw new ItemAlreadyExistsError(`The key '${key}' already exists in storage.`)
}
this._stateFile[key] = changes[key]
}

fs.writeFileSync(this._folder + '/state.json', JSON.stringify(this._stateFile, null, 2))
return Promise.resolve()
return Promise.resolve(changes)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/agents-hosting/src/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './storage'
export * from './memoryStorage'
export * from './fileStorage'
export * from './eTagConflictError'
export * from './itemAlreadyExistsError'
11 changes: 11 additions & 0 deletions packages/agents-hosting/src/storage/itemAlreadyExistsError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

/**
* Error thrown when attempting to create an item that already exists in storage.
*/
export class ItemAlreadyExistsError extends Error {
public readonly code: number = 409
}
Loading