
IBM Storage 存储大对象分块上传,原理比较简单,就是在客户端把需要上传的大文件分割成块,然后并行上传到IBM Storage中,当所有的大文件都上传成功以后,客户端给IBM Storage发出上传完成的信号,IBM Storage会完成块文件的合并,并显示在IBM Storage中。
官方文档:https://cloud.ibm.com/docs/cloud-object-storage/basics?topic=cloud-object-storage-large-objects
IBM Storage支持大对象分块上传。这样的有很多好处:
- 文件分块之间可以按任意顺序独立地、并行上传,加快上传的速度。
- 已完成的分块,会暂时储存在IBM Storage中,这样如果上传过程中出错,只需要续传未完成的文件块就可以了。
- 网络中断不会导致大型上传操作失败,在一段时间内上传可以暂停并重新启动。
PS:分块上传仅可用于大于 5 MB 的对象。对于小于 50 GB 的对象,建议的分块大小为 20 MB 到 100 MB,以实现最佳性能。对于更大的对象,可以增大分块大小,而不会对性能产生重大影响。分块上传限制为分块数不超过 10,000 个,每个分块 5 GB,最大对象大小为 10 TB。


关于分块上传,IBM Cloud提供了 REST API 和 SDK(Nodejs, Python, Java, Go)等等。本文的以IOS开发中,Swift用 REST API做为例子。
分块上传对象分为五个阶段:
1. 生成Bearer Token
要想使用IBM Storage的API, 首先要取得Bearer Token,首先要利用Rest Api创建一个。
另外,apikey需要预先在IBM Storage服务凭证中获得。
生成Bearer Token的Swift方法如下:
class func createBearerToken(completion: @escaping ([String: Any]) -> Void) { guard let url = URL(string: "https://iam.cloud.ibm.com/identity/token") else { return } var request = URLRequest(url: url) // Http POST提出。 request.httpMethod = "POST" // Http Header设置下面两个属性。 request.addValue("application/json", forHTTPHeaderField: "Accept") request.addValue("application/x-www-form-urlencoded", forHTTPHeaderField: "Content-Type") // storageApiKey需要从Storage服务凭证中获得 // 编辑参数字符串,做为body的一部分 let data: Data = "apikey=\(storageApiKey)&response_type=cloud_iam&grant_type=urn:ibm:params:oauth:grant-type:apikey".data(using: .utf8)! request.httpBody = data let urlconfig = URLSessionConfiguration.default let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main) session.dataTask(with: request) { (data, response, error) in var resultArray: [String: Any] = [:] if let response = response { print(response) } guard let data = data else { return } if let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .allowFragments) { if let dicObject = jsonObject as? NSDictionary { if dicObject["errorCode"] != nil { resultArray.updateValue(dicObject["errorDetails"]!, forKey: "message") } else { resultArray.updateValue(dicObject["access_token"]!, forKey: "access_token") } DispatchQueue.main.async { completion(resultArray) } } } }.resume() }
返回结果: { "access_token" = "eyJraWQiOiIyMDIxMDYxOTE4xxxxx"; expiration = 1624711576; "expires_in" = 3600; "refresh_token" = "not_supported"; scope = "ibm openid"; "token_type" = Bearer; }
2. 通过Rest Api 创建UploadId
每个上传文件会对应一个UploadId,这个UploadId通过Rest APi向IBM Storage请求。
向IBM Storage发出使用查询参数 upload
的 POST
请求会创建新的 UploadId
值,然后要上传的对象的每个分块都会引用此值。


// Swift中利用Rest Api创建UploadId // 参数: bearerToken:步骤1取得的bearerToken // bucketName:IBM Storage中的存储桶的名字 // fileName:上传的文件名字。 func createUploadId(bearerToken: String, bucketName: String, fileName: String, completion: @escaping ([String: Any]) -> Void) { // 编辑url guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploads") else { return } var request = URLRequest(url: url) // POST方式提交 request.httpMethod = "POST" // Header设置 bearerToken request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization") let urlconfig = URLSessionConfiguration.default urlconfig.timeoutIntervalForRequest = 60 urlconfig.timeoutIntervalForResource = 60 let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main) session.dataTask(with: request) { (data, response, error) in var resultArray: [String: Any] = [:] if let response = response { print(response) } if let data = data, let dataString = String(data: data, encoding: .utf8) { // 注意 返回结果是XML格式的,这里面按字符串输出,如果要取得各个节点的值,则需要解析XML。 print("Response data string:\n \(dataString)") } }.resume() }
返回结果: <?xml version="1.0" encoding="UTF-8" standalone="yes"?><InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Bucket>xxxx</Bucket><Key>test555.mp4</Key><UploadId>xxxx-485f-xxxx-bb3c-xxxxx</UploadId></InitiateMultipartUploadResult>
3. 在客户端预先分割需要上传的文件。
我们可以在上传前预先分割大文件,把各个块文件块物理存储在设备上。当然我们也可以通过指针偏移读取文件数据,把读取的值放入内存中,下面的代码是真实分割文件块的例子。
// 文件块的大小20MB var filePartSize: Int = 20971520 // 取得文件块路径,放到document文件夹下面 let manager = FileManager.default let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask) let videoPartPath = urlForDocument[0] // 参数 videoName:文件名 videoPath:文件路径 videoSize:文件的Size可以通过下面getFileSize方法取得 uploadId:步骤2取得的uploadId computeFilePartSize(videoName: String, videoPath: String, videoSize: String, uploadId: String) { if Int(videoSize)! <= filePartSize { let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "0", videoPartSize: videoSize) createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!) } else { let loop = Int(videoSize)! / filePartSize for i in 0..<loop { let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(i * filePartSize)", videoPartSize: "\(filePartSize)") createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!) } let lastSize = Int(videoSize)! % filePartSize if lastSize > 0 { let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(loop * filePartSize)", videoPartSize: "\(lastSize)") createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!) } } } // 取得文件Size的方法。 func getFileSize(videoName: String, videoPath: String) -> String { let manager = FileManager.default let attributes = try? manager.attributesOfItem(atPath: videoPath) return "\(attributes![FileAttributeKey.size]!)" } func readFileByPart(videoName: String, videoPath: String, videoBeginOffset: String, videoPartSize: String) -> Data? { let readHandle = FileHandle.init(forReadingAtPath: videoPath) // 通过指针的偏移,局部读取文件数据。 readHandle?.seek(toFileOffset: UInt64(videoBeginOffset)!) let data = readHandle?.readData(ofLength: Int(videoPartSize)!) return data! }
4.上传分块文件
向对象发出使用查询参数 partNumber
和 uploadId
的 PUT
请求将上传对象的一个分块。这些分块可以按序列上传,也可以并行上传,但必须按顺序编号。上传成功以后,Http Response会返回Etag。


var appDelegateComm: AppDelegate? // 下面的例子是Swift中,后台上传文件的例子,需要用到urlSession uploadTask。 // 参数: bearerToken:步骤1取得到的bearerToken // bucketName:存储桶的名字 // fileName:上传的文件名 // partNumber:块文件的编号 // uploadId:步骤2取得的uploadId // videoPartSize:文件块的Size // fileBaseUrl:块文件的Url地址 func upLoadFileBackground(bearerToken: String, bucketName: String, fileName: String, partNumber: String, uploadId: String, videoPartSize: String, name: String, fileBaseUrl: URL) { // 编辑url guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?partNumber=\(partNumber)&uploadId=\(uploadId)") else { return } var request = URLRequest(url: url) // 注意是PUT提交 request.httpMethod = "PUT" // 设定HTTPHeader request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization") request.addValue(videoPartSize, forHTTPHeaderField: "Content-Length") // 设定background config,delegate let config = URLSessionConfiguration.background(withIdentifier: ("\(Date().timeIntervalSince1970)_\(partNumber)")) // 重要把 appDelegateComm对象作为代理对象 赋值给config let session = URLSession(configuration: config, delegate: appDelegateComm, delegateQueue: OperationQueue.main) let fileUrl = fileBaseUrl.appendingPathComponent(name) // 这里需要注意,必须用【session.uploadTask(with: request, fromFile: fileUrl)】 参数是fromFile,才能后台运行。 let uploadTask = session.uploadTask(with: request, fromFile: fileUrl) uploadTask.resume() } // 重要 AppDelegate实现URLSessionDataDelegate 和 URLSessionDelegate代理协议 class AppDelegate: UIResponder, UIApplicationDelegate, URLSessionDelegate, URLSessionDataDelegate { func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool { // Override point for customization after application launch. let manager = FileManager.default let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask) videoPartPath = urlForDocument[0] session.delegate = self // 这里面把 AppDelegate对象 赋值给 appDelegateComm appDelegateComm = self return true } // 重要 当uploadTask完成时,会调用下面的方法 func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { if let httpResponse = dataTask.response as? HTTPURLResponse { print(httpResponse) // 从response里面可以拿到返回值 let etag = httpResponse.allHeaderFields["Etag"] as? String print(etag) } } func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { upLpadProcess.removeValue(forKey: session.partInfoId) } }
返回结果: <NSHTTPURLResponse: 0x600002e69b20> { URL: https://s3.au-syd.cloud-object-storage.appdomain.cloud/xxx/test.mp4?partNumber=6&uploadId=0100017a-4891-6d23-60fa-6b0ba4869729 } { Status Code: 200, Headers { "Content-Length" = ( 0 ); Date = ( "Thu, 01 Jan 1970 00:00:00 GMT" ); Etag = ( "\"xxx1e935b62e52c97906011682\"" ); Server = ( Cleversafe ); "X-Clv-Request-Id" = ( "b5f4adaa-a7eb-4de7-9e16-42c6fe891a74" ); "X-Clv-S3-Version" = ( "2.5" ); "x-amz-request-id" = ( "b5f4adaa-a7eb-4de7-9e16-42c6fe891a74" ); } }
5.完成分块上传
向对象发出使用查询参数 uploadId
并且主体中包含相应 XML 块的 PUT
请求将完成分块上传。


// 编辑好Config的字符串 var configStr = "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part><Part><PartNumber>2</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part></CompleteMultipartUpload>" func completeUpload(bearerToken: String, bucketName: String, fileName: String, uploadId: String, configStr: String, completion: @escaping ([String: Any]) -> Void) { guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploadId=\(uploadId)") else { return } var request = URLRequest(url: url) request.httpMethod = "POST" request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization") request.addValue("text/plain; charset=utf-8", forHTTPHeaderField: "Content-Type") let urlconfig = URLSessionConfiguration.default urlconfig.timeoutIntervalForRequest = 60 urlconfig.timeoutIntervalForResource = 60 // 编辑好的configStr字符串。 let data: Data = configStr.data(using: .utf8)! request.httpBody = data let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main) session.dataTask(with: request) { (data, response, error) in var resultArray: [String: Any] = [:] if let httpResponse = response as? HTTPURLResponse { if httpResponse.statusCode == 200 { resultArray.updateValue("ok", forKey: "status") DispatchQueue.main.async { completion(resultArray) } } else { resultArray.updateValue("error", forKey: "status") DispatchQueue.main.async { completion(resultArray) } } } }.resume() }