Skip to content

Commit 9201a5b

Browse files
authored
feat: Add support to upload to Azure blob storage. (#243)
Creates an AzureStorageUploader that can upload to Azure blob storage.
1 parent 6f15951 commit 9201a5b

File tree

4 files changed

+210
-6
lines changed

4 files changed

+210
-6
lines changed

docs/source/cloud_storage.rst

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ https://localhost:8080/foo/bar/dash.mpd to write the manifest (with default
2525
settings).
2626

2727
Cloud storage URLs can be either Google Cloud Storage URLs (beginning with
28-
gs://) or Amazon S3 URLs (beginning with s3://). Like the HTTP support
29-
described above, these are a base URL. If you ask for output to gs://foo/bar/,
30-
Streamer will write to gs://foo/bar/dash.mpd (with default settings).
28+
gs://), Amazon S3 URLs (beginning with s3://), or Azure Blob Storage URLs
29+
(beginning with azure://). Like the HTTP support described above, these are
30+
a base URL. If you ask for output to gs://foo/bar/, Streamer will write to
31+
gs://foo/bar/dash.mpd (with default settings).
3132

3233
Cloud storage output uses the storage provider's Python libraries. Find more
3334
details on setup and authentication below.
@@ -93,5 +94,39 @@ Example command-line for live streaming to Amazon S3:
9394
-o s3://my_s3_bucket/folder/
9495
9596
97+
Azure Blob Storage Setup
98+
~~~~~~~~~~~~~~~~~~~~~~~~
99+
100+
Install the Python modules if you haven't yet:
101+
102+
.. code:: sh
103+
104+
python3 -m pip install azure-storage-blob azure-identity
105+
106+
Azure Blob Storage support uses append blobs for efficient streaming uploads,
107+
making it ideal for live streaming scenarios where data is written sequentially.
108+
Authentication is handled by Azure's DefaultAzureCredential, which automatically
109+
tries multiple authentication methods in order.
110+
111+
The most common authentication methods are:
112+
113+
1. **Azure CLI**: Login using ``az login`` (recommended for development)
114+
2. **Managed Identity**: Automatic when running on Azure resources
115+
3. **Service Principal**: Set ``AZURE_CLIENT_ID``, ``AZURE_CLIENT_SECRET``,
116+
and ``AZURE_TENANT_ID`` environment variables
117+
4. **Interactive Browser**: Fallback authentication method
118+
119+
The Azure URL format is: ``azure://storageaccount.blob.core.windows.net/container/path/``
120+
121+
Example command-line for live streaming to Azure Blob Storage:
122+
123+
.. code:: sh
124+
125+
python3 shaka-streamer \
126+
-i config_files/input_looped_file_config.yaml \
127+
-p config_files/pipeline_live_config.yaml \
128+
-o azure://mystorageaccount.blob.core.windows.net/mycontainer/folder/
129+
130+
96131
.. _boto config file: http://boto.cloudhackers.com/en/latest/boto_config_tut.html
97132
.. _AWS CLI: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html

shaka-streamer

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ See docs: https://shaka-project.github.io/shaka-streamer/cloud_storage.html
6969
'config_files/bitrate_config.yaml)')
7070
parser.add_argument('-c', '--cloud-url',
7171
default=None,
72-
help='The Google Cloud Storage or Amazon S3 URL to ' +
73-
'upload to. (Starts with gs:// or s3://) (DEPRECATED, use -o)')
72+
help='The Google Cloud Storage or Amazon S3 or Azure Blob Storage URL to ' +
73+
'upload to. (Starts with gs:// or s3:// or azure://) (DEPRECATED, use -o)')
7474
parser.add_argument('-o', '--output',
7575
default='output_files',
7676
help='The output folder or URL to write files to. See ' +

streamer/cloud/azure.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Upload to Azure Blob Storage."""
16+
17+
import io
18+
import urllib.parse
19+
from typing import Optional
20+
21+
from azure.storage.blob import BlobServiceClient, BlobClient # type: ignore
22+
from azure.core.exceptions import ResourceNotFoundError # type: ignore
23+
from azure.identity import DefaultAzureCredential # type: ignore
24+
25+
from streamer.cloud.base import CloudUploaderBase
26+
27+
28+
# Azure Append Blobs can accept chunks of any size, but we'll use a reasonable buffer size.
29+
APPEND_BLOB_BUFFER_SIZE = (4 << 20) # 4MB
30+
31+
32+
class AzureStorageUploader(CloudUploaderBase):
33+
"""See base class for interface docs."""
34+
35+
def __init__(self, upload_location: str) -> None:
36+
# Parse the upload location (URL).
37+
# Expected format: azure://storageaccount.blob.core.windows.net/container/path
38+
url = urllib.parse.urlparse(upload_location)
39+
if not url.netloc:
40+
raise ValueError(f"Invalid Azure storage URL format: {upload_location}")
41+
42+
# Extract storage account from the netloc
43+
# netloc format: storageaccount.blob.core.windows.net
44+
account_url = f"https://{url.netloc}"
45+
46+
# Initialize the BlobServiceClient with DefaultAzureCredential
47+
try:
48+
credential = DefaultAzureCredential()
49+
self._blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
50+
except Exception as e:
51+
raise RuntimeError(f"Failed to initialize Azure credentials for {account_url}: {e}")
52+
53+
# Extract container name and base path from the URL path
54+
# First part of path is container, everything after is base path
55+
path_parts = url.path.strip('/').split('/', 1)
56+
if not path_parts or not path_parts[0]:
57+
raise ValueError(f"Container name not found in URL: {upload_location}")
58+
59+
self._container_name = path_parts[0]
60+
# Base path within the container (everything after container name)
61+
self._base_path = path_parts[1] if len(path_parts) > 1 else ''
62+
63+
# State for chunked uploads:
64+
self._blob_client: Optional[BlobClient] = None
65+
self._data_buffer: bytes = b''
66+
67+
def write_non_chunked(self, path: str, data: bytes) -> None:
68+
"""Write the non-chunked data to the destination."""
69+
full_path = self._get_full_path(path)
70+
71+
blob_client = self._blob_service_client.get_blob_client(
72+
container=self._container_name,
73+
blob=full_path
74+
)
75+
76+
# Upload the blob with cache control headers
77+
blob_client.upload_blob(
78+
data=data,
79+
overwrite=True
80+
)
81+
82+
def start_chunked(self, path: str) -> None:
83+
"""Set up for a chunked transfer to the destination."""
84+
full_path = self._get_full_path(path)
85+
86+
self._blob_client = self._blob_service_client.get_blob_client(
87+
container=self._container_name,
88+
blob=full_path
89+
)
90+
91+
self._blob_client.create_append_blob()
92+
93+
# Reset state for new chunked upload
94+
self._data_buffer = b''
95+
96+
def write_chunk(self, data: bytes, force: bool = False) -> None:
97+
"""Handle a single chunk of data."""
98+
if not self._blob_client:
99+
raise RuntimeError("start_chunked() must be called before write_chunk()")
100+
101+
# Accumulate data in buffer
102+
self._data_buffer += data
103+
104+
# Append data when we have enough data or when forced
105+
buffer_size = len(self._data_buffer)
106+
if buffer_size >= APPEND_BLOB_BUFFER_SIZE or (buffer_size > 0 and force):
107+
# Append the data to the blob
108+
self._blob_client.append_block(
109+
data=self._data_buffer
110+
)
111+
112+
# Clear the buffer
113+
self._data_buffer = b''
114+
115+
def end_chunked(self) -> None:
116+
"""End the chunked transfer."""
117+
if not self._blob_client:
118+
raise RuntimeError("start_chunked() must be called before end_chunked()")
119+
120+
# Upload any remaining data in the buffer
121+
self.write_chunk(b'', force=True)
122+
123+
# For append blobs, no additional commit operation is needed
124+
# The data is already committed with each append_block call
125+
# Reset state
126+
self.reset()
127+
128+
def delete(self, path: str) -> None:
129+
"""Delete the file from cloud storage."""
130+
full_path = self._get_full_path(path)
131+
132+
blob_client = self._blob_service_client.get_blob_client(
133+
container=self._container_name,
134+
blob=full_path
135+
)
136+
137+
try:
138+
blob_client.delete_blob()
139+
except ResourceNotFoundError:
140+
# Blob doesn't exist, which is fine for delete operation
141+
pass
142+
143+
def reset(self) -> None:
144+
"""Reset any chunked output state."""
145+
self._blob_client = None
146+
self._data_buffer = b''
147+
148+
def _get_full_path(self, path: str) -> str:
149+
"""Construct the full blob path by combining base path and relative path."""
150+
# Remove leading slashes to avoid empty path segments
151+
clean_path = path.lstrip('/')
152+
153+
if self._base_path:
154+
# Ensure proper path separation
155+
base = self._base_path.rstrip('/')
156+
return f"{base}/{clean_path}" if clean_path else base
157+
else:
158+
return clean_path

streamer/cloud/uploader.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424

2525
# All supported protocols. Used to provide more useful error messages.
26-
ALL_SUPPORTED_PROTOCOLS: list[str] = ['gs', 's3']
26+
ALL_SUPPORTED_PROTOCOLS: list[str] = ['gs', 's3', 'azure']
2727

2828

2929
# Try to load the GCS (Google Cloud Storage) uploader. If we can, the user has
@@ -44,12 +44,23 @@
4444
pass
4545

4646

47+
# Try to load the Azure Blob Storage uploader. If we can, the user has
48+
# the libraries needed for Azure support.
49+
try:
50+
from streamer.cloud.azure import AzureStorageUploader
51+
SUPPORTED_PROTOCOLS.append('azure')
52+
except:
53+
pass
54+
55+
4756
def create(upload_location: str) -> CloudUploaderBase:
4857
"""Create an uploader appropriate to the upload location URL."""
4958

5059
if upload_location.startswith("gs://"):
5160
return GCSUploader(upload_location)
5261
elif upload_location.startswith("s3://"):
5362
return S3Uploader(upload_location)
63+
elif upload_location.startswith("azure://"):
64+
return AzureStorageUploader(upload_location)
5465
else:
5566
raise RuntimeError("Protocol of {} isn't supported".format(upload_location))

0 commit comments

Comments
 (0)