Object的分块上传
更新时间:2025-04-01
分块上传的场景
除了通过putObject()方法上传文件到BOS以外,BOS还提供了另外一种上传模式:分块上传(Multipart Upload)。用户可以在如下的应用场景内(但不仅限于此),使用分块上传模式,如:
- 需要支持断点上传。
- 上传超过5GB大小的文件。
- 网络条件较差,和BOS的服务器之间的连接经常断开。
- 需要流式地上传文件。
- 上传文件之前,无法确定上传文件的大小。
Multipart Upload分块上传流程
假设有一个文件,本地路径为/path/to/file.zip
,由于文件比较大,使用分块上传其传输到BOS中。 基本流程:
- 初始化Multipart Upload。
- 上传分块。
- 完成分块上传。
初始化Multipart Upload
使用initiateMultipartUpload
方法来初始化一个分块上传事件:
Plain Text
1let bucketName = "test-harmony-bucket"
2let objectName = "test-multi-upload-object"
3let args = new InitiateMultipartUploadArgs();
4// 设置上传的object为标准存储类型
5args.storageClass = "STANDARD";
6// initialization
7let initMultiUploadResult: InitiateMultipartUploadResult;
8try {
9 // contentType参数设置为undefined占位, 使用默认值
10 initMultiUploadResult = await bosClient.initiateMultipartUpload(bucketName, objectName, undefined, args);
11 logger.info(`init multi upload success, info: ${JSON.stringify(initMultiUploadResult)}`)
12} catch(bosResponse) {
13 logger.error(`errCode: ${bosResponse.error.code}`)
14 logger.error(`requestId: ${bosResponse.error.requestId}`)
15 logger.error(`errMessage: ${bosResponse.error.message}`)
16 logger.error(`statusCode: ${bosResponse.statusCode}`)
17}
说明:
initiateMultipartUpload
的返回结果中含有UploadId
,它是区分分块上传事件的唯一标识,在后面的操作中,我们将用到它。
上传分块
Plain Text
1let stat = fs.lstatSync("/path/to/file.zip");
2let file = fs.openSync(filePath, fs.OpenMode.READ_ONLY);
3if (stat.size < MAX_SINGLE_OBJECT_SIZE) { //BOS最大文件大小为48.8TB
4 return Promise.reject(fillBceError(`The file size exceeds ${MAX_SINGLE_OBJECT_SIZE}`));
5}
6// 如果文件小于一个块的大小,就不需要使用分块上传
7if (stat.size < MIN_MULTIPART_SIZE || this._clientImpl.clientOptions.multiPartSize < MIN_MULTIPART_SIZE) {
8 return Promise.reject(fillBceError(`multipart size should not be less than ${MIN_MULTIPART_SIZE}`));
9}
10// 计算每块的大小的尺寸
11let partSize = (this._clientImpl.clientOptions.multiPartSize + MULTIPART_ALIGN - 1) / MULTIPART_ALIGN * MULTIPART_ALIGN;
12let partNum = Math.floor((stat.size + partSize - 1) / partSize);
13// 如果块的数量超过10000,将重新计算每块的大小,不再使用默认的块大小
14if (partNum > MAX_PART_NUMBER) {
15 partSize = (stat.size + MAX_PART_NUMBER - 1) / MAX_PART_NUMBER;
16 partSize = (partSize + MULTIPART_ALIGN - 1) / MULTIPART_ALIGN * MULTIPART_ALIGN;
17 partNum = Math.floor((stat.size + partSize - 1) / partSize);
18}
19logger.debug(`start to upload super file, total parts: ${partNum}, part size: ${partSize}`);
20let args = new InitiateMultipartUploadArgs();
21if (storageClass && isValidStorageClass(storageClass)) {
22 args.storageClass = storageClass;
23}
24// 使用初始化分块上传的结果返回的uploadId
25let uploadId = initMultiUploadResult.uploadId as string;
26// group task by partNum
27let completeArgs = new CompleteMultipartUploadArgs();
28completeArgs.partInfo = new PartInfo();
29completeArgs.partInfo.parts = [];
30let partIndex = 1;
31// 依次上传每一块
32while (partIndex <= partNum) {
33 logger.debug(`upload part: ${partIndex}`);
34 let uploadSize = partSize;
35 let offset = (partIndex - 1) * partSize;
36 if (uploadSize > stat.size - offset) {
37 uploadSize = stat.size - offset;
38 }
39 let data = new ArrayBuffer(uploadSize);
40 fs.readSync(file.fd, data, {offset: offset, length: uploadSize});
41 try {
42 let etag = await bosClient.uploadPart(bucketName, objectName, uploadId, partIndex, data);
43 // 每块上传成功后都需要记录eTag信息和partNumber信息
44 let uploadInfo = new UploadInfoType();
45 uploadInfo.partNumber = partIndex;
46 uploadInfo.eTag = etag;
47 completeArgs.partInfo.parts.push(uploadInfo);
48 } catch (bosResponse) { // 如果上传过程有失败的情况
49 logger.error(`upload error, info: ${JSON.stringify(bosResponse.error)}`)
50 await bosClient.abortMultipartUpload(bucketName, objectName, uploadId); //放弃本次分块上传
51 fs.closeSync(file);
52 return Promise.reject(bosResponse);
53 }
54 partIndex++;
55}
56fs.closeSync(file.fd); // 关闭本地文件
注意:
- UploadPart 方法要求Part大小是1MB的整数倍或大于5MB。但是Upload Part接口并不会立即校验上传Part的大小;只有当Complete Multipart Upload的时候才会校验。
- 为了保证数据在网络传输过程中不出现错误,建议您在UploadPart后,使用每个分块BOS返回的Content-MD5值分别验证已上传分块数据的正确性。当所有分块数据合成一个Object后,不再含MD5值。
- Part号码的范围是1~10000。如果超出这个范围,BOS将返回InvalidArgument的错误码。
- 每次上传Part之后,BOS的返回结果会包含一个 PartETag 对象,它是上传块的ETag与块编号(PartNumber)的组合,在后续完成分块上传的步骤中会用到它,因此需要将其保存起来。一般来讲这些 PartETag 对象将被保存到List中。
完成分块上传
Plain Text
1let completeMultiUploadResult:CompleteMultipartUploadResult;
2try {
3 completeMultiUploadResult = await bosClient.completeMultipartUpload(bucketName, objectName, uploadId, completeArgs);
4} catch(bosResponse) {
5 logger.error(`complete multipart fail, info: ${JSON.stringify(bosResponse.error)}`)
6 await bosClient.abortMultipartUpload(bucketName, objectName, uploadId);
7}
完整示例
该部分已经封装到BosClient.uploadSuperFile方法中,用户可以直接调用即可:
Plain Text
1import { logger, Credential, BosClient, ClientOptions } from "bos"
2import { CompleteMultipartUploadResult } from "bos/src/main/ets/bos/api/DataType"
3
4let credential = new Credential(AccessKeyID, SecretAccessKey, Token); //STS返回的临时AK/SK及Token
5let clientOptions = new ClientOptions();
6clientOptions.endpoint = "bj.bcebos.com"; //传入Bucket所在区域域名
7let bosClient = new BosClient(credential, clientOptions); // 创建BosClient
8
9let bucketName = "test-harmony-bucket"
10let objectName = "multi_upload_object.txt"
11let path = "/path/to/file.zip"
12
13let result: CompleteMultipartUploadResult;
14
15try {
16 result = await bosClient.uploadSuperFile(bucketName, objectName, path);
17 logger.info(`upload super file success, result : ${JSON.stringify(result)}`);
18} catch (bosResponse) {
19 logger.error(`errCode: ${bosResponse.error.code}`)
20 logger.error(`requestId: ${bosResponse.error.requestId}`)
21 logger.error(`errMessage: ${bosResponse.error.message}`)
22 logger.error(`statusCode: ${bosResponse.statusCode}`)
23}
取消分块上传
用户可以使用abortMultipartUpload方法取消分块上传。
Plain Text
1try {
2 await bosClient.abortMultipartUpload(bucketName, objectName, uploadId);
3} catch(bosResponse) {
4 logger.error(`errCode: ${bosResponse.error.code}`)
5 logger.error(`requestId: ${bosResponse.error.requestId}`)
6 logger.error(`errMessage: ${bosResponse.error.message}`)
7 logger.error(`statusCode: ${bosResponse.statusCode}`)
8}
获取未完成的分块上传
用户可以使用listMultipartUploads
方法获取Bucket内未完成的分块上传事件。
示例代码
Plain Text
1try {
2let listMultipartUploadsResult = await bosClient.listMultipartUploads(bucketName);
3 for (let multiUpload of listMultipartUploadsResult.uploads as ListMultipartUploadsType[]) {
4 logger.info(`key: ${multiUpload.key}, uploadId: ${multiUpload.uploadId}`);
5 }
6} catch(bosResponse) {
7 logger.error(`errCode: ${bosResponse.error.code}`)
8 logger.error(`requestId: ${bosResponse.error.requestId}`)
9 logger.error(`errMessage: ${bosResponse.error.message}`)
10 logger.error(`statusCode: ${bosResponse.statusCode}`)
11}
注意:
- 默认情况下,如果Bucket中的分块上传事件的数目大于1000,则只会返回1000个Object,并且返回结果中IsTruncated的值为True,同时返回NextKeyMarker作为下次读取的起点。
- 若想获取更多分块上传事件,可以访问ListMultipartUploadsResult实例中的keyMarker参数分次读取。
获取所有已上传的分块信息
用户可以使用listParts
方法获取某个上传事件中所有已上传的块。
示例代码
Plain Text
1try {
2 let listPartsResult = await bosClient.listParts(bucketName, objectName, "e8583fb592ec699d7ecca085fc46fdd8");
3 for (let part of listPartsResult.parts as ListPartType[]) {
4 logger.info(`partNumber: ${part.partNumber}, eTag: ${part.eTag}`);
5 }
6} catch(bosResponse) {
7 logger.error(`errCode: ${bosResponse.error.code}`)
8 logger.error(`requestId: ${bosResponse.error.requestId}`)
9 logger.error(`errMessage: ${bosResponse.error.message}`)
10 logger.error(`statusCode: ${bosResponse.statusCode}`)
11}
注意:
- 默认情况下,如果Bucket中的分块上传事件的数目大于1000,则只会返回1000个Object,并且返回结果中IsTruncated的值为True,同时返回NextPartNumberMarker作为下次读取的起点。
- 若想获取更多已上传的分块信息,可以访问ListPartsResult中的partNumberMarker参数分次读取。