API数据表更新

This commit is contained in:
2025-08-25 22:57:22 +08:00
parent 1d44083cf1
commit bf77e77071
2 changed files with 100 additions and 56 deletions

View File

@@ -0,0 +1,26 @@
package com.mini.capi.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class springBean {
@Bean("hostExecutor")
public Executor hostExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int core = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(core * 2);
executor.setMaxPoolSize(core * 4);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("host-disk-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@@ -9,11 +9,14 @@ import com.mini.capi.utils.vDate;
import com.mini.capi.utils.vId;
import com.mini.capi.utils.vToken;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -38,68 +41,83 @@ public class taskEnable {
private SysHostService sysHostService;
@Resource
private Executor hostExecutor;
/**
* 获取容器主机的磁盘使用情况
*/
@GetMapping("/getTaskDockerDiskInfo")
public ApiResult<?> jobHostDisk(String token) {
if (vToken.isValidToken(token)) {
List<DockerHost> dockerHosts = dockerHostService.list();
for (DockerHost host : dockerHosts) {
SshUser sshUser = sshUserService.getById(host.getUserId());
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
try {
/* 1. 采集实时数据 */
HostInfo.Result r = HostInfo.collect(
sshInfo.getHostIp(),
Integer.parseInt(sshInfo.getHostPort()),
sshUser.getCUsername(),
sshUser.getCPassword());
/* 2. 主机维度 saveOrUpdate */
SysHost sysHost = r.host;
sysHost.setSysHostId(host.getHostId());
sysHost.setUpdateTime(vDate.getNow());
sysHost.setDokerHostId(host.getDokerHostId());
sysHostService.saveOrUpdate(sysHost);
/* 3. 处理磁盘:先查库做索引,再比对 */
List<DiskMount> dbDisks = diskMountService.lambdaQuery()
.eq(DiskMount::getSysHostId, host.getHostId())
.list();
Map<String, DiskMount> dbDiskMap = dbDisks.stream()
.collect(Collectors.toMap(DiskMount::getMountPoint, Function.identity()));
List<DiskMount> toSaveOrUpdate = new ArrayList<>();
Set<String> liveMountPoint = new HashSet<>();
for (DiskMount d : r.disks) {
liveMountPoint.add(d.getMountPoint());
DiskMount exist = dbDiskMap.get(d.getMountPoint());
if (exist != null) {
// 存在 -> 更新
d.setDiskMountId(exist.getDiskMountId());
} else {
// 不存在 -> 新增
d.setDiskMountId(vId.getUid());
}
d.setSysHostId(host.getHostId());
d.setUpdateTime(vDate.getNow());
toSaveOrUpdate.add(d);
}
/* 4. 批量保存/更新 */
diskMountService.saveOrUpdateBatch(toSaveOrUpdate);
/* 5. 删除实时已消失的盘 */
List<String> delIds = dbDisks.stream()
.filter(d -> !liveMountPoint.contains(d.getMountPoint()))
.map(DiskMount::getDiskMountId)
.collect(Collectors.toList());
if (!delIds.isEmpty()) {
diskMountService.removeByIds(delIds);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
return ApiResult.success();
List<String> errorList = Collections.synchronizedList(new ArrayList<>());
// 并行处理所有宿主机
CompletableFuture<?>[] futures = dockerHosts.stream()
.map(host -> CompletableFuture.runAsync(() -> handleSingleHost(host, errorList), hostExecutor))
.toArray(CompletableFuture[]::new);
// 等待全部完成
CompletableFuture.allOf(futures).join();
return errorList.isEmpty()
? ApiResult.success()
: ApiResult.error();
}
return ApiResult.error();
}
private void handleSingleHost(DockerHost host, List<String> errorList) {
try {
SshUser sshUser = sshUserService.getById(host.getUserId());
SshInfo sshInfo = sshInfoService.getById(host.getHostId());
/* 1. 采集实时数据 */
HostInfo.Result r = HostInfo.collect(
sshInfo.getHostIp(),
Integer.parseInt(sshInfo.getHostPort()),
sshUser.getCUsername(),
sshUser.getCPassword());
/* 2. 主机维度 saveOrUpdate */
SysHost sysHost = r.host;
sysHost.setSysHostId(host.getHostId());
sysHost.setUpdateTime(vDate.getNow());
sysHost.setDokerHostId(host.getDokerHostId());
sysHostService.saveOrUpdate(sysHost);
/* 3. 处理磁盘:先查库做索引,再比对 */
List<DiskMount> dbDisks = diskMountService.lambdaQuery()
.eq(DiskMount::getSysHostId, host.getHostId())
.list();
Map<String, DiskMount> dbDiskMap = dbDisks.stream()
.collect(Collectors.toMap(DiskMount::getMountPoint, Function.identity()));
List<DiskMount> toSaveOrUpdate = new ArrayList<>();
Set<String> liveMountPoint = new HashSet<>();
for (DiskMount d : r.disks) {
liveMountPoint.add(d.getMountPoint());
DiskMount exist = dbDiskMap.get(d.getMountPoint());
if (exist != null) {
d.setDiskMountId(exist.getDiskMountId());
} else {
d.setDiskMountId(vId.getUid());
}
d.setSysHostId(host.getHostId());
d.setUpdateTime(vDate.getNow());
toSaveOrUpdate.add(d);
}
/* 4. 批量保存/更新 */
diskMountService.saveOrUpdateBatch(toSaveOrUpdate);
/* 5. 删除实时已消失的盘 */
List<String> delIds = dbDisks.stream()
.filter(d -> !liveMountPoint.contains(d.getMountPoint()))
.map(DiskMount::getDiskMountId)
.collect(Collectors.toList());
if (!delIds.isEmpty()) {
diskMountService.removeByIds(delIds);
}
} catch (Exception e) {
// 仅记录异常,不中断其它任务
errorList.add(String.format("hostId=%s, error=%s", host.getHostId(), e.getMessage()));
}
}
}