diff --git a/pom.xml b/pom.xml index 13aafd1..a1624f2 100644 --- a/pom.xml +++ b/pom.xml @@ -277,11 +277,6 @@ curator-recipes 5.6.0 - - com.hierynomus - sshj - 0.38.0 - org.junit.jupiter @@ -305,7 +300,6 @@ org.apache.sshd sshd-sftp 2.12.1 - test diff --git a/src/main/java/org/mortbay/jetty/orchestrator/Cluster.java b/src/main/java/org/mortbay/jetty/orchestrator/Cluster.java index 46f5708..03f04af 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/Cluster.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/Cluster.java @@ -115,18 +115,24 @@ private void init() throws Exception throw new IllegalStateException("No configured host launcher to start node on " + hostname); futures.add(executor.submit(() -> { + if (LOG.isDebugEnabled()) + LOG.debug("launching {}", globalNodeId); String remoteConnectString = launcher.launch(globalNodeId, connectString); + if (LOG.isDebugEnabled()) + LOG.debug("launched {}", globalNodeId); return new AbstractMap.SimpleImmutableEntry<>(globalNodeId, remoteConnectString); })); } executor.shutdown(); for (Future> future : futures) { - Map.Entry entry = future.get(); + Map.Entry entry = future.get(120, TimeUnit.SECONDS); GlobalNodeId globalNodeId = entry.getKey(); String remoteConnectString = entry.getValue(); hosts.put(globalNodeId, new Host(globalNodeId, new RpcClient(curator, globalNodeId), remoteConnectString)); } + if (LOG.isDebugEnabled()) + LOG.debug("All hosts nodes connected to cluster, spawning node arrays..."); // start all worker nodes for (NodeArrayConfiguration nodeArrayConfig : configuration.nodeArrays()) @@ -164,6 +170,9 @@ public void run() } } }, RpcClient.HEALTH_CHECK_DELAY_MS, RpcClient.HEALTH_CHECK_DELAY_MS); + + if (LOG.isDebugEnabled()) + LOG.info("Cluster initialized, requested host nodes to spawn their node arrays: {}", hosts.values()); } public ClusterTools tools() diff --git a/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java b/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java index c06d72c..7596d77 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/NodeArray.java @@ -59,7 +59,7 @@ public Path rootPathOf(String id) } else { - URI uri = URI.create(NodeFileSystemProvider.PREFIX + ":" + node.globalNodeId.getHostId() + "!/." + NodeFileSystemProvider.PREFIX + "/" + node.globalNodeId.getNodeId()); + URI uri = URI.create(NodeFileSystemProvider.SCHEME + ":" + node.globalNodeId.getHostId() + "!/." + NodeFileSystemProvider.SCHEME + "/" + node.globalNodeId.getNodeId()); return Paths.get(uri); } } diff --git a/src/main/java/org/mortbay/jetty/orchestrator/configuration/LocalHostLauncher.java b/src/main/java/org/mortbay/jetty/orchestrator/configuration/LocalHostLauncher.java index fcd7f02..e0082c3 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/configuration/LocalHostLauncher.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/configuration/LocalHostLauncher.java @@ -114,7 +114,7 @@ public void close() throws Exception public static File rootPathOf(String hostId) { - return new File(System.getProperty("user.home") + "/." + NodeFileSystemProvider.PREFIX + "/" + hostId); + return new File(System.getProperty("user.home") + "/." + NodeFileSystemProvider.SCHEME + "/" + hostId); } private static void copyFile(String hostId, String filename, InputStream contents) throws Exception diff --git a/src/main/java/org/mortbay/jetty/orchestrator/configuration/SshRemoteHostLauncher.java b/src/main/java/org/mortbay/jetty/orchestrator/configuration/SshRemoteHostLauncher.java index d16f2af..04b0fe4 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/configuration/SshRemoteHostLauncher.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/configuration/SshRemoteHostLauncher.java @@ -13,17 +13,18 @@ package org.mortbay.jetty.orchestrator.configuration; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; +import java.io.OutputStream; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -32,17 +33,16 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import net.schmizz.sshj.SSHClient; -import net.schmizz.sshj.connection.channel.Channel; -import net.schmizz.sshj.connection.channel.direct.Session; -import net.schmizz.sshj.connection.channel.direct.Signal; -import net.schmizz.sshj.connection.channel.forwarded.ConnectListener; -import net.schmizz.sshj.connection.channel.forwarded.RemotePortForwarder; -import net.schmizz.sshj.sftp.RemoteResourceInfo; -import net.schmizz.sshj.sftp.SFTPClient; -import net.schmizz.sshj.transport.verification.PromiscuousVerifier; -import net.schmizz.sshj.xfer.FileSystemFile; -import net.schmizz.sshj.xfer.LocalSourceFile; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.channel.ChannelExec; +import org.apache.sshd.client.channel.ClientChannel; +import org.apache.sshd.client.channel.ClientChannelEvent; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.client.session.forward.PortForwardingTracker; +import org.apache.sshd.common.util.net.SshdSocketAddress; +import org.apache.sshd.server.forward.AcceptAllForwardingFilter; +import org.apache.sshd.sftp.client.SftpClient; +import org.apache.sshd.sftp.client.SftpClientFactory; import org.mortbay.jetty.orchestrator.nodefs.NodeFileSystemProvider; import org.mortbay.jetty.orchestrator.rpc.GlobalNodeId; import org.mortbay.jetty.orchestrator.rpc.NodeProcess; @@ -115,76 +115,85 @@ public String launch(GlobalNodeId globalNodeId, String connectString) throws Exc if (nodes.putIfAbsent(nodeId.getHostname(), RemoteNodeHolder.NULL) != null) throw new IllegalArgumentException("ssh launcher already launched node on host " + nodeId.getHostname()); - SSHClient sshClient = new SSHClient(); + SshClient sshClient = SshClient.setUpDefaultClient(); + sshClient.setForwardingFilter(new AcceptAllForwardingFilter()); // must be set, otherwise port forwarding does not work + sshClient.start(); FileSystem fileSystem = null; - SocketForwardingConnectListener forwardingConnectListener = null; - AutoCloseable forwarding = null; - Session.Command cmd = null; - Session session = null; + PortForwardingTracker forwarding = null; + ClientChannel clientChannel = null; + ClientSession session = null; try { - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); // or loadKnownHosts() instead? - sshClient.connect(nodeId.getHostname(), port); + session = sshClient.connect(username, nodeId.getHostname(), port) + .verify(30, TimeUnit.SECONDS) + .getSession(); if (LOG.isDebugEnabled()) - LOG.debug("ssh to {} with username {} and empty password {}", nodeId.getHostname(), username, password == null); + LOG.debug("ssh to {} with username {} and password {}", nodeId.getHostname(), username, password == null ? null : "'" + new String(password) + "'"); - if (password == null) - sshClient.authPublickey(username); // public key auth - else - sshClient.authPassword(username, password); // pw auth + if (password != null && password.length > 0) + session.addPasswordIdentity(new String(password)); // pw auth + + session.auth().verify(30, TimeUnit.SECONDS); // detect windows - boolean windows = isWindows(sshClient); + boolean windows = isWindows(session); // do remote port forwarding int zkPort = Integer.parseInt(connectString.split(":")[1]); - forwardingConnectListener = new SocketForwardingConnectListener(nodeId.getHostname(), new InetSocketAddress("localhost", zkPort)); - RemotePortForwarder.Forward forward = sshClient.getRemotePortForwarder().bind( - new RemotePortForwarder.Forward(0), // remote port, dynamically choose one - forwardingConnectListener + forwarding = session.createRemotePortForwardingTracker( + new SshdSocketAddress("localhost", 0), // remote port, dynamically choose one + new SshdSocketAddress("localhost", zkPort) ); - forwarding = () -> sshClient.getRemotePortForwarder().cancel(forward); - String remoteConnectString = "localhost:" + forward.getPort(); + String remoteConnectString = forwarding.getBoundAddress().toString(); // read the dynamically chosen port + // create remote filesystem HashMap env = new HashMap<>(); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, windows); - fileSystem = FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":" + nodeId.getHostId()), env); - + env.put(SshClient.class.getName(), sshClient); + env.put(NodeFileSystemProvider.SFTP_HOST_ENV, nodeId.getHostname()); + env.put(NodeFileSystemProvider.SFTP_PORT_ENV, port); + env.put(NodeFileSystemProvider.SFTP_USERNAME_ENV, username); + env.put(NodeFileSystemProvider.SFTP_PASSWORD_ENV, password); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, windows); + fileSystem = FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":" + nodeId.getHostId()), env); + + // upload classpath List remoteClasspathEntries = new ArrayList<>(); String[] classpathEntries = System.getProperty("java.class.path").split(File.pathSeparator); String delimiter = windows ? "\\" : "/"; - try (SFTPClient sftpClient = sshClient.newStatefulSFTPClient()) + try (SftpClient sftpClient = SftpClientFactory.instance().createSftpClient(session)) { for (String classpathEntry : classpathEntries) { File cpFile = new File(classpathEntry); String cpFileName = cpFile.getName(); - if (!cpFileName.endsWith(".jar") && !cpFileName.endsWith(".JAR")) - remoteClasspathEntries.add("." + NodeFileSystemProvider.PREFIX + delimiter + nodeId.getHostId() + delimiter + NodeProcess.CLASSPATH_FOLDER_NAME + delimiter + cpFileName); + if (!cpFileName.toLowerCase(Locale.ROOT).endsWith(".jar")) + remoteClasspathEntries.add("." + NodeFileSystemProvider.SCHEME + delimiter + nodeId.getHostId() + delimiter + NodeProcess.CLASSPATH_FOLDER_NAME + delimiter + cpFileName); if (cpFile.isDirectory()) copyDir(sftpClient, nodeId.getHostId(), cpFile, 1); else - copyFile(sftpClient, nodeId.getHostId(), cpFileName, new FileSystemFile(cpFile)); + copyFile(sftpClient, nodeId.getHostId(), cpFileName, cpFile); } } - remoteClasspathEntries.add("." + NodeFileSystemProvider.PREFIX + delimiter + nodeId.getHostId() + delimiter + NodeProcess.CLASSPATH_FOLDER_NAME + delimiter + "*"); + remoteClasspathEntries.add("." + NodeFileSystemProvider.SCHEME + delimiter + nodeId.getHostId() + delimiter + NodeProcess.CLASSPATH_FOLDER_NAME + delimiter + "*"); + // spawn remote node jvm String cmdLine = String.join(" ", buildCommandLine(fileSystem, jvm, remoteClasspathEntries, windows ? ";" : ":", nodeId.getHostId(), nodeId.getHostname(), remoteConnectString)); - session = sshClient.startSession(); - cmd = session.exec(cmdLine); - new StreamCopier(cmd.getInputStream(), System.out, true).spawnDaemon(nodeId.getHostname() + "-stdout"); - new StreamCopier(cmd.getErrorStream(), System.err, true).spawnDaemon(nodeId.getHostname() + "-stderr"); + LOG.info("spawning node command '{}'...", cmdLine); + clientChannel = session.createExecChannel(cmdLine); + clientChannel.setRedirectErrorStream(true); + clientChannel.open().verify(30, TimeUnit.SECONDS); - RemoteNodeHolder remoteNodeHolder = new RemoteNodeHolder(nodeId, fileSystem, sshClient, forwardingConnectListener, forwarding, session, cmd); + new StreamCopier(clientChannel.getInvertedOut(), System.out, true).spawnDaemon(nodeId.getHostname() + "-stdout"); + + RemoteNodeHolder remoteNodeHolder = new RemoteNodeHolder(nodeId, fileSystem, sshClient, forwarding, session, clientChannel); nodes.put(nodeId.getHostname(), remoteNodeHolder); return remoteConnectString; } catch (Exception e) { - IOUtil.close(fileSystem, cmd, session, forwardingConnectListener, forwarding, sshClient); + IOUtil.close(fileSystem, clientChannel, session, forwarding, sshClient); throw new Exception("Error launching host '" + nodeId.getHostname() + "'", e); } finally @@ -194,37 +203,33 @@ public String launch(GlobalNodeId globalNodeId, String connectString) throws Exc } } - private static boolean isWindows(SSHClient sshClient) throws IOException + private static boolean isWindows(ClientSession session) throws IOException { - try (Session session = sshClient.startSession()) + String output; + Integer exitStatus; + try (ChannelExec channel = session.createExecChannel("uname -s")) { - Session.Command uname = session.exec("uname -s"); - uname.join(); - InputStream is = uname.getInputStream(); - StringBuilder sb = new StringBuilder(); - while (true) - { - int read = is.read(); - if (read == -1) - break; - sb.append((char)read); - } - String output = sb.toString().toLowerCase(Locale.ROOT); - uname.close(); - Integer exitStatus = uname.getExitStatus(); - if (exitStatus == null) - throw new IOException("Executing 'uname' command did not provide an exit status"); - - // Cannot run "uname -s"? Assume windows. - if (exitStatus != 0) - return true; - // Outputs a well-known windows uname? Assume windows. - for (String winUname : COMMON_WIN_UNAMES) - if (output.contains(winUname.toLowerCase(Locale.ROOT))) - return true; - // Assume *nix. - return false; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + channel.setOut(baos); + channel.setRedirectErrorStream(true); + channel.open().verify(30, TimeUnit.SECONDS); + channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L); + exitStatus = channel.getExitStatus(); + output = baos.toString(StandardCharsets.UTF_8).toLowerCase(Locale.ROOT); } + + if (exitStatus == null) + throw new IOException("Executing 'uname' command did not provide an exit status"); + + // Cannot run "uname -s"? Assume windows. + if (exitStatus != 0) + return true; + // Outputs a well-known windows uname? Assume windows. + for (String winUname : COMMON_WIN_UNAMES) + if (output.contains(winUname.toLowerCase(Locale.ROOT))) + return true; + // Assume *nix. + return false; } private static List buildCommandLine(FileSystem fileSystem, Jvm jvm, List remoteClasspathEntries, String delimiter, String nodeId, String hostname, String connectString) @@ -243,19 +248,21 @@ private static List buildCommandLine(FileSystem fileSystem, Jvm jvm, Lis private static List filterOutEmptyStrings(List opts) { - return opts.stream().filter(s -> !s.trim().equals("")).collect(Collectors.toList()); + return opts.stream().filter(s -> !s.trim().isEmpty()).collect(Collectors.toList()); } - private static void copyFile(SFTPClient sftpClient, String hostId, String filename, LocalSourceFile localSourceFile) throws Exception + private static void copyFile(SftpClient sftpClient, String hostId, String filename, File localSourceFile) throws Exception { - String destFilename = "." + NodeFileSystemProvider.PREFIX + "/" + hostId + "/" + NodeProcess.CLASSPATH_FOLDER_NAME + "/" + filename; - String parentFilename = destFilename.substring(0, destFilename.lastIndexOf('/')); + String destFilename = "." + NodeFileSystemProvider.SCHEME + "/" + hostId + "/" + NodeProcess.CLASSPATH_FOLDER_NAME + "/" + filename; - sftpClient.mkdirs(parentFilename); - sftpClient.put(localSourceFile, destFilename); + try (OutputStream os = sftpClient.write(destFilename); + FileInputStream is = new FileInputStream(localSourceFile)) + { + IOUtil.copy(is, os); + } } - private static void copyDir(SFTPClient sftpClient, String hostId, File cpFile, int depth) throws Exception + private static void copyDir(SftpClient sftpClient, String hostId, File cpFile, int depth) throws Exception { File[] files = cpFile.listFiles(); if (files == null) @@ -265,6 +272,14 @@ private static void copyDir(SFTPClient sftpClient, String hostId, File cpFile, i { if (file.isDirectory()) { + try + { + sftpClient.lstat(file.getName()); + } + catch (IOException e) + { + sftpClient.mkdir(file.getName()); + } copyDir(sftpClient, hostId, file, depth + 1); } else @@ -276,27 +291,25 @@ private static void copyDir(SFTPClient sftpClient, String hostId, File cpFile, i currentFile = currentFile.getParentFile(); filename = currentFile.getName() + "/" + filename; } - copyFile(sftpClient, hostId, filename, new FileSystemFile(file)); + copyFile(sftpClient, hostId, filename, file); } } } private static class RemoteNodeHolder implements AutoCloseable { - private static final RemoteNodeHolder NULL = new RemoteNodeHolder(null, null, null, null, null, null, null); + private static final RemoteNodeHolder NULL = new RemoteNodeHolder(null, null, null, null, null, null); private final GlobalNodeId nodeId; private final FileSystem fileSystem; - private final SSHClient sshClient; - private final SocketForwardingConnectListener forwardingConnectListener; + private final SshClient sshClient; private final AutoCloseable forwarding; - private final Session session; - private final Session.Command command; + private final ClientSession session; + private final ClientChannel command; - private RemoteNodeHolder(GlobalNodeId nodeId, FileSystem fileSystem, SSHClient sshClient, SocketForwardingConnectListener forwardingConnectListener, AutoCloseable forwarding, Session session, Session.Command command) { + private RemoteNodeHolder(GlobalNodeId nodeId, FileSystem fileSystem, SshClient sshClient, AutoCloseable forwarding, ClientSession session, ClientChannel command) { this.nodeId = nodeId; this.fileSystem = fileSystem; this.sshClient = sshClient; - this.forwardingConnectListener = forwardingConnectListener; this.forwarding = forwarding; this.session = session; this.command = command; @@ -305,84 +318,45 @@ private RemoteNodeHolder(GlobalNodeId nodeId, FileSystem fileSystem, SSHClient s @Override public void close() throws Exception { + if (LOG.isDebugEnabled()) + LOG.debug("closing remote node holder of node id {}", nodeId); IOUtil.close(fileSystem); // 0x03 is the character for CTRL-C -> send it to the remote PTY - session.getOutputStream().write(0x03); - // also send TERM signal - command.signal(Signal.TERM); + command.getInvertedIn().write(0x03); try { - command.join(10, TimeUnit.SECONDS); + command.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0L); } catch (Exception e) { // timeout? error? too late, try to kill the process - command.signal(Signal.KILL); + command.close(true); } IOUtil.close(command); IOUtil.close(session); if (!LocalHostLauncher.skipDiskCleanup()) { - try (SFTPClient sftpClient = sshClient.newStatefulSFTPClient()) + try (SftpClient sftpClient = SftpClientFactory.instance().createSftpClient(session)) { - deltree(sftpClient, "." + NodeFileSystemProvider.PREFIX + "/" + nodeId.getClusterId()); + deltree(sftpClient, "." + NodeFileSystemProvider.SCHEME + "/" + nodeId.getClusterId()); } } - IOUtil.close(forwardingConnectListener); IOUtil.close(forwarding); IOUtil.close(sshClient); } - private static void deltree(SFTPClient sftpClient, String path) throws IOException + private static void deltree(SftpClient sftpClient, String path) throws IOException { - List ls = sftpClient.ls(path); - for (RemoteResourceInfo l : ls) + Iterable ls = sftpClient.readDir(path); + for (SftpClient.DirEntry l : ls) { - if (l.isDirectory()) - deltree(sftpClient, l.getPath()); + if (l.getAttributes().isDirectory()) + deltree(sftpClient, l.getLongFilename()); else - sftpClient.rm(l.getPath()); + sftpClient.remove(l.getFilename()); } sftpClient.rmdir(path); } } - - private static class SocketForwardingConnectListener implements ConnectListener, AutoCloseable - { - private final String threadNamePrefix; - private final SocketAddress addr; - private Socket socket; - private Channel.Forwarded channel; - - private SocketForwardingConnectListener(String threadNamePrefix, SocketAddress addr) - { - this.threadNamePrefix = threadNamePrefix; - this.addr = addr; - } - - @Override - public void close() - { - IOUtil.close(channel, socket); - } - - @Override - public void gotConnect(Channel.Forwarded channel) throws IOException - { - this.channel = channel; - socket = new Socket(); - socket.setSendBufferSize(channel.getLocalMaxPacketSize()); - socket.setReceiveBufferSize(channel.getRemoteMaxPacketSize()); - socket.connect(addr); - - channel.confirm(); - - new StreamCopier(socket.getInputStream(), channel.getOutputStream(), channel.getRemoteMaxPacketSize(), false) - .spawnDaemon(threadNamePrefix + "-soc2chan"); - - new StreamCopier(channel.getInputStream(), socket.getOutputStream(), channel.getLocalMaxPacketSize(), false) - .spawnDaemon(threadNamePrefix + "-chan2soc"); - } - } } diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/InMemoryFile.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/InMemoryFile.java deleted file mode 100644 index 01031e8..0000000 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/InMemoryFile.java +++ /dev/null @@ -1,45 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.mortbay.jetty.orchestrator.nodefs; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import net.schmizz.sshj.xfer.InMemoryDestFile; - -class InMemoryFile extends InMemoryDestFile -{ - private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - // Method used by SFTP - @Override - public OutputStream getOutputStream(boolean append) throws IOException - { - return outputStream; - } - - @Override - public ByteArrayOutputStream getOutputStream() - { - return outputStream; - } - - // Method used by SFTP - @Override - public long getLength() - { - return -1; - } -} diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileAttributes.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileAttributes.java index be2e82b..8ef75d6 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileAttributes.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileAttributes.java @@ -13,78 +13,71 @@ package org.mortbay.jetty.orchestrator.nodefs; -import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.attribute.FileTime; -import java.util.concurrent.TimeUnit; - -import net.schmizz.sshj.sftp.FileAttributes; -import net.schmizz.sshj.sftp.FileMode; - -class NodeFileAttributes implements BasicFileAttributes +class NodeFileAttributes // implements BasicFileAttributes { - private final FileAttributes lstat; - - NodeFileAttributes(FileAttributes lstat) - { - this.lstat = lstat; - } - - public FileAttributes getLstat() - { - return lstat; - } - - @Override - public FileTime lastModifiedTime() - { - return FileTime.from(lstat.getMtime(), TimeUnit.MILLISECONDS); - } - - @Override - public FileTime lastAccessTime() - { - return FileTime.from(lstat.getAtime(), TimeUnit.MILLISECONDS); - } - - @Override - public FileTime creationTime() - { - return lastModifiedTime(); - } - - @Override - public boolean isRegularFile() - { - return lstat.getType() == FileMode.Type.REGULAR; - } - - @Override - public boolean isDirectory() - { - return lstat.getType() == FileMode.Type.DIRECTORY; - } - - @Override - public boolean isSymbolicLink() - { - return lstat.getType() == FileMode.Type.SYMLINK; - } - - @Override - public boolean isOther() - { - return !isDirectory() && !isRegularFile() && !isSymbolicLink(); - } - - @Override - public long size() - { - return lstat.getSize(); - } - - @Override - public Object fileKey() - { - return null; - } +// private final FileAttributes lstat; +// +// NodeFileAttributes(FileAttributes lstat) +// { +// this.lstat = lstat; +// } +// +// public FileAttributes getLstat() +// { +// return lstat; +// } +// +// @Override +// public FileTime lastModifiedTime() +// { +// return FileTime.from(lstat.getMtime(), TimeUnit.MILLISECONDS); +// } +// +// @Override +// public FileTime lastAccessTime() +// { +// return FileTime.from(lstat.getAtime(), TimeUnit.MILLISECONDS); +// } +// +// @Override +// public FileTime creationTime() +// { +// return lastModifiedTime(); +// } +// +// @Override +// public boolean isRegularFile() +// { +// return lstat.getType() == FileMode.Type.REGULAR; +// } +// +// @Override +// public boolean isDirectory() +// { +// return lstat.getType() == FileMode.Type.DIRECTORY; +// } +// +// @Override +// public boolean isSymbolicLink() +// { +// return lstat.getType() == FileMode.Type.SYMLINK; +// } +// +// @Override +// public boolean isOther() +// { +// return !isDirectory() && !isRegularFile() && !isSymbolicLink(); +// } +// +// @Override +// public long size() +// { +// return lstat.getSize(); +// } +// +// @Override +// public Object fileKey() +// { +// return null; +// } } diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java index 2daae2d..4d460ef 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystem.java @@ -13,33 +13,23 @@ package org.mortbay.jetty.orchestrator.nodefs; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.DirectoryStream; +import java.net.URI; import java.nio.file.FileStore; import java.nio.file.FileSystem; -import java.nio.file.LinkOption; -import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.WatchService; -import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.UserPrincipalLookupService; import java.nio.file.spi.FileSystemProvider; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Set; -import net.schmizz.sshj.sftp.FileAttributes; -import net.schmizz.sshj.sftp.RemoteResourceInfo; -import net.schmizz.sshj.sftp.SFTPClient; -import org.mortbay.jetty.orchestrator.util.IOUtil; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.sftp.client.fs.SftpFileSystemProvider; +import org.apache.sshd.sftp.common.SftpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,238 +39,53 @@ class NodeFileSystem extends FileSystem static final String PATH_SEPARATOR = "/"; private final NodeFileSystemProvider provider; - private final SFTPClient sftpClient; private final String hostId; private final boolean windows; private final NodePath homePath; private final NodePath cwdPath; - private volatile boolean closed; + private final FileSystem delegate; - NodeFileSystem(NodeFileSystemProvider provider, SFTPClient sftpClient, String hostId, List cwd, boolean windows) + NodeFileSystem(NodeFileSystemProvider provider, SshClient sshClient, String hostId, String cwd, boolean windows, String sftpHost, int sftpPort, String sftpUsername, char[] sftpPassword) { this.provider = provider; - this.sftpClient = sftpClient; this.hostId = hostId; this.windows = windows; - try - { - this.homePath = new NodePath(this, null, NodePath.toSegments(sftpClient.canonicalize("."))); - this.cwdPath = new NodePath(this, homePath, cwd); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - boolean isWindows() - { - return windows; - } + SftpFileSystemProvider sftpFileSystemProvider = new SftpFileSystemProvider(sshClient); - String getHostId() - { - return hostId; - } - private Path relativeFromHomeOrAbsolute(NodePath dir) - { - try - { - return homePath.relativize(dir); - } - catch (IllegalArgumentException e) - { - return dir.toAbsolutePath(); - } - } + URI uri = SftpFileSystemProvider.createFileSystemURI(sftpHost, sftpPort, sftpUsername, sftpPassword == null || sftpPassword.length == 0 ? null : new String(sftpPassword)); - SeekableByteChannel newByteChannel(NodePath path, Set options, FileAttribute... attrs) throws IOException - { - byte[] data; try { - InMemoryFile inMemoryFile = new InMemoryFile(); - sftpClient.get(relativeFromHomeOrAbsolute(path).toString(), inMemoryFile); - data = inMemoryFile.getOutputStream().toByteArray(); + //String userAuth = SftpFileSystemProvider.encodeCredentials(sftpUsername, sftpPassword == null || sftpPassword.length == 0 ? null : new String(sftpPassword)); + //URI uri = new URI(SftpConstants.SFTP_SUBSYSTEM_NAME, userAuth, sftpHost, sftpPort, cwd, null, null); + delegate = sftpFileSystemProvider.newFileSystem(uri, Collections.emptyMap()); + this.homePath = new NodePath(this, null, NodePath.toSegments(delegate.getPath(".").toAbsolutePath().normalize().toString())); + this.cwdPath = new NodePath(this, homePath, NodePath.toSegments(cwd)); } catch (IOException e) { - throw new IOException("Unable to open byte channel for path: " + path, e); + throw new RuntimeException(e); } - - return new SeekableByteChannel() - { - private long position; - - @Override - public void close() - { - } - - @Override - public boolean isOpen() - { - return true; - } - - @Override - public long position() - { - return position; - } - - @Override - public SeekableByteChannel position(long newPosition) - { - position = newPosition; - return this; - } - - @Override - public int read(ByteBuffer dst) - { - int l = (int)Math.min(dst.remaining(), size() - position); - dst.put(data, (int)position, l); - position += l; - return l; - } - - @Override - public long size() - { - return data.length; - } - - @Override - public SeekableByteChannel truncate(long size) - { - throw new UnsupportedOperationException(); - } - - @Override - public int write(ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - }; } - DirectoryStream newDirectoryStream(NodePath dir, DirectoryStream.Filter filter) throws IOException + public Path delegatePath(NodePath path) { - List filteredPaths = new ArrayList<>(); - try - { - List content = sftpClient.ls(relativeFromHomeOrAbsolute(dir).toString()); - for (RemoteResourceInfo remoteResourceInfo : content) - { - Path resolved = dir.resolve(remoteResourceInfo.getName()); - if (filter.accept(resolved)) - filteredPaths.add(resolved); - } - } - catch (IOException e) - { - throw new IOException("Unable to open directory stream for path: " + dir, e); - } - - return new DirectoryStream() - { - @Override - public Iterator iterator() - { - return new Iterator() - { - private final Iterator delegate = filteredPaths.iterator(); - - @Override - public boolean hasNext() - { - return delegate.hasNext(); - } - - @Override - public Path next() - { - return delegate.next(); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - @Override - public void close() - { - } - }; + if (!path.isAbsolute()) + return delegate.getPath(cwdPath.resolve(path).toAbsolutePath().toString()); + else + return delegate.getPath(path.toAbsolutePath().toString()); } - InputStream newInputStream(NodePath path, OpenOption... options) throws IOException + boolean isWindows() { - String sftpPath = relativeFromHomeOrAbsolute(path).toString(); - long fileSize; - try - { - fileSize = sftpClient.lstat(sftpPath).getSize(); - } - catch (IOException e) - { - throw new IOException("Unable to open input stream for path: " + path, e); - } - if (fileSize > 1024 * 1024) - { - // use piping if file to download is > 1MB - PipingFile pipingFile = new PipingFile(); - Thread t = new Thread(() -> - { - try - { - sftpClient.get(sftpPath, pipingFile); - } - catch (IOException e) - { - if (LOG.isDebugEnabled()) - LOG.debug("Error copying " + sftpPath + " over sftp", e); - } - finally - { - IOUtil.close(pipingFile.getOutputStream()); - } - }); - t.setDaemon(true); - t.start(); - return pipingFile.getInputStream(); - } - else - { - InMemoryFile inMemoryFile = new InMemoryFile(); - sftpClient.get(sftpPath, inMemoryFile); - byte[] data = inMemoryFile.getOutputStream().toByteArray(); - return new ByteArrayInputStream(data); - } + return windows; } - @SuppressWarnings("unchecked") - A readAttributes(NodePath path, Class type, LinkOption... options) throws IOException + String getHostId() { - if (!type.equals(BasicFileAttributes.class) && !type.equals(NodeFileAttributes.class)) - throw new UnsupportedOperationException(); - - String sftpPath = relativeFromHomeOrAbsolute(path).toString(); - try - { - FileAttributes lstat = sftpClient.lstat(sftpPath); - NodeFileAttributes nodeFileAttributes = new NodeFileAttributes(lstat); - return (A)nodeFileAttributes; - } - catch (IOException e) - { - throw new IOException("Error reading attributes of path: " + path, e); - } + return hostId; } @Override @@ -289,36 +94,33 @@ public FileSystemProvider provider() return provider; } + public FileSystemProvider delegateProvider() + { + return delegate.provider(); + } + @Override public void close() throws IOException { - try - { - sftpClient.close(); - } - finally - { - provider.remove(hostId); - closed = true; - } + delegate.close(); } @Override public boolean isOpen() { - return !closed; + return delegate.isOpen(); } @Override public boolean isReadOnly() { - return true; + return delegate.isReadOnly(); } @Override public String getSeparator() { - return PATH_SEPARATOR; + return delegate.getSeparator(); } @Override @@ -330,13 +132,13 @@ public Iterable getRootDirectories() @Override public Iterable getFileStores() { - return Collections.emptySet(); + return delegate.getFileStores(); } @Override public Set supportedFileAttributeViews() { - return Collections.emptySet(); + return delegate.supportedFileAttributeViews(); } @Override @@ -357,19 +159,19 @@ Path getPath(boolean absolute, List segments) @Override public PathMatcher getPathMatcher(String syntaxAndPattern) { - throw new UnsupportedOperationException(); + return delegate.getPathMatcher(syntaxAndPattern); } @Override public UserPrincipalLookupService getUserPrincipalLookupService() { - throw new UnsupportedOperationException(); + return delegate.getUserPrincipalLookupService(); } @Override - public WatchService newWatchService() + public WatchService newWatchService() throws IOException { - throw new UnsupportedOperationException(); + return delegate.newWatchService(); } @Override diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemProvider.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemProvider.java index 4759077..893b8b2 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemProvider.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemProvider.java @@ -40,8 +40,7 @@ import java.util.Map; import java.util.Set; -import net.schmizz.sshj.sftp.SFTPClient; -import net.schmizz.sshj.xfer.FilePermission; +import org.apache.sshd.client.SshClient; /** * URI format is: @@ -49,8 +48,13 @@ */ public class NodeFileSystemProvider extends FileSystemProvider { - public static final String PREFIX = "jco"; - public static final String IS_WINDOWS_ENV_PROPERTY = "windows"; + public static final String SCHEME = "jco"; + public static final String IS_WINDOWS_ENV = "windows"; + public static final String SFTP_HOST_ENV = "host"; + public static final String SFTP_PORT_ENV = "port"; + public static final String SFTP_USERNAME_ENV = "username"; + public static final String SFTP_PASSWORD_ENV = "password"; + private static final Map ACCESS_MODES_MASKS = new EnumMap<>(AccessMode.class); static { @@ -100,13 +104,17 @@ public FileSystem newFileSystem(URI uri, Map env) { synchronized (fileSystems) { - boolean windows = (Boolean)env.get(IS_WINDOWS_ENV_PROPERTY); - SFTPClient sftpClient = (SFTPClient)env.get(SFTPClient.class.getName()); + boolean windows = (Boolean)env.get(IS_WINDOWS_ENV); + String sftpHost = (String)env.get(SFTP_HOST_ENV); + Integer sftpPort = (Integer)env.get(SFTP_PORT_ENV); + String sftpUsername = (String)env.get(SFTP_USERNAME_ENV); + char[] sftpPassword = (char[])env.get(SFTP_PASSWORD_ENV); + SshClient sshClient = (SshClient)env.get(SshClient.class.getName()); String hostId = extractHostId(uri); if (fileSystems.containsKey(hostId)) throw new FileSystemAlreadyExistsException("FileSystem already exists: " + hostId); - NodeFileSystem fileSystem = new NodeFileSystem(this, sftpClient, hostId, extractPath(uri), windows); + NodeFileSystem fileSystem = new NodeFileSystem(this, sshClient, hostId, extractPath(uri), windows, sftpHost, sftpPort, sftpUsername, sftpPassword); fileSystems.put(hostId, fileSystem); return fileSystem; } @@ -142,13 +150,22 @@ private static String extractHostId(URI uri) return nodeId; } - private static List extractPath(URI uri) + private static String extractPath(URI uri) { String nodeId = uri.getSchemeSpecificPart(); int i = nodeId.indexOf("!/"); if (i == -1) - return Collections.emptyList(); - return NodePath.toSegments(nodeId.substring(i + 1)); + return ""; + return nodeId.substring(i + 1); + } + + private static String extractPathAsString(URI uri) + { + String nodeId = uri.getSchemeSpecificPart(); + int i = nodeId.indexOf("!/"); + if (i == -1) + return "."; + return nodeId.substring(i + 1); } @Override @@ -160,14 +177,14 @@ public Path getPath(URI uri) NodeFileSystem fileSystem = fileSystems.get(hostId); if (fileSystem == null) throw new FileSystemNotFoundException(uri.toString()); - return fileSystem.getPath(false, extractPath(uri)); + return fileSystem.getPath(extractPathAsString(uri)); } } @Override public String getScheme() { - return PREFIX; + return SCHEME; } @Override @@ -191,31 +208,29 @@ public void move(Path source, Path target, CopyOption... options) @Override public InputStream newInputStream(Path path, OpenOption... options) throws IOException { - if (!(path instanceof NodePath)) - throw new ProviderMismatchException(); - return ((NodeFileSystem)path.getFileSystem()).newInputStream((NodePath)path, options); + return path.getFileSystem().provider().newInputStream(path, options); } @Override public SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { - if (!(path instanceof NodePath)) - throw new ProviderMismatchException(); - return ((NodeFileSystem)path.getFileSystem()).newByteChannel((NodePath)path, options, attrs); + return path.getFileSystem().provider().newByteChannel(path, options, attrs); } @Override public DirectoryStream newDirectoryStream(Path dir, DirectoryStream.Filter filter) throws IOException { if (!(dir instanceof NodePath)) - throw new ProviderMismatchException(); - return ((NodeFileSystem)dir.getFileSystem()).newDirectoryStream((NodePath)dir, filter); + throw new ProviderMismatchException("Path is not NodePath: " + dir.getClass()); + NodeFileSystem delegateFileSystem = (NodeFileSystem)dir.getFileSystem(); + Path path = delegateFileSystem.delegatePath((NodePath) dir); + return delegateFileSystem.delegateProvider().newDirectoryStream(path, filter); } @Override public A readAttributes(Path path, Class type, LinkOption... options) throws IOException { - return ((NodeFileSystem)path.getFileSystem()).readAttributes((NodePath)path, type, options); + return path.getFileSystem().provider().readAttributes(path, type, options); } @Override @@ -241,17 +256,17 @@ public void checkAccess(Path path, AccessMode... modes) throws IOException masks[i] = mask; } - NodeFileAttributes attributes = readAttributes(path, NodeFileAttributes.class); - for (FilePermission permission : attributes.getLstat().getPermissions()) - { - if (masks.length == 0) // existence check - return; - for (int mask : masks) - { - if (permission.isIn(mask)) - return; - } - } +// NodeFileAttributes attributes = readAttributes(path, NodeFileAttributes.class); +// for (FilePermission permission : attributes.getLstat().getPermissions()) +// { +// if (masks.length == 0) // existence check +// return; +// for (int mask : masks) +// { +// if (permission.isIn(mask)) +// return; +// } +// } throw new IOException("Access check failed"); } } diff --git a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/PipingFile.java b/src/main/java/org/mortbay/jetty/orchestrator/nodefs/PipingFile.java deleted file mode 100644 index 5af4d1f..0000000 --- a/src/main/java/org/mortbay/jetty/orchestrator/nodefs/PipingFile.java +++ /dev/null @@ -1,56 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.mortbay.jetty.orchestrator.nodefs; - -import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; - -import net.schmizz.sshj.xfer.InMemoryDestFile; - -class PipingFile extends InMemoryDestFile -{ - private final PipedOutputStream outputStream = new PipedOutputStream(); - private final PipedInputStream inputStream; - - public PipingFile() throws IOException - { - inputStream = new PipedInputStream(outputStream); - } - - // Method used by SFTP - @Override - public long getLength() - { - return -1; - } - - // Method used by SFTP - @Override - public PipedOutputStream getOutputStream(boolean append) throws IOException - { - return outputStream; - } - - @Override - public PipedOutputStream getOutputStream() - { - return outputStream; - } - - public PipedInputStream getInputStream() - { - return inputStream; - } -} diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/GlobalNodeId.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/GlobalNodeId.java index 28177f1..4a7990a 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/GlobalNodeId.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/GlobalNodeId.java @@ -119,4 +119,16 @@ public int hashCode() { return Objects.hash(nodeId); } + + @Override + public String toString() + { + return "GlobalNodeId{" + + "clusterId='" + clusterId + '\'' + + ", hostname='" + hostname + '\'' + + ", hostId='" + hostId + '\'' + + ", nodeId='" + nodeId + '\'' + + ", local=" + local + + '}'; + } } diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java index 9262766..4e663c2 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/NodeProcess.java @@ -127,7 +127,16 @@ public static void main(String[] args) throws Exception }); Runtime.getRuntime().addShutdownHook(shutdown); - rpcServer.run(); + if (LOG.isDebugEnabled()) + LOG.debug("Node [{}] ready to serve", nodeId); + try + { + rpcServer.run(); + } + catch (Exception e) + { + LOG.error("RPC server failed on node {}; aborting", nodeId, e); + } if (LOG.isDebugEnabled()) LOG.debug("Node [{}] disconnecting from {}", nodeId, connectString); shutdown.run(); // do not start that thread, run its runnable on the current thread @@ -183,7 +192,7 @@ public static Thread spawnThread(String nodeId, String connectString) private static File defaultRootPath(String hostId) { - return new File(System.getProperty("user.home") + "/." + NodeFileSystemProvider.PREFIX + "/" + hostId); + return new File(System.getProperty("user.home") + "/." + NodeFileSystemProvider.SCHEME + "/" + hostId); } private static File defaultLibPath(String hostId) diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java index f578cc2..0ea5e2e 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcClient.java @@ -73,6 +73,8 @@ public RpcClient(CuratorFramework curator, GlobalNodeId globalNodeId) future.complete(resp.getResult()); } }); + if (LOG.isDebugEnabled()) + LOG.debug("RPC client started on {}", globalNodeId.getNodeId()); } public CompletableFuture callAsync(Command command) throws Exception diff --git a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java index 75ed00a..a60db20 100644 --- a/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java +++ b/src/main/java/org/mortbay/jetty/orchestrator/rpc/RpcServer.java @@ -56,6 +56,8 @@ public RpcServer(CuratorFramework curator, GlobalNodeId globalNodeId) return thread; }); clusterTools = new ClusterTools(curator, globalNodeId); + if (LOG.isDebugEnabled()) + LOG.debug("RPC server started on {}", globalNodeId.getNodeId()); } public long getLastCommandTimestamp() @@ -88,7 +90,7 @@ private void abort() { // does not matter, ZK is shutting down if this happens if (LOG.isDebugEnabled()) - LOG.debug("", e); + LOG.debug("exception caught during abort", e); } } @@ -107,6 +109,8 @@ public void run() if (request.getCommand().getClass() == AbortCommand.class) { active = false; + if (LOG.isDebugEnabled()) + LOG.debug("RPC server got Abort command"); return; } @@ -159,6 +163,8 @@ public void run() catch (InterruptedException e) { active = false; + if (LOG.isDebugEnabled()) + LOG.debug("RPC server interrupted", e); return; } catch (Exception e) @@ -167,6 +173,8 @@ public void run() throw new RuntimeException("Error reading command on node " + globalNodeId.getNodeId(), e); } } + if (LOG.isDebugEnabled()) + LOG.debug("RPC server is now inactive; request queue processing returning"); } private static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException diff --git a/src/test/java/org/mortbay/jetty/orchestrator/NodeArrayFutureTest.java b/src/test/java/org/mortbay/jetty/orchestrator/NodeArrayFutureTest.java index f2108f0..180cd40 100644 --- a/src/test/java/org/mortbay/jetty/orchestrator/NodeArrayFutureTest.java +++ b/src/test/java/org/mortbay/jetty/orchestrator/NodeArrayFutureTest.java @@ -26,6 +26,7 @@ import org.mortbay.jetty.orchestrator.configuration.SimpleNodeArrayConfiguration; import org.mortbay.jetty.orchestrator.configuration.SshRemoteHostLauncher; import sshd.AbstractSshTest; +import utils.JvmUtil; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -37,7 +38,7 @@ public class NodeArrayFutureTest extends AbstractSshTest public void testJvmOptionWithStar() throws Exception { ClusterConfiguration cfg = new SimpleClusterConfiguration() - .jvm(new Jvm((fs, h) -> "java", "-Dmyprop=*")) + .jvm(new Jvm((fs, h) -> JvmUtil.findCurrentJavaExecutable().toAbsolutePath().toString(), "-Dmyprop=*")) .nodeArray(new SimpleNodeArrayConfiguration("my-array").node(new Node("1", InetAddress.getLocalHost().getHostName()))) .hostLauncher(new SshRemoteHostLauncher(System.getProperty("user.name"), new char[0], sshd.getPort())) ; @@ -54,6 +55,7 @@ public void testJvmOptionWithStar() throws Exception public void testDetectProcessDeath() throws Exception { ClusterConfiguration cfg = new SimpleClusterConfiguration() + .jvm(new Jvm((fs, h) -> JvmUtil.findCurrentJavaExecutable().toAbsolutePath().toString())) .nodeArray(new SimpleNodeArrayConfiguration("my-array").node(new Node("1", InetAddress.getLocalHost().getHostName()))) .hostLauncher(new SshRemoteHostLauncher(System.getProperty("user.name"), new char[0], sshd.getPort())) ; @@ -73,6 +75,7 @@ public void testDetectProcessDeath() throws Exception public void testDetectTimeout() throws Exception { ClusterConfiguration cfg = new SimpleClusterConfiguration() + .jvm(new Jvm((fs, h) -> JvmUtil.findCurrentJavaExecutable().toAbsolutePath().toString())) .nodeArray(new SimpleNodeArrayConfiguration("my-array").node(new Node("1", InetAddress.getLocalHost().getHostName())).node(new Node("2", InetAddress.getLocalHost().getHostName()))) .hostLauncher(new SshRemoteHostLauncher(System.getProperty("user.name"), new char[0], sshd.getPort())) ; @@ -96,6 +99,7 @@ public void testDetectTimeout() throws Exception public void testZeroTimeoutThenDetectDeath() throws Exception { ClusterConfiguration cfg = new SimpleClusterConfiguration() + .jvm(new Jvm((fs, h) -> JvmUtil.findCurrentJavaExecutable().toAbsolutePath().toString())) .nodeArray(new SimpleNodeArrayConfiguration("my-array").node(new Node("1", InetAddress.getLocalHost().getHostName()))) .hostLauncher(new SshRemoteHostLauncher(System.getProperty("user.name"), new char[0], sshd.getPort())) ; @@ -124,6 +128,7 @@ public void testZeroTimeoutThenDetectDeath() throws Exception public void testTimeoutIsSpread() throws Exception { ClusterConfiguration cfg = new SimpleClusterConfiguration() + .jvm(new Jvm((fs, h) -> JvmUtil.findCurrentJavaExecutable().toAbsolutePath().toString())) .nodeArray(new SimpleNodeArrayConfiguration("my-array").node(new Node("1", InetAddress.getLocalHost().getHostName())).node(new Node("2", InetAddress.getLocalHost().getHostName()))) .hostLauncher(new SshRemoteHostLauncher(System.getProperty("user.name"), new char[0], sshd.getPort())) ; diff --git a/src/test/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemTest.java b/src/test/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemTest.java index ab0ec88..ac6b6dc 100644 --- a/src/test/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemTest.java +++ b/src/test/java/org/mortbay/jetty/orchestrator/nodefs/NodeFileSystemTest.java @@ -16,21 +16,25 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.nio.file.DirectoryStream; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.HashMap; import java.util.Iterator; +import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import java.util.stream.StreamSupport; -import net.schmizz.sshj.SSHClient; -import net.schmizz.sshj.sftp.SFTPClient; -import net.schmizz.sshj.transport.verification.PromiscuousVerifier; +import org.apache.sshd.client.SshClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,6 +50,8 @@ public class NodeFileSystemTest { + private static final boolean IS_WINDOWS = System.getProperty("os.name").startsWith("Windows"); + private Closer closer; @BeforeEach @@ -55,7 +61,7 @@ public void setUp() } @AfterEach - public void tearDown() throws Exception + public void tearDown() { closer.close(); } @@ -63,18 +69,25 @@ public void tearDown() throws Exception @Test public void testNodeIdFolder() throws Exception { - new File("target/testNodeIdFolder/." + NodeFileSystemProvider.PREFIX + "/the-test/myhost/a").mkdirs(); + new File("target/testNodeIdFolder/." + NodeFileSystemProvider.SCHEME + "/the-test/myhost/a").mkdirs(); TestSshServer testSshServer = closer.register(new TestSshServer("target/testNodeIdFolder")); - SSHClient sshClient = closer.register(new SSHClient()); - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - sshClient.connect("localhost", testSshServer.getPort()); - sshClient.authPassword("username", new char[0]); + SshClient sshClient = closer.register(SshClient.setUpDefaultClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); HashMap env = new HashMap<>(); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, false); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - NodeFileSystem fileSystem = closer.register((NodeFileSystem)FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":the-test/myhost!/." + NodeFileSystemProvider.PREFIX + "/the-test/myhost"), env)); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(NodeFileSystemProvider.SFTP_HOST_ENV, "localhost"); + env.put(NodeFileSystemProvider.SFTP_PORT_ENV, testSshServer.getPort()); + env.put(NodeFileSystemProvider.SFTP_USERNAME_ENV, System.getProperty("user.name")); + env.put(SshClient.class.getName(), sshClient); + NodeFileSystem fileSystem = closer.register((NodeFileSystem)FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost!/." + NodeFileSystemProvider.SCHEME + "/the-test/myhost"), env)); + + Files.newDirectoryStream(fileSystem.getPath(".")).forEach((s) -> System.out.println(s)); + Files.newDirectoryStream(fileSystem.getPath(".jco")).forEach((s) -> System.out.println(s)); DirectoryStream paths = Files.newDirectoryStream(fileSystem.getPath(".")); Iterator iterator = paths.iterator(); @@ -83,21 +96,115 @@ public void testNodeIdFolder() throws Exception assertThat(iterator.hasNext(), is(false)); } + @Test + public void testCopy() throws Exception + { + new File("target/testCopy/." + NodeFileSystemProvider.SCHEME + "/the-test/myhost/a").mkdirs(); + + for (String classpathEntry : System.getProperty("java.class.path").split(File.pathSeparator)) + { + File cpFile = new File(classpathEntry); + if (cpFile.isDirectory()) + continue; + copyFile(cpFile.toPath(), Paths.get("target/testCopy/.jco/the-test/myhost", cpFile.getName())); + } + + TestSshServer testSshServer = closer.register(new TestSshServer("target/testCopy")); + SshClient sshClient = closer.register(SshClient.setUpDefaultClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); + + HashMap env = new HashMap<>(); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(NodeFileSystemProvider.SFTP_HOST_ENV, "localhost"); + env.put(NodeFileSystemProvider.SFTP_PORT_ENV, testSshServer.getPort()); + env.put(NodeFileSystemProvider.SFTP_USERNAME_ENV, System.getProperty("user.name")); + env.put(SshClient.class.getName(), sshClient); + NodeFileSystem fileSystem = closer.register((NodeFileSystem)FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost!/." + NodeFileSystemProvider.SCHEME + "/the-test/myhost"), env)); + + Path targetPath = Paths.get("target/testCopy-target/"); + Files.createDirectories(targetPath); + Path sourcePath = fileSystem.getPath("."); + + copyDir(sourcePath, targetPath); + } + + public static void copyDir(Path srcDir, Path destDir) throws IOException + { + if (!Files.isDirectory(Objects.requireNonNull(srcDir))) + throw new IllegalArgumentException("Source is not a directory: " + srcDir); + Objects.requireNonNull(destDir); + if (Files.exists(destDir) && !Files.isDirectory(destDir)) + throw new IllegalArgumentException("Destination is not a directory: " + destDir); + else if (!Files.exists(destDir)) + Files.createDirectory(destDir); // only attempt top create 1 level of directory (parent must exist) + + try (Stream sourceStream = Files.walk(srcDir)) + { + Iterator iterFiles = sourceStream + .filter(Files::isRegularFile) + .iterator(); + while (iterFiles.hasNext()) + { + Path sourceFile = iterFiles.next(); + Path relative = srcDir.relativize(sourceFile); + Path destFile = resolvePath(destDir, relative); + if (!Files.exists(destFile.getParent())) + Files.createDirectories(destFile.getParent()); + copyFile(sourceFile, destFile); + } + } + } + + public static void copyFile(Path srcFile, Path destFile) throws IOException + { + if (!Files.isRegularFile(Objects.requireNonNull(srcFile))) + throw new IllegalArgumentException("Source is not a file: " + srcFile); + Objects.requireNonNull(destFile); + + try (OutputStream out = Files.newOutputStream(destFile, + StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) + { + Files.copy(srcFile, out); + } + } + + public static Path resolvePath(Path basePath, Path relative) + { + if (relative.isAbsolute()) + throw new IllegalArgumentException("Relative path cannot be absolute"); + + if (basePath.getFileSystem().equals(relative.getFileSystem())) + { + return basePath.resolve(relative); + } + else + { + for (Path segment : relative) + basePath = basePath.resolve(segment.toString()); + return basePath; + } + } + + @Test public void testHomeFolderIsDefault() throws Exception { - new File("target/testHomeFolderIsDefault/." + NodeFileSystemProvider.PREFIX + "/the-test/myhost").mkdirs(); + new File("target/testHomeFolderIsDefault/." + NodeFileSystemProvider.SCHEME + "/the-test/myhost").mkdirs(); TestSshServer testSshServer = closer.register(new TestSshServer("target/testHomeFolderIsDefault")); - SSHClient sshClient = closer.register(new SSHClient()); - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - sshClient.connect("localhost", testSshServer.getPort()); - sshClient.authPassword("username", new char[0]); + SshClient sshClient = closer.register(new SshClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); HashMap env = new HashMap<>(); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, false); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":the-test/myhost"), env)); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(SshClient.class.getName(), sshClient); + FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost"), env)); DirectoryStream paths = Files.newDirectoryStream(fileSystem.getPath(".")); Iterator iterator = paths.iterator(); @@ -112,15 +219,16 @@ public void testAbsolutePath() throws Exception new File("target/testAbsolutePath").mkdirs(); TestSshServer testSshServer = closer.register(new TestSshServer("target/testAbsolutePath")); - SSHClient sshClient = closer.register(new SSHClient()); - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - sshClient.connect("localhost", testSshServer.getPort()); - sshClient.authPassword("username", new char[0]); + SshClient sshClient = closer.register(new SshClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); HashMap env = new HashMap<>(); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, false); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":the-test/myhost"), env)); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(SshClient.class.getName(), sshClient); + FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost"), env)); DirectoryStream directoryStream = Files.newDirectoryStream(fileSystem.getPath("/")); long pathCount = StreamSupport.stream(Spliterators.spliteratorUnknownSize(directoryStream.iterator(), Spliterator.ORDERED), false).count(); @@ -139,15 +247,16 @@ public void testJvmFilenameSupplierFound() throws Exception javaFile.setExecutable(true); TestSshServer testSshServer = closer.register(new TestSshServer(home.getPath())); - SSHClient sshClient = closer.register(new SSHClient()); - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - sshClient.connect("localhost", testSshServer.getPort()); - sshClient.authPassword("username", new char[0]); + SshClient sshClient = closer.register(new SshClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); HashMap env = new HashMap<>(); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, false); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":the-test/myhost"), env)); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(SshClient.class.getName(), sshClient); + FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost"), env)); Jvm jvm = new Jvm((fs, h) -> { @@ -176,15 +285,16 @@ public void testJvmFilenameSupplierNotFound() throws Exception folder.mkdirs(); TestSshServer testSshServer = closer.register(new TestSshServer(home.getPath())); - SSHClient sshClient = closer.register(new SSHClient()); - sshClient.addHostKeyVerifier(new PromiscuousVerifier()); - sshClient.connect("localhost", testSshServer.getPort()); - sshClient.authPassword("username", new char[0]); + SshClient sshClient = closer.register(new SshClient()); + sshClient.start(); + closer.register(sshClient.connect(null, "localhost", testSshServer.getPort()) + .verify(30, TimeUnit.SECONDS) + .getSession()); HashMap env = new HashMap<>(); - env.put(NodeFileSystemProvider.IS_WINDOWS_ENV_PROPERTY, false); - env.put(SFTPClient.class.getName(), sshClient.newStatefulSFTPClient()); - FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.PREFIX + ":the-test/myhost"), env)); + env.put(NodeFileSystemProvider.IS_WINDOWS_ENV, IS_WINDOWS); + env.put(SshClient.class.getName(), sshClient); + FileSystem fileSystem = closer.register(FileSystems.newFileSystem(URI.create(NodeFileSystemProvider.SCHEME + ":the-test/myhost"), env)); assertThrows(NoFileException.class, () -> new Jvm((fs, h) -> { diff --git a/src/test/java/sshd/HomeProcessShell.java b/src/test/java/sshd/HomeProcessShell.java index 334958a..523dfa2 100644 --- a/src/test/java/sshd/HomeProcessShell.java +++ b/src/test/java/sshd/HomeProcessShell.java @@ -13,10 +13,12 @@ package sshd; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -106,12 +108,21 @@ public void start(ChannelSession channel, Environment env) throws IOException builder.directory(new File(homePath)); - process = builder.start(); - Map modes = resolveShellTtyOptions(env.getPtyModes()); - out = new TtyFilterInputStream(process.getInputStream(), modes); - err = new TtyFilterInputStream(process.getErrorStream(), modes); - in = new TtyFilterOutputStream(process.getOutputStream(), err, modes); + try + { + process = builder.start(); + out = new TtyFilterInputStream(process.getInputStream(), modes); + err = new TtyFilterInputStream(process.getErrorStream(), modes); + in = new TtyFilterOutputStream(process.getOutputStream(), err, modes); + } + catch (IOException ioe) + { + out = new TtyFilterInputStream(InputStream.nullInputStream(), modes); + ByteArrayInputStream errStream = new ByteArrayInputStream((ioe.getMessage() + "\n").getBytes(StandardCharsets.UTF_8)); + err = new TtyFilterInputStream(errStream, modes); + in = new TtyFilterOutputStream(OutputStream.nullOutputStream(), err, modes); + } } protected Map resolveShellEnvironment(Map env) { @@ -144,7 +155,7 @@ public InputStream getErrorStream() { @Override public boolean isAlive() { - return process.isAlive(); + return process != null && process.isAlive(); } @Override @@ -156,7 +167,7 @@ public int exitValue() { throw new RuntimeException(e); } } else { - return process.exitValue(); + return process == null ? 127 : process.exitValue(); } } diff --git a/src/test/java/utils/JvmUtil.java b/src/test/java/utils/JvmUtil.java new file mode 100644 index 0000000..4ea6726 --- /dev/null +++ b/src/test/java/utils/JvmUtil.java @@ -0,0 +1,40 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package utils; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class JvmUtil +{ + public static Path findCurrentJavaExecutable() + { + String javaHome = System.getProperty("java.home"); + Path javaHomePath = Paths.get(javaHome); + return findJavaExecutable(javaHomePath); + } + + public static Path findJavaExecutable(Path javaHomePath) + { + Path javaExec = javaHomePath.resolve("bin").resolve("java"); // *nix + if (!Files.isExecutable(javaExec)) + javaExec = javaHomePath.resolve("Contents").resolve("Home").resolve("bin").resolve("java"); // OSX + if (!Files.isExecutable(javaExec)) + javaExec = javaHomePath.resolve("bin").resolve("java.exe"); // Windows + if (!Files.isExecutable(javaExec)) + return null; + return javaExec; + } +}