Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 24 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
[workspace]
resolver = "3"
members = [ "helios", "helios-api", "helios-legacy", "helios-oci", "helios-remote", "helios-remote-model", "helios-state", "helios-util" ]
members = [
"helios",
"helios-api",
"helios-legacy",
"helios-oci",
"helios-remote",
"helios-remote-model",
"helios-state",
"helios-util",
]

[workspace.package]
version = "0.14.0"
Expand All @@ -9,7 +18,7 @@ edition = "2024"
description = "Balena's on-device agent"
homepage = "https://github.com/balena-io/helios"
repository = "https://github.com/balena-io/helios"
authors = [ "Balena Inc. <hello@balena.io>" ]
authors = ["Balena Inc. <hello@balena.io>"]
license = "Apache-2.0"
readme = "README.md"
publish = false
Expand Down Expand Up @@ -61,38 +70,42 @@ version = "0"
[workspace.dependencies.axum]
version = "0.8.6"
default-features = false
features = [ "tokio", "http1", "json", "query" ]
features = ["tokio", "http1", "json", "query"]

[workspace.dependencies.tokio]
version = "1.47.1"
default-features = false
features = [ "rt-multi-thread", "macros", "time", "fs" ]
features = ["rt-multi-thread", "macros", "time", "fs"]

[workspace.dependencies.tokio-stream]
version = "0.1.18"
default-features = false

[workspace.dependencies.tower-http]
version = "0.6.6"
default-features = false
features = [ "trace" ]
features = ["trace"]

[workspace.dependencies.tracing-subscriber]
version = "0.3.20"
features = [ "env-filter", "fmt", "registry", "ansi" ]
features = ["env-filter", "fmt", "registry", "ansi"]

[workspace.dependencies.reqwest]
version = "0.13"
default-features = false
features = [ "json", "rustls", "brotli", "stream" ]
features = ["json", "rustls", "brotli", "stream"]

[workspace.dependencies.serde]
version = "1.0"
features = [ "derive" ]
features = ["derive"]

[workspace.dependencies.clap]
version = "4.5"
features = [ "derive", "env" ]
features = ["derive", "env"]

[workspace.dependencies.uuid]
version = "1.18"
features = [ "v4" ]
features = ["v4"]

[workspace.dependencies.mahler]
version = "0.24"
Expand All @@ -101,18 +114,14 @@ version = "0.24"
version = "0.10.9"
default-features = false

[workspace.dependencies.futures-lite]
version = "2.6.1"
default-features = false

[workspace.dependencies.mockito]
version = "1.7"
default-features = false

[workspace.dependencies.zbus]
version = "5.11"
default-features = false
features = [ "tokio" ]
features = ["tokio"]

[profile.release]
opt-level = "z"
Expand Down
10 changes: 9 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ LABEL org.opencontainers.image.licenses=APACHE-2.0

# Install release dependencies
RUN apk add --update --no-cache \
libstdc++ docker-cli jq dbus socat
libstdc++ jq dbus socat

# Install docker v28 since docker v29 no longer supports
# the balena-engine API 1.41
RUN apk add --no-cache \
--repositories-file /dev/null \
--repository https://dl-cdn.alpinelinux.org/alpine/v3.22/main \
--repository https://dl-cdn.alpinelinux.org/alpine/v3.22/community \
"docker-cli<29"

COPY scripts /opt/helios
COPY --from=build /usr/src/app/target/release/helios /usr/bin
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ services:
condition: service_healthy

docker:
image: docker:dind
image: docker:28-dind
privileged: true
volumes:
- dind:/var/run
Expand Down
34 changes: 24 additions & 10 deletions helios-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{
use helios_legacy::{ProxyConfig, ProxyState, proxy};
use helios_remote::PollRequest;
use helios_state::models::{App, Device, DeviceTarget as LocalDeviceTarget};
use helios_state::{LocalState, SeekRequest, UpdateOpts, UpdateStatus};
use helios_state::{LocalState, SeekRequest, TargetState, UpdateOpts, UpdateStatus};
use helios_util::types::Uuid;

// NOTE: we use the target from the remote backend as input for the API
Expand Down Expand Up @@ -203,9 +203,9 @@ async fn get_app_cur_state(
Path(app_uuid): Path<Uuid>,
) -> Result<Json<App>, StatusCode> {
let state = state_rx.borrow();
let device = state.device.clone();
if let Some(app) = device.apps.get(&app_uuid) {
return Ok(Json(app.clone()));
let mut device = state.device.clone();
if let Some(app) = device.apps.remove(&app_uuid) {
return Ok(Json(app));
}

Err(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -233,9 +233,8 @@ async fn set_app_tgt_state(

if seek_request_tx
.send(SeekRequest {
target,
target: TargetState::Local { target },
opts,
raw_target: None,
})
.is_err()
{
Expand All @@ -258,9 +257,10 @@ mod tests {
watch::Receiver<SeekRequest>,
) {
let (seek_request_tx, seek_rx) = watch::channel(SeekRequest {
target: LocalDeviceTarget::default(),
target: TargetState::Local {
target: LocalDeviceTarget::default(),
},
opts: UpdateOpts::default(),
raw_target: None,
});
let (poll_request_tx, poll_rx) = watch::channel(PollRequest::default());
let device = Device::new(Uuid::default(), "balenaOS 6.3.1".parse().ok());
Expand Down Expand Up @@ -349,7 +349,14 @@ mod tests {
let seek_request = seek_rx.borrow().clone();
assert!(!seek_request.opts.force);
assert!(!seek_request.opts.cancel); // API default via serde is false
assert!(seek_request.target.apps.contains_key(&app_uuid));
if let TargetState::Local {
target: local_target,
} = seek_request.target
{
assert!(local_target.apps.contains_key(&app_uuid));
} else {
panic!("expected local target");
}
}

#[tokio::test]
Expand All @@ -373,6 +380,13 @@ mod tests {
assert!(seek_rx.changed().await.is_ok());
let seek_request = seek_rx.borrow().clone();
assert!(seek_request.opts.force);
assert!(seek_request.target.apps.contains_key(&app_uuid));
if let TargetState::Local {
target: local_target,
} = seek_request.target
{
assert!(local_target.apps.contains_key(&app_uuid));
} else {
panic!("expected local target");
}
}
}
1 change: 0 additions & 1 deletion helios-integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ axum = { version = "0.8.6", default-features = false, features = [
] }
dirs = "6.0.0"
fastrand = "2.3.0"
futures-lite = { version = "2.6.1", default-features = false }
mahler = "0.24"
regex = "1.12"
reqwest = { version = "0.13", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion helios-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ version.workspace = true
helios-util = { path = "../helios-util", version = "0" }

bollard.workspace = true
futures-lite.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
2 changes: 1 addition & 1 deletion helios-oci/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use bollard::{
},
secret::ContainerInspectResponse,
};
use futures_lite::StreamExt;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

use super::image::ImageConfig;
use super::util::types::ImageUri;
Expand Down
33 changes: 26 additions & 7 deletions helios-oci/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::task::{Context, Poll};
use bollard::query_parameters::{
CreateImageOptions, ListImagesOptions, RemoveImageOptions, TagImageOptions,
};
use futures_lite::Stream;
use serde::{Deserialize, Serialize};
use tokio_stream::{Stream, StreamExt};

use super::util::types::ImageUri;
use super::{Client, Credentials, Error, Result, WithContext};
Expand All @@ -33,15 +33,36 @@ impl Image<'_> {
let res = self.0.inner().list_images(Some(opts)).await;
let images = res.map_err(Error::with_context("failed to list images"))?;

images
let uris = images
.into_iter()
.flat_map(|image| {
image
.repo_tags
.repo_digests
.into_iter()
.map(|tag| tag.parse().map_err(Error::unexpected))
.chain(image.repo_tags)
// ignore errors when parsing as the list may contain
// <none>:<none> tags
.flat_map(|tag| tag.parse().ok())
})
.collect()
// Deduplicate digests and tags ImageUri::repo_and_tag
// When repo and tag match, prefer the one with a digest.
.fold(
HashMap::<String, ImageUri>::new(),
|mut acc, uri: ImageUri| {
let repo_and_tag = uri.repo_and_tag();
let exists_with_digest = acc
.get(&repo_and_tag)
.is_some_and(|existing| existing.digest().is_some());
if !exists_with_digest {
acc.insert(repo_and_tag, uri);
}
acc
},
)
.into_values()
.collect();

Ok(uris)
}

/// Tags an image so that it becomes part of a repository.
Expand All @@ -62,8 +83,6 @@ impl Image<'_> {

/// Pulls an image from a registry.
pub async fn pull(&self, image: &ImageUri, creds: Option<Credentials>) -> Result<()> {
use futures_lite::StreamExt;

let mut stream = self.pull_with_progress(image, creds);
while let Some(result) = stream.next().await {
result?;
Expand Down
26 changes: 17 additions & 9 deletions helios-remote/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use tokio::sync::watch::{Receiver, Sender};
use tokio::time::Instant;
use tracing::{error, info, instrument, trace, warn};

use crate::state::{SeekRequest, UpdateOpts};
use crate::state::{SeekRequest, TargetState, UpdateOpts};
use crate::util::http::Uri;
use crate::util::interrupt::Interrupt;
use crate::util::request::{self, Get};
use crate::util::types::Uuid;

use super::config::{RemoteConfig, RequestConfig};
use super::model::Device as DeviceTarget;
use super::model::Device as RemoteDeviceTarget;

async fn get_poll_client(uuid: &Uuid, remote: &RemoteConfig) -> (Get, Option<Value>) {
let uri = remote.api_endpoint.clone();
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn poll_remote(
}

#[derive(Deserialize, Clone, Debug)]
struct TargetState(HashMap<Uuid, DeviceTarget>);
struct RemoteTargetState(HashMap<Uuid, RemoteDeviceTarget>);

/// An update request coming from the API.
///
Expand Down Expand Up @@ -218,21 +218,29 @@ pub async fn start_poll(
// If there is a new target
if let Some(target_state) = target {
// put the poll back on the channel
match serde_json::from_value::<TargetState>(target_state.clone()) {
Ok(TargetState(mut map)) => {
if let Some(target) = map.remove(&uuid) {
match serde_json::from_value::<RemoteTargetState>(target_state.clone()) {
Ok(RemoteTargetState(mut map)) => {
if let Some(remote_target) = map.remove(&uuid) {
let _ = seek_tx.send(SeekRequest {
target: target.into(),
raw_target: Some(target_state),
target: TargetState::Remote {
target: Some(remote_target.into()),
raw_target: target_state,
},
opts,
});
} else {
error!("no target for uuid {uuid} found on target state");
}
}
Err(e) => {
// FIXME: we'll need to reject the target if it cannot be deserialized
warn!("failed to deserialize target state: {e}");
let _ = seek_tx.send(SeekRequest {
target: TargetState::Remote {
target: None,
raw_target: target_state,
},
opts,
});
}
};
}
Expand Down
Loading
Loading