Skip to content

feat: rust-native ssh support #2081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,114 changes: 1,082 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions gix-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ async-client = [
## Data structures implement `serde::Serialize` and `serde::Deserialize`.
serde = ["dep:serde"]

russh = ["dep:russh", "dep:tokio", "async-client"]

[[test]]
name = "blocking-transport"
path = "tests/blocking-transport.rs"
Expand Down Expand Up @@ -121,6 +123,9 @@ async-std = { version = "1.12.0", optional = true }

document-features = { version = "0.2.0", optional = true }

russh = { version = "0.53", optional = true }
tokio = { version = "1", optional = true, default-features = false }

[dev-dependencies]
gix-pack = { path = "../gix-pack", default-features = false, features = [
"streaming-input",
Expand Down
18 changes: 11 additions & 7 deletions gix-transport/src/client/async_io/connect.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
pub use crate::client::non_io_types::connect::{Error, Options};

#[cfg(feature = "async-std")]
pub(crate) mod function {
use crate::client::{git, non_io_types::connect::Error};
use crate::client::non_io_types::connect::Error;

/// A general purpose connector connecting to a repository identified by the given `url`.
///
/// This includes connections to
/// [git daemons][crate::client::git::connect()] only at the moment.
/// [git daemons][crate::client::git::connect()] and `ssh`,
///
/// Use `options` to further control specifics of the transport resulting from the connection.
pub async fn connect<Url, E>(
Expand All @@ -18,28 +17,33 @@ pub(crate) mod function {
Url: TryInto<gix_url::Url, Error = E>,
gix_url::parse::Error: From<E>,
{
let mut url = url.try_into().map_err(gix_url::parse::Error::from)?;
let url = url.try_into().map_err(gix_url::parse::Error::from)?;
Ok(match url.scheme {
#[cfg(feature = "async-std")]
gix_url::Scheme::Git => {
if url.user().is_some() {
return Err(Error::UnsupportedUrlTokens {
url: url.to_bstring(),
scheme: url.scheme,
});
}
let path = std::mem::take(&mut url.path);

Box::new(
git::Connection::new_tcp(
crate::client::git::Connection::new_tcp(
url.host().expect("host is present in url"),
url.port,
path,
url.path.clone(),
options.version,
options.trace,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
)
}
#[cfg(feature = "russh")]
gix_url::Scheme::Ssh => {
Box::new(crate::client::async_io::ssh::connect(url, options.version, options.trace).await?)
}
scheme => return Err(Error::UnsupportedScheme(scheme)),
})
}
Expand Down
4 changes: 3 additions & 1 deletion gix-transport/src/client/async_io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ pub use traits::{SetServiceResponse, Transport, TransportV2Ext};

///
pub mod connect;
#[cfg(feature = "async-std")]
pub use connect::function::connect;

#[cfg(feature = "russh")]
pub mod ssh;
157 changes: 157 additions & 0 deletions gix-transport/src/client/async_io/ssh/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::{ops::DerefMut, sync::Arc, task::ready};

use russh::{
client::{Config, Handle, Handler},
MethodSet,
};

pub enum AuthMode {
UsernamePassword { username: String, password: String },
PublicKey { username: String },
}

#[derive(Clone)]
pub struct Client {
handle: Arc<Handle<ClientHandler>>,
}

impl Client {
pub(super) async fn connect(host: &str, port: u16, auth: AuthMode) -> Result<Self, super::Error> {
let mut handle = russh::client::connect(Arc::new(Config::default()), (host, port), ClientHandler).await?;

Self::authenticate(&mut handle, auth).await?;

Ok(Client {
handle: Arc::new(handle),
})
}

async fn authenticate(handle: &mut Handle<ClientHandler>, auth: AuthMode) -> Result<(), super::Error> {
match auth {
AuthMode::UsernamePassword { username, password } => {
match handle.authenticate_password(username, password).await? {
russh::client::AuthResult::Success => Ok(()),
russh::client::AuthResult::Failure {
remaining_methods,
partial_success: _,
} => Err(super::Error::AuthenticationFailed(remaining_methods)),
}
}
AuthMode::PublicKey { username } => {
let mut agent = russh::keys::agent::client::AgentClient::connect_env().await?;
let rsa_hash = handle.best_supported_rsa_hash().await?.flatten();
let mut methods = MethodSet::empty();
for key in agent.request_identities().await? {
match handle
.authenticate_publickey_with(&username, key, rsa_hash, &mut agent)
.await?
{
russh::client::AuthResult::Success => return Ok(()),
russh::client::AuthResult::Failure {
remaining_methods,
partial_success: _,
} => methods = remaining_methods,
}
}
Err(super::Error::AuthenticationFailed(methods))
}
}
}

pub async fn open_session(
&mut self,
cmd: impl Into<String>,
env: Vec<(String, String)>,
) -> Result<Session, super::Error> {
let channel = self.handle.channel_open_session().await?;

for (key, value) in env {
channel.set_env(false, key, value).await?;
}

channel.exec(false, cmd.into().bytes().collect::<Vec<_>>()).await?;

let stream = channel.into_stream();
Ok(Session {
stream: Arc::new(std::sync::Mutex::new(stream)),
})
}
}

#[derive(Clone)]
pub struct Session {
stream: Arc<std::sync::Mutex<russh::ChannelStream<russh::client::Msg>>>,
}

impl Session {
fn poll_fn<F, R>(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, poll_fn: F) -> std::task::Poll<R>
where
F: FnOnce(
std::pin::Pin<&mut russh::ChannelStream<russh::client::Msg>>,
&mut std::task::Context<'_>,
) -> std::task::Poll<R>,
{
match self.stream.try_lock() {
Ok(mut inner) => {
let pinned = std::pin::Pin::new(inner.deref_mut());
(poll_fn)(pinned, cx)
}
Err(_) => {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
}
}

impl futures_io::AsyncRead for Session {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
slice: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
self.poll_fn(cx, |pinned, cx| {
let mut buf = tokio::io::ReadBuf::new(slice);
ready!(tokio::io::AsyncRead::poll_read(pinned, cx, &mut buf))?;
std::task::Poll::Ready(Ok(buf.filled().len()))
})
}
}

impl futures_io::AsyncWrite for Session {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
self.poll_fn(cx, |pinned, cx| tokio::io::AsyncWrite::poll_write(pinned, cx, buf))
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.poll_fn(cx, tokio::io::AsyncWrite::poll_flush)
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.poll_fn(cx, tokio::io::AsyncWrite::poll_shutdown)
}
}

struct ClientHandler;

impl Handler for ClientHandler {
type Error = super::Error;

async fn check_server_key(
&mut self,
_server_public_key: &russh::keys::ssh_key::PublicKey,
) -> Result<bool, Self::Error> {
// TODO: configurable
Ok(true)
}
}
19 changes: 19 additions & 0 deletions gix-transport/src/client/async_io/ssh/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use russh::MethodSet;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("The authentication method failed. Remaining methods: {0:?}")]
AuthenticationFailed(MethodSet),
#[error(transparent)]
Ssh(#[from] russh::Error),
#[error(transparent)]
Keys(#[from] russh::keys::Error),
#[error(transparent)]
Agent(#[from] russh::AgentAuthError),
}

impl From<Error> for crate::client::Error {
fn from(err: Error) -> Self {
Self::NativeSshError(err)
}
}
135 changes: 135 additions & 0 deletions gix-transport/src/client/async_io/ssh/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::{
client::{SetServiceResponse, Transport, TransportWithoutIO},
Protocol, Service,
};
use async_trait::async_trait;

mod client;
mod error;

pub use error::Error;

pub struct NativeSsh {
url: gix_url::Url,
desired_version: Protocol,
trace: bool,

identity: Option<gix_sec::identity::Account>,
client: Option<client::Client>,
connection: Option<crate::client::git::Connection<client::Session, client::Session>>,
}

impl TransportWithoutIO for NativeSsh {
fn set_identity(&mut self, identity: gix_sec::identity::Account) -> Result<(), crate::client::Error> {
self.identity = Some(identity);
Ok(())
}

fn request(
&mut self,
write_mode: crate::client::WriteMode,
on_into_read: crate::client::MessageKind,
trace: bool,
) -> Result<super::RequestWriter<'_>, crate::client::Error> {
if let Some(connection) = &mut self.connection {
connection.request(write_mode, on_into_read, trace)
} else {
Err(crate::client::Error::MissingHandshake)
}
}

fn to_url(&self) -> std::borrow::Cow<'_, bstr::BStr> {
self.url.to_bstring().into()
}

fn connection_persists_across_multiple_requests(&self) -> bool {
true
}

fn configure(
&mut self,
_config: &dyn std::any::Any,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(())
}
}

#[async_trait(?Send)]
impl Transport for NativeSsh {
async fn handshake<'a>(
&mut self,
service: Service,
extra_parameters: &'a [(&'a str, Option<&'a str>)],
) -> Result<SetServiceResponse<'_>, crate::client::Error> {
let host = self.url.host().expect("url has host");
let port = self.url.port_or_default().expect("ssh has a default port");

let auth_mode = match self.identity.as_ref() {
Some(crate::client::Account {
username,
password,
oauth_refresh_token: _,
}) => client::AuthMode::UsernamePassword {
username: username.clone(),
password: password.clone(),
},
None => client::AuthMode::PublicKey {
username: self
.url
.user()
.map(std::string::ToString::to_string)
.unwrap_or_default(),
},
};

let mut client = client::Client::connect(host, port, auth_mode).await?;

let session = client
.open_session(
format!("{} {}", service.as_str(), self.url.path),
vec![(
"GIT_PROTOCOL".to_string(),
format!("version={}", self.desired_version as usize),
)],
)
.await?;

let connection = crate::client::git::Connection::new(
session.clone(),
session,
self.desired_version,
self.url.path.clone(),
None::<(String, _)>,
crate::client::git::ConnectMode::Process,
self.trace,
);

self.client = Some(client);
self.connection = Some(connection);

self.connection
.as_mut()
.expect("connection to be there right after setting it")
.handshake(service, extra_parameters)
.await
}
}

#[allow(clippy::unused_async)]
pub async fn connect(
url: gix_url::Url,
desired_version: Protocol,
trace: bool,
) -> Result<NativeSsh, crate::client::connect::Error> {
if url.scheme != gix_url::Scheme::Ssh {
return Err(crate::client::connect::Error::UnsupportedScheme(url.scheme));
}
Ok(NativeSsh {
url,
desired_version,
trace,
identity: None,
client: None,
connection: None,
})
}
Loading
Loading