Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e081983
initial changes for workloads
tvaron3 Oct 13, 2025
fe85531
create initial read write query workload
tvaron3 Oct 13, 2025
0b463cb
remove unnecessary print statement
tvaron3 Oct 13, 2025
54b06be
add changes to perform reads,writes,queries
tvaron3 Oct 13, 2025
6ceccc4
increase time workload runs
tvaron3 Oct 13, 2025
80da224
refactor and react to comments
tvaron3 Oct 13, 2025
1dd267e
react to comments and add vector search queries
tvaron3 Oct 15, 2025
4e7463c
refactor configs
tvaron3 Oct 15, 2025
318a88c
add client engine to workloads
tvaron3 Oct 19, 2025
502a13a
fix vector policy
tvaron3 Oct 20, 2025
b07d714
fix vector policy
tvaron3 Oct 20, 2025
399c0dd
update instructions
tvaron3 Oct 20, 2025
872c341
update instructions
tvaron3 Oct 20, 2025
b8756fb
change permissions on setup script
tvaron3 Oct 20, 2025
15558fe
change to log to a file
tvaron3 Oct 20, 2025
43b3dcf
additional logging for debugging preferred locations
tvaron3 Oct 20, 2025
b2e85d8
more logging
tvaron3 Oct 20, 2025
28fd089
comment out logs
tvaron3 Oct 20, 2025
8b35f4b
react to comments and add useragent
tvaron3 Oct 22, 2025
8500f77
add aad to workload
tvaron3 Oct 31, 2025
492698b
remove ctx timer, run point reads on other thread
tvaron3 Oct 31, 2025
7a21cde
skip setup with aad
tvaron3 Oct 31, 2025
0c009ab
fix pkfield
tvaron3 Oct 31, 2025
9a3d2b8
revert pkField change
tvaron3 Oct 31, 2025
ee2f36f
fix pkfield
tvaron3 Oct 31, 2025
3cf9d50
Add Allow Tentative Writes Header (#25127)
tvaron3 Aug 25, 2025
f627ed1
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-go into…
tvaron3 Nov 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
## 1.4.1 (2025-08-27)

### Bugs Fixed
* Fixed bug where the correct header was not being sent for writes on multiple write region accounts. See [PR 25127](https://github.com/Azure/azure-sdk-for-go/pull/25127)

* Fixed bug where the correct header was not being sent for writes on multiple write region accounts. See [PR 25127](https://github.com/Azure/azure-sdk-for-go/pull/25127)

Expand Down
8 changes: 8 additions & 0 deletions sdk/data/azcosmos/cosmos_location_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations []
lc.refreshStaleEndpoints()
if readLocations != nil {
availReadEndpointsByLocation, availReadLocations, err := getEndpointsByLocation(readLocations)
// log.Printf("Available read endpoints by location: %v", availReadEndpointsByLocation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's either remove these altogether or switch them to use the core logging facility. (Basically, I just don't like committing commented-out code ;))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I will remove these. :)

if err != nil {
return err
}
Expand All @@ -98,6 +99,7 @@ func (lc *locationCache) update(writeLocations []accountRegion, readLocations []

if writeLocations != nil {
availWriteEndpointsByLocation, availWriteLocations, err := getEndpointsByLocation(writeLocations)
// log.Printf("Available write endpoints by location: %v", availWriteEndpointsByLocation)
if err != nil {
return err
}
Expand All @@ -124,8 +126,10 @@ func (lc *locationCache) resolveServiceEndpoint(locationIndex int, resourceType
}

endpoints := lc.locationInfo.readEndpoints
// log.Printf("Read endpoints: %v", endpoints)
if isWriteOperation {
endpoints = lc.locationInfo.writeEndpoints
// log.Printf("Write endpoints: %v", endpoints)
}
return endpoints[locationIndex%len(endpoints)]
}
Expand Down Expand Up @@ -245,6 +249,8 @@ func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url
if lc.canUseMultipleWriteLocs() || availOps&read != 0 {
unavailEndpoints := make([]url.URL, 0)
unavailEndpoints = append(unavailEndpoints, fallbackEndpoint)
// log.Printf("Unavailable endpoints: %v", unavailEndpoints)
// log.Printf("Pref location: %v", lc.locationInfo.prefLocations)
for _, loc := range lc.locationInfo.prefLocations {
if endpoint, ok := endpointsByLoc[loc]; ok && endpoint != fallbackEndpoint {
if lc.isEndpointUnavailable(endpoint, availOps) {
Expand All @@ -256,11 +262,13 @@ func (lc *locationCache) getPrefAvailableEndpoints(endpointsByLoc map[string]url
}
endpoints = append(endpoints, unavailEndpoints...)
} else {
// log.Printf("Pref location: %v", lc.locationInfo.prefLocations)
for _, loc := range locs {
if endpoint, ok := endpointsByLoc[loc]; ok && loc != "" {
endpoints = append(endpoints, endpoint)
}
}
// log.Printf("Endpoints %v", endpoints)
}
}
if len(endpoints) == 0 {
Expand Down
5 changes: 4 additions & 1 deletion sdk/data/azcosmos/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos

go 1.23.0
go 1.23.3

toolchain go1.24.4

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1
Expand All @@ -10,6 +12,7 @@ require (
)

require (
github.com/Azure/azure-cosmos-client-engine/go/azcosmoscx v0.0.6
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions sdk/data/azcosmos/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/Azure/azure-cosmos-client-engine/go/azcosmoscx v0.0.6 h1:MlHT6JQ3/fia3o0vlJ3+9v4Ir2WBEGrG9dTPwE+acAY=
github.com/Azure/azure-cosmos-client-engine/go/azcosmoscx v0.0.6/go.mod h1:WeN45W+Vf3Q8XN6mpJnLizqTpcdAd0GaJ8ngTG829l4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1 h1:Wc1ml6QlJs2BHQ/9Bqu1jiyggbsSjramq2oUmp5WeIo=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1/go.mod h1:Ot/6aikWnKWi4l9QB7qVSwa8iMphQNqkWALMoNT3rzM=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 h1:B+blDbyVIG3WaikNxPnhPiJ1MThR03b3vKGtER95TP4=
Expand Down
32 changes: 32 additions & 0 deletions sdk/data/azcosmos/workloads/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
## SDK Scale Testing
This directory contains the scale testing workloads for the SDK. The workloads are designed to test the performance
and scalability of the SDK under various conditions.

### Setup Scale Testing
1. Create a VM in Azure with the following configuration:
- 8 vCPUs
- 32 GB RAM
- Ubuntu
- Accelerated networking
1. Give the VM necessary [permissions](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-grant-data-plane-access?tabs=built-in-definition%2Ccsharp&pivots=azure-interface-cli) to access the Cosmos DB account if using AAD (Optional).
1. Fork and clone this repository
1. Go to azcosmos folder
- `cd azure-sdk-for-go/sdk/data/azcosmos/workloads`
1. Checkout the branch with the changes to test.
1. Run `./setup_env.sh`
1. Fill out relevant configs in `workload_configs.go`: key, host, etc using env variables
- `COSMOS_URI` - required
- `COSMOS_KEY` - required
- `COSMOS_DATABASE`
- `COSMOS_CONTAINER`
- `PARTITION_KEY`
- `NUMBER_OF_LOGICAL_PARTITIONS`
- `THROUGHPUT`
- `PREFERRED_LOCATIONS`
1. Set `AZURE_SDK_GO_LOGGING` env variable to "all" for detailed logs
1. Run the scale workload
- `go run ./main/main.go`

### Monitor Run
- `ps -eaf | grep "go"` to see the running processes

119 changes: 119 additions & 0 deletions sdk/data/azcosmos/workloads/initial_setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package workloads

import (
"context"
"errors"
"fmt"
"log"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
)

// createDatabaseIfNotExists attempts to create the database, tolerating conflicts.
func createDatabaseIfNotExists(ctx context.Context, client *azcosmos.Client, dbID string) (*azcosmos.DatabaseClient, error) {
dbClient, err := client.NewDatabase(dbID)
if err != nil {
return nil, err
}
props := azcosmos.DatabaseProperties{ID: dbID}
_, err = client.CreateDatabase(ctx, props, nil)
if err != nil {
var azErr *azcore.ResponseError
if errors.As(err, &azErr) {
if azErr.StatusCode == 409 {
return dbClient, nil // already exists
}
}
return nil, err
}
return dbClient, nil
}

func createContainerIfNotExists(ctx context.Context, db *azcosmos.DatabaseClient, containerID, pkField string, desiredThroughput int32) (*azcosmos.ContainerClient, error) {
containerClient, err := db.NewContainer(containerID)
if err != nil {
return nil, err
}

// Build container properties with vector indexing policy
props := azcosmos.ContainerProperties{
ID: containerID,
PartitionKeyDefinition: azcosmos.PartitionKeyDefinition{
Paths: []string{"/" + pkField},
Kind: azcosmos.PartitionKeyKindHash,
},
VectorEmbeddingPolicy: &azcosmos.VectorEmbeddingPolicy{
VectorEmbeddings: []azcosmos.VectorEmbedding{
{
Path: "/embedding",
DataType: azcosmos.VectorDataTypeFloat32,
DistanceFunction: azcosmos.VectorDistanceFunctionCosine,
Dimensions: 10,
},
},
},
IndexingPolicy: &azcosmos.IndexingPolicy{
Automatic: true,
IndexingMode: azcosmos.IndexingModeConsistent,
IncludedPaths: []azcosmos.IncludedPath{
{Path: "/*"},
},
ExcludedPaths: []azcosmos.ExcludedPath{
{Path: "/\"_etag\"/?"},
{Path: "/embedding/*"}, // Exclude vector path from standard indexing
},
VectorIndexes: []azcosmos.VectorIndex{
{
Path: "/embedding",
Type: azcosmos.VectorIndexTypeDiskANN,
},
},
},
}

throughput := azcosmos.NewManualThroughputProperties(desiredThroughput)
// Try create
_, err = db.CreateContainer(ctx, props, &azcosmos.CreateContainerOptions{
ThroughputProperties: &throughput,
})
if err != nil {
var azErr *azcore.ResponseError
if errors.As(err, &azErr) {
if azErr.StatusCode == 409 {
return containerClient, nil // already exists
}
}
return nil, err
}

return containerClient, nil
}

// RunSetup creates the database/container and performs the concurrent upserts.
func RunSetup(ctx context.Context, client *azcosmos.Client, cfg workloadConfig) error {

dbClient, err := createDatabaseIfNotExists(ctx, client, cfg.DatabaseID)
if err != nil {
return fmt.Errorf("ensure database: %w", err)
}

container, err := createContainerIfNotExists(ctx, dbClient, cfg.ContainerID, cfg.PartitionKeyFieldName, int32(cfg.Throughput))
if err != nil {
return fmt.Errorf("ensure container: %w", err)
}

var count = cfg.LogicalPartitions

log.Printf("Starting %d concurrent upserts...", count)

if err := upsertItemsConcurrently(ctx, container, count, cfg.PartitionKeyFieldName); err != nil {
return fmt.Errorf("upserts failed: %w", err)
}

log.Printf("Completed %d upserts.", count)
return nil
}
50 changes: 50 additions & 0 deletions sdk/data/azcosmos/workloads/main/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package main

import (
"context"
"log"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/workloads"
)

func main() {
// setup logger
f, err := os.OpenFile("workloads.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
log.Fatalf("failed to open log file: %v", err)
}
defer f.Close()

// Send the default logger output to the file
log.SetOutput(f)
// Optional: set flags to include date/time/file info
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile)

ctx := context.Background()

cfg, err := workloads.LoadConfig()
if err != nil {
log.Fatalf("failed to load config: %v", err)
}

client, err := workloads.CreateClient(cfg)
if err != nil {
log.Fatalf("creating client: %v", err)
}

if cfg.Key != "" {
if err := workloads.RunSetup(ctx, client, cfg); err != nil {
log.Fatalf("setup failed: %v", err)
}
} else {
log.Printf("Setup skipped as AAD is enabled.")
}
log.Println("setup completed")
if err := workloads.RunWorkload(ctx, client, cfg); err != nil {
log.Fatalf("workload failed: %v", err)
}
}
87 changes: 87 additions & 0 deletions sdk/data/azcosmos/workloads/r_w_q_vs_workload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package workloads

import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
)

func RunWorkload(ctx context.Context, client *azcosmos.Client, cfg workloadConfig) error {

dbClient, err := client.NewDatabase(cfg.DatabaseID)
if err != nil {
return fmt.Errorf("ensure database: %w", err)
}

container, err := dbClient.NewContainer(cfg.ContainerID)
if err != nil {
return fmt.Errorf("ensure container: %w", err)
}

var count = cfg.LogicalPartitions

log.Printf("Starting %d concurrent read/write/queries ...", count)

// Use two goroutines each locked to their own OS thread.
var wg sync.WaitGroup
wg.Add(2)

// Goroutine 1: random read/write/queries
go func() {
// Pin this goroutine to its own OS thread
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer wg.Done()

for {
select {
case <-ctx.Done():
return
default:
}

if err := randomReadWriteQueries(ctx, container, count, cfg.PartitionKeyFieldName); err != nil {
log.Printf("read/write/queries failed: %v", err)
}

// small jitter to avoid tight loop in case of immediate errors
time.Sleep(10 * time.Millisecond)
}
}()

// Goroutine 2: vector search queries
go func() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer wg.Done()

for {
select {
case <-ctx.Done():
return
default:
}

if err := vectorSearchQueries(ctx, container, count, cfg.PartitionKeyFieldName); err != nil {
log.Printf("vector search queries failed: %v", err)
}

time.Sleep(10 * time.Millisecond)
}
}()

// Wait until context is cancelled, then wait for goroutines to finish
<-ctx.Done()
// Give goroutines a moment to observe ctx.Done and exit; they will return promptly
wg.Wait()
return nil

}
14 changes: 14 additions & 0 deletions sdk/data/azcosmos/workloads/setup_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
# cspell:disable
# setup_env.sh - Automates Azure Cosmos SDK scale testing environment setup
# Usage: bash setup_env.sh

set -e

# 1. System update and install dependencies
echo "[Step 1] System update and install dependencies: started."
sudo apt update
sudo apt install golang-go
sudo apt install neovim
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
echo "[Step 1] System update and install dependencies: completed."
Loading