🔨 查询上传任务.

This commit is contained in:
lijiahang
2024-05-10 13:21:08 +08:00
parent cd312ef5c8
commit 564e40a31d
10 changed files with 210 additions and 60 deletions

View File

@@ -40,10 +40,7 @@ import java.util.List;
@SuppressWarnings({"ELValidationInJSP", "SpringElInspection"})
public class UploadTaskController {
// todo create 返回 host, STATUS
// 修改状态元数据
// 上船前检查size, size不对则直接cancel
// cancel 需要设置子元素为 cancel
// TODO 测试空文件上传 0B 取消怎么那么慢 是不是删除也慢 异步cancel cancel 需要设置子元素为 cancel
@Resource
private UploadTaskService uploadTaskService;
@@ -79,7 +76,7 @@ public class UploadTaskController {
@Parameter(name = "id", description = "id", required = true)
@PreAuthorize("@ss.hasPermission('asset:upload-task:query')")
public UploadTaskVO getUploadTask(@RequestParam("id") Long id) {
return uploadTaskService.getUploadTaskById(id);
return uploadTaskService.getUploadTask(id);
}
@IgnoreLog(IgnoreLogMode.RET)
@@ -90,6 +87,15 @@ public class UploadTaskController {
return uploadTaskService.getUploadTaskPage(request);
}
@IgnoreLog(IgnoreLogMode.ALL)
@GetMapping("/status")
@Operation(summary = "查询上传状态")
@Parameter(name = "id", description = "id", required = true)
@PreAuthorize("@ss.hasPermission('asset:upload-task:query')")
public List<UploadTaskVO> getUploadTaskStatus(@RequestParam("idList") List<Long> idList, @RequestParam("queryFiles") Boolean queryFiles) {
return uploadTaskService.getUploadTaskStatus(idList, queryFiles);
}
@OperatorLog(UploadTaskOperatorType.DELETE)
@DeleteMapping("/delete")
@Operation(summary = "删除上传任务")

View File

@@ -28,6 +28,20 @@ public interface UploadTaskFileDAO extends IMapper<UploadTaskFileDO> {
return this.selectList(Conditions.eq(UploadTaskFileDO::getTaskId, taskId));
}
/**
* 通过 taskId 查询
*
* @param taskId taskId
* @return files
*/
default List<UploadTaskFileDO> selectByTaskId(Long taskId, String status) {
// 条件
LambdaQueryWrapper<UploadTaskFileDO> wrapper = this.wrapper()
.eq(UploadTaskFileDO::getTaskId, taskId)
.eq(UploadTaskFileDO::getStatus, status);
return this.selectList(wrapper);
}
/**
* 通过 taskId hostId 更新状态
*

View File

@@ -7,6 +7,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* 上传任务 视图响应对象
@@ -30,4 +31,7 @@ public class UploadTaskCreateVO implements Serializable {
@Schema(description = "上传 token")
private String token;
@Schema(description = "主机")
private List<HostBaseVO> hosts;
}

View File

@@ -127,7 +127,7 @@ public class FileUploadTask implements IFileUploadTask {
*/
private void createFileUploader() {
// 查询文件
List<UploadTaskFileDO> uploadFiles = uploadTaskFileDAO.selectByTaskId(id);
List<UploadTaskFileDO> uploadFiles = uploadTaskFileDAO.selectByTaskId(id, UploadTaskFileStatusEnum.WAITING.name());
Map<Long, List<UploadTaskFileDO>> hostFileGroup = uploadFiles.stream()
.collect(Collectors.groupingBy(UploadTaskFileDO::getHostId));
hostFileGroup.forEach((k, v) -> {

View File

@@ -141,7 +141,7 @@ public class FileUploader implements IFileUploader {
int read;
while ((read = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, read);
// FIXME test
// todo test
file.setCurrent(file.getCurrent() + read);
}
outputStream.flush();

View File

@@ -34,7 +34,7 @@ public interface UploadTaskService {
* @param id id
* @return row
*/
UploadTaskVO getUploadTaskById(Long id);
UploadTaskVO getUploadTask(Long id);
/**
* 分页查询上传任务
@@ -44,6 +44,15 @@ public interface UploadTaskService {
*/
DataGrid<UploadTaskVO> getUploadTaskPage(UploadTaskQueryRequest request);
/**
* 获取上传任务状态
*
* @param idList idList
* @param queryFiles queryFiles
* @return rows
*/
List<UploadTaskVO> getUploadTaskStatus(List<Long> idList, Boolean queryFiles);
/**
* 获取上传任务数量
*

View File

@@ -6,11 +6,13 @@ import com.orion.lang.define.wrapper.DataGrid;
import com.orion.lang.utils.Arrays1;
import com.orion.lang.utils.Booleans;
import com.orion.lang.utils.Strings;
import com.orion.lang.utils.Threads;
import com.orion.lang.utils.collect.Lists;
import com.orion.lang.utils.collect.Maps;
import com.orion.lang.utils.io.Files1;
import com.orion.lang.utils.time.Dates;
import com.orion.ops.framework.biz.operator.log.core.utils.OperatorLogs;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.constant.ErrorMessage;
import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.framework.common.security.LoginUser;
@@ -19,6 +21,7 @@ import com.orion.ops.framework.mybatis.core.query.Conditions;
import com.orion.ops.framework.security.core.utils.SecurityUtils;
import com.orion.ops.module.asset.convert.HostConvert;
import com.orion.ops.module.asset.convert.UploadTaskConvert;
import com.orion.ops.module.asset.convert.UploadTaskFileConvert;
import com.orion.ops.module.asset.dao.HostDAO;
import com.orion.ops.module.asset.dao.UploadTaskDAO;
import com.orion.ops.module.asset.dao.UploadTaskFileDAO;
@@ -50,6 +53,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.File;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -65,8 +69,6 @@ import java.util.stream.Collectors;
@Service
public class UploadTaskServiceImpl implements UploadTaskService {
// TODO 测试空文件上传 0B
private static final String DEFAULT_DESC = "上传文件 {}";
@Resource
@@ -145,48 +147,19 @@ public class UploadTaskServiceImpl implements UploadTaskService {
return UploadTaskCreateVO.builder()
.id(id)
.token(token)
.hosts(hosts)
.build();
}
@Override
public UploadTaskVO getUploadTaskById(Long id) {
public UploadTaskVO getUploadTask(Long id) {
// 查询任务
UploadTaskDO record = uploadTaskDAO.selectById(id);
Valid.notNull(record, ErrorMessage.DATA_ABSENT);
// 查询任务文件
List<UploadTaskFileVO> files = uploadTaskFileService.getFileByTaskId(id);
// 获取上传任务进度
IFileUploadTask task = fileUploadTaskManager.getTask(id);
Map<Long, FileUploadFileItemDTO> fileItemMap;
if (task == null) {
fileItemMap = Maps.empty();
} else {
fileItemMap = task.getUploaderList()
.stream()
.map(IFileUploader::getFiles)
.flatMap(Collection::stream)
.collect(Collectors.toMap(FileUploadFileItemDTO::getId, Function.identity()));
}
// 设置进度
for (UploadTaskFileVO file : files) {
String status = file.getStatus();
if (UploadTaskFileStatusEnum.WAITING.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.UPLOADING.name().equals(status)) {
// 上传中获取当前值
Long current = Optional.ofNullable(file.getId())
.map(fileItemMap::get)
.map(FileUploadFileItemDTO::getCurrent)
.orElse(file.getFileSize());
file.setCurrent(current);
} else if (UploadTaskFileStatusEnum.FINISHED.name().equals(status)) {
file.setCurrent(file.getFileSize());
} else if (UploadTaskFileStatusEnum.FAILED.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.CANCELED.name().equals(status)) {
file.setCurrent(0L);
}
}
// 计算传输进度
this.computeUploadProgress(id, files);
// 返回
UploadTaskVO uploadTask = UploadTaskConvert.MAPPER.to(record);
uploadTask.setFiles(files);
@@ -203,6 +176,44 @@ public class UploadTaskServiceImpl implements UploadTaskService {
.dataGrid(UploadTaskConvert.MAPPER::to);
}
@Override
public List<UploadTaskVO> getUploadTaskStatus(List<Long> idList, Boolean queryFiles) {
// 查询任务
List<UploadTaskVO> tasks = uploadTaskDAO.of()
.createWrapper()
.select(UploadTaskDO::getId, UploadTaskDO::getStatus,
UploadTaskDO::getStartTime, UploadTaskDO::getEndTime)
.in(UploadTaskDO::getId, idList)
.then()
.list(UploadTaskConvert.MAPPER::to);
if (!Booleans.isTrue(queryFiles)) {
return tasks;
}
// 查询任务文件
Map<Long, List<UploadTaskFileVO>> filesMap = uploadTaskFileDAO.of()
.createWrapper()
.select(UploadTaskFileDO::getId, UploadTaskFileDO::getTaskId,
UploadTaskFileDO::getHostId, UploadTaskFileDO::getStatus,
UploadTaskFileDO::getStartTime, UploadTaskFileDO::getEndTime)
.in(UploadTaskFileDO::getTaskId, idList)
.then()
.stream()
.map(UploadTaskFileConvert.MAPPER::to)
.collect(Collectors.groupingBy(UploadTaskFileVO::getTaskId));
for (UploadTaskVO task : tasks) {
Long id = task.getId();
List<UploadTaskFileVO> files = filesMap.get(id);
if (files == null) {
files = Lists.empty();
} else {
// 计算进度
this.computeUploadProgress(id, files);
}
task.setFiles(files);
}
return tasks;
}
@Override
public Long getUploadTaskCount(UploadTaskQueryRequest request) {
return uploadTaskDAO.selectCount(this.buildQueryWrapper(request));
@@ -249,8 +260,10 @@ public class UploadTaskServiceImpl implements UploadTaskService {
// 查询任务
UploadTaskDO record = uploadTaskDAO.selectById(id);
Valid.notNull(record, ErrorMessage.TASK_ABSENT);
// 检查状态
// 检查任务状态
Valid.eq(record.getStatus(), UploadTaskStatusEnum.WAITING.name(), ErrorMessage.ILLEGAL_STATUS);
// 检查文件完整性
this.checkFileCompleteness(id);
// 执行任务
FileUploadTasks.start(id);
}
@@ -277,6 +290,9 @@ public class UploadTaskServiceImpl implements UploadTaskService {
@Override
public void clearUploadSwapFiles(List<Long> idList) {
if (Lists.isEmpty(idList)) {
return;
}
// 查询记录
List<String> paths = idList.stream()
.map(s -> Strings.format(SWAP_ENDPOINT, s))
@@ -320,6 +336,33 @@ public class UploadTaskServiceImpl implements UploadTaskService {
}
}
/**
* 检查文件完整性
*
* @param id id
*/
private void checkFileCompleteness(Long id) {
List<Long> cancelIdList = new ArrayList<>();
// 查询任务文件
List<UploadTaskFileDO> files = uploadTaskFileDAO.selectByTaskId(id);
Map<String, List<UploadTaskFileDO>> fileIdGroups = files.stream()
.collect(Collectors.groupingBy(UploadTaskFileDO::getFileId));
fileIdGroups.forEach((k, v) -> {
// 获取文件实际路径
String path = localFileClient.getReturnPath(Strings.format(SWAP_ENDPOINT, id) + Const.SLASH + k);
File file = new File(localFileClient.getAbsolutePath(path));
// 文件不存在/大小不正确
if (!Files1.isFile(file) || file.length() != v.get(0).getFileSize()) {
cancelIdList.addAll(Lists.map(v, UploadTaskFileDO::getId));
}
});
if (cancelIdList.isEmpty()) {
return;
}
// 修改任务状态
uploadTaskFileDAO.updateStatusByIdList(cancelIdList, UploadTaskFileStatusEnum.CANCELED.name());
}
/**
* 检查需要取消的任务
*
@@ -334,26 +377,79 @@ public class UploadTaskServiceImpl implements UploadTaskService {
if (cancelableRecords.isEmpty()) {
return;
}
// 更新状态的 id
// 更新状态任务
List<Long> updateIdList = cancelableRecords.stream()
.map(UploadTaskDO::getId)
.filter(s -> fileUploadTaskManager.getTask(s) == null)
.collect(Collectors.toList());
// 取消上传
cancelableRecords.stream()
.map(UploadTaskDO::getId)
.map(fileUploadTaskManager::getTask)
.filter(Objects::nonNull)
.forEach(IFileUploadTask::cancel);
// 更新状态
if (!updateIdList.isEmpty()) {
UploadTaskDO update = new UploadTaskDO();
update.setStatus(status.name());
update.setEndTime(new Date());
// 更新
uploadTaskDAO.update(update, Conditions.in(UploadTaskDO::getId, updateIdList));
// 更新任务状态
UploadTaskDO updateTask = new UploadTaskDO();
updateTask.setStatus(status.name());
updateTask.setEndTime(new Date());
uploadTaskDAO.update(updateTask, Conditions.in(UploadTaskDO::getId, updateIdList));
// 更新任务文件状态
UploadTaskFileDO uploadFile = new UploadTaskFileDO();
uploadFile.setStatus(UploadTaskFileStatusEnum.CANCELED.name());
uploadFile.setEndTime(new Date());
LambdaQueryWrapper<UploadTaskFileDO> updateFileQuery = uploadTaskFileDAO.wrapper()
.in(UploadTaskFileDO::getId, updateIdList)
.in(UploadTaskFileDO::getStatus,
UploadTaskFileStatusEnum.WAITING.name(),
UploadTaskFileStatusEnum.UPLOADING.name());
uploadTaskFileDAO.update(uploadFile, updateFileQuery);
}
// 异步处理
Threads.start(() -> {
// 删除文件
this.clearUploadSwapFiles(updateIdList);
// 取消上传
cancelableRecords.stream()
.map(UploadTaskDO::getId)
.map(fileUploadTaskManager::getTask)
.filter(Objects::nonNull)
.forEach(IFileUploadTask::cancel);
});
}
/**
* 计算传输进度
*
* @param id id
* @param files files
*/
private void computeUploadProgress(Long id, List<UploadTaskFileVO> files) {
// 获取上传任务进度
IFileUploadTask task = fileUploadTaskManager.getTask(id);
Map<Long, FileUploadFileItemDTO> fileItemMap;
if (task == null) {
fileItemMap = Maps.empty();
} else {
fileItemMap = task.getUploaderList()
.stream()
.map(IFileUploader::getFiles)
.flatMap(Collection::stream)
.collect(Collectors.toMap(FileUploadFileItemDTO::getId, Function.identity()));
}
// 设置进度
for (UploadTaskFileVO file : files) {
String status = file.getStatus();
if (UploadTaskFileStatusEnum.WAITING.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.UPLOADING.name().equals(status)) {
// 上传中获取当前值
Long current = Optional.ofNullable(file.getId())
.map(fileItemMap::get)
.map(FileUploadFileItemDTO::getCurrent)
.orElse(file.getFileSize());
file.setCurrent(current);
} else if (UploadTaskFileStatusEnum.FINISHED.name().equals(status)) {
file.setCurrent(file.getFileSize());
} else if (UploadTaskFileStatusEnum.FAILED.name().equals(status)) {
file.setCurrent(0L);
} else if (UploadTaskFileStatusEnum.CANCELED.name().equals(status)) {
file.setCurrent(0L);
}
}
}

View File

@@ -1,4 +1,5 @@
import type { DataGrid, Pagination } from '@/types/global';
import type { HostQueryResponse } from '@/api/asset/host';
import type { TableData } from '@arco-design/web-vue/es/table/interface';
import axios from 'axios';
import qs from 'query-string';
@@ -28,6 +29,7 @@ export interface UploadTaskFileCreateRequest {
export interface UploadTaskCreateResponse {
id: number;
token: string;
hosts: Array<HostQueryResponse>;
}
/**
@@ -110,6 +112,18 @@ export function getUploadTaskPage(request: UploadTaskQueryRequest) {
return axios.post<DataGrid<UploadTaskQueryResponse>>('/asset/upload-task/query', request);
}
/**
* 查询上传任务状态
*/
export function getUploadTaskStatus(idList: Array<number>, queryFiles: boolean) {
return axios.get<Array<UploadTaskQueryResponse>>('/asset/upload-task/status', {
params: { idList, queryFiles },
paramsSerializer: params => {
return qs.stringify(params, { arrayFormat: 'comma' });
}
});
}
/**
* 删除上传任务
*/

View File

@@ -74,7 +74,7 @@
import type { FileItem } from '@arco-design/web-vue';
import type { UploadTaskFileCreateRequest } from '@/api/exec/upload-task';
import type { IFileUploader } from '@/components/system/uploader/const';
import { ref } from 'vue';
import { onUnmounted, ref } from 'vue';
import { getFileSize } from '@/utils/file';
import FileUploader from '@/components/system/uploader/file-uploader';
@@ -127,6 +127,11 @@
defineExpose({ getFiles, startUpload, close });
// 卸载时关闭
onUnmounted(() => {
uploader.value?.close();
});
</script>
<style lang="less" scoped>

View File

@@ -58,8 +58,9 @@
const filesRef = ref();
const hostModal = ref<any>();
// TODO pullstatus
// TODO 测试 error 情况
// TODO pullstatus 按钮显示就可以去掉了吧
// host tab
// status tab
// 设置选中主机
const setSelectedHost = (hosts: Array<number>) => {
@@ -147,6 +148,7 @@
const clear = () => {
status.value = UploadTaskStatus.WAITING;
formModel.value = { ...defaultForm() };
filesRef.value?.close();
};
</script>