Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # | |
| # Copyright (c) 2018-2022 Leonardo Taccari | |
| # All rights reserved. | |
| # | |
| # Redistribution and use in source and binary forms, with or without | |
| # modification, are permitted provided that the following conditions | |
| # are met: | |
| # | |
| # 1. Redistributions of source code must retain the above copyright | |
| # notice, this list of conditions and the following disclaimer. | |
| # 2. Redistributions in binary form must reproduce the above copyright | |
| # notice, this list of conditions and the following disclaimer in the | |
| # documentation and/or other materials provided with the distribution. | |
| # | |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED | |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR | |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS | |
| # BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
| # POSSIBILITY OF SUCH DAMAGE. | |
| # | |
| """ | |
| Download/upload files via wetransfer.com | |
| transferwee is a script/module to download/upload files via wetransfer.com. | |
| It exposes `download' and `upload' subcommands, respectively used to download | |
| files from a `we.tl' or `wetransfer.com/downloads' URLs and upload files that | |
| will be shared via emails or link. | |
| """ | |
| from typing import List, Optional | |
| import logging | |
| import os.path | |
| import re | |
| import urllib.parse | |
| import zlib | |
| import requests | |
| WETRANSFER_API_URL = 'https://wetransfer.com/api/v4/transfers' | |
| WETRANSFER_DOWNLOAD_URL = WETRANSFER_API_URL + '/{transfer_id}/download' | |
| WETRANSFER_UPLOAD_EMAIL_URL = WETRANSFER_API_URL + '/email' | |
| WETRANSFER_VERIFY_URL = WETRANSFER_API_URL + '/{transfer_id}/verify' | |
| WETRANSFER_UPLOAD_LINK_URL = WETRANSFER_API_URL + '/link' | |
| WETRANSFER_FILES_URL = WETRANSFER_API_URL + '/{transfer_id}/files' | |
| WETRANSFER_PART_PUT_URL = WETRANSFER_FILES_URL + '/{file_id}/part-put-url' | |
| WETRANSFER_FINALIZE_MPP_URL = WETRANSFER_FILES_URL + '/{file_id}/finalize-mpp' | |
| WETRANSFER_FINALIZE_URL = WETRANSFER_API_URL + '/{transfer_id}/finalize' | |
| WETRANSFER_DEFAULT_CHUNK_SIZE = 5242880 | |
| WETRANSFER_EXPIRE_IN = 604800 | |
| logger = logging.getLogger(__name__) | |
| def download_url(url: str) -> Optional[str]: | |
| """Given a wetransfer.com download URL download return the downloadable URL. | |
| The URL should be of the form `https://we.tl/' or | |
| `https://wetransfer.com/downloads/'. If it is a short URL (i.e. `we.tl') | |
| the redirect is followed in order to retrieve the corresponding | |
| `wetransfer.com/downloads/' URL. | |
| The following type of URLs are supported: | |
| - `https://we.tl/<short_url_id>`: | |
| received via link upload, via email to the sender and printed by | |
| `upload` action | |
| - `https://wetransfer.com/<transfer_id>/<security_hash>`: | |
| directly not shared in any ways but the short URLs actually redirect to | |
| them | |
| - `https://wetransfer.com/<transfer_id>/<recipient_id>/<security_hash>`: | |
| received via email by recipients when the files are shared via email | |
| upload | |
| Return the download URL (AKA `direct_link') as a str or None if the URL | |
| could not be parsed. | |
| """ | |
| logger.debug(f'Getting download URL of {url}') | |
| # Follow the redirect if we have a short URL | |
| if url.startswith('https://we.tl/'): | |
| r = requests.head(url, allow_redirects=True) | |
| logger.debug(f'Short URL {url} redirects to {r.url}') | |
| url = r.url | |
| recipient_id = None | |
| params = urllib.parse.urlparse(url).path.split('/')[2:] | |
| if len(params) == 2: | |
| transfer_id, security_hash = params | |
| elif len(params) == 3: | |
| transfer_id, recipient_id, security_hash = params | |
| else: | |
| return None | |
| logger.debug(f'Getting direct_link of {url}') | |
| j = { | |
| "intent": "entire_transfer", | |
| "security_hash": security_hash, | |
| } | |
| if recipient_id: | |
| j["recipient_id"] = recipient_id | |
| s = _prepare_session() | |
| if not s: | |
| raise ConnectionError('Could not prepare session') | |
| r = s.post(WETRANSFER_DOWNLOAD_URL.format(transfer_id=transfer_id), | |
| json=j) | |
| _close_session(s) | |
| j = r.json() | |
| return j.get('direct_link') | |
| def _file_unquote(file: str) -> str: | |
| """Given a URL encoded file unquote it. | |
| All occurences of `\', `/' and `../' will be ignored to avoid possible | |
| directory traversals. | |
| """ | |
| return urllib.parse.unquote(file).replace('../', '').replace('/', '').replace('\\', '') | |
| def download(url: str, file: str = '') -> None: | |
| """Given a `we.tl/' or `wetransfer.com/downloads/' download it. | |
| First a direct link is retrieved (via download_url()), the filename can be | |
| provided via the optional `file' argument. If not provided the filename | |
| will be extracted to it and it will be fetched and stored on the current | |
| working directory. | |
| """ | |
| logger.debug(f'Downloading {url}') | |
| dl_url = download_url(url) | |
| if not dl_url: | |
| logger.error(f'Could not find direct link of {url}') | |
| return None | |
| if not file: | |
| file = _file_unquote(urllib.parse.urlparse(dl_url).path.split('/')[-1]) | |
| logger.debug(f'Fetching {dl_url}') | |
| r = requests.get(dl_url, stream=True) | |
| with open(file, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=1024): | |
| f.write(chunk) | |
| def _file_name_and_size(file: str) -> dict: | |
| """Given a file, prepare the "name" and "size" dictionary. | |
| Return a dictionary with "name" and "size" keys. | |
| """ | |
| filename = os.path.basename(file) | |
| filesize = os.path.getsize(file) | |
| return { | |
| "name": filename, | |
| "size": filesize | |
| } | |
| def _prepare_session() -> Optional[requests.Session]: | |
| """Prepare a wetransfer.com session. | |
| Return a requests session that will always pass the required headers | |
| and with cookies properly populated that can be used for wetransfer | |
| requests. | |
| """ | |
| s = requests.Session() | |
| r = s.get('https://wetransfer.com/') | |
| m = re.search('name="csrf-token" content="([^"]+)"', r.text) | |
| if not m: | |
| logger.error(f'Could not find any csrf-token') | |
| return None | |
| s.headers.update({ | |
| 'x-csrf-token': m.group(1), | |
| 'x-requested-with': 'XMLHttpRequest', | |
| }) | |
| return s | |
| def _close_session(s: requests.Session) -> None: | |
| """Close a wetransfer.com session. | |
| Terminate wetransfer.com session. | |
| """ | |
| s.close() | |
| def _prepare_email_upload(filenames: List[str], display_name: str, message: str, | |
| sender: str, recipients: List[str], | |
| session: requests.Session) -> dict: | |
| """Given a list of filenames, message a sender and recipients prepare for | |
| the email upload. | |
| Return the parsed JSON response. | |
| """ | |
| j = { | |
| "files": [_file_name_and_size(f) for f in filenames], | |
| "from": sender, | |
| "display_name": display_name, | |
| "message": message, | |
| "recipients": recipients, | |
| "ui_language": "en", | |
| } | |
| r = session.post(WETRANSFER_UPLOAD_EMAIL_URL, json=j) | |
| return r.json() | |
| def _verify_email_upload(transfer_id: str, session: requests.Session) -> str: | |
| """Given a transfer_id, read the code from standard input. | |
| Return the parsed JSON response. | |
| """ | |
| code = input('Code:') | |
| j = { | |
| "code": code, | |
| "expire_in": WETRANSFER_EXPIRE_IN, | |
| } | |
| r = session.post(WETRANSFER_VERIFY_URL.format(transfer_id=transfer_id), | |
| json=j) | |
| return r.json() | |
| def _prepare_link_upload(filenames: List[str], display_name: str, message: str, | |
| session: requests.Session) -> dict: | |
| """Given a list of filenames and a message prepare for the link upload. | |
| Return the parsed JSON response. | |
| """ | |
| j = { | |
| "files": [_file_name_and_size(f) for f in filenames], | |
| "display_name": display_name, | |
| "message": message, | |
| "ui_language": "en", | |
| } | |
| r = session.post(WETRANSFER_UPLOAD_LINK_URL, json=j) | |
| return r.json() | |
| def _prepare_file_upload(transfer_id: str, file: str, | |
| session: requests.Session) -> dict: | |
| """Given a transfer_id and file prepare it for the upload. | |
| Return the parsed JSON response. | |
| """ | |
| j = _file_name_and_size(file) | |
| r = session.post(WETRANSFER_FILES_URL.format(transfer_id=transfer_id), | |
| json=j) | |
| return r.json() | |
| def _upload_chunks(transfer_id: str, file_id: str, file: str, | |
| session: requests.Session, | |
| default_chunk_size: int = WETRANSFER_DEFAULT_CHUNK_SIZE) -> str: | |
| """Given a transfer_id, file_id and file upload it. | |
| Return the parsed JSON response. | |
| """ | |
| with open(file, 'rb') as f: | |
| chunk_number = 0 | |
| while True: | |
| chunk = f.read(default_chunk_size) | |
| chunk_size = len(chunk) | |
| if chunk_size == 0: | |
| break | |
| chunk_number += 1 | |
| j = { | |
| "chunk_crc": zlib.crc32(chunk), | |
| "chunk_number": chunk_number, | |
| "chunk_size": chunk_size, | |
| "retries": 0 | |
| } | |
| r = session.post( | |
| WETRANSFER_PART_PUT_URL.format(transfer_id=transfer_id, | |
| file_id=file_id), | |
| json=j) | |
| url = r.json().get('url') | |
| requests.options(url, | |
| headers={ | |
| 'Origin': 'https://wetransfer.com', | |
| 'Access-Control-Request-Method': 'PUT', | |
| }) | |
| requests.put(url, data=chunk) | |
| j = { | |
| 'chunk_count': chunk_number | |
| } | |
| r = session.put( | |
| WETRANSFER_FINALIZE_MPP_URL.format(transfer_id=transfer_id, | |
| file_id=file_id), | |
| json=j) | |
| return r.json() | |
| def _finalize_upload(transfer_id: str, session: requests.Session) -> dict: | |
| """Given a transfer_id finalize the upload. | |
| Return the parsed JSON response. | |
| """ | |
| r = session.put(WETRANSFER_FINALIZE_URL.format(transfer_id=transfer_id)) | |
| return r.json() | |
| def upload(files: List[str], display_name: str = '', message: str = '', | |
| sender: Optional[str] = None, | |
| recipients: Optional[List[str]] = []) -> str: | |
| """Given a list of files upload them and return the corresponding URL. | |
| Also accepts optional parameters: | |
| - `display_name': name used as a title of the transfer | |
| - `message': message used as a description of the transfer | |
| - `sender': email address used to receive an ACK if the upload is | |
| successfull. For every download by the recipients an email | |
| will be also sent | |
| - `recipients': list of email addresses of recipients. When the upload | |
| succeed every recipients will receive an email with a link | |
| If both sender and recipient parameters are passed the email upload will be | |
| used. Otherwise, the link upload will be used. | |
| Return the short URL of the transfer on success. | |
| """ | |
| # Check that all files exists | |
| logger.debug(f'Checking that all files exists') | |
| for f in files: | |
| if not os.path.exists(f): | |
| raise FileNotFoundError(f) | |
| # Check that there are no duplicates filenames | |
| # (despite possible different dirname()) | |
| logger.debug(f'Checking for no duplicate filenames') | |
| filenames = [os.path.basename(f) for f in files] | |
| if len(files) != len(set(filenames)): | |
| raise FileExistsError('Duplicate filenames') | |
| logger.debug(f'Preparing to upload') | |
| transfer_id = None | |
| s = _prepare_session() | |
| if not s: | |
| raise ConnectionError('Could not prepare session') | |
| if sender and recipients: | |
| # email upload | |
| transfer_id = \ | |
| _prepare_email_upload(files, display_name, message, sender, recipients, s)['id'] | |
| _verify_email_upload(transfer_id, s) | |
| else: | |
| # link upload | |
| transfer_id = _prepare_link_upload(files, display_name, message, s)['id'] | |
| logger.debug(f'Get transfer id {transfer_id}') | |
| for f in files: | |
| logger.debug(f'Uploading {f} as part of transfer_id {transfer_id}') | |
| file_id = _prepare_file_upload(transfer_id, f, s)['id'] | |
| _upload_chunks(transfer_id, file_id, f, s) | |
| logger.debug(f'Finalizing upload with transfer id {transfer_id}') | |
| shortened_url = _finalize_upload(transfer_id, s)['shortened_url'] | |
| _close_session(s) | |
| return shortened_url | |
| if __name__ == '__main__': | |
| from sys import exit | |
| import argparse | |
| log = logging.getLogger(__name__) | |
| log.setLevel(logging.INFO) | |
| log.addHandler(logging.StreamHandler()) | |
| ap = argparse.ArgumentParser( | |
| prog='transferwee', | |
| description='Download/upload files via wetransfer.com' | |
| ) | |
| sp = ap.add_subparsers(dest='action', help='action', required=True) | |
| # download subcommand | |
| dp = sp.add_parser('download', help='download files') | |
| dp.add_argument('-g', action='store_true', | |
| help='only print the direct link (without downloading it)') | |
| dp.add_argument('-o', type=str, default='', metavar='file', | |
| help='output file to be used') | |
| dp.add_argument('-v', action='store_true', | |
| help='get verbose/debug logging') | |
| dp.add_argument('url', nargs='+', type=str, metavar='url', | |
| help='URL (we.tl/... or wetransfer.com/downloads/...)') | |
| # upload subcommand | |
| up = sp.add_parser('upload', help='upload files') | |
| up.add_argument('-n', type=str, default='', metavar='display_name', | |
| help='title for the transfer') | |
| up.add_argument('-m', type=str, default='', metavar='message', | |
| help='message description for the transfer') | |
| up.add_argument('-f', type=str, metavar='from', help='sender email') | |
| up.add_argument('-t', nargs='+', type=str, metavar='to', | |
| help='recipient emails') | |
| up.add_argument('-v', action='store_true', | |
| help='get verbose/debug logging') | |
| up.add_argument('files', nargs='+', type=str, metavar='file', | |
| help='files to upload') | |
| args = ap.parse_args() | |
| if args.v: | |
| log.setLevel(logging.DEBUG) | |
| if args.action == 'download': | |
| if args.g: | |
| for u in args.url: | |
| print(download_url(u)) | |
| else: | |
| for u in args.url: | |
| download(u, args.o) | |
| exit(0) | |
| if args.action == 'upload': | |
| print(upload(args.files, args.n, args.m, args.f, args.t)) | |
| exit(0) | |