defupdate(self, predictions: TargetsType, targets: TargetsType) -> torch.Tensor: """ Computes the metric given the ground truth targets and the model's predictions. :param predictions: model's predictions :param targets: ground truth targets from the dataset :return: metric scalar tensor """ predicted_tokenized_arrays = predictions['predicted_tokenized_arrays'] for i inrange(predicted_tokenized_arrays.shape[0]): collision_rate = gutils.calculate_collision_rate_batch( predicted_tokenized_arrays[i], ego_only=self._is_ego) self.batch += 1 self.collision_rate += collision_rate
defmetric_offroad_rate(scene_tensor, valid_mask, map_tensor, map_valid, lane_threshold=2.75, obs_frames=5): """ Computes the off-road rate for vehicles based on their distances to lane lines. A vehicle is considered "in-lane" if at least one valid frame in the first obs_frames is within lane_threshold. If any valid frame after obs_frames has a distance greater than lane_threshold, the vehicle is marked as off-road. The off-road rate is computed as: off-road rate = (# vehicles that went off-road) / (# vehicles that were initially in-lane) Parameters: scene_tensor (torch.Tensor): [B, NA, NT, D] vehicle tensor (first two dims of D are x, y) valid_mask (torch.Tensor): [B, NA, NT] boolean tensor indicating valid vehicle frames map_tensor (torch.Tensor): [B, L, N, D] lane tensor (first two dims of D are x, y) map_valid (torch.Tensor): [B, L, N, D] boolean tensor indicating valid lane points lane_threshold (float): distance threshold for being "in-lane" obs_frames (int): number of initial frames for observing in-lane behavior Returns: offroad_rate (float): Ratio of vehicles that went off-road among those initially in-lane. """ B, NA, NT, _ = scene_tensor.shape
# Compute the minimum distance from each vehicle position to any lane point min_dist = compute_min_distance_torch(scene_tensor, map_tensor, map_valid) # [B, NA, NT]
for b inrange(B): for i inrange(NA): # Get valid mask for vehicle i in batch b (shape [NT]) valid_frames = valid_mask[b, i, :] # Bool tensor ifnot torch.any(valid_frames[:obs_frames]): continue# Skip if no valid observation in first obs_frames # Check if any valid frame in the observation period is within lane threshold if torch.any((min_dist[b, i, :obs_frames] <= lane_threshold) & valid_frames[:obs_frames].bool()): in_lane[b, i] = True # Check subsequent valid frames for off-road condition if torch.any((min_dist[b, i, obs_frames:] > lane_threshold) & valid_frames[obs_frames:].bool()): offroad[b, i] = True
model = build_torch_module_wrapper(cfg.model) feature_builders = model.get_list_of_required_feature() target_builders = model.get_list_of_computed_target() del model
defcompute_features( self, scenario: AbstractScenario, iteration: int=0 ) -> Tuple[FeaturesType, TargetsType, List[CacheMetadataEntry]]: """ Compute features for a scenario, in case cache_path is set, features will be stored in cache, otherwise just recomputed * perform try/except when not versatile_cache :param scenario for which features and targets should be computed :return: model features and targets and cache metadata """ all_features: FeaturesType all_feature_cache_metadata: List[CacheMetadataEntry] all_targets: TargetsType all_targets_cache_metadata: List[CacheMetadataEntry]
def_compute_all_features( self, scenario: AbstractScenario, builders: List[Union[AbstractFeatureBuilder, AbstractTargetBuilder]], iteration: int ) -> Tuple[Union[FeaturesType, TargetsType], List[Optional[CacheMetadataEntry]]]: """ Compute all features/targets from builders for scenario * perform try/except when versatile_cache :param scenario: for which features should be computed :param builders: to use for feature computation :return: computed features/targets and the metadata entries for the computed features/targets """ # Features to be computed all_features: FeaturesType = {} all_features_metadata_entries: List[CacheMetadataEntry] = []
defcompute_or_load_feature( scenario: AbstractScenario, cache_path: Optional[pathlib.Path], builder: Union[AbstractFeatureBuilder, AbstractTargetBuilder], storing_mechanism: FeatureCache, force_feature_computation: bool, iteration: int = 0, versatile_cache: bool = False, ) -> Tuple[AbstractModelFeature, Optional[CacheMetadataEntry]]: """ Compute features if non existent in cache, otherwise load them from cache :param scenario: for which features should be computed :param cache_path: location of cached features :param builder: which builder should compute the features :param storing_mechanism: a way to store features :param force_feature_computation: if true, even if cache exists, it will be overwritten :return features computed with builder and the metadata entry for the computed feature if feature is valid. """ cache_path_available = cache_path isnotNone # Filename of the cached features/targets file_name = ( cache_path / scenario.log_name / scenario.scenario_type / scenario.token / builder.get_feature_unique_name() if cache_path_available elseNone )
# If feature recomputation is desired or cached file doesnt exists, compute the feature need_to_compute_feature = ( force_feature_computation ornot cache_path_available ornot storing_mechanism.exists_feature_cache(file_name) ) feature_stored_sucessfully = False if need_to_compute_feature: logger.debug("Computing feature...") ifisinstance(scenario, CachedScenario): raise ValueError( textwrap.dedent( f""" Attempting to recompute scenario with CachedScenario. This should typically never happen, and usually means that the scenario is missing from the cache. Check the cache to ensure that the scenario is present. If it was intended to re-compute the feature on the fly, re-run with `cache.use_cache_without_dataset=False`. Debug information: Scenario type: {scenario.scenario_type}. Scenario log name: {scenario.log_name}. Scenario token: {scenario.token}. """ ) ) ifisinstance(builder, AbstractFeatureBuilder): feature = builder.get_features_from_scenario(scenario) elifisinstance(builder, AbstractTargetBuilder): feature = builder.get_targets(scenario) else: raise ValueError(f"Unknown builder type: {type(builder)}")
# If caching is enabled, store the feature if feature.is_valid and cache_path_available: logger.debug(f"Saving feature: {file_name} to a file...") file_name.parent.mkdir(parents=True, exist_ok=True) feature_stored_sucessfully = storing_mechanism.store_computed_feature_to_folder(file_name, feature) else: # In case the feature exists in the cache, load it logger.debug(f"Loading feature: {file_name} from a file...") feature = storing_mechanism.load_computed_feature_from_folder(file_name, builder.get_feature_type()) assert feature.is_valid, 'Invalid feature loaded from cache!'
return ( feature, CacheMetadataEntry(file_name=file_name) if (need_to_compute_feature and feature_stored_sucessfully) elseNone, )