diff --git a/hetu-filesystem-client/mysql_ddl.sql b/hetu-filesystem-client/mysql_ddl.sql
new file mode 100644
index 000000000..c0090ff3f
--- /dev/null
+++ b/hetu-filesystem-client/mysql_ddl.sql
@@ -0,0 +1,10 @@
+drop table `olk_fs_catalog` if exists;
+
+CREATE TABLE `olk_fs_catalog`
+(
+ `catalog_name` varchar(256) NOT NULL COMMENT '目录名称',
+ `metadata` text COMMENT '元数据',
+ `properties` text COMMENT '配置信息',
+ `create_time` datetime DEFAULT NULL COMMENT '配置信息',
+ PRIMARY KEY (`catalog_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
\ No newline at end of file
diff --git a/hetu-filesystem-client/pom.xml b/hetu-filesystem-client/pom.xml
index 4097b370c..5a5fa1b51 100644
--- a/hetu-filesystem-client/pom.xml
+++ b/hetu-filesystem-client/pom.xml
@@ -35,6 +35,19 @@
io.airlift
log
+
+ mysql
+ mysql-connector-java
+
+
+ org.postgresql
+ postgresql
+
+
+ com.alibaba
+ druid
+ 1.2.15
+
org.testng
testng
@@ -57,6 +70,12 @@
true
+
+ maven-dependency-plugin
+
+ true
+
+
\ No newline at end of file
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/AbstractWorkspaceFileSystemClient.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/AbstractWorkspaceFileSystemClient.java
index e7afe1380..6e99aa314 100644
--- a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/AbstractWorkspaceFileSystemClient.java
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/AbstractWorkspaceFileSystemClient.java
@@ -51,8 +51,12 @@ public Path getRoot()
public void validate(Path path)
throws AccessDeniedException
{
- if (!path.toAbsolutePath().startsWith(root)) {
- throw new AccessDeniedException(String.format("%s is not in workspace %s. Access has been denied.", path, root));
+ String os = System.getProperty("os.name");
+ //Windows操作系统不校验
+ if (os == null || !os.toLowerCase().startsWith("windows")) {
+ if (!path.toAbsolutePath().startsWith(root)) {
+ throw new AccessDeniedException(String.format("%s is not in workspace %s. Access has been denied.", path, root));
+ }
}
}
}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuFileSystemClientPlugin.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuFileSystemClientPlugin.java
index d37a88526..a79f09423 100644
--- a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuFileSystemClientPlugin.java
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuFileSystemClientPlugin.java
@@ -15,6 +15,7 @@
package io.hetu.core.filesystem;
import com.google.common.collect.ImmutableList;
+import io.hetu.core.filesystem.db.DbFileSystemClientFactory;
import io.prestosql.spi.Plugin;
import io.prestosql.spi.filesystem.HetuFileSystemClientFactory;
@@ -29,6 +30,6 @@ public class HetuFileSystemClientPlugin
@Override
public Iterable getFileSystemClientFactory()
{
- return ImmutableList.of(new LocalFileSystemClientFactory(), new HdfsFileSystemClientFactory());
+ return ImmutableList.of(new LocalFileSystemClientFactory(), new HdfsFileSystemClientFactory(), new DbFileSystemClientFactory());
}
}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbCatalog.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbCatalog.java
new file mode 100644
index 000000000..58a544874
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbCatalog.java
@@ -0,0 +1,72 @@
+package io.hetu.core.filesystem.db;
+
+import java.nio.file.Path;
+
+class DbCatalog {
+ private String catalogName;
+
+ private String metadata;
+
+ private Path metadataDirPath;
+
+ private String properties;
+
+ private Path propertiesPath;
+
+ public DbCatalog() {
+ }
+
+ public DbCatalog(String catalogName, String metadata, String properties) {
+ this.catalogName = catalogName;
+ this.metadata = metadata;
+ this.properties = properties;
+ }
+
+ public DbCatalog(String catalogName, String metadata, Path metadataDirPath, String properties, Path propertiesPath) {
+ this.catalogName = catalogName;
+ this.metadata = metadata;
+ this.metadataDirPath = metadataDirPath;
+ this.properties = properties;
+ this.propertiesPath = propertiesPath;
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ public void setCatalogName(String catalogName) {
+ this.catalogName = catalogName;
+ }
+
+ public String getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(String metadata) {
+ this.metadata = metadata;
+ }
+
+ public Path getMetadataDirPath() {
+ return metadataDirPath;
+ }
+
+ public void setMetadataDirPath(Path metadataDirPath) {
+ this.metadataDirPath = metadataDirPath;
+ }
+
+ public String getProperties() {
+ return properties;
+ }
+
+ public void setProperties(String properties) {
+ this.properties = properties;
+ }
+
+ public Path getPropertiesPath() {
+ return propertiesPath;
+ }
+
+ public void setPropertiesPath(Path propertiesPath) {
+ this.propertiesPath = propertiesPath;
+ }
+}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbConfig.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbConfig.java
new file mode 100644
index 000000000..2518e4370
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2018-2020. Huawei Technologies Co., Ltd. All rights reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.filesystem.db;
+
+import io.airlift.log.Logger;
+
+import java.util.Properties;
+
+/**
+ * 动态目录数据库配置
+ */
+public class DbConfig {
+
+ private static final Logger LOG = Logger.get(DbConfig.class);
+
+ private static final String FS_DB_URL = "fs.db.url";
+
+ private static final String FS_DB_DRIVER = "fs.db.driver";
+
+ private static final String FS_DB_USERNAME = "fs.db.username";
+
+ private static final String FS_DB_PASSWORD = "fs.db.password";
+
+ private static final String FS_DB_INITIAL_SIZE = "fs.db.initialSize";
+
+ private static final String FS_DB_MIN_IDLE = "fs.db.minIdle";
+
+ private static final String FS_DB_MAX_ACTIVE = "fs.db.maxActive";
+
+ private static final String FS_DB_MAX_WAIT = "fs.db.maxWait";
+
+ private Properties dbProperties;
+
+ public Properties getDbProperties() {
+ return dbProperties;
+ }
+
+ public DbConfig(Properties properties) {
+ generateDbConfig(properties);
+ }
+
+ private void generateDbConfig(Properties properties) {
+ try {
+ dbProperties = new Properties();
+ dbProperties.setProperty("url", properties.getProperty(FS_DB_URL));
+ dbProperties.setProperty("driverClassName", properties.getProperty(FS_DB_DRIVER));
+ dbProperties.setProperty("username", properties.getProperty(FS_DB_USERNAME));
+ dbProperties.setProperty("password", properties.getProperty(FS_DB_PASSWORD));
+ dbProperties.setProperty("initialSize", configDefault(properties, FS_DB_INITIAL_SIZE, "1"));
+ dbProperties.setProperty("minIdle", configDefault(properties, FS_DB_MIN_IDLE, "1"));
+ dbProperties.setProperty("maxActive", configDefault(properties, FS_DB_MAX_ACTIVE, "1"));
+ dbProperties.setProperty("maxWait", configDefault(properties, FS_DB_MAX_WAIT, "1"));
+ } catch (Exception e) {
+ LOG.error("获取动态目录数据库配置异常, 原因: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("获取动态目录数据库配置异常, 原因: " + e.getMessage());
+ }
+ }
+
+ private String configDefault(Properties properties, String key, String defaultValue) {
+ Object o = properties.get(key);
+ return o != null && o != "" ? o.toString() : defaultValue;
+ }
+}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileOutputStream.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileOutputStream.java
new file mode 100644
index 000000000..9cfcc271b
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileOutputStream.java
@@ -0,0 +1,91 @@
+package io.hetu.core.filesystem.db;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static io.hetu.core.filesystem.db.HetuDbFileSystemClient.readString;
+
+public class DbFileOutputStream extends OutputStream {
+
+ private DataSource dataSource;
+
+ private List catalogBaseDir;
+
+ private Path path;
+
+ private OutputStream out;
+
+ public DbFileOutputStream(DataSource dataSource, List catalogBaseDir, Path path, OutputStream out) {
+ this.dataSource = dataSource;
+ this.catalogBaseDir = catalogBaseDir;
+ this.path = path;
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ saveDbCatalog(path);
+ }
+
+ /**
+ * 保存数据库连接器
+ * @param path
+ * @throws Exception
+ */
+ private void saveDbCatalog(Path path) throws IOException {
+ String absolutePath = path.toFile().getParentFile().getAbsolutePath();
+ if (!catalogBaseDir.isEmpty() && catalogBaseDir.stream().filter(absolutePath::contains).count() > 0) {
+ String fileName = path.toFile().getName();
+ String catalogName = null;
+ String metatdata = null;
+ String properties = null;
+ if (fileName.endsWith(".properties") && Files.exists(path.toAbsolutePath())) {
+ catalogName = fileName.substring(0, fileName.lastIndexOf(".properties"));
+ properties = readString(Files.newInputStream(path.toAbsolutePath()));
+ }
+ if (fileName.endsWith(".metadata") && Files.exists(path.toAbsolutePath())) {
+ catalogName = fileName.substring(0, fileName.lastIndexOf(".metadata"));
+ metatdata = readString(Files.newInputStream(path.toAbsolutePath()));
+ }
+ if (catalogName != null) {
+ DbCatalog dbCatalog = DbUtils.selectOne(dataSource, catalogName);
+ if (dbCatalog != null) {
+ if (metatdata != null) {
+ dbCatalog.setMetadata(metatdata);
+ }
+ if (properties != null) {
+ dbCatalog.setProperties(properties);
+ }
+ DbUtils.updateByCatalogName(dataSource, dbCatalog.getCatalogName(), dbCatalog.getMetadata(), dbCatalog.getProperties());
+ } else {
+ DbUtils.insert(dataSource, catalogName, metatdata, properties);
+ }
+ }
+ }
+ }
+}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileSystemClientFactory.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileSystemClientFactory.java
new file mode 100644
index 000000000..ccfd0f9f8
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbFileSystemClientFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2018-2020. Huawei Technologies Co., Ltd. All rights reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.filesystem.db;
+
+import io.hetu.core.filesystem.LocalFileSystemClientFactory;
+import io.prestosql.spi.filesystem.HetuFileSystemClient;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+/**
+ * 动态目录数据库客户端工厂
+ */
+public class DbFileSystemClientFactory extends LocalFileSystemClientFactory {
+ private static final String NAME_DB = "db";
+
+ @Override
+ public HetuFileSystemClient getFileSystemClient(Properties properties) {
+ return new HetuDbFileSystemClient(new DbConfig(properties), Paths.get("/"));
+ }
+
+ @Override
+ public HetuFileSystemClient getFileSystemClient(Properties properties, Path root) {
+ return new HetuDbFileSystemClient(new DbConfig(properties), root);
+ }
+
+ @Override
+ public String getName() {
+ return NAME_DB;
+ }
+}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbUtils.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbUtils.java
new file mode 100644
index 000000000..104f61d71
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/DbUtils.java
@@ -0,0 +1,131 @@
+package io.hetu.core.filesystem.db;
+
+import io.airlift.log.Logger;
+
+import javax.sql.DataSource;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class DbUtils {
+
+ private static final Logger LOG = Logger.get(DbUtils.class);
+
+ private static final String SELECT_ALL_SQL = "select * from olk_fs_catalog";
+ private static final String SELECT_ONE_SQL = "select * from olk_fs_catalog where catalog_name = ?";
+ private static final String INSERT_SQL = "insert into olk_fs_catalog (catalog_name, metadata, properties, create_time) values(?,?,?,?)";
+ private static final String UPDATE_SQL = "update olk_fs_catalog set metadata = ?, properties = ? where catalog_name = ?";
+ private static final String DELETE_SQL = "delete from olk_fs_catalog where catalog_name = ?";
+
+
+ public static List selectAll(DataSource dataSource) {
+ List results = new ArrayList<>();
+ execute(dataSource, SELECT_ALL_SQL, (p) -> {
+ try {
+ ResultSet resultSet = p.executeQuery();
+ while (resultSet.next()) {
+ String catalogName = resultSet.getString("catalog_name");
+ String metadata = resultSet.getString("metadata");
+ String properties = resultSet.getString("properties");
+ results.add(new DbCatalog(catalogName, metadata, properties));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return results;
+ }
+
+ public static DbCatalog selectOne(DataSource dataSource, String catalog) {
+ List results = new ArrayList<>();
+ execute(dataSource, SELECT_ONE_SQL, (p) -> {
+ try {
+ p.setString(1, catalog);
+ ResultSet resultSet = p.executeQuery();
+ while (resultSet.next()) {
+ String catalogName = resultSet.getString("catalog_name");
+ String metadata = resultSet.getString("metadata");
+ String properties = resultSet.getString("properties");
+ results.add(new DbCatalog(catalogName, metadata, properties));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ if (!results.isEmpty()) {
+ return results.get(0);
+ }
+ return null;
+ }
+
+ public static void insert(DataSource dataSource, String catalogName, String metadata, String properties) {
+ execute(dataSource, INSERT_SQL, (p) -> {
+ try {
+ p.setString(1, catalogName);
+ p.setString(2, metadata);
+ p.setString(3, properties);
+ p.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
+ p.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public static void updateByCatalogName(DataSource dataSource, String catalogName, String metadata, String properties) {
+ execute(dataSource, UPDATE_SQL, (p) -> {
+ try {
+ p.setString(1, metadata);
+ p.setString(2, properties);
+ p.setString(3, catalogName);
+ p.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public static void deleteByCatalogName(DataSource dataSource, String catalogName) {
+ execute(dataSource, DELETE_SQL, (p) -> {
+ try {
+ p.setString(1, catalogName);
+ p.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public static void execute(DataSource dataSource, String sql, Consumer consumer) {
+ Connection connection = null;
+ PreparedStatement preparedStatement = null;
+ ResultSet resultSet = null;
+ try {
+ connection = dataSource.getConnection();
+ preparedStatement = connection.prepareStatement(sql);
+ consumer.accept(preparedStatement);
+ } catch (Exception e) {
+ LOG.error("执行动态目录数据库语句异常, 原因: %s, %s", e.getMessage(), e);
+ throw new RuntimeException("执行动态目录数据库语句异常, 原因: " + e.getMessage());
+ } finally {
+ closeConn(resultSet, preparedStatement, connection);
+ }
+ }
+
+ private static void closeConn(ResultSet resultSet, Statement statement, Connection connection) {
+ try {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/HetuDbFileSystemClient.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/HetuDbFileSystemClient.java
new file mode 100644
index 000000000..0e1ff0105
--- /dev/null
+++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/db/HetuDbFileSystemClient.java
@@ -0,0 +1,445 @@
+/*
+ * Copyright (C) 2018-2021. Huawei Technologies Co., Ltd. All rights reserved.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.filesystem.db;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import io.airlift.log.Logger;
+import io.hetu.core.filesystem.AbstractWorkspaceFileSystemClient;
+import io.prestosql.spi.filesystem.SupportedFileAttributes;
+
+import javax.sql.DataSource;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.file.*;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.nio.file.Files.getFileStore;
+import static java.nio.file.Files.newDirectoryStream;
+
+/**
+ * 动态目录数据库客户端
+ */
+public class HetuDbFileSystemClient extends AbstractWorkspaceFileSystemClient {
+
+ private static final Logger LOG = Logger.get(HetuDbFileSystemClient.class);
+
+ private DataSource dataSource;
+
+ private List catalogBaseDir = new ArrayList<>();
+
+ public HetuDbFileSystemClient(DbConfig config, Path allowAccessRoot) {
+ super(allowAccessRoot);
+ try {
+ dataSource = DruidDataSourceFactory.createDataSource(config.getDbProperties());
+ } catch (Exception e) {
+ LOG.error("获取动态目录数据库配置异常, 原因: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("获取数据库动态目录配置异常, 原因: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public Path createDirectories(Path dir)
+ throws IOException {
+ validate(dir);
+ return Files.createDirectories(dir);
+ }
+
+ @Override
+ public Path createDirectory(Path dir)
+ throws IOException {
+ validate(dir);
+ return Files.createDirectory(dir);
+ }
+
+ /**
+ * Delete a given file or directory. If the given path is a directory it must be empty.
+ *
+ * @param path Path to delete.
+ * @throws IOException Other exceptions.
+ */
+ @Override
+ public void delete(Path path)
+ throws IOException {
+ validate(path);
+ Files.delete(path);
+ }
+
+ /**
+ * Delete a given file or directory. If the given path is a directory it must be empty.
+ * Return the result of deletion.
+ *
+ * @param path Path to delete.
+ * @return Whether the deletion is successful. If the file does not exist, return {@code false}.
+ * @throws IOException Other exceptions.
+ */
+ public boolean deleteIfExistsNoDb(Path path) throws IOException {
+ validate(path);
+ return Files.deleteIfExists(path);
+ }
+
+ public boolean deleteRecursivelyNoDb(Path path)
+ throws FileSystemException {
+ validate(path);
+ if (!exists(path)) {
+ return false;
+ }
+ Collection exceptions = new LinkedList<>();
+ deleteRecursivelyCore(path, exceptions);
+ if (!exceptions.isEmpty()) {
+ FileSystemException exceptionToThrow = new FileSystemException(path.toString(), null,
+ "Failed to delete one or more files. Please checked suppressed exceptions for details");
+ for (IOException ex : exceptions) {
+ exceptionToThrow.addSuppressed(ex);
+ }
+ throw exceptionToThrow;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteIfExists(Path path)
+ throws IOException {
+ // 删除连接器
+ deleteDbCatalog(path);
+ // localfile正常流程
+ validate(path);
+ return Files.deleteIfExists(path);
+ }
+
+ @Override
+ public boolean deleteRecursively(Path path)
+ throws FileSystemException {
+ // 删除连接器
+ deleteDbCatalog(path);
+ // localfile正常流程
+ validate(path);
+ if (!exists(path)) {
+ return false;
+ }
+ Collection exceptions = new LinkedList<>();
+ deleteRecursivelyCore(path, exceptions);
+ if (!exceptions.isEmpty()) {
+ FileSystemException exceptionToThrow = new FileSystemException(path.toString(), null,
+ "Failed to delete one or more files. Please checked suppressed exceptions for details");
+ for (IOException ex : exceptions) {
+ exceptionToThrow.addSuppressed(ex);
+ }
+ throw exceptionToThrow;
+ }
+ return true;
+ }
+
+ private void deleteRecursivelyCore(Path path, Collection exceptions) {
+ if (!exists(path)) {
+ exceptions.add(new FileNotFoundException(path.toString()));
+ return;
+ }
+ if (!Files.isDirectory(path)) {
+ try {
+ delete(path);
+ } catch (IOException ex) {
+ exceptions.add(ex);
+ }
+ } else {
+ try (Stream children = list(path)) {
+ if (children != null) {
+ children.forEach(child -> deleteRecursivelyCore(child, exceptions));
+ }
+ delete(path);
+ } catch (IOException ex) {
+ exceptions.add(ex);
+ }
+ }
+ }
+
+ @Override
+ public boolean exists(Path path) {
+ return Files.exists(path);
+ }
+
+ @Override
+ public void move(Path source, Path target)
+ throws IOException {
+ validate(source);
+ validate(target);
+ Files.move(source, target);
+ }
+
+ @Override
+ public InputStream newInputStream(Path path)
+ throws IOException {
+ // Need inline check to pass security check
+ validate(path);
+ return Files.newInputStream(path);
+ }
+
+ @Override
+ public OutputStream newOutputStream(Path path, OpenOption... options)
+ throws IOException {
+ // Need inline check to pass security check
+ validate(path);
+ return new DbFileOutputStream(dataSource, catalogBaseDir, path, Files.newOutputStream(path, options));
+ }
+
+ @Override
+ public Object getAttribute(Path path, String attribute)
+ throws IOException {
+ validate(path);
+ if (!SupportedFileAttributes.SUPPORTED_ATTRIBUTES.contains(attribute)) {
+ throw new IllegalArgumentException(
+ String.format(Locale.ROOT, "Attribute [%s] is not supported.", attribute));
+ }
+ // Get time in millis instead of date time format
+ if (attribute.equalsIgnoreCase(SupportedFileAttributes.LAST_MODIFIED_TIME)) {
+ return Files.getLastModifiedTime(path).toMillis();
+ }
+ return Files.getAttribute(path, attribute);
+ }
+
+ @Override
+ public boolean isDirectory(Path path) {
+ return Files.isDirectory(path);
+ }
+
+ @Override
+ public Stream list(Path dir)
+ throws IOException {
+ // 同步数据库连接器
+ syncDbCatalog(dir);
+ // localfile正常流程
+ validate(dir);
+ return Files.list(dir);
+ }
+
+ @Override
+ public Stream walk(Path dir)
+ throws IOException {
+ validate(dir);
+ return Files.walk(dir);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public long getTotalSpace(Path path) throws IOException {
+ FileStore fileStore = getFileStore(path);
+ return fileStore.getTotalSpace();
+ }
+
+ @Override
+ public long getUsableSpace(Path path) throws IOException {
+ FileStore fileStore = getFileStore(path);
+ return fileStore.getUsableSpace();
+ }
+
+ @Override
+ public Path createTemporaryFile(Path path, String prefix, String suffix) throws IOException {
+ return Files.createTempFile(path, prefix, suffix);
+ }
+
+ @Override
+ public Path createFile(Path path) throws IOException {
+ return Files.createFile(path);
+ }
+
+ @Override
+ public Stream getDirectoryStream(Path path, String prefix, String suffix) throws IOException {
+ String glob = prefix + "*" + suffix;
+ return StreamSupport.stream(newDirectoryStream(path, glob).spliterator(), false);
+ }
+
+ /**
+ * 同步数据库连接器
+ *
+ * @param dir
+ * @throws Exception
+ */
+ private void syncDbCatalog(Path dir) throws IOException {
+ if (dir.toFile().getName().equals("catalog")) {
+ String absolutePath = dir.toFile().getAbsolutePath();
+ // 保存动态目录的根目录
+ if (!catalogBaseDir.contains(absolutePath)) {
+ catalogBaseDir.add(absolutePath);
+ }
+ // 获取数据源
+ Map dbCatalogs = DbUtils.selectAll(dataSource).stream().collect(Collectors.toMap(DbCatalog::getCatalogName, o -> o));
+ // 获取本地文件
+ Map localCatalogs = getLocalCatalog(dir);
+ // 删除本地文件
+ for (String localKey : localCatalogs.keySet()) {
+ DbCatalog dbCatalog = dbCatalogs.get(localKey);
+ DbCatalog localCatalog = localCatalogs.get(localKey);
+ if (dbCatalog == null) {
+ if (localCatalog.getPropertiesPath() != null) {
+ deleteIfExistsNoDb(localCatalog.getPropertiesPath().toAbsolutePath());
+ }
+ if (localCatalog.getMetadataDirPath() != null) {
+ deleteRecursivelyNoDb(localCatalog.getMetadataDirPath().toAbsolutePath());
+ }
+ }
+ }
+ // 增加本地文件
+ saveLocalFile(absolutePath, dbCatalogs, localCatalogs);
+ }
+ }
+
+ /**
+ * 获取本地配置
+ *
+ * @param dir
+ * @return
+ * @throws IOException
+ */
+ private Map getLocalCatalog(Path dir) throws IOException {
+ Map localCatalogs = new HashMap<>();
+ List propertiesPath = Files.list(dir).map(Path::getFileName).collect(Collectors.toList());
+ if (!propertiesPath.isEmpty()) {
+ for (Path path : propertiesPath) {
+ String fileName = path.toString();
+ if (fileName.endsWith(".properties")) {
+ String catalogName = fileName.substring(0, fileName.lastIndexOf(".properties"));
+ Path propertiesAPath = Paths.get(dir.toString(), path.toString()).toAbsolutePath();
+ localCatalogs.put(catalogName, new DbCatalog(catalogName, null, null, readString(Files.newInputStream(propertiesAPath)), propertiesAPath));
+ }
+ }
+ }
+ List metadataDirPath = Files.list(dir).map(Path::toFile).filter(File::isDirectory).collect(Collectors.toList());
+ if (!metadataDirPath.isEmpty()) {
+ for (File path : metadataDirPath) {
+ File metadataPath = Files.list(path.toPath()).map(Path::toFile).filter(o -> o.getName().endsWith(".metadata")).findFirst().orElse(null);
+ if (metadataPath != null) {
+ // 获取metadata文件加metadata目录
+ String fileName = metadataPath.getName();
+ DbCatalog dbCatalog = localCatalogs.get(fileName.substring(0, fileName.lastIndexOf(".metadata")));
+ if (dbCatalog != null) {
+ dbCatalog.setMetadata(readString(Files.newInputStream(Paths.get(metadataPath.getAbsolutePath()).toAbsolutePath())));
+ dbCatalog.setMetadataDirPath(path.toPath());
+ }
+ } else {
+ // 获取metadata目录
+ DbCatalog dbCatalog = localCatalogs.get(path.getName());
+ if (dbCatalog != null) {
+ dbCatalog.setMetadataDirPath(path.toPath());
+ }
+ }
+ }
+ }
+ return localCatalogs;
+ }
+
+ /**
+ * 保存本地文件
+ *
+ * @param absolutePath
+ * @param dbCatalogs
+ * @param localCatalogs
+ * @throws IOException
+ */
+ private void saveLocalFile(String absolutePath, Map dbCatalogs, Map localCatalogs) throws IOException {
+ // 增加本地文件
+ for (String dbKey : dbCatalogs.keySet()) {
+ DbCatalog dbCatalog = dbCatalogs.get(dbKey);
+ DbCatalog localCatalog = localCatalogs.get(dbKey);
+ if (localCatalog != null) {
+ if (!dbCatalog.getMetadata().equals(localCatalog.getMetadata())
+ || !dbCatalog.getProperties().equals(localCatalog.getProperties())) {
+ if (localCatalog.getPropertiesPath() != null) {
+ deleteIfExistsNoDb(localCatalog.getPropertiesPath().toAbsolutePath());
+ }
+ if (localCatalog.getMetadataDirPath() != null) {
+ deleteRecursivelyNoDb(localCatalog.getMetadataDirPath().toAbsolutePath());
+ }
+ }
+ }
+ // 创建properties
+ Path catalogPropertiesPath = Paths.get(absolutePath, dbCatalog.getCatalogName() + ".properties");
+ Properties catalogProperties = new Properties();
+ catalogProperties.load(new StringReader(dbCatalog.getProperties()));
+ catalogProperties.store(Files.newOutputStream(catalogPropertiesPath), "创建连接器[" + dbCatalog.getCatalogName() + "]的properties文件");
+ // 创建目录
+ Path catalogDir = Paths.get(absolutePath, dbCatalog.getCatalogName());
+ if (!Files.exists(catalogDir)) {
+ Files.createDirectory(catalogDir);
+ }
+ // 创建metadata
+ Path metadataPropertiesPath = Paths.get(absolutePath, dbCatalog.getCatalogName(), dbCatalog.getCatalogName() + ".metadata");
+ Properties metadataProperties = new Properties();
+ metadataProperties.load(new StringReader(dbCatalog.getMetadata()));
+ metadataProperties.store(Files.newOutputStream(metadataPropertiesPath), "创建连接器[" + dbCatalog.getCatalogName() + "]的metadata文件");
+ }
+ }
+
+ /**
+ * 删除数据库连接器
+ *
+ * @param path
+ * @throws Exception
+ */
+ private void deleteDbCatalog(Path path) {
+ // 添加/更新连接器
+ String absolutePath = path.toFile().getParentFile().getAbsolutePath();
+ if (!catalogBaseDir.isEmpty() && catalogBaseDir.stream().filter(absolutePath::contains).count() > 0) {
+ String fileName = path.toFile().getName();
+ String catalogName = null;
+ if (fileName.endsWith(".properties")) {
+ catalogName = fileName.substring(0, fileName.lastIndexOf(".properties"));
+ }
+ if (fileName.endsWith(".metadata")) {
+ catalogName = fileName.substring(0, fileName.lastIndexOf(".metadata"));
+ }
+ if (catalogName != null) {
+ DbCatalog dbCatalog = DbUtils.selectOne(dataSource, catalogName);
+ if (dbCatalog != null) {
+ DbUtils.deleteByCatalogName(dataSource, dbCatalog.getCatalogName());
+ }
+ }
+ }
+ }
+
+ /**
+ * 读取文件内容字符串
+ *
+ * @param stream
+ * @return
+ */
+ public static String readString(InputStream stream) {
+ StringBuilder sb = new StringBuilder();
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(stream, Charset.defaultCharset()))) {
+ String s = null;
+ while ((s = br.readLine()) != null) {
+ sb.append(s);
+ sb.append("\r\n");
+ }
+ br.close();
+ return sb.toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ } finally {
+ if (stream != null) {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}