From 4cba289250e3e5127ff3afeef767587953da77ab Mon Sep 17 00:00:00 2001 From: yangpeng <183851063@qq.com> Date: Mon, 28 Nov 2022 14:27:44 +0800 Subject: [PATCH] add resuamable upload and download --- samples/Resumable.php | 77 ++++++ src/OSS/Core/OssUtil.php | 31 +++ src/OSS/OssClient.php | 286 +++++++++++++++++++++ tests/OSS/Tests/OssClientResumableTest.php | 229 +++++++++++++++++ 4 files changed, 623 insertions(+) create mode 100644 samples/Resumable.php create mode 100644 tests/OSS/Tests/OssClientResumableTest.php diff --git a/samples/Resumable.php b/samples/Resumable.php new file mode 100644 index 0000000..a3ce6c7 --- /dev/null +++ b/samples/Resumable.php @@ -0,0 +1,77 @@ +resumableUpload($bucket, "a.file", __FILE__, array()); +Common::println("local file " . __FILE__ . " is uploaded to the bucket $bucket, a.file"); + + +// Dwonload file useing resumable dwonload +$ossClient->resumableDownload($bucket, "b.file", $options = array( + OssClient::OSS_FILE_DOWNLOAD => "./c.file", +)); +Common::println("b.file is fetched to the local file: c.file"); + + +//******************************* For complete usage, see the following functions **************************************************** + +resumableUpload($ossClient,$bucket); +resumableDownload($ossClient,$bucket); + +/** + * Upload files using resumable upload + * + * @param OssClient $ossClient OssClient instance + * @param string $bucket bucket name + * @return null + */ +function resumableUpload($ossClient, $bucket) +{ + $object = "test/multipart-test.txt"; + $file = __FILE__; + $options = array(); + + try { + $ossClient->resumableUpload($bucket, $object, $file, $options); + } catch (OssException $e) { + printf(__FUNCTION__ . ": FAILED\n"); + printf($e->getMessage() . "\n"); + return; + } + print(__FUNCTION__ . ": OK" . "\n"); +} + + +/** + * Download files using resumable download + * + * @param OssClient $ossClient OssClient instance + * @param string $bucket bucket name + * @return null + */ +function resumableDownload($ossClient, $bucket) +{ + $object = "test/multipart-test.txt"; + $options = array( + OssClient::OSS_FILE_DOWNLOAD => "./c.file", + ); + + try { + $ossClient->resumableDownload($bucket, $object, $options); + } catch (OssException $e) { + printf(__FUNCTION__ . ": FAILED\n"); + printf($e->getMessage() . "\n"); + return; + } + print(__FUNCTION__ . ": OK" . "\n"); +} \ No newline at end of file diff --git a/src/OSS/Core/OssUtil.php b/src/OSS/Core/OssUtil.php index 93e2f4f..2c1acac 100644 --- a/src/OSS/Core/OssUtil.php +++ b/src/OSS/Core/OssUtil.php @@ -531,4 +531,35 @@ public static function decodeKey($key, $encoding) throw new OssException("Unrecognized encoding type: " . $encoding); } } + + /** + * getUploadCpFilePath return the file path of the checkpoint + * @param string $filePath + * @param $bucketName + * @param $objectKey + * @return string + */ + public static function getCpFilePath($bucketName,$objectKey,$versionId=""){ + $dest = sprintf("oss://%s/%s", $bucketName, $objectKey); + $cpFileName = self::getCpFileName("", $dest, $versionId); + return sys_get_temp_dir(). DIRECTORY_SEPARATOR . $cpFileName; + } + + /** + * getCpFileName return the name of the checkpoint file + * @param string $src + * @param string $dest + * @param string $versionId + * @return string + */ + public static function getCpFileName($src,$dest,$versionId){ + $srcCheckSum = md5($src); + $destCheckSum = md5($dest); + if ($versionId == ''){ + return sprintf("%s-%s.cp", $srcCheckSum, $destCheckSum); + } + $versionCheckSum = md5($versionId); + return sprintf("%s-%s-%s.cp", $srcCheckSum, $destCheckSum,$versionCheckSum); + } + } diff --git a/src/OSS/OssClient.php b/src/OSS/OssClient.php index c953344..8b73f16 100644 --- a/src/OSS/OssClient.php +++ b/src/OSS/OssClient.php @@ -2718,6 +2718,292 @@ public function multiuploadFile($bucket, $object, $file, $options = null) return $this->completeMultipartUpload($bucket, $object, $uploadId, $uploadParts, $cmp_options); } + + /** + * A higher level API for uploading a file with multipart upload and resumeable upload. It consists of initialization, parts upload and completion. + * + * @param string $bucket bucket name + * @param string $object object name + * @param string $file The local file to upload + * @param array $options Key-Value array + * @return null + * @throws OssException + */ + public function resumableUpload($bucket, $object, $file, $options = null) + { + $cpFilePath = OssUtil::getCpFilePath($bucket,$object); + $resumable = false; + if (file_exists($cpFilePath)){ + $content = file_get_contents($cpFilePath); + $uploadInfo = json_decode($content,true); + $uploadId = $uploadInfo['uploadId']; + $parts = $uploadInfo['parts']; + $object = $uploadInfo['object']; + $partSize = $uploadInfo['partSize']; + $totalSize = $uploadInfo['totalSize']; + if ($this->isValidUpload($file,$uploadInfo)){ + $resumable = true; + } + } + $this->precheckCommon($bucket, $object, $options); + if (isset($options[self::OSS_LENGTH])) { + $options[self::OSS_CONTENT_LENGTH] = $options[self::OSS_LENGTH]; + unset($options[self::OSS_LENGTH]); + } + if (empty($file)) { + throw new OssException("parameter invalid, file is empty"); + } + $uploadFile = OssUtil::encodePath($file); + if (!isset($options[self::OSS_CONTENT_TYPE])) { + $options[self::OSS_CONTENT_TYPE] = $this->getMimeType($object, $uploadFile); + } + $position = isset($options[self::OSS_SEEK_TO]) ? (integer)$options[self::OSS_SEEK_TO] : 0; + + if ($resumable){ + $num = count($parts); + $upload_position = $position + $options[self::OSS_PART_SIZE]*$num; + $upload_file_size = $totalSize -= $upload_position; + }else{ + $upload_position = $position; + if (isset($options[self::OSS_CONTENT_LENGTH])) { + $upload_file_size = (integer)$options[self::OSS_CONTENT_LENGTH]; + } else { + $upload_file_size = sprintf('%u',filesize($uploadFile)); + if ($upload_file_size !== false) { + $upload_file_size -= $upload_position; + } + } + } + if ($upload_position === false || !isset($upload_file_size) || $upload_file_size === false || $upload_file_size < 0) { + throw new OssException('The size of `fileUpload` cannot be determined in ' . __FUNCTION__ . '().'); + } + if ($resumable){ + $options[self::OSS_PART_SIZE] = $partSize; + }else{ + // Computes the part size and assign it to options. + if (isset($options[self::OSS_PART_SIZE])) { + $options[self::OSS_PART_SIZE] = $this->computePartSize($options[self::OSS_PART_SIZE]); + } else { + $options[self::OSS_PART_SIZE] = self::OSS_MID_PART_SIZE; + } + } + $is_check_md5 = $this->isCheckMD5($options); + // if the file size is less than part size, use simple file upload. + if ($upload_file_size < $options[self::OSS_PART_SIZE] && !isset($options[self::OSS_UPLOAD_ID])) { + return $this->uploadFile($bucket, $object, $uploadFile, $options); + } + if (!$resumable){ + // Using multipart upload, initialize if no OSS_UPLOAD_ID is specified in options. + if (isset($options[self::OSS_UPLOAD_ID])) { + $uploadId = $options[self::OSS_UPLOAD_ID]; + } else { + // initialize + $uploadId = $this->initiateMultipartUpload($bucket, $object, $options); + } + } + $upload_position = isset($options[self::OSS_SEEK_TO]) ? (integer)$options[self::OSS_SEEK_TO] : 0; + if (!$resumable){ + $uploadInfo = array( + 'file'=>$uploadFile, + 'fileStat'=>array( + 'size'=>$upload_file_size, + 'lastModified'=>filemtime($uploadFile), + ), + 'uploadId' =>$uploadId, + 'object'=>$object, + 'partSize'=>$options[self::OSS_PART_SIZE], + 'totalSize'=>$upload_file_size, + ); + } + // generates the parts information and upload them one by one + $pieces = $this->generateMultiuploadParts($upload_file_size, (integer)$options[self::OSS_PART_SIZE]); + if($resumable){ + $num = count($parts); + $todo_pieces = array_slice($pieces,$num,-1,true); + }else{ + $todo_pieces = $pieces; + } + $response_upload_part = array(); + foreach ($todo_pieces as $i => $piece) { + $from_pos = $upload_position + (integer)$piece[self::OSS_SEEK_TO]; + $to_pos = (integer)$piece[self::OSS_LENGTH] + $from_pos - 1; + $up_options = array( + self::OSS_FILE_UPLOAD => $uploadFile, + self::OSS_PART_NUM => ($i + 1), + self::OSS_SEEK_TO => $from_pos, + self::OSS_LENGTH => $to_pos - $from_pos + 1, + self::OSS_CHECK_MD5 => $is_check_md5, + ); + if ($is_check_md5) { + $content_md5 = OssUtil::getMd5SumForFile($uploadFile, $from_pos, $to_pos); + $up_options[self::OSS_CONTENT_MD5] = $content_md5; + } + $response_upload_part[] = $this->uploadPart($bucket, $object, $uploadId, $up_options); + if ($resumable){ + $uploadInfo['parts'] = array_merge($parts,$response_upload_part); + }else{ + $uploadInfo['parts'] = $response_upload_part; + } + file_put_contents($cpFilePath,json_encode($uploadInfo)); + if (isset($options['uploadPartHooker']) && $i+1 == $options['uploadPartHooker']){ + throw new OssException('ErrorHooker in ' . __FUNCTION__ . '().'); + } + } + $uploadParts = array(); + foreach ($uploadInfo['parts'] as $i => $etag) { + $uploadParts[] = array( + 'PartNumber' => ($i + 1), + 'ETag' => $etag, + ); + } + //build complete options + $cmp_options = null; + if (isset($options[self::OSS_HEADERS]) && isset($options[self::OSS_HEADERS][self::OSS_REQUEST_PAYER])) { + $cmp_options = array( + OssClient::OSS_HEADERS => array( + OssClient::OSS_REQUEST_PAYER => $options[self::OSS_HEADERS][self::OSS_REQUEST_PAYER], + )); + } + $result = $this->completeMultipartUpload($bucket, $object, $uploadId, $uploadParts, $cmp_options); + unlink($cpFilePath); + return $result; + } + + /** + * A higher level API for download a file with get object. + * + * @param string $bucket bucket name + * @param string $object object name + * @param array $options Key-Value array + * @return null + * @throws OssException + */ + public function resumableDownload($bucket, $object, $options = null) + { + if (!isset($options[self::OSS_FILE_DOWNLOAD])){ + throw new OssException('options file download can not be empty! '); + } + if (isset($options[self::OSS_VERSION_ID])){ + $cpFilePath = OssUtil::getCpFilePath($bucket,$object,$options[self::OSS_VERSION_ID]); + }else{ + $cpFilePath = OssUtil::getCpFilePath($bucket,$object); + } + $resumable = false; + $objectMeta = $this->getObjectMeta($bucket, $object, $options); + $size = $objectMeta['content-length']; + if (file_exists($cpFilePath)){ + $content = file_get_contents($cpFilePath); + $downloadInfo = json_decode($content,true); + $num = $downloadInfo['parts']; + $pieces = $downloadInfo['pieces']; + $object = $downloadInfo['object']; + $partSize = $downloadInfo['partSize']; + if ($this->isValidDownload($objectMeta,$downloadInfo)){ + $resumable = true; + }else{ + if (file_exists($options[self::OSS_FILE_DOWNLOAD])){ + unlink($options[self::OSS_FILE_DOWNLOAD]); + } + } + } + if ($resumable){ + $options[self::OSS_PART_SIZE] = $partSize; + }else{ + // Computes the part size and assign it to options. + if (isset($options[self::OSS_PART_SIZE])) { + $options[self::OSS_PART_SIZE] = $this->computePartSize($options[self::OSS_PART_SIZE]); + } else { + $options[self::OSS_PART_SIZE] = self::OSS_MID_PART_SIZE; + } + } + // if the file size is less than part size, use simple file download. + if ($size < $options[self::OSS_PART_SIZE]) { + return $this->getObject($bucket, $object, $options); + } + if (!$resumable){ + // Computes the part size and assign it to options. + if (isset($options[self::OSS_PART_SIZE])) { + $options[self::OSS_PART_SIZE] = $this->computePartSize($options[self::OSS_PART_SIZE]); + } else { + $options[self::OSS_PART_SIZE] = self::OSS_MID_PART_SIZE; + } + $todo_pieces = $pieces = $this->generateMultiuploadParts($size, $options[self::OSS_PART_SIZE]); + $downloadPosition = 0; + $downloadArray = array( + "object" => $object, + "partSize" => $options[self::OSS_PART_SIZE], + 'fileSize' => $size, + "pieces" => $pieces, + 'fileStat'=>array( + 'size'=>$size, + 'lastModified'=>$objectMeta['last-modified'], + ), + ); + }else{ + $todo_pieces = array_slice($pieces,$num,-1,true); + } + foreach ($todo_pieces as $i => $piece) { + $fromPos = $downloadPosition + (integer)$piece[self::OSS_SEEK_TO]; + $toPos = (integer)$piece[self::OSS_LENGTH] + $fromPos - 1; + if (isset($options[self::OSS_VERSION_ID])){ + $downOptions = array( + OssClient::OSS_RANGE => $fromPos.'-'.$toPos, + OssClient::OSS_VERSION_ID => $options[self::OSS_VERSION_ID], + ); + }else{ + $downOptions = array( + OssClient::OSS_RANGE => $fromPos.'-'.$toPos, + ); + } + $downloadArray['parts'] = $i+1; + $content = $this->getObject($bucket,$object,$downOptions); + $fh = fopen($options[self::OSS_FILE_DOWNLOAD], 'a'); + if ($fh === false){ + throw new OssException('open '+$options[self::OSS_FILE_DOWNLOAD]+' failed!'); + } + fseek($fh, $fromPos,SEEK_SET); + if (fwrite($fh, $content) === false){ + throw new OssException('write '+$options[self::OSS_FILE_DOWNLOAD]+' failed!'); + } + fclose($fh); + file_put_contents($cpFilePath, json_encode($downloadArray)); + if ($i+1 == count($pieces)){ + unlink($cpFilePath); + } + if (isset($options['uploadPartHooker']) && $i+1 == $options['uploadPartHooker']){ + throw new OssException('ErrorHooker in ' . __FUNCTION__ . '().'); + } + } + } + + + /** + * Valid this upload file + * @param string $file + * @param array $uploadInfo + * @return bool + */ + private function isValidUpload($file,$uploadInfo){ + $fileInfo = $uploadInfo['fileStat']; + if ($fileInfo['size'] != sprintf('%u',filesize($file)) || filemtime($file) != $fileInfo['lastModified']){ + return false; + } + return true; + } + + /** + * Valid this download file + * @param array $meta + * @param array $downloadInfo + * @return bool + */ + private function isValidDownload($meta,$downloadInfo){ + if ($downloadInfo['size'] != $meta['content-length'] || $downloadInfo['lastModified'] != $meta['last-modified']){ + return false; + } + return true; + } + /** * Uploads the local directory to the specified bucket into specified folder (prefix) * diff --git a/tests/OSS/Tests/OssClientResumableTest.php b/tests/OSS/Tests/OssClientResumableTest.php new file mode 100644 index 0000000..784e0e2 --- /dev/null +++ b/tests/OSS/Tests/OssClientResumableTest.php @@ -0,0 +1,229 @@ +ossClient->resumableUpload($this->bucket, $object, $bigFileName, array(OssClient::OSS_PART_SIZE => 1)); + $options = array(OssClient::OSS_FILE_DOWNLOAD => $localFilename); + $this->ossClient->getObject($this->bucket, $object, $options); + $this->assertEquals(md5_file($bigFileName), md5_file($localFilename)); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertFalse(true); + } + unlink($bigFileName); + unlink($localFilename); + } + + + public function testResumableUpload() + { + $bigFileName = __DIR__ . DIRECTORY_SEPARATOR . "/bigfile.tmp"; + $localFilename = __DIR__ . DIRECTORY_SEPARATOR . "/localfile.tmp"; + OssUtil::generateFile($bigFileName, 6 * 1024 * 1024); + $object = 'mpu/multipart-bigfile-test.tmp'; + try { + $options = array( + OssClient::OSS_PART_SIZE => 1024*1024, + OssClient::OSS_CHECK_MD5 => true, + "uploadPartHooker"=>3 + ); + $this->ossClient->resumableUpload($this->bucket, $object, $bigFileName, $options); + + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options = array( + OssClient::OSS_PART_SIZE => 1024*1024, + OssClient::OSS_CHECK_MD5 => true, + "uploadPartHooker"=>5 + ); + $this->ossClient->resumableUpload($this->bucket, $object, $bigFileName, $options); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options = array( + OssClient::OSS_PART_SIZE => 1024*1024, + OssClient::OSS_CHECK_MD5 => true, + ); + $this->ossClient->resumableUpload($this->bucket, $object, $bigFileName, $options); + $options = array(OssClient::OSS_FILE_DOWNLOAD => $localFilename); + $this->ossClient->getObject($this->bucket, $object, $options); + $this->assertEquals(md5_file($bigFileName), md5_file($localFilename)); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertFalse(true); + } + unlink($bigFileName); + unlink($localFilename); + } + + + public function testResumableDownload() + { + $bigFileName = __DIR__ . DIRECTORY_SEPARATOR . "/bigfile.tmp"; + $localfile = __DIR__ . DIRECTORY_SEPARATOR . "/localfile.tmp"; + OssUtil::generateFile($bigFileName, 6 * 1024 * 1024); + $object = 'mpu/multipart-bigfile-test.tmp'; + try { + $this->ossClient->resumableUpload($this->bucket, $object, $bigFileName); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(false); + } + + try { + $options = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile, + OssClient::OSS_PART_SIZE => 1024*1024, + "uploadPartHooker"=>3 + ); + $this->ossClient->resumableDownload($this->bucket, $object, $options); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile, + OssClient::OSS_PART_SIZE => 1024*1024, + "uploadPartHooker"=>5 + ); + $this->ossClient->resumableDownload($this->bucket, $object, $options); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile, + OssClient::OSS_PART_SIZE => 1024*1024, + ); + $this->ossClient->resumableDownload($this->bucket, $object, $options); + $this->assertEquals(md5_file($bigFileName), md5_file($localfile)); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertFalse(true); + } + + unlink($bigFileName); + unlink($localfile); + } + + + public function testResumableDownloadWithVersionId() + { + $bigFileName = __DIR__ . DIRECTORY_SEPARATOR . "/bigfile.tmp"; + $localfile = __DIR__ . DIRECTORY_SEPARATOR . "/localfile.tmp"; + $localfile2 = __DIR__ . DIRECTORY_SEPARATOR . "/localfile2.tmp"; + OssUtil::generateFile($bigFileName, 6 * 1024 * 1024); + $object = 'mpu/multipart-bigfile-test.tmp'; + $this->ossClient->putBucketVersioning($this->bucket, "Enabled"); + try { + $result = $this->ossClient->putObject($this->bucket, $object, $bigFileName); + $versionId = $result[OssClient::OSS_HEADER_VERSION_ID]; + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(false); + } + + try { + $options = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile2, + OssClient::OSS_VERSION_ID=>$versionId, + ); + $this->ossClient->getObject($this->bucket, $object, $options); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options2 = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile, + OssClient::OSS_PART_SIZE => 1024*1024, + OssClient::OSS_VERSION_ID=>$versionId, + "uploadPartHooker"=>3 + ); + $this->ossClient->resumableDownload($this->bucket, $object, $options); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertTrue(true); + } + + try { + $options2 = array( + OssClient::OSS_FILE_DOWNLOAD => $localfile, + OssClient::OSS_PART_SIZE => 1024*1024, + OssClient::OSS_VERSION_ID=>$versionId, + ); + $this->ossClient->resumableDownload($this->bucket, $object, $options2); + $this->assertEquals(md5_file($localfile2), md5_file($localfile)); + } catch (OssException $e) { + var_dump($e->getMessage()); + $this->assertFalse(true); + } + + + + unlink($bigFileName); + unlink($localfile); + unlink($localfile2); + } + + protected function tearDown(): void + { + if (!$this->ossClient->doesBucketExist($this->bucket)) { + return; + } + + $this->ossClient->putBucketVersioning($this->bucket, "Suspended"); + + $result = $this->ossClient->listObjectVersions( + $this->bucket, array('max-keys' => 1000, 'delimiter' => '')); + + $versions = $result->getObjectVersionList(); + $deleteMarkers = $result->getDeleteMarkerList(); + + foreach ($versions as $obj) { + $options = array( + OssClient::OSS_VERSION_ID => $obj->getVersionId(), + ); + $this->ossClient->deleteObject($this->bucket, $obj->getKey(), $options); + } + + foreach ($deleteMarkers as $del) { + $options = array( + OssClient::OSS_VERSION_ID => $del->getVersionId(), + ); + $this->ossClient->deleteObject($this->bucket, $del->getKey(), $options); + } + + parent::tearDown(); + } + +}