from io import BytesIO from ufile import filemanager, config, httprequest from const import PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX def _download_file_save_to_bio(url, header): try: response = httprequest.requests.get(url, headers=header, stream=True) except httprequest.RequestException as e: return None, httprequest.ResponseInfo(None, e) if response.status_code in [200, 206]: bio = BytesIO() for block in response.iter_content(config.BLOCKSIZE): bio.write(block) else: return return bio class BioFileManager(filemanager.FileManager): def download_bytes_file(self, bucket, key, isprivate=True, expires=None, content_range=None, header=None): """ :param bucket: string类型, UFile空间名称 :param key: string类型, 下载文件在空间中的名称 :param isprivate: boolean类型,如果为私有空间则为True :param expires: integer类型,私有文件链接有效时间 :param content_range: tuple类型,元素为两个整型 :param header: dict类型,http 请求header,键值对类型分别为string,比如{'User-Agent': 'Google Chrome'} :return: ret: 如果http状态码为[200, 204, 206]之一则返回None,否则如果服务器返回json信息则返回dict类型,键值对类型分别为string, unicode string类型,否则返回空的dict :return: ResponseInfo: 响应的具体信息,UCloud UFile 服务器返回信息或者网络链接异常 """ if header is None: header = dict() else: filemanager._check_dict(header) if expires is None: expires = config.get_default('expires') if 'User-Agent' not in header: header['User-Agent'] = config.get_default('user_agent') if isinstance(content_range, tuple) and len(content_range) == 2: header['Range'] = 'bytes=' + '-'.join(map(lambda x: str(x), content_range)) if not isprivate: url = self.public_download_url(bucket, key) else: url = self.private_download_url(bucket, key, expires, header, True) return _download_file_save_to_bio(url, header) client = BioFileManager(PUBLIC_KEY, PRIVATE_KEY, UPLOAD_SUFFIX, DOWNLOAD_SUFFIX)