新增MySQL和pg数据库的同步
This commit is contained in:
@@ -192,7 +192,6 @@ public class taskDbSync {
|
|||||||
private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) {
|
private String mapMySqlTypeToPgType(String mySqlType, int columnSize, int decimalDigits) {
|
||||||
// 统一转为大写处理,避免类型字符串大小写问题
|
// 统一转为大写处理,避免类型字符串大小写问题
|
||||||
String type = mySqlType.toUpperCase();
|
String type = mySqlType.toUpperCase();
|
||||||
|
|
||||||
return switch (type) {
|
return switch (type) {
|
||||||
// 整数类型映射
|
// 整数类型映射
|
||||||
case "INT", "INTEGER" -> "INTEGER";
|
case "INT", "INTEGER" -> "INTEGER";
|
||||||
@@ -200,7 +199,6 @@ public class taskDbSync {
|
|||||||
case "SMALLINT" -> "SMALLINT";
|
case "SMALLINT" -> "SMALLINT";
|
||||||
case "MEDIUMINT" -> "INTEGER"; // PostgresSQL无MEDIUMINT,用INTEGER兼容
|
case "MEDIUMINT" -> "INTEGER"; // PostgresSQL无MEDIUMINT,用INTEGER兼容
|
||||||
case "BIGINT" -> "BIGINT";
|
case "BIGINT" -> "BIGINT";
|
||||||
|
|
||||||
// 浮点类型映射
|
// 浮点类型映射
|
||||||
case "FLOAT" -> columnSize > 24 ? "DOUBLE PRECISION" : "REAL"; // FLOAT(24)以下映射为REAL
|
case "FLOAT" -> columnSize > 24 ? "DOUBLE PRECISION" : "REAL"; // FLOAT(24)以下映射为REAL
|
||||||
case "DOUBLE", "DOUBLE PRECISION" -> "DOUBLE PRECISION";
|
case "DOUBLE", "DOUBLE PRECISION" -> "DOUBLE PRECISION";
|
||||||
@@ -209,7 +207,6 @@ public class taskDbSync {
|
|||||||
int scale = Math.max(decimalDigits, 0);
|
int scale = Math.max(decimalDigits, 0);
|
||||||
yield "NUMERIC(" + precision + "," + scale + ")";
|
yield "NUMERIC(" + precision + "," + scale + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
// 字符串类型映射
|
// 字符串类型映射
|
||||||
case "VARCHAR" -> {
|
case "VARCHAR" -> {
|
||||||
// PostgresSQL VARCHAR无长度限制时建议用TEXT
|
// PostgresSQL VARCHAR无长度限制时建议用TEXT
|
||||||
@@ -219,30 +216,25 @@ public class taskDbSync {
|
|||||||
case "CHAR" -> "CHAR(" + (columnSize > 0 ? columnSize : 1) + ")";
|
case "CHAR" -> "CHAR(" + (columnSize > 0 ? columnSize : 1) + ")";
|
||||||
case "TEXT", "MEDIUMTEXT", "TINYTEXT" -> "TEXT";
|
case "TEXT", "MEDIUMTEXT", "TINYTEXT" -> "TEXT";
|
||||||
case "LONGTEXT" -> "TEXT"; // PostgresSQL TEXT无长度限制
|
case "LONGTEXT" -> "TEXT"; // PostgresSQL TEXT无长度限制
|
||||||
|
|
||||||
// 二进制类型映射
|
// 二进制类型映射
|
||||||
case "BLOB" -> "BYTEA";
|
case "BLOB" -> "BYTEA";
|
||||||
case "TINYBLOB", "MEDIUMBLOB", "LONGBLOB" -> "BYTEA";
|
case "TINYBLOB", "MEDIUMBLOB", "LONGBLOB" -> "BYTEA";
|
||||||
case "BINARY" -> "BYTEA";
|
case "BINARY" -> "BYTEA";
|
||||||
case "VARBINARY" -> "BYTEA";
|
case "VARBINARY" -> "BYTEA";
|
||||||
|
|
||||||
// 日期时间类型映射
|
// 日期时间类型映射
|
||||||
case "DATE" -> "DATE";
|
case "DATE" -> "DATE";
|
||||||
case "TIME" -> "TIME";
|
case "TIME" -> "TIME";
|
||||||
case "DATETIME", "TIMESTAMP" -> "TIMESTAMP";
|
case "DATETIME", "TIMESTAMP" -> "TIMESTAMP";
|
||||||
case "YEAR" -> "SMALLINT"; // YEAR用SMALLINT存储更高效
|
case "YEAR" -> "SMALLINT"; // YEAR用SMALLINT存储更高效
|
||||||
|
|
||||||
// 特殊类型映射
|
// 特殊类型映射
|
||||||
case "BOOLEAN" -> "BOOLEAN";
|
case "BOOLEAN" -> "BOOLEAN";
|
||||||
case "JSON", "JSONB" -> "JSONB"; // PostgresSQL推荐用JSONB
|
case "JSON", "JSONB" -> "JSONB"; // PostgresSQL推荐用JSONB
|
||||||
case "ENUM" -> "VARCHAR(255)"; // ENUM转为字符串存储,需业务层保证合法性
|
case "ENUM" -> "VARCHAR(255)"; // ENUM转为字符串存储,需业务层保证合法性
|
||||||
case "SET" -> "TEXT"; // SET用TEXT存储,逗号分隔
|
case "SET" -> "TEXT"; // SET用TEXT存储,逗号分隔
|
||||||
|
|
||||||
// 几何类型(简化映射)
|
// 几何类型(简化映射)
|
||||||
case "POINT" -> "POINT";
|
case "POINT" -> "POINT";
|
||||||
case "LINESTRING" -> "LINESTRING";
|
case "LINESTRING" -> "LINESTRING";
|
||||||
case "POLYGON" -> "POLYGON";
|
case "POLYGON" -> "POLYGON";
|
||||||
|
|
||||||
// 未匹配类型的默认处理
|
// 未匹配类型的默认处理
|
||||||
default -> {
|
default -> {
|
||||||
// 日志输出未匹配的类型,便于后续优化
|
// 日志输出未匹配的类型,便于后续优化
|
||||||
|
|||||||
@@ -46,7 +46,10 @@ public class taskEnable {
|
|||||||
*/
|
*/
|
||||||
@GetMapping("/getTaskDockerDiskInfo")
|
@GetMapping("/getTaskDockerDiskInfo")
|
||||||
public ApiResult<?> jobHostDisk(String token) {
|
public ApiResult<?> jobHostDisk(String token) {
|
||||||
if (vToken.isValidToken(token)) {
|
if (!vToken.isValidToken(token)) {
|
||||||
|
return ApiResult.error(401, "无效的访问令牌");
|
||||||
|
}
|
||||||
|
try {
|
||||||
List<DockerHost> dockerHosts = dockerHostService.list();
|
List<DockerHost> dockerHosts = dockerHostService.list();
|
||||||
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
||||||
// 并行处理所有宿主机
|
// 并行处理所有宿主机
|
||||||
@@ -58,8 +61,9 @@ public class taskEnable {
|
|||||||
return errorList.isEmpty()
|
return errorList.isEmpty()
|
||||||
? ApiResult.success()
|
? ApiResult.success()
|
||||||
: ApiResult.error();
|
: ApiResult.error();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return ApiResult.error(101, e.getMessage());
|
||||||
}
|
}
|
||||||
return ApiResult.error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -255,7 +255,11 @@ public class sysController {
|
|||||||
|
|
||||||
@GetMapping("/getApiInfo")
|
@GetMapping("/getApiInfo")
|
||||||
public ApiResult<List<SnapshotDTO>> getApiInfo(String token) {
|
public ApiResult<List<SnapshotDTO>> getApiInfo(String token) {
|
||||||
if (vToken.isValidToken(token)) {
|
if (!vToken.isValidToken(token)) {
|
||||||
|
return ApiResult.error(401, "无效的访问令牌");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
// 1. 新建一个一次性 List
|
// 1. 新建一个一次性 List
|
||||||
List<HostRuntime.Snapshot> snapshots =
|
List<HostRuntime.Snapshot> snapshots =
|
||||||
Collections.synchronizedList(new LinkedList<>());
|
Collections.synchronizedList(new LinkedList<>());
|
||||||
@@ -267,8 +271,9 @@ public class sysController {
|
|||||||
((LinkedList<HostRuntime.Snapshot>) snapshots).removeFirst();
|
((LinkedList<HostRuntime.Snapshot>) snapshots).removeFirst();
|
||||||
}
|
}
|
||||||
return ApiResult.success(Collections.singletonList(SnapshotDTO.from(snap)));
|
return ApiResult.success(Collections.singletonList(SnapshotDTO.from(snap)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
return ApiResult.error(101, e.getMessage());
|
||||||
}
|
}
|
||||||
return ApiResult.error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -277,7 +282,10 @@ public class sysController {
|
|||||||
*/
|
*/
|
||||||
@GetMapping("/getApiDockerInfo")
|
@GetMapping("/getApiDockerInfo")
|
||||||
public ApiResult<?> getDockerInfo(String token) {
|
public ApiResult<?> getDockerInfo(String token) {
|
||||||
if (vToken.isValidToken(token)) {
|
if (!vToken.isValidToken(token)) {
|
||||||
|
return ApiResult.error(401, "无效的访问令牌");
|
||||||
|
}
|
||||||
|
try {
|
||||||
List<DockerHost> dockerHosts = dockerHostService.list();
|
List<DockerHost> dockerHosts = dockerHostService.list();
|
||||||
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
|
||||||
CompletableFuture<?>[] futures = dockerHosts.stream()
|
CompletableFuture<?>[] futures = dockerHosts.stream()
|
||||||
@@ -289,8 +297,9 @@ public class sysController {
|
|||||||
return errorList.isEmpty()
|
return errorList.isEmpty()
|
||||||
? ApiResult.success()
|
? ApiResult.success()
|
||||||
: ApiResult.error();
|
: ApiResult.error();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return ApiResult.error(101, e.getMessage());
|
||||||
}
|
}
|
||||||
return ApiResult.error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -299,29 +308,29 @@ public class sysController {
|
|||||||
*/
|
*/
|
||||||
@GetMapping("/getApiStartDockerInfo")
|
@GetMapping("/getApiStartDockerInfo")
|
||||||
public ApiResult<?> startDockerInfo(String id, String token) {
|
public ApiResult<?> startDockerInfo(String id, String token) {
|
||||||
if (vToken.isValidToken(token)) {
|
if (!vToken.isValidToken(token)) {
|
||||||
|
return ApiResult.error(401, "无效的访问令牌");
|
||||||
|
}
|
||||||
|
try {
|
||||||
DockerContainerInfo cur = dockerInfoService.getById(id);
|
DockerContainerInfo cur = dockerInfoService.getById(id);
|
||||||
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
||||||
try {
|
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
docker.startDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
||||||
docker.startDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
/* 2. 取回最新列表 */
|
||||||
/* 2. 取回最新列表 */
|
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
||||||
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
sshInfo.getHostIp(),
|
||||||
sshInfo.getHostIp(),
|
Long.valueOf(sshInfo.getHostPort()),
|
||||||
Long.valueOf(sshInfo.getHostPort()),
|
sshUser.getCUsername(),
|
||||||
sshUser.getCUsername(),
|
sshUser.getCPassword());
|
||||||
sshUser.getCPassword());
|
/* 3. 有则更新、无则插入、失效删除 */
|
||||||
/* 3. 有则更新、无则插入、失效删除 */
|
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
||||||
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
/* 4. 更新主机运行数 */
|
||||||
/* 4. 更新主机运行数 */
|
updateRunNum(host, remoteList);
|
||||||
updateRunNum(host, remoteList);
|
return ApiResult.success();
|
||||||
return ApiResult.success();
|
} catch (Exception e) {
|
||||||
} catch (Exception e) {
|
return ApiResult.error(101, e.getMessage());
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ApiResult.error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -330,29 +339,29 @@ public class sysController {
|
|||||||
*/
|
*/
|
||||||
@GetMapping("/getApiStopDockerInfo")
|
@GetMapping("/getApiStopDockerInfo")
|
||||||
public ApiResult<?> stopDockerInfo(String id, String token) {
|
public ApiResult<?> stopDockerInfo(String id, String token) {
|
||||||
if (vToken.isValidToken(token)) {
|
if (!vToken.isValidToken(token)) {
|
||||||
|
return ApiResult.error(401, "无效的访问令牌");
|
||||||
|
}
|
||||||
|
try {
|
||||||
DockerContainerInfo cur = dockerInfoService.getById(id);
|
DockerContainerInfo cur = dockerInfoService.getById(id);
|
||||||
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
DockerHost host = dockerHostService.getById(cur.getDokerHostId());
|
||||||
try {
|
SshUser sshUser = sshUserService.getById(host.getUserId());
|
||||||
SshUser sshUser = sshUserService.getById(host.getUserId());
|
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
||||||
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
|
docker.stopDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
||||||
docker.stopDocker(sshInfo.getHostIp(), Long.valueOf(sshInfo.getHostPort()), sshUser.getCUsername(), sshUser.getCPassword(), cur.getContainerId());
|
/* 2. 取回最新列表 */
|
||||||
/* 2. 取回最新列表 */
|
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
||||||
List<docker.DockerInfo> remoteList = docker.getDockerInfo(
|
sshInfo.getHostIp(),
|
||||||
sshInfo.getHostIp(),
|
Long.valueOf(sshInfo.getHostPort()),
|
||||||
Long.valueOf(sshInfo.getHostPort()),
|
sshUser.getCUsername(),
|
||||||
sshUser.getCUsername(),
|
sshUser.getCPassword());
|
||||||
sshUser.getCPassword());
|
/* 3. 有则更新、无则插入、失效删除 */
|
||||||
/* 3. 有则更新、无则插入、失效删除 */
|
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
||||||
refreshContainerTable(cur.getDokerHostId(), remoteList, sshInfo);
|
/* 4. 更新主机运行数 */
|
||||||
/* 4. 更新主机运行数 */
|
updateRunNum(host, remoteList);
|
||||||
updateRunNum(host, remoteList);
|
return ApiResult.success();
|
||||||
return ApiResult.success();
|
} catch (Exception e) {
|
||||||
} catch (Exception e) {
|
return ApiResult.error(101, e.getMessage());
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ApiResult.error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user