
IBM Storage 存储大对象分块上传,原理比较简单,就是在客户端把需要上传的大文件分割成块,然后并行上传到IBM Storage中,当所有的大文件都上传成功以后,客户端给IBM Storage发出上传完成的信号,IBM Storage会完成块文件的合并,并显示在IBM Storage中。
官方文档:https://cloud.ibm.com/docs/cloud-object-storage/basics?topic=cloud-object-storage-large-objects
IBM Storage支持大对象分块上传。这样的有很多好处:
- 文件分块之间可以按任意顺序独立地、并行上传,加快上传的速度。
- 已完成的分块,会暂时储存在IBM Storage中,这样如果上传过程中出错,只需要续传未完成的文件块就可以了。
- 网络中断不会导致大型上传操作失败,在一段时间内上传可以暂停并重新启动。
PS:分块上传仅可用于大于 5 MB 的对象。对于小于 50 GB 的对象,建议的分块大小为 20 MB 到 100 MB,以实现最佳性能。对于更大的对象,可以增大分块大小,而不会对性能产生重大影响。分块上传限制为分块数不超过 10,000 个,每个分块 5 GB,最大对象大小为 10 TB。


关于分块上传,IBM Cloud提供了 REST API 和 SDK(Nodejs, Python, Java, Go)等等。本文的以IOS开发中,Swift用 REST API做为例子。
分块上传对象分为五个阶段:
1. 生成Bearer Token
要想使用IBM Storage的API, 首先要取得Bearer Token,首先要利用Rest Api创建一个。
另外,apikey需要预先在IBM Storage服务凭证中获得。
生成Bearer Token的Swift方法如下:
class func createBearerToken(completion: @escaping ([String: Any]) -> Void) {
guard let url = URL(string: "https://iam.cloud.ibm.com/identity/token") else { return }
var request = URLRequest(url: url)
// Http POST提出。
request.httpMethod = "POST"
// Http Header设置下面两个属性。
request.addValue("application/json", forHTTPHeaderField: "Accept")
request.addValue("application/x-www-form-urlencoded", forHTTPHeaderField: "Content-Type")
// storageApiKey需要从Storage服务凭证中获得
// 编辑参数字符串,做为body的一部分
let data: Data = "apikey=\(storageApiKey)&response_type=cloud_iam&grant_type=urn:ibm:params:oauth:grant-type:apikey".data(using: .utf8)!
request.httpBody = data
let urlconfig = URLSessionConfiguration.default
let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)
session.dataTask(with: request) { (data, response, error) in
var resultArray: [String: Any] = [:]
if let response = response {
print(response)
}
guard let data = data else { return }
if let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .allowFragments) {
if let dicObject = jsonObject as? NSDictionary {
if dicObject["errorCode"] != nil {
resultArray.updateValue(dicObject["errorDetails"]!, forKey: "message")
} else {
resultArray.updateValue(dicObject["access_token"]!, forKey: "access_token")
}
DispatchQueue.main.async {
completion(resultArray)
}
}
}
}.resume()
}
返回结果:
{
"access_token" = "eyJraWQiOiIyMDIxMDYxOTE4xxxxx";
expiration = 1624711576;
"expires_in" = 3600;
"refresh_token" = "not_supported";
scope = "ibm openid";
"token_type" = Bearer;
}
2. 通过Rest Api 创建UploadId
每个上传文件会对应一个UploadId,这个UploadId通过Rest APi向IBM Storage请求。
向IBM Storage发出使用查询参数 upload 的 POST 请求会创建新的 UploadId 值,然后要上传的对象的每个分块都会引用此值。


// Swift中利用Rest Api创建UploadId
// 参数: bearerToken:步骤1取得的bearerToken
// bucketName:IBM Storage中的存储桶的名字
// fileName:上传的文件名字。
func createUploadId(bearerToken: String, bucketName: String, fileName: String, completion: @escaping ([String: Any]) -> Void) {
// 编辑url
guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploads") else { return }
var request = URLRequest(url: url)
// POST方式提交
request.httpMethod = "POST"
// Header设置 bearerToken
request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
let urlconfig = URLSessionConfiguration.default
urlconfig.timeoutIntervalForRequest = 60
urlconfig.timeoutIntervalForResource = 60
let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)
session.dataTask(with: request) { (data, response, error) in
var resultArray: [String: Any] = [:]
if let response = response {
print(response)
}
if let data = data, let dataString = String(data: data, encoding: .utf8) {
// 注意 返回结果是XML格式的,这里面按字符串输出,如果要取得各个节点的值,则需要解析XML。
print("Response data string:\n \(dataString)")
}
}.resume()
}
返回结果: <?xml version="1.0" encoding="UTF-8" standalone="yes"?><InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Bucket>xxxx</Bucket><Key>test555.mp4</Key><UploadId>xxxx-485f-xxxx-bb3c-xxxxx</UploadId></InitiateMultipartUploadResult>
3. 在客户端预先分割需要上传的文件。
我们可以在上传前预先分割大文件,把各个块文件块物理存储在设备上。当然我们也可以通过指针偏移读取文件数据,把读取的值放入内存中,下面的代码是真实分割文件块的例子。
// 文件块的大小20MB
var filePartSize: Int = 20971520
// 取得文件块路径,放到document文件夹下面
let manager = FileManager.default
let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask)
let videoPartPath = urlForDocument[0]
// 参数 videoName:文件名 videoPath:文件路径 videoSize:文件的Size可以通过下面getFileSize方法取得 uploadId:步骤2取得的uploadId
computeFilePartSize(videoName: String, videoPath: String, videoSize: String, uploadId: String) {
if Int(videoSize)! <= filePartSize {
let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "0", videoPartSize: videoSize)
createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
} else {
let loop = Int(videoSize)! / filePartSize
for i in 0..<loop {
let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(i * filePartSize)", videoPartSize: "\(filePartSize)")
createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
}
let lastSize = Int(videoSize)! % filePartSize
if lastSize > 0 {
let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(loop * filePartSize)", videoPartSize: "\(lastSize)")
createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
}
}
}
// 取得文件Size的方法。
func getFileSize(videoName: String, videoPath: String) -> String {
let manager = FileManager.default
let attributes = try? manager.attributesOfItem(atPath: videoPath)
return "\(attributes![FileAttributeKey.size]!)"
}
func readFileByPart(videoName: String, videoPath: String, videoBeginOffset: String, videoPartSize: String) -> Data? {
let readHandle = FileHandle.init(forReadingAtPath: videoPath)
// 通过指针的偏移,局部读取文件数据。
readHandle?.seek(toFileOffset: UInt64(videoBeginOffset)!)
let data = readHandle?.readData(ofLength: Int(videoPartSize)!)
return data!
}
4.上传分块文件
向对象发出使用查询参数 partNumber 和 uploadId 的 PUT 请求将上传对象的一个分块。这些分块可以按序列上传,也可以并行上传,但必须按顺序编号。上传成功以后,Http Response会返回Etag。


var appDelegateComm: AppDelegate?
// 下面的例子是Swift中,后台上传文件的例子,需要用到urlSession uploadTask。
// 参数: bearerToken:步骤1取得到的bearerToken
// bucketName:存储桶的名字
// fileName:上传的文件名
// partNumber:块文件的编号
// uploadId:步骤2取得的uploadId
// videoPartSize:文件块的Size
// fileBaseUrl:块文件的Url地址
func upLoadFileBackground(bearerToken: String, bucketName: String, fileName: String, partNumber: String, uploadId: String, videoPartSize: String, name: String, fileBaseUrl: URL) {
// 编辑url
guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?partNumber=\(partNumber)&uploadId=\(uploadId)") else { return }
var request = URLRequest(url: url)
// 注意是PUT提交
request.httpMethod = "PUT"
// 设定HTTPHeader
request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
request.addValue(videoPartSize, forHTTPHeaderField: "Content-Length")
// 设定background config,delegate
let config = URLSessionConfiguration.background(withIdentifier: ("\(Date().timeIntervalSince1970)_\(partNumber)"))
// 重要把 appDelegateComm对象作为代理对象 赋值给config
let session = URLSession(configuration: config, delegate: appDelegateComm, delegateQueue: OperationQueue.main)
let fileUrl = fileBaseUrl.appendingPathComponent(name)
// 这里需要注意,必须用【session.uploadTask(with: request, fromFile: fileUrl)】 参数是fromFile,才能后台运行。
let uploadTask = session.uploadTask(with: request, fromFile: fileUrl)
uploadTask.resume()
}
// 重要 AppDelegate实现URLSessionDataDelegate 和 URLSessionDelegate代理协议
class AppDelegate: UIResponder, UIApplicationDelegate, URLSessionDelegate, URLSessionDataDelegate {
func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
// Override point for customization after application launch.
let manager = FileManager.default
let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask)
videoPartPath = urlForDocument[0]
session.delegate = self
// 这里面把 AppDelegate对象 赋值给 appDelegateComm
appDelegateComm = self
return true
}
// 重要 当uploadTask完成时,会调用下面的方法
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
if let httpResponse = dataTask.response as? HTTPURLResponse {
print(httpResponse)
// 从response里面可以拿到返回值
let etag = httpResponse.allHeaderFields["Etag"] as? String
print(etag)
}
}
func urlSession(_ session: URLSession, task: URLSessionTask,
didCompleteWithError error: Error?) {
upLpadProcess.removeValue(forKey: session.partInfoId)
}
}
返回结果:
<NSHTTPURLResponse: 0x600002e69b20> { URL: https://s3.au-syd.cloud-object-storage.appdomain.cloud/xxx/test.mp4?partNumber=6&uploadId=0100017a-4891-6d23-60fa-6b0ba4869729 } { Status Code: 200, Headers {
"Content-Length" = (
0
);
Date = (
"Thu, 01 Jan 1970 00:00:00 GMT"
);
Etag = (
"\"xxx1e935b62e52c97906011682\""
);
Server = (
Cleversafe
);
"X-Clv-Request-Id" = (
"b5f4adaa-a7eb-4de7-9e16-42c6fe891a74"
);
"X-Clv-S3-Version" = (
"2.5"
);
"x-amz-request-id" = (
"b5f4adaa-a7eb-4de7-9e16-42c6fe891a74"
);
} }
5.完成分块上传
向对象发出使用查询参数 uploadId 并且主体中包含相应 XML 块的 PUT 请求将完成分块上传。


// 编辑好Config的字符串
var configStr = "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part><Part><PartNumber>2</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part></CompleteMultipartUpload>"
func completeUpload(bearerToken: String, bucketName: String, fileName: String, uploadId: String, configStr: String, completion: @escaping ([String: Any]) -> Void) {
guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploadId=\(uploadId)") else { return }
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
request.addValue("text/plain; charset=utf-8", forHTTPHeaderField: "Content-Type")
let urlconfig = URLSessionConfiguration.default
urlconfig.timeoutIntervalForRequest = 60
urlconfig.timeoutIntervalForResource = 60
// 编辑好的configStr字符串。
let data: Data = configStr.data(using: .utf8)!
request.httpBody = data
let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)
session.dataTask(with: request) { (data, response, error) in
var resultArray: [String: Any] = [:]
if let httpResponse = response as? HTTPURLResponse {
if httpResponse.statusCode == 200 {
resultArray.updateValue("ok", forKey: "status")
DispatchQueue.main.async {
completion(resultArray)
}
} else {
resultArray.updateValue("error", forKey: "status")
DispatchQueue.main.async {
completion(resultArray)
}
}
}
}.resume()
}