| import base64, os |
|
|
| from autogen import ConversableAgent, AssistantAgent |
| from autogen.coding import LocalCommandLineCodeExecutor |
|
|
| def read_image_file(image_file_path: str) -> str: |
| with open(image_file_path, "rb") as image_file: |
| image_data = image_file.read() |
| return base64.b64encode(image_data).decode("utf-8") |
|
|
| def generate_markdown_code(image_data: str) -> str: |
| return f"" |
|
|
| def read_file(file_path: str) -> str: |
| with open(file_path, 'r', encoding='utf-8') as file: |
| return file.read() |
|
|
| def format_as_markdown(code: str) -> str: |
| markdown_code = '```\n' |
| markdown_code += code |
| markdown_code += '\n```' |
| return markdown_code |
|
|
| def run_multi_agent(llm, message): |
| llm_config = {"model": llm} |
| |
| executor = LocalCommandLineCodeExecutor( |
| timeout=60, |
| work_dir="coding", |
| ) |
|
|
| code_executor_agent = ConversableAgent( |
| name="code_executor_agent", |
| llm_config=False, |
| code_execution_config={"executor": executor}, |
| human_input_mode="NEVER", |
| default_auto_reply="Please continue. If everything is done, reply 'TERMINATE'.", |
| ) |
|
|
| print("### code_executor_agent.system_message = " + code_executor_agent.system_message) |
|
|
| code_writer_agent = AssistantAgent( |
| name="code_writer_agent", |
| llm_config=llm_config, |
| code_execution_config=False, |
| human_input_mode="NEVER", |
| ) |
| |
| print("### code_writer_agent.system_message = " + code_writer_agent.system_message) |
| |
| chat_result = code_executor_agent.initiate_chat( |
| code_writer_agent, |
| message=message, |
| max_turns=10 |
| ) |
|
|
| image_data = read_image_file("/home/user/app/coding/ytd_stock_gains.png") |
| markdown_code = generate_markdown_code(image_data) |
|
|
| print("### markdown_code = " + markdown_code) |
|
|
| files_in_folder = os.listdir("coding") |
|
|
| file_name_py = "" |
| file_name_sh = "" |
| |
| print(f"Files in {files_in_folder}:") |
| for file in files_in_folder: |
| print(file) |
| if file : |
| _, file_extension = os.path.splitext(file) |
| if file_extension == ".py": |
| file_name_py = file |
| if file_extension == ".sh": |
| file_name_sh = file |
| |
| print("#" + file_name_py) |
| print("#" + file_name_sh) |
| |
| file_path_py = "coding/" + file_name_py |
| print("##" + file_path_py) |
| code_py = read_file(file_path_py) |
| markdown_code_py = format_as_markdown(code_py) |
| print("### " + markdown_code_py) |
| |
| file_path_sh = "coding/" + file_name_sh |
| print("##" + file_path_sh) |
| code_sh = read_file(file_path_sh) |
| markdown_code_sh = format_as_markdown(code_sh) |
| print("### " + markdown_code_sh) |
| |
| |
| |
| |
| |
| |
| |
| return markdown_code + "<br />" + markdown_code_sh + "<br />" + markdown_code_py |