From bf77e7707148bfbc8dd675c65238880366adbc64 Mon Sep 17 00:00:00 2001 From: gaoxq <376340421@qq.com> Date: Mon, 25 Aug 2025 22:57:22 +0800 Subject: [PATCH] =?UTF-8?q?API=E6=95=B0=E6=8D=AE=E8=A1=A8=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/mini/capi/config/springBean.java | 26 ++++ .../java/com/mini/capi/job/taskEnable.java | 130 ++++++++++-------- 2 files changed, 100 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/mini/capi/config/springBean.java diff --git a/src/main/java/com/mini/capi/config/springBean.java b/src/main/java/com/mini/capi/config/springBean.java new file mode 100644 index 0000000..e17efa5 --- /dev/null +++ b/src/main/java/com/mini/capi/config/springBean.java @@ -0,0 +1,26 @@ +package com.mini.capi.config; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class springBean { + + @Bean("hostExecutor") + public Executor hostExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + int core = Runtime.getRuntime().availableProcessors(); + executor.setCorePoolSize(core * 2); + executor.setMaxPoolSize(core * 4); + executor.setQueueCapacity(200); + executor.setThreadNamePrefix("host-disk-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/com/mini/capi/job/taskEnable.java b/src/main/java/com/mini/capi/job/taskEnable.java index ae91857..c233904 100644 --- a/src/main/java/com/mini/capi/job/taskEnable.java +++ b/src/main/java/com/mini/capi/job/taskEnable.java @@ -9,11 +9,14 @@ import com.mini.capi.utils.vDate; import com.mini.capi.utils.vId; import com.mini.capi.utils.vToken; import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -38,68 +41,83 @@ public class taskEnable { private SysHostService sysHostService; + @Resource + private Executor hostExecutor; + + + /** + * 获取容器主机的磁盘使用情况 + */ @GetMapping("/getTaskDockerDiskInfo") public ApiResult jobHostDisk(String token) { if (vToken.isValidToken(token)) { List dockerHosts = dockerHostService.list(); - for (DockerHost host : dockerHosts) { - SshUser sshUser = sshUserService.getById(host.getUserId()); - SshInfo sshInfo = sshInfoService.getById(host.getHostId()); - try { - /* 1. 采集实时数据 */ - HostInfo.Result r = HostInfo.collect( - sshInfo.getHostIp(), - Integer.parseInt(sshInfo.getHostPort()), - sshUser.getCUsername(), - sshUser.getCPassword()); - /* 2. 主机维度 saveOrUpdate */ - SysHost sysHost = r.host; - sysHost.setSysHostId(host.getHostId()); - sysHost.setUpdateTime(vDate.getNow()); - sysHost.setDokerHostId(host.getDokerHostId()); - sysHostService.saveOrUpdate(sysHost); - - /* 3. 处理磁盘:先查库做索引,再比对 */ - List dbDisks = diskMountService.lambdaQuery() - .eq(DiskMount::getSysHostId, host.getHostId()) - .list(); - Map dbDiskMap = dbDisks.stream() - .collect(Collectors.toMap(DiskMount::getMountPoint, Function.identity())); - - List toSaveOrUpdate = new ArrayList<>(); - Set liveMountPoint = new HashSet<>(); - - for (DiskMount d : r.disks) { - liveMountPoint.add(d.getMountPoint()); - DiskMount exist = dbDiskMap.get(d.getMountPoint()); - if (exist != null) { - // 存在 -> 更新 - d.setDiskMountId(exist.getDiskMountId()); - } else { - // 不存在 -> 新增 - d.setDiskMountId(vId.getUid()); - } - d.setSysHostId(host.getHostId()); - d.setUpdateTime(vDate.getNow()); - toSaveOrUpdate.add(d); - } - /* 4. 批量保存/更新 */ - diskMountService.saveOrUpdateBatch(toSaveOrUpdate); - /* 5. 删除实时已消失的盘 */ - List delIds = dbDisks.stream() - .filter(d -> !liveMountPoint.contains(d.getMountPoint())) - .map(DiskMount::getDiskMountId) - .collect(Collectors.toList()); - if (!delIds.isEmpty()) { - diskMountService.removeByIds(delIds); - } - } catch (Exception e) { - System.out.println(e.getMessage()); - } - } - return ApiResult.success(); + List errorList = Collections.synchronizedList(new ArrayList<>()); + // 并行处理所有宿主机 + CompletableFuture[] futures = dockerHosts.stream() + .map(host -> CompletableFuture.runAsync(() -> handleSingleHost(host, errorList), hostExecutor)) + .toArray(CompletableFuture[]::new); + // 等待全部完成 + CompletableFuture.allOf(futures).join(); + return errorList.isEmpty() + ? ApiResult.success() + : ApiResult.error(); } return ApiResult.error(); } + + private void handleSingleHost(DockerHost host, List errorList) { + try { + SshUser sshUser = sshUserService.getById(host.getUserId()); + SshInfo sshInfo = sshInfoService.getById(host.getHostId()); + /* 1. 采集实时数据 */ + HostInfo.Result r = HostInfo.collect( + sshInfo.getHostIp(), + Integer.parseInt(sshInfo.getHostPort()), + sshUser.getCUsername(), + sshUser.getCPassword()); + /* 2. 主机维度 saveOrUpdate */ + SysHost sysHost = r.host; + sysHost.setSysHostId(host.getHostId()); + sysHost.setUpdateTime(vDate.getNow()); + sysHost.setDokerHostId(host.getDokerHostId()); + sysHostService.saveOrUpdate(sysHost); + /* 3. 处理磁盘:先查库做索引,再比对 */ + List dbDisks = diskMountService.lambdaQuery() + .eq(DiskMount::getSysHostId, host.getHostId()) + .list(); + Map dbDiskMap = dbDisks.stream() + .collect(Collectors.toMap(DiskMount::getMountPoint, Function.identity())); + + List toSaveOrUpdate = new ArrayList<>(); + Set liveMountPoint = new HashSet<>(); + for (DiskMount d : r.disks) { + liveMountPoint.add(d.getMountPoint()); + DiskMount exist = dbDiskMap.get(d.getMountPoint()); + if (exist != null) { + d.setDiskMountId(exist.getDiskMountId()); + } else { + d.setDiskMountId(vId.getUid()); + } + d.setSysHostId(host.getHostId()); + d.setUpdateTime(vDate.getNow()); + toSaveOrUpdate.add(d); + } + /* 4. 批量保存/更新 */ + diskMountService.saveOrUpdateBatch(toSaveOrUpdate); + /* 5. 删除实时已消失的盘 */ + List delIds = dbDisks.stream() + .filter(d -> !liveMountPoint.contains(d.getMountPoint())) + .map(DiskMount::getDiskMountId) + .collect(Collectors.toList()); + if (!delIds.isEmpty()) { + diskMountService.removeByIds(delIds); + } + } catch (Exception e) { + // 仅记录异常,不中断其它任务 + errorList.add(String.format("hostId=%s, error=%s", host.getHostId(), e.getMessage())); + } + } + }