通用大文件上传组件.

This commit is contained in:
lijiahang
2024-05-07 19:12:37 +08:00
parent 1379150369
commit f416e63b66
21 changed files with 711 additions and 32 deletions

View File

@@ -0,0 +1,21 @@
package com.orion.ops.module.infra.api;
/**
* 文件上传 对外服务类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 15:58
*/
public interface FileUploadApi {
/**
* 生成上传 uploadToken
*
* @param userId userId
* @param endpoint endpoint
* @return token
*/
String createUploadToken(Long userId, String endpoint);
}

View File

@@ -34,6 +34,12 @@
<artifactId>orion-ops-spring-boot-starter-web</artifactId>
</dependency>
<!-- websocket -->
<dependency>
<groupId>com.orion.ops</groupId>
<artifactId>orion-ops-spring-boot-starter-websocket</artifactId>
</dependency>
<!-- log -->
<dependency>
<groupId>com.orion.ops</groupId>

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.infra.api.impl;
import com.orion.ops.module.infra.api.FileUploadApi;
import com.orion.ops.module.infra.service.FileUploadService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 文件上传 对外服务实现类
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 19:04
*/
@Slf4j
@Service
public class FileUploadApiImpl implements FileUploadApi {
@Resource
private FileUploadService fileUploadService;
@Override
public String createUploadToken(Long userId, String endpoint) {
return fileUploadService.createUploadToken(userId, endpoint);
}
}

View File

@@ -0,0 +1,39 @@
package com.orion.ops.module.infra.configuration;
import com.orion.ops.module.infra.handler.upload.FileUploadMessageDispatcher;
import com.orion.ops.module.infra.interceptor.FileUploadInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* 基建模块 websocket 配置
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/28 11:39
*/
@Configuration
public class InfraWebSocketConfiguration implements WebSocketConfigurer {
@Value("${orion.websocket.prefix}")
private String prefix;
@Resource
private FileUploadInterceptor fileUploadInterceptor;
@Resource
private FileUploadMessageDispatcher fileUploadMessageDispatcher;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 文件上传
registry.addHandler(fileUploadMessageDispatcher, prefix + "/file/upload/{uploadToken}")
.addInterceptors(fileUploadInterceptor)
.setAllowedOrigins("*");
}
}

View File

@@ -0,0 +1,27 @@
package com.orion.ops.module.infra.define.cache;
import com.orion.lang.define.cache.key.CacheKeyBuilder;
import com.orion.lang.define.cache.key.CacheKeyDefine;
import com.orion.lang.define.cache.key.struct.RedisCacheStruct;
import com.orion.ops.module.infra.entity.dto.FileUploadTokenDTO;
import java.util.concurrent.TimeUnit;
/**
* 文明上传缓存 key
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/8/31 11:48
*/
public interface FileUploadCacheKeyDefine {
CacheKeyDefine FILE_UPLOAD = new CacheKeyBuilder()
.key("file:upload:{}")
.desc("文件上传信息 ${token}")
.type(FileUploadTokenDTO.class)
.struct(RedisCacheStruct.STRING)
.timeout(3, TimeUnit.MINUTES)
.build();
}

View File

@@ -0,0 +1,33 @@
package com.orion.ops.module.infra.entity.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 文件上传 token 缓存对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023-7-13 18:42
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileUploadTokenDTO", description = "文件上传 token 缓存对象")
public class FileUploadTokenDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "userId")
private Long userId;
@Schema(description = "上传父目录")
private String endpoint;
}

View File

@@ -0,0 +1,82 @@
package com.orion.ops.module.infra.handler.upload;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.infra.entity.dto.FileUploadTokenDTO;
import com.orion.ops.module.infra.handler.upload.enums.FileUploadOperatorType;
import com.orion.ops.module.infra.handler.upload.handler.FileUploadHandler;
import com.orion.ops.module.infra.handler.upload.handler.IFileUploadHandler;
import com.orion.ops.module.infra.handler.upload.model.FileUploadRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;
/**
* 上传消息处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/2/21 18:22
*/
@Slf4j
@Component
public class FileUploadMessageDispatcher extends AbstractWebSocketHandler {
private final ConcurrentHashMap<String, IFileUploadHandler> handlers = new ConcurrentHashMap<>();
@Resource
private FileClient localFileClient;
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 获取处理器
IFileUploadHandler handler = handlers.computeIfAbsent(session.getId(), s -> {
FileUploadTokenDTO info = WebSockets.getAttr(session, ExtraFieldConst.INFO);
return new FileUploadHandler(session, localFileClient, info.getEndpoint());
});
// 处理消息
FileUploadRequest request = JSON.parseObject(message.getPayload(), FileUploadRequest.class);
FileUploadOperatorType type = FileUploadOperatorType.of(request.getType());
if (FileUploadOperatorType.START.equals(type)) {
// 开始上传
handler.start(request.getFileId());
} else if (FileUploadOperatorType.FINISH.equals(type)) {
// 上传完成
handler.finish();
}
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
handlers.get(session.getId()).write(message.getPayload().array());
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
log.info("FileUploadMessageDispatcher-afterConnectionEstablished id: {}", session.getId());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("FileUploadMessageDispatcher-handleTransportError id: {}", session.getId(), exception);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String id = session.getId();
log.info("FileUploadMessageDispatcher-afterConnectionClosed id: {}, code: {}, reason: {}", id, status.getCode(), status.getReason());
// 关闭会话
Streams.close(handlers.remove(id));
}
}

View File

@@ -0,0 +1,43 @@
package com.orion.ops.module.infra.handler.upload.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 文件上传操作类型
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 18:01
*/
@Getter
@AllArgsConstructor
public enum FileUploadOperatorType {
/**
* 开始上传
*/
START("start"),
/**
* 上传完成
*/
FINISH("finish"),
;
private final String type;
public static FileUploadOperatorType of(String type) {
if (type == null) {
return null;
}
for (FileUploadOperatorType value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,48 @@
package com.orion.ops.module.infra.handler.upload.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 文件上传响应类型
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 18:01
*/
@Getter
@AllArgsConstructor
public enum FileUploadReceiverType {
/**
* 请求下一块数据
*/
NEXT("next"),
/**
* 上传完成
*/
FINISH("finish"),
/**
* 上传失败
*/
ERROR("error"),
;
private final String type;
public static FileUploadReceiverType of(String type) {
if (type == null) {
return null;
}
for (FileUploadReceiverType value : values()) {
if (value.type.equals(type)) {
return value;
}
}
return null;
}
}

View File

@@ -0,0 +1,127 @@
package com.orion.ops.module.infra.handler.upload.handler;
import com.alibaba.fastjson.JSON;
import com.orion.lang.utils.io.Streams;
import com.orion.ops.framework.common.constant.Const;
import com.orion.ops.framework.common.file.FileClient;
import com.orion.ops.framework.websocket.core.utils.WebSockets;
import com.orion.ops.module.infra.handler.upload.enums.FileUploadReceiverType;
import com.orion.ops.module.infra.handler.upload.model.FileUploadResponse;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.io.OutputStream;
/**
* 文件上传处理器实现
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 15:49
*/
public class FileUploadHandler implements IFileUploadHandler {
private final WebSocketSession channel;
private final FileClient fileClient;
private final String endpoint;
private String fileId;
private String filePath;
private OutputStream outputStream;
private boolean closed;
public FileUploadHandler(WebSocketSession channel, FileClient fileClient, String endpoint) {
this.channel = channel;
this.fileClient = fileClient;
this.endpoint = endpoint;
}
@Override
public void start(String fileId) {
// 释放资源
this.close();
// 获取返回路径
this.fileId = fileId;
this.filePath = fileClient.getReturnPath(endpoint + Const.SLASH + fileId);
try {
// 打开文件流
this.outputStream = fileClient.getContentOutputStream(filePath);
this.closed = false;
// 请求下一块数据
FileUploadResponse resp = FileUploadResponse.builder()
.type(FileUploadReceiverType.NEXT.getType())
.fileId(this.fileId)
.build();
this.send(resp);
} catch (Exception e) {
// 释放资源
this.close();
// 返回错误
FileUploadResponse resp = FileUploadResponse.builder()
.type(FileUploadReceiverType.ERROR.getType())
.fileId(this.fileId)
.build();
this.send(resp);
}
}
@Override
public void write(byte[] content) {
try {
// 写入内容
this.outputStream.write(content);
// 请求下一块数据
FileUploadResponse resp = FileUploadResponse.builder()
.type(FileUploadReceiverType.NEXT.getType())
.fileId(this.fileId)
.build();
this.send(resp);
} catch (IOException e) {
// 释放资源
this.close();
// 返回错误
FileUploadResponse resp = FileUploadResponse.builder()
.type(FileUploadReceiverType.ERROR.getType())
.fileId(this.fileId)
.build();
this.send(resp);
}
}
@Override
public void finish() {
// 释放资源
this.close();
// 返回上传路径
FileUploadResponse resp = FileUploadResponse.builder()
.type(FileUploadReceiverType.FINISH.getType())
.fileId(this.fileId)
.path(this.filePath)
.build();
this.send(resp);
}
@Override
public void close() {
if (closed) {
return;
}
this.closed = true;
Streams.close(outputStream);
}
/**
* 发送消息
*
* @param resp resp
*/
private void send(FileUploadResponse resp) {
WebSockets.sendText(channel, JSON.toJSONString(resp));
}
}

View File

@@ -0,0 +1,33 @@
package com.orion.ops.module.infra.handler.upload.handler;
import com.orion.lang.able.SafeCloseable;
/**
* 文件上传处理器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 15:49
*/
public interface IFileUploadHandler extends SafeCloseable {
/**
* 开始上传
*
* @param fileId fileId
*/
void start(String fileId);
/**
* 写入内容
*
* @param content content
*/
void write(byte[] content);
/**
* 上传结束
*/
void finish();
}

View File

@@ -0,0 +1,29 @@
package com.orion.ops.module.infra.handler.upload.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 文件上传请求 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 18:12
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileUploadRequest", description = "文件上传请求 实体对象")
public class FileUploadRequest {
@Schema(description = "type")
private String type;
@Schema(description = "fileId")
private String fileId;
}

View File

@@ -0,0 +1,32 @@
package com.orion.ops.module.infra.handler.upload.model;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 文件上传响应 实体对象
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 18:12
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(name = "FileUploadResponse", description = "文件上传响应 实体对象")
public class FileUploadResponse {
@Schema(description = "type")
private String type;
@Schema(description = "fileId")
private String fileId;
@Schema(description = "路径")
private String path;
}

View File

@@ -0,0 +1,51 @@
package com.orion.ops.module.infra.interceptor;
import com.orion.lang.utils.Urls;
import com.orion.ops.framework.common.constant.ExtraFieldConst;
import com.orion.ops.module.infra.entity.dto.FileUploadTokenDTO;
import com.orion.ops.module.infra.service.FileUploadService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.annotation.Resource;
import java.util.Map;
/**
* 文件上传拦截器
*
* @author Jiahang Li
* @version 1.0.0
* @since 2023/12/27 23:53
*/
@Slf4j
@Component
public class FileUploadInterceptor implements HandshakeInterceptor {
@Resource
private FileUploadService fileUploadService;
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 获取 uploadToken
String uploadToken = Urls.getUrlSource(request.getURI().getPath());
log.info("FileUploadInterceptor-beforeHandshake start uploadToken: {}", uploadToken);
// 检查 uploadToken
FileUploadTokenDTO info = fileUploadService.checkUploadToken(uploadToken);
if (info == null) {
log.error("FileUploadInterceptor-beforeHandshake absent uploadToken: {}", uploadToken);
return false;
}
// 设置参数
attributes.put(ExtraFieldConst.INFO, info);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}

View File

@@ -0,0 +1,31 @@
package com.orion.ops.module.infra.service;
import com.orion.ops.module.infra.entity.dto.FileUploadTokenDTO;
/**
* 文件上传服务
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 15:58
*/
public interface FileUploadService {
/**
* 生成上传 uploadToken
*
* @param userId userId
* @param endpoint endpoint
* @return token
*/
String createUploadToken(Long userId, String endpoint);
/**
* 检查 uploadToken
*
* @param token token
* @return info
*/
FileUploadTokenDTO checkUploadToken(String token);
}

View File

@@ -0,0 +1,47 @@
package com.orion.ops.module.infra.service.impl;
import com.orion.lang.id.UUIds;
import com.orion.ops.framework.redis.core.utils.RedisStrings;
import com.orion.ops.module.infra.define.cache.FileUploadCacheKeyDefine;
import com.orion.ops.module.infra.entity.dto.FileUploadTokenDTO;
import com.orion.ops.module.infra.service.FileUploadService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 文件上传服务
*
* @author Jiahang Li
* @version 1.0.0
* @since 2024/5/7 16:43
*/
@Slf4j
@Service
public class FileUploadServiceImpl implements FileUploadService {
@Override
public String createUploadToken(Long userId, String endpoint) {
String token = UUIds.random32();
String key = FileUploadCacheKeyDefine.FILE_UPLOAD.format(token);
// 设置缓存
FileUploadTokenDTO info = FileUploadTokenDTO.builder()
.userId(userId)
.endpoint(endpoint)
.build();
RedisStrings.setJson(key, FileUploadCacheKeyDefine.FILE_UPLOAD, info);
return token;
}
@Override
public FileUploadTokenDTO checkUploadToken(String token) {
String key = FileUploadCacheKeyDefine.FILE_UPLOAD.format(token);
// 查询缓存
FileUploadTokenDTO info = RedisStrings.getJson(key, FileUploadCacheKeyDefine.FILE_UPLOAD);
if (info != null) {
// 删除缓存
RedisStrings.delete(key);
}
return info;
}
}