#!/usr/bin/env python3 """Inference-only sample for Faster R-CNN split across three LiteRT models. This sample expects prebuilt TFLite files. It does not convert or compile models. LiteRT runs the tensor-heavy submodels on CPU, while TorchVision host code handles model-specific preprocessing, FPN, proposal decode/NMS, ROIAlign, and final postprocessing. """ from __future__ import annotations import argparse import io import math from collections import OrderedDict from pathlib import Path import urllib.request import numpy as np from PIL import Image, ImageDraw import torch from torchvision.models.detection import ( FasterRCNN_ResNet50_FPN_Weights, fasterrcnn_resnet50_fpn, ) from torchvision.models.detection.rpn import concat_box_prediction_layers from torchvision.transforms.functional import pil_to_tensor, to_pil_image from ai_edge_litert.compiled_model import CompiledModel from ai_edge_litert.hardware_accelerator import HardwareAccelerator DEFAULT_IMAGE = "https://github.com/pytorch/hub/raw/master/images/dog.jpg" SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_BACKBONE_MODEL = SCRIPT_DIR / "fasterrcnn_resnet50_fpn_backbone_body_dynamic_hw.tflite" DEFAULT_RPN_HEAD_MODEL = SCRIPT_DIR / "fasterrcnn_resnet50_fpn_rpn_head_dynamic_hw.tflite" DEFAULT_ROI_MODEL = SCRIPT_DIR / "fasterrcnn_resnet50_fpn_roi_box_dynamic_n.tflite" def _format_size(path: Path) -> str: return f"{path.stat().st_size / (1024.0 * 1024.0):.1f} MiB" def _categories() -> list[str]: meta = FasterRCNN_ResNet50_FPN_Weights.DEFAULT.meta return [str(name) for name in meta.get("categories", [])] def _load_model() -> torch.nn.Module: model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT) model.eval() return model def _load_image(spec: str) -> torch.Tensor: if spec == "synthetic": y = torch.linspace(0.0, 1.0, 640, dtype=torch.float32).view(1, 640, 1) x = torch.linspace(0.0, 1.0, 960, dtype=torch.float32).view(1, 1, 960) red = y.expand(1, 640, 960) green = x.expand(1, 640, 960) blue = (1.0 - red * 0.5 - green * 0.5).clamp(0.0, 1.0) return torch.cat([red, green, blue], dim=0) if spec.startswith("http://") or spec.startswith("https://"): with urllib.request.urlopen(spec, timeout=30) as resp: image = Image.open(io.BytesIO(resp.read())).convert("RGB") else: image = Image.open(spec).convert("RGB") return pil_to_tensor(image).to(torch.float32) / 255.0 def _require_models(paths: list[Path]) -> None: missing = [path for path in paths if not path.exists()] if not missing: return formatted = "\n".join(f" - {path}" for path in missing) raise FileNotFoundError( "Missing prebuilt LiteRT model file(s):\n" f"{formatted}\n" "Run the conversion pipeline separately, then rerun this inference sample." ) def _div_floor(value: int, divisor: int) -> int: return int(math.floor(value / divisor)) def _backbone_output_shapes(input_shape: tuple[int, ...]) -> list[tuple[int, ...]]: _, _, height, width = [int(v) for v in input_shape] return [ (1, 256, _div_floor(height, 4), _div_floor(width, 4)), (1, 512, _div_floor(height, 8), _div_floor(width, 8)), (1, 1024, _div_floor(height, 16), _div_floor(width, 16)), (1, 2048, _div_floor(height, 32), _div_floor(width, 32)), ] def _rpn_output_shapes(feature_shape: torch.Size) -> list[tuple[int, ...]]: _, _, height, width = [int(v) for v in feature_shape] return [ (1, 3, height, width), (1, 12, height, width), ] def _pick_dtype(requirements: dict) -> np.dtype: types = requirements.get("supported_types") or [] if 1 in types or "FLOAT32" in types: return np.float32 if types == [4] or types == ["INT64"]: return np.int64 if types == [2] or types == ["INT32"]: return np.int32 raise ValueError(f"Unsupported LiteRT buffer types: {types}") def _read_buffer(buffer: object, dtype: np.dtype, shape: tuple[int, ...]) -> np.ndarray: count = int(np.prod(shape)) data = buffer.read(count, dtype) return np.asarray(data, dtype=dtype).reshape(shape) def _run_litert_model( model_path: Path, input_array: np.ndarray, output_shapes: list[tuple[int, ...]], ) -> list[np.ndarray]: compiled = CompiledModel.from_file( str(model_path), hardware_accel=HardwareAccelerator.CPU, ) compiled.resize_input_tensor_by_name( "main", "x", tuple(int(v) for v in input_array.shape), strict=True, ) input_buffers = compiled.create_input_buffers(0) output_buffers = compiled.create_output_buffers(0) input_req = compiled.get_input_buffer_requirements(0, 0) input_dtype = _pick_dtype(input_req) input_buffers[0].write(np.asarray(input_array, dtype=input_dtype).reshape(-1)) compiled.run_by_index(0, input_buffers, output_buffers) outputs: list[np.ndarray] = [] for i, shape in enumerate(output_shapes): req = compiled.get_output_buffer_requirements(i, 0) dtype = _pick_dtype(req) expected_bytes = int(np.prod(shape)) * np.dtype(dtype).itemsize if int(req.get("buffer_size", 0)) < expected_bytes: raise RuntimeError( f"{model_path} output {i} buffer is too small: " f"{req.get('buffer_size')} < {expected_bytes} for shape {shape}" ) outputs.append(_read_buffer(output_buffers[i], dtype, shape)) return outputs def _run_rpn_head( model_path: Path, fpn_features: OrderedDict[str, torch.Tensor], ) -> tuple[list[torch.Tensor], list[torch.Tensor]]: objectness: list[torch.Tensor] = [] bbox_deltas: list[torch.Tensor] = [] for feature in fpn_features.values(): outputs = _run_litert_model( model_path, feature.detach().cpu().numpy(), _rpn_output_shapes(feature.shape), ) objectness.append(torch.from_numpy(outputs[0]).to(dtype=feature.dtype)) bbox_deltas.append(torch.from_numpy(outputs[1]).to(dtype=feature.dtype)) return objectness, bbox_deltas def _rpn_proposals_from_head_outputs( model: torch.nn.Module, images: object, fpn_features: OrderedDict[str, torch.Tensor], objectness: list[torch.Tensor], pred_bbox_deltas: list[torch.Tensor], ) -> list[torch.Tensor]: feature_list = list(fpn_features.values()) anchors = model.rpn.anchor_generator(images, feature_list) num_images = len(anchors) num_anchors_per_level = [ int(shape[0] * shape[1] * shape[2]) for shape in [o[0].shape for o in objectness] ] objectness_flat, pred_bbox_deltas_flat = concat_box_prediction_layers( objectness, pred_bbox_deltas, ) proposals = model.rpn.box_coder.decode(pred_bbox_deltas_flat.detach(), anchors) proposals = proposals.view(num_images, -1, 4) boxes, _ = model.rpn.filter_proposals( proposals, objectness_flat, images.image_sizes, num_anchors_per_level, ) return boxes def _print_detections( detection: dict[str, torch.Tensor], *, topk: int, score_threshold: float, categories: list[str], ) -> None: scores = detection["scores"].detach().cpu() labels = detection["labels"].detach().cpu() boxes = detection["boxes"].detach().cpu() visible = [i for i in range(min(topk, int(scores.numel()))) if float(scores[i]) >= score_threshold] print(f"detections above {score_threshold:.2f}: {len(visible)}") for rank, i in enumerate(visible, start=1): label_id = int(labels[i]) name = categories[label_id] if 0 <= label_id < len(categories) else str(label_id) box = [round(float(v), 2) for v in boxes[i].tolist()] print(f" {rank:02d}: {name} score={float(scores[i]):.4f} box={box}") def _save_annotated_image( *, image: torch.Tensor, detection: dict[str, torch.Tensor], out_path: Path, topk: int, score_threshold: float, categories: list[str], ) -> None: pil = to_pil_image(image.detach().cpu().clamp(0.0, 1.0)) draw = ImageDraw.Draw(pil) scores = detection["scores"].detach().cpu() labels = detection["labels"].detach().cpu() boxes = detection["boxes"].detach().cpu() for rank in range(min(topk, int(scores.numel()))): score = float(scores[rank]) if score < score_threshold: continue label_id = int(labels[rank]) name = categories[label_id] if 0 <= label_id < len(categories) else str(label_id) x0, y0, x1, y1 = [float(v) for v in boxes[rank].tolist()] draw.rectangle((x0, y0, x1, y1), outline="red", width=3) draw.text((x0 + 3, y0 + 3), f"{name} {score:.2f}", fill="red") out_path.parent.mkdir(parents=True, exist_ok=True) pil.save(out_path) def run_sample(args: argparse.Namespace) -> int: torch.set_grad_enabled(False) model_paths = [args.backbone_model, args.rpn_head_model, args.roi_model] _require_models(model_paths) model = _load_model() image = _load_image(args.image) original_sizes = [(int(image.shape[-2]), int(image.shape[-1]))] images, _ = model.transform([image], None) image_tensor = images.tensors print("LiteRT models:") print(f" backbone body: {args.backbone_model} ({_format_size(args.backbone_model)})") print(f" RPN head: {args.rpn_head_model} ({_format_size(args.rpn_head_model)})") print(f" ROI box head: {args.roi_model} ({_format_size(args.roi_model)})") print(f"input image: original={original_sizes[0]} transformed={tuple(int(v) for v in image_tensor.shape)}") body_arrays = _run_litert_model( args.backbone_model, image_tensor.detach().cpu().numpy(), _backbone_output_shapes(tuple(int(v) for v in image_tensor.shape)), ) body_features = OrderedDict( (str(i), torch.from_numpy(array).to(dtype=image_tensor.dtype)) for i, array in enumerate(body_arrays) ) print("backbone body LiteRT outputs:") for key, value in body_features.items(): print(f" C{int(key) + 2}: {tuple(int(v) for v in value.shape)}") fpn_features = model.backbone.fpn(body_features) objectness, pred_bbox_deltas = _run_rpn_head(args.rpn_head_model, fpn_features) print("RPN head LiteRT outputs:") for i, (obj, bbox) in enumerate(zip(objectness, pred_bbox_deltas)): print(f" P{i + 2}: objectness={tuple(int(v) for v in obj.shape)} bbox={tuple(int(v) for v in bbox.shape)}") proposals = _rpn_proposals_from_head_outputs( model, images, fpn_features, objectness, pred_bbox_deltas, ) proposal_count = int(proposals[0].shape[0]) print(f"host proposal decode/NMS: {proposal_count} proposals") if proposal_count == 0: print("no proposals; skipping ROI stage") return 0 roi_features = model.roi_heads.box_roi_pool( fpn_features, proposals, images.image_sizes, ) roi_arrays = _run_litert_model( args.roi_model, roi_features.detach().cpu().numpy(), [(proposal_count, 91), (proposal_count, 364)], ) class_logits = torch.from_numpy(roi_arrays[0]).to(dtype=roi_features.dtype) box_regression = torch.from_numpy(roi_arrays[1]).to(dtype=roi_features.dtype) print( "ROI LiteRT outputs: " f"logits={tuple(int(v) for v in class_logits.shape)} " f"box_regression={tuple(int(v) for v in box_regression.shape)}" ) boxes, scores, labels = model.roi_heads.postprocess_detections( class_logits, box_regression, proposals, images.image_sizes, ) detections = [{"boxes": boxes[0], "scores": scores[0], "labels": labels[0]}] detections = model.transform.postprocess(detections, images.image_sizes, original_sizes) detection = detections[0] categories = _categories() _print_detections( detection, topk=args.topk, score_threshold=args.score_threshold, categories=categories, ) if args.annotated_out: _save_annotated_image( image=image, detection=detection, out_path=args.annotated_out, topk=args.topk, score_threshold=args.score_threshold, categories=categories, ) print(f"annotated image: {args.annotated_out}") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--image", default=DEFAULT_IMAGE, help="Image path, URL, or 'synthetic'.", ) parser.add_argument("--backbone-model", type=Path, default=DEFAULT_BACKBONE_MODEL) parser.add_argument("--rpn-head-model", type=Path, default=DEFAULT_RPN_HEAD_MODEL) parser.add_argument("--roi-model", type=Path, default=DEFAULT_ROI_MODEL) parser.add_argument("--topk", type=int, default=5) parser.add_argument("--score-threshold", type=float, default=0.5) parser.add_argument( "--annotated-out", default="fasterrcnn_litert_cpu_sample.jpg", help="Optional output image with drawn detections. Pass '' to disable.", ) args = parser.parse_args() if args.annotated_out is not None and str(args.annotated_out).strip() == "": args.annotated_out = None elif args.annotated_out is not None: args.annotated_out = Path(args.annotated_out) return args if __name__ == "__main__": raise SystemExit(run_sample(parse_args()))