Spaces:
Sleeping
Sleeping
| ## Utility Functions | |
| ## Note: edit ~/.bigqueryrc to set global settings for bq command line tool | |
| using CSV, DataFrames, JSON3 | |
| function read_json(file_path::String) | |
| json_data = JSON3.read(open(file_path, "r")) | |
| return json_data | |
| end | |
| """ | |
| ## ostreacultura_bq_auth() | |
| - Activate the service account using the credentials file | |
| """ | |
| function ostreacultura_bq_auth() | |
| if isfile("ostreacultura-credentials.json") | |
| run(`gcloud auth activate-service-account --key-file=ostreacultura-credentials.json`) | |
| else | |
| println("Credentials file not found") | |
| end | |
| end | |
| """ | |
| ## julia_to_bq_type(julia_type::DataType) | |
| - Map Julia types to BigQuery types | |
| Arguments: | |
| - julia_type: The Julia data type to map | |
| Returns: | |
| - The corresponding BigQuery type as a string | |
| """ | |
| function julia_to_bq_type(julia_type::DataType) | |
| if julia_type == String | |
| return "STRING" | |
| elseif julia_type == Int64 | |
| return "INTEGER" | |
| elseif julia_type == Float64 | |
| return "FLOAT" | |
| elseif julia_type <: AbstractArray{Float64} | |
| return "FLOAT64" | |
| elseif julia_type <: AbstractArray{Int64} | |
| return "INTEGER" | |
| else | |
| return "STRING" | |
| end | |
| end | |
| """ | |
| ## create_bq_schema(df::DataFrame) | |
| - Create a BigQuery schema from a DataFrame | |
| Arguments: | |
| - df: The DataFrame to create the schema from | |
| Returns: | |
| - The schema as a string in BigQuery format | |
| Example: | |
| df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) | |
| create_bq_schema(df) | |
| """ | |
| function create_bq_schema(df::DataFrame) | |
| schema = [] | |
| for col in names(df) | |
| if eltype(df[!, col]) <: AbstractArray | |
| push!(schema, Dict("name" => col, "type" => "FLOAT64", "mode" => "REPEATED")) | |
| else | |
| push!(schema, Dict("name" => col, "type" => julia_to_bq_type(eltype(df[!, col])), "mode" => "NULLABLE")) | |
| end | |
| end | |
| return JSON3.write(schema) | |
| end | |
| """ | |
| ## dataframe_to_json(df::DataFrame, file_path::String) | |
| - Convert a DataFrame to JSON format and save to a file | |
| Arguments: | |
| - df: The DataFrame to convert | |
| - file_path: The path where the JSON file should be saved | |
| """ | |
| function dataframe_to_json(df::DataFrame, file_path::String) | |
| open(file_path, "w") do io | |
| for row in eachrow(df) | |
| JSON.print(io, Dict(col => row[col] for col in names(df))) | |
| write(io, "\n") | |
| end | |
| end | |
| end | |
| """ | |
| # Function to send a DataFrame to a BigQuery table | |
| ## send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) | |
| - Send a DataFrame to a BigQuery table, which will append if the table already exists | |
| Arguments: | |
| - df: The DataFrame to upload | |
| - dataset_name: The BigQuery dataset name | |
| - table_name: The BigQuery table name | |
| # Example usage | |
| df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) | |
| send_to_bq_table(df, "climate_truth", "embtest") | |
| # Upload a DataFrame | |
| using CSV, DataFrames | |
| import OstreaCultura as OC | |
| tdat = CSV.read("data/climate_test.csv", DataFrame) | |
| emb = OC.multi_embeddings(tdat) | |
| """ | |
| function send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) | |
| # Temporary JSON file | |
| json_file_path = tempname() * ".json" | |
| schema = create_bq_schema(df) | |
| ## Save schema to a file | |
| schema_file_path = tempname() * ".json" | |
| open(schema_file_path, "w") do io | |
| write(io, schema) | |
| end | |
| # Save DataFrame to JSON | |
| dataframe_to_json(df, json_file_path) | |
| # Use bq command-line tool to load JSON to BigQuery table with specified schema | |
| run(`bq load --source_format=NEWLINE_DELIMITED_JSON $dataset_name.$table_name $json_file_path $schema_file_path`) | |
| # Clean up and remove the temporary JSON file after upload | |
| rm(json_file_path) | |
| rm(schema_file_path) | |
| return nothing | |
| end | |
| """ | |
| ## bq(query::String) | |
| - Run a BigQuery query and return the result as a DataFrame | |
| Example: bq("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10") | |
| """ | |
| function bq(query::String) | |
| tname = tempname() | |
| run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, tname)) | |
| return CSV.read(tname, DataFrame) | |
| end | |
| """ | |
| ## Function to average embeddings over some group | |
| example: | |
| avg_embeddings("ostreacultura.climate_truth.embtest", "text", "embed") | |
| """ | |
| function avg_embeddings(table::String, group::String, embedname::String) | |
| query = """ | |
| SELECT | |
| $group, | |
| ARRAY( | |
| SELECT AVG(value) | |
| FROM UNNEST($embedname) AS value WITH OFFSET pos | |
| GROUP BY pos | |
| ORDER BY pos | |
| ) AS averaged_array | |
| FROM ( | |
| SELECT $group, ARRAY_CONCAT_AGG($embedname) AS $embedname | |
| FROM $table | |
| GROUP BY $group | |
| ) | |
| """ | |
| return query | |
| end | |
| """ | |
| ## SAVE results of query to a CSV file | |
| Example: | |
| bq_csv("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10", "data/test.csv") | |
| """ | |
| function bq_csv(query::String, path::String) | |
| run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, path)) | |
| end | |