Spaces:
Runtime error
Runtime error
| from flask import Flask, request, jsonify, send_file | |
| import requests | |
| import os | |
| import uuid | |
| from pathlib import Path | |
| import threading | |
| import time | |
| from pydub import AudioSegment | |
| def audio_overlay(audio1, audio2, save_filename): | |
| audio_raw1 = AudioSegment.from_file(audio1) | |
| audio_raw2 = AudioSegment.from_file(audio2) | |
| min_length = min(len(audio_raw1), len(audio_raw2)) | |
| audio_raw1 = audio_raw1[:min_length] | |
| audio_raw2 = audio_raw2[:min_length] | |
| overlayed_audio = audio_raw1.overlay(audio_raw2) | |
| save_path = os.path.join("/tmp", save_filename) | |
| overlayed_audio.export(save_path, format="mp3") | |
| # 任务队列和状态存储 | |
| tasks = {} | |
| running_threads = 0 | |
| condition = threading.Condition() | |
| def process_audio_overlay(task_id, audio_url1, audio_url2): | |
| global running_threads | |
| with condition: | |
| while running_threads >= 1: | |
| tasks[task_id] = {"status": "queue"} | |
| condition.wait() | |
| running_threads += 1 | |
| tasks[task_id] = {"status": "processing"} | |
| try: | |
| # 下载音频文件到本地 | |
| response1 = requests.get(audio_url1) | |
| if response1.status_code != 200: | |
| raise Exception("无效的 URL") | |
| audio_filename1 = audio_url1.split('/')[-1] | |
| with open("/tmp/" + audio_filename1, 'wb') as f: | |
| f.write(response1.content) | |
| response2 = requests.get(audio_url2) | |
| if response2.status_code != 200: | |
| raise Exception("无效的 URL") | |
| audio_filename2 = audio_url2.split('/')[-1] | |
| with open("/tmp/" + audio_filename2, 'wb') as f: | |
| f.write(response2.content) | |
| file_id = str(uuid.uuid4()) | |
| # 执行音频合并操作 | |
| audio = audio_overlay("/tmp/" + audio_filename1, "/tmp/" + audio_filename2, os.path.join("/tmp", file_id + ".mp3")) | |
| # 提供文件的永久直链 | |
| result_audio_url = "/download/" + file_id + ".mp3" | |
| # 更新任务状态 | |
| tasks[task_id] = { | |
| "status": "completed", | |
| "result_url": result_audio_url | |
| } | |
| except Exception as e: | |
| tasks[task_id] = { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| with condition: | |
| running_threads -= 1 | |
| condition.notify_all() | |
| app = Flask(__name__) | |
| def hello(): | |
| return "Hello! This is an api server, and it is running successfully. For usage, please contact the person who hosted this api server." | |
| def audio_separation(): | |
| audio_url1 = request.args.get('url1') | |
| if not audio_url1: | |
| return jsonify({"error": "URL1 parameter is required"}), 400 | |
| audio_url2 = request.args.get('url2') | |
| if not audio_url2: | |
| return jsonify({"error": "URL2 parameter is required"}), 400 | |
| task_id = str(uuid.uuid4()) | |
| tasks[task_id] = {"status": "processing"} | |
| # 异步执行任务 | |
| threading.Thread(target=process_audio_overlay, args=(task_id, audio_url1, audio_url2)).start() | |
| return jsonify({"task_id": task_id}), 202 | |
| def get_task_status(task_id): | |
| task = tasks.get(task_id) | |
| if task: | |
| return jsonify(task) | |
| else: | |
| return jsonify({"error": "Task not found"}), 404 | |
| def download(filename): | |
| return send_file("/tmp/" + filename, as_attachment=True) | |
| if __name__ == '__main__': | |
| app.run(debug=False) |