Spaces:
Runtime error
Runtime error
| # docker build -t reward-simulator .docker run -p 7860:7860 -v $(pwd)/data:/app/data reward-simulator | |
| from PIL import Image | |
| import numpy as np | |
| import io | |
| import faiss | |
| import requests | |
| import torch | |
| from request import get_ft, get_topk | |
| from flickrapi import FlickrAPI | |
| from flask import Flask, request, render_template, jsonify, send_from_directory | |
| app = Flask(__name__) | |
| PRESET_IMAGES = { | |
| 1: "static/1.webp", | |
| 2: "static/2.webp", | |
| 3: "static/3.webp" | |
| } | |
| # Add Flickr configuration | |
| FLICKR_API_KEY = '80ef21a6f7eb0984ea613c316a89ca69' | |
| FLICKR_API_SECRET = '4d0e8ce6734f4b3f' | |
| flickr = FlickrAPI(FLICKR_API_KEY, FLICKR_API_SECRET, format='parsed-json', store_token=False) | |
| def get_photo_id(url): | |
| """Extract photo ID from Flickr URL""" | |
| try: | |
| return url.split('/')[-1].split('_')[0] | |
| except: | |
| return None | |
| def get_other_info(url): | |
| """Get author information from Flickr""" | |
| try: | |
| photo_id = get_photo_id(url) | |
| if photo_id: | |
| photo_info = flickr.photos.getInfo(photo_id=photo_id) | |
| license = photo_info['photo']['license'] | |
| owner = photo_info['photo']['owner'] | |
| flickr_url = f"https://www.flickr.com/photos/{owner.get('nsid', '')}/{photo_id}" | |
| return { | |
| 'username': owner.get('username', ''), | |
| 'realname': owner.get('realname', ''), | |
| 'nsid': owner.get('nsid', ''), | |
| 'flickr_url': flickr_url, | |
| 'license': license | |
| } | |
| except: | |
| pass | |
| return { | |
| 'username': 'Unknown', | |
| 'realname': 'Unknown', | |
| 'nsid': '', | |
| 'flickr_url': '', | |
| 'license': 'Unknown' | |
| } | |
| def load_model(): | |
| """Load DINOv2 model once and cache it""" | |
| torch.hub.set_dir('static') | |
| model = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14') | |
| model.eval() | |
| model.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) | |
| return model | |
| def load_index(index_path): | |
| """Load FAISS index once and cache it""" | |
| return faiss.read_index(index_path) | |
| def distance_to_similarity(distances, temp=1e-4): | |
| """Convert distance to similarity""" | |
| for ii in range(len(distances)): | |
| contribs = distances[ii].max() - distances[ii] | |
| contribs = contribs / temp | |
| sum_contribs = np.exp(contribs).sum() | |
| distances[ii] = np.exp(contribs) / sum_contribs | |
| return distances | |
| def calculate_rewards(subscription, num_generations, author_share, ro_share, num_users_k, similarities, num_authors=1800): | |
| """Calculate rewards based on user inputs and similarities""" | |
| num_users = num_users_k * 1000 | |
| # Monthly revenue allocated to authors | |
| authors_monthly_revenue = subscription * num_users * (author_share / 100) | |
| rewards = [] | |
| for sim in similarities[0]: | |
| # Attribution bonus based on similarity score and number of neighbors | |
| attribution_bonus = sim * len(similarities[0]) | |
| # Calculate monthly rewards | |
| author_month_reward = (authors_monthly_revenue / num_authors) * attribution_bonus | |
| ro_month_reward = author_month_reward / (author_share / 100) * (ro_share / 100) | |
| rewards.append({ | |
| 'paid_per_month': f"{subscription:.0f}€", | |
| 'attribution': f"{sim*100:.0f}%", | |
| 'author_month_reward': f"{author_month_reward:.0f}€", | |
| 'ro_month_reward': f"{ro_month_reward:.0f}€" | |
| # 'paid_per_month': f"{subscription:.0f}€", | |
| # 'paid_per_gen': f"{paid_per_gen:.2f}€", | |
| # 'aro_share': f"{aro_share:.2f}c€", | |
| # 'attribution': f"{sim*100:.0f}%", | |
| # 'training_data_reward': f"{training_data_reward:.2f}c€", | |
| # 'author_month_reward': f"{author_month_reward:.0f}€", | |
| # 'ro_month_reward': f"{ro_month_reward:.0f}€" | |
| }) | |
| return rewards | |
| # Global variables for model and index | |
| model = None | |
| index = None | |
| urls = None | |
| def init_model(): | |
| global model, index, urls | |
| model = load_model() | |
| index = load_index("data/openimages_index.bin") | |
| with open("data/openimages_urls.txt", "r") as f: | |
| urls = f.readlines() | |
| def home(): | |
| return render_template('index.html') | |
| def serve_static(filename): | |
| return send_from_directory('static', filename) | |
| DEFAULT_PARAMS = { | |
| 'subscription': 12, | |
| 'num_generations': 60, | |
| 'author_share': 5, | |
| 'ro_share': 10, | |
| 'num_users_k': 500, | |
| 'num_neighbors': 10, | |
| 'num_authors': 2000 | |
| } | |
| def select_preset(preset_id): | |
| if preset_id not in PRESET_IMAGES: | |
| return jsonify({'error': 'Invalid preset ID'}), 400 | |
| try: | |
| image_path = PRESET_IMAGES[preset_id] | |
| image = Image.open(image_path).convert('RGB') | |
| # Use default parameters for presets | |
| params = DEFAULT_PARAMS.copy() | |
| # Get features and search | |
| features = get_ft(model, image) | |
| distances, indices = get_topk(index, features, topk=params['num_neighbors']) | |
| # Collect valid results first | |
| valid_results = [] | |
| valid_similarities = [] | |
| for i in range(params['num_neighbors']): | |
| image_url = urls[indices[0][i]].strip() | |
| try: | |
| response = requests.head(image_url) | |
| if response.status_code == 200: | |
| valid_results.append({ | |
| 'index': i, | |
| 'url': image_url | |
| }) | |
| valid_similarities.append(distances[0][i]) | |
| except requests.RequestException: | |
| continue | |
| # Renormalize similarities for valid results | |
| if valid_similarities: | |
| similarities = distance_to_similarity(np.array([valid_similarities]), temp=1e-5) | |
| # Calculate rewards with renormalized similarities | |
| rewards = calculate_rewards( | |
| params['subscription'], | |
| params['num_generations'], | |
| params['author_share'], | |
| params['ro_share'], | |
| params['num_users_k'], | |
| similarities, | |
| params['num_authors'] | |
| ) | |
| # Build final results | |
| results = [] | |
| for i, result in enumerate(valid_results): | |
| other_info = get_other_info(result['url']) | |
| results.append({ | |
| 'image_url': result['url'], | |
| 'rewards': rewards[i], | |
| 'other': other_info | |
| }) | |
| return jsonify({'results': results}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def process_image(): | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'No image provided'}), 400 | |
| try: | |
| image_file = request.files['image'] | |
| image = Image.open(io.BytesIO(image_file.read())).convert('RGB') | |
| # Use default parameters if none provided | |
| params = DEFAULT_PARAMS.copy() | |
| if request.form: | |
| params.update({ | |
| 'subscription': float(request.form.get('subscription', params['subscription'])), | |
| 'num_generations': int(request.form.get('num_generations', params['num_generations'])), | |
| 'author_share': float(request.form.get('author_share', params['author_share'])), | |
| 'ro_share': float(request.form.get('ro_share', params['ro_share'])), | |
| 'num_users_k': int(request.form.get('num_users_k', params['num_users_k'])), | |
| 'num_neighbors': int(request.form.get('num_neighbors', params['num_neighbors'])), | |
| 'num_authors': int(request.form.get('num_authors', DEFAULT_PARAMS['num_authors'])), | |
| }) | |
| # Process image | |
| features = get_ft(model, image) | |
| distances, indices = get_topk(index, features, topk=params['num_neighbors']) | |
| # Collect valid results first | |
| valid_results = [] | |
| valid_similarities = [] | |
| for i in range(params['num_neighbors']): | |
| image_url = urls[indices[0][i]].strip() | |
| try: | |
| response = requests.head(image_url) | |
| if response.status_code == 200: | |
| valid_results.append({ | |
| 'index': i, | |
| 'url': image_url | |
| }) | |
| valid_similarities.append(distances[0][i]) | |
| except requests.RequestException: | |
| continue | |
| # Renormalize similarities for valid results | |
| if valid_similarities: | |
| similarities = distance_to_similarity(np.array([valid_similarities]), temp=1e-5) | |
| # Calculate rewards with renormalized similarities | |
| rewards = calculate_rewards( | |
| params['subscription'], | |
| params['num_generations'], | |
| params['author_share'], | |
| params['ro_share'], | |
| params['num_users_k'], | |
| similarities, | |
| params['num_authors'] | |
| ) | |
| # Build final results | |
| results = [] | |
| for i, result in enumerate(valid_results): | |
| other_info = get_other_info(result['url']) | |
| results.append({ | |
| 'image_url': result['url'], | |
| 'rewards': rewards[i], | |
| 'other': other_info | |
| }) | |
| return jsonify({'results': results}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| init_model() | |
| app.run(host='0.0.0.0', port=7860) | |