IBM CloudiOS开发

IBM Storage 存储大对象

IBM Storage 存储大对象分块上传,原理比较简单,就是在客户端把需要上传的大文件分割成块,然后并行上传到IBM Storage中,当所有的大文件都上传成功以后,客户端给IBM Storage发出上传完成的信号,IBM Storage会完成块文件的合并,并显示在IBM Storage中。

官方文档:https://cloud.ibm.com/docs/cloud-object-storage/basics?topic=cloud-object-storage-large-objects

IBM Storage支持大对象分块上传。这样的有很多好处:

  • 文件分块之间可以按任意顺序独立地、并行上传,加快上传的速度。
  • 已完成的分块,会暂时储存在IBM Storage中,这样如果上传过程中出错,只需要续传未完成的文件块就可以了。
  • 网络中断不会导致大型上传操作失败,在一段时间内上传可以暂停并重新启动。

PS:分块上传仅可用于大于 5 MB 的对象。对于小于 50 GB 的对象,建议的分块大小为 20 MB 到 100 MB,以实现最佳性能。对于更大的对象,可以增大分块大小,而不会对性能产生重大影响。分块上传限制为分块数不超过 10,000 个,每个分块 5 GB,最大对象大小为 10 TB。


关于分块上传,IBM Cloud提供了 REST API 和 SDK(Nodejs, Python, Java, Go)等等。本文的以IOS开发中,Swift用 REST API做为例子。

分块上传对象分为五个阶段:
1. 生成Bearer Token

要想使用IBM Storage的API, 首先要取得Bearer Token,首先要利用Rest Api创建一个。

另外,apikey需要预先在IBM Storage服务凭证中获得。

生成Bearer Token的Swift方法如下:

class func createBearerToken(completion: @escaping ([String: Any]) -> Void) {

        guard let url = URL(string: "https://iam.cloud.ibm.com/identity/token") else { return }

        var request = URLRequest(url: url)
        // Http POST提出。
        request.httpMethod = "POST"

         // Http Header设置下面两个属性。
        request.addValue("application/json", forHTTPHeaderField: "Accept")
        request.addValue("application/x-www-form-urlencoded", forHTTPHeaderField: "Content-Type")
        
        // storageApiKey需要从Storage服务凭证中获得
        // 编辑参数字符串,做为body的一部分
        let data: Data = "apikey=\(storageApiKey)&response_type=cloud_iam&grant_type=urn:ibm:params:oauth:grant-type:apikey".data(using: .utf8)!
        request.httpBody = data

        let urlconfig = URLSessionConfiguration.default
        let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)

        session.dataTask(with: request) { (data, response, error) in

            var resultArray: [String: Any] = [:]
            if let response = response {
                print(response)
            }

            guard let data = data else { return }
            if let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .allowFragments) {
                if let dicObject = jsonObject as? NSDictionary {
                    if dicObject["errorCode"] != nil {
                        resultArray.updateValue(dicObject["errorDetails"]!, forKey: "message")
                    } else {
                        resultArray.updateValue(dicObject["access_token"]!, forKey: "access_token")
                    }
                    DispatchQueue.main.async {
                        completion(resultArray)
                    }
                }
            }
        }.resume()
    }
返回结果:
{
    "access_token" = "eyJraWQiOiIyMDIxMDYxOTE4xxxxx";
    expiration = 1624711576;
    "expires_in" = 3600;
    "refresh_token" = "not_supported";
    scope = "ibm openid";
    "token_type" = Bearer;
}
2. 通过Rest Api 创建UploadId

每个上传文件会对应一个UploadId,这个UploadId通过Rest APi向IBM Storage请求。

向IBM Storage发出使用查询参数 upload 的 POST 请求会创建新的 UploadId 值,然后要上传的对象的每个分块都会引用此值。

   // Swift中利用Rest Api创建UploadId
   // 参数: bearerToken:步骤1取得的bearerToken   
   //       bucketName:IBM Storage中的存储桶的名字
   //       fileName:上传的文件名字。
   func createUploadId(bearerToken: String, bucketName: String, fileName: String, completion: @escaping ([String: Any]) -> Void) {
        // 编辑url
        guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploads") else { return }
        var request = URLRequest(url: url)

        // POST方式提交
        request.httpMethod = "POST"
        // Header设置 bearerToken
        request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")

        let urlconfig = URLSessionConfiguration.default
        urlconfig.timeoutIntervalForRequest = 60
        urlconfig.timeoutIntervalForResource = 60

        let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)

        session.dataTask(with: request) { (data, response, error) in
            var resultArray: [String: Any] = [:]
            if let response = response {
                print(response)
            }
            if let data = data, let dataString = String(data: data, encoding: .utf8) {
                // 注意 返回结果是XML格式的,这里面按字符串输出,如果要取得各个节点的值,则需要解析XML。
                print("Response data string:\n \(dataString)")
            }
        }.resume()
    }
返回结果:
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?><InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Bucket>xxxx</Bucket><Key>test555.mp4</Key><UploadId>xxxx-485f-xxxx-bb3c-xxxxx</UploadId></InitiateMultipartUploadResult>
3. 在客户端预先分割需要上传的文件。

我们可以在上传前预先分割大文件,把各个块文件块物理存储在设备上。当然我们也可以通过指针偏移读取文件数据,把读取的值放入内存中,下面的代码是真实分割文件块的例子。

  // 文件块的大小20MB
  var filePartSize: Int = 20971520

  // 取得文件块路径,放到document文件夹下面
  let manager = FileManager.default
  let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask)
  let videoPartPath = urlForDocument[0]

  // 参数  videoName:文件名   videoPath:文件路径  videoSize:文件的Size可以通过下面getFileSize方法取得  uploadId:步骤2取得的uploadId
  computeFilePartSize(videoName: String, videoPath: String, videoSize: String, uploadId: String) {
        if Int(videoSize)! <= filePartSize {
            
            let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "0", videoPartSize: videoSize)
            createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
            
        } else {
            let loop = Int(videoSize)! / filePartSize
            for i in 0..<loop {
                
                let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(i * filePartSize)", videoPartSize: "\(filePartSize)")
                createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
            }

            let lastSize = Int(videoSize)! % filePartSize
            if lastSize > 0 {
                
                let filePartData = readFileByPart(videoName: videoName, videoPath: videoPath, videoBeginOffset: "\(loop * filePartSize)", videoPartSize: "\(lastSize)")
                createFile(name: videoName, fileBaseUrl:videoPartPath!, data: filePartData!)
            }
        }
    }

  // 取得文件Size的方法。
  func getFileSize(videoName: String, videoPath: String) -> String {
        let manager = FileManager.default
        let attributes = try? manager.attributesOfItem(atPath: videoPath)
        return "\(attributes![FileAttributeKey.size]!)"
    }

   func readFileByPart(videoName: String, videoPath: String, videoBeginOffset: String, videoPartSize: String) -> Data? {
        let readHandle = FileHandle.init(forReadingAtPath: videoPath)
        // 通过指针的偏移,局部读取文件数据。
        readHandle?.seek(toFileOffset: UInt64(videoBeginOffset)!)
        let data = readHandle?.readData(ofLength: Int(videoPartSize)!)
        return data!
    }
4.上传分块文件

向对象发出使用查询参数 partNumber 和 uploadId 的 PUT 请求将上传对象的一个分块。这些分块可以按序列上传,也可以并行上传,但必须按顺序编号。上传成功以后,Http Response会返回Etag。

 var appDelegateComm: AppDelegate?

// 下面的例子是Swift中,后台上传文件的例子,需要用到urlSession uploadTask。
 // 参数:  bearerToken:步骤1取得到的bearerToken
 //        bucketName:存储桶的名字
 //        fileName:上传的文件名
 //        partNumber:块文件的编号
 //        uploadId:步骤2取得的uploadId
 //        videoPartSize:文件块的Size
 //        fileBaseUrl:块文件的Url地址
  func upLoadFileBackground(bearerToken: String, bucketName: String, fileName: String, partNumber: String, uploadId: String, videoPartSize: String, name: String, fileBaseUrl: URL) {
        // 编辑url
        guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?partNumber=\(partNumber)&uploadId=\(uploadId)") else { return }

        var request = URLRequest(url: url)
        // 注意是PUT提交
        request.httpMethod = "PUT"
        
        // 设定HTTPHeader
        request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
        request.addValue(videoPartSize, forHTTPHeaderField: "Content-Length")

        // 设定background config,delegate
        let config = URLSessionConfiguration.background(withIdentifier: ("\(Date().timeIntervalSince1970)_\(partNumber)"))
        // 重要把 appDelegateComm对象作为代理对象 赋值给config
        let session = URLSession(configuration: config, delegate: appDelegateComm, delegateQueue: OperationQueue.main)
        
        let fileUrl = fileBaseUrl.appendingPathComponent(name)

        // 这里需要注意,必须用【session.uploadTask(with: request, fromFile: fileUrl)】  参数是fromFile,才能后台运行。
        let uploadTask = session.uploadTask(with: request, fromFile: fileUrl)
        uploadTask.resume()

    }

// 重要 AppDelegate实现URLSessionDataDelegate 和 URLSessionDelegate代理协议
class AppDelegate: UIResponder, UIApplicationDelegate, URLSessionDelegate, URLSessionDataDelegate {

    func application(_ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
        // Override point for customization after application launch.

        let manager = FileManager.default
        let urlForDocument = manager.urls(for: .documentDirectory, in: .userDomainMask)
        videoPartPath = urlForDocument[0]
    
        session.delegate = self
        // 这里面把 AppDelegate对象 赋值给 appDelegateComm
        appDelegateComm = self

        return true
    }

    // 重要 当uploadTask完成时,会调用下面的方法
    func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
        if let httpResponse = dataTask.response as? HTTPURLResponse {
            print(httpResponse)
            // 从response里面可以拿到返回值
            let etag = httpResponse.allHeaderFields["Etag"] as? String
            print(etag)
       }
    }

    func urlSession(_ session: URLSession, task: URLSessionTask,
        didCompleteWithError error: Error?) {
        upLpadProcess.removeValue(forKey: session.partInfoId)
    }
}
返回结果:
<NSHTTPURLResponse: 0x600002e69b20> { URL: https://s3.au-syd.cloud-object-storage.appdomain.cloud/xxx/test.mp4?partNumber=6&uploadId=0100017a-4891-6d23-60fa-6b0ba4869729 } { Status Code: 200, Headers {
    "Content-Length" =     (
        0
    );
    Date =     (
        "Thu, 01 Jan 1970 00:00:00 GMT"
    );
    Etag =     (
        "\"xxx1e935b62e52c97906011682\""
    );
    Server =     (
        Cleversafe
    );
    "X-Clv-Request-Id" =     (
        "b5f4adaa-a7eb-4de7-9e16-42c6fe891a74"
    );
    "X-Clv-S3-Version" =     (
        "2.5"
    );
    "x-amz-request-id" =     (
        "b5f4adaa-a7eb-4de7-9e16-42c6fe891a74"
    );
} }
5.完成分块上传

向对象发出使用查询参数 uploadId 并且主体中包含相应 XML 块的 PUT 请求将完成分块上传。

 // 编辑好Config的字符串
var configStr = "<CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part><Part><PartNumber>2</PartNumber><ETag>"7417ca8d45a71b692168f0419c17fe2f"</ETag></Part></CompleteMultipartUpload>"

func completeUpload(bearerToken: String, bucketName: String, fileName: String, uploadId: String, configStr: String, completion: @escaping ([String: Any]) -> Void) {
        guard let url = URL(string: "https://s3.au-syd.cloud-object-storage.appdomain.cloud/\(bucketName)/\(fileName)?uploadId=\(uploadId)") else { return }

        var request = URLRequest(url: url)
        request.httpMethod = "POST"
        request.addValue("bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
        request.addValue("text/plain; charset=utf-8", forHTTPHeaderField: "Content-Type")

        let urlconfig = URLSessionConfiguration.default
        urlconfig.timeoutIntervalForRequest = 60
        urlconfig.timeoutIntervalForResource = 60

        // 编辑好的configStr字符串。
        let data: Data = configStr.data(using: .utf8)!
        request.httpBody = data

        let session = URLSession(configuration: urlconfig, delegate: nil, delegateQueue: OperationQueue.main)

        session.dataTask(with: request) { (data, response, error) in

            var resultArray: [String: Any] = [:]

            if let httpResponse = response as? HTTPURLResponse {

                if httpResponse.statusCode == 200 {
                    resultArray.updateValue("ok", forKey: "status")
                    DispatchQueue.main.async {
                        completion(resultArray)
                    }
                } else {
                    resultArray.updateValue("error", forKey: "status")
                    DispatchQueue.main.async {
                        completion(resultArray)
                    }
                }
            }
        }.resume()
    }