Source code for freneticlib.core.core

import ast
import logging
from typing import Dict, Iterator, List

import pandas as pd

from freneticlib.core.mutation import abstract_operators
from freneticlib.core.mutation.crossovers import AbstractCrossover
from freneticlib.core.objective import Objective
from freneticlib.executors.outcome import Outcome
from freneticlib.representations.abstract_representation import RoadRepresentation

logger = logging.getLogger(__name__)

TestIndividual = Dict
"""The type that contains a road and its execution data."""


[docs]class FreneticCore(object): """The core module of freneticlib, implementing the genetic algorithm.""" history: pd.DataFrame = None """Stores the history including road, execution outcome and parent info.""" crossover_max_visits = 10 """How many times an individual road can be selected as crossover parent, before it is "retired'.""" def __init__( self, representation: RoadRepresentation, objective: Objective, mutator: abstract_operators.AbstractMutator = None, crossover: AbstractCrossover = None, ): """ Args: representation (RoadRepresentation): The selected road representation. objective (Objective): The search objective. (Make sure it's the same as the one passed to the Executor). mutator (AbstractMutator): Holds the mutation operators for roads with :attr:`.Outcome.PASS`. exploiter (AbstractMutator): Special treatment to differently mutate roads with :attr:`.Outcome.FAIL`. crossover (AbstractCrossover): Defines which crossover operator(s) to apply. """ self._mutant_generator = None self.representation = representation self.objective = objective self.mutator = mutator self.crossover = crossover # Warnings when operators are None if not mutator: logger.warning("No mutator was chosen.") if not crossover: logger.warning("No crossover was chosen.")
[docs] def ask_random(self) -> TestIndividual: """Returns a random road in the specific road representation. Returs: (TestIndividual): A new, randomly generated road from :attr:`.representation`. """ test = self.representation.generate() assert self.representation.is_valid(test), "The newly generated test should be valid." return dict(test=test, method="random", visited=0, generation=0)
[docs] def tell(self, record: TestIndividual): """ Register the result of an execution. Args: record (TestIndividual): A dict containing the road and execution data (outcome, feature value, ...). """ logger.debug(f"Tell: {record}") # {road} -> {result}") if self.history is None: self.history = pd.DataFrame([record]) else: # self.df = self.df.append(record, ignore_index=True) self.history = pd.concat([self.history, pd.DataFrame([record])], ignore_index=True)
[docs] def ask(self) -> TestIndividual: """Create """ if self._mutant_generator is None: self._mutant_generator = self._ask() return next(self._mutant_generator)
[docs] def _ask(self) -> Iterator[Dict]: """This is actually a generator, it will produce roads as long as we ask it. Specifically, it will first create a list of mutants based on the best known individual. Then, using the history and the mutants, it will create the crossover children and yield those. Yields: (dict): A dictionary containing a road and additional information of the test. """ while True: # First we mutate (or generate random) mutated_tests = self.get_mutated_tests() if len(mutated_tests) == 0: logger.debug("No mutations. Generating a random test.") yield self.ask_random() for idx, test in enumerate(mutated_tests): # stop mutating if one of the mutants already produced a failure if idx > 0 and "outcome" in mutated_tests[idx - 1] and mutated_tests[idx - 1]["outcome"] == Outcome.FAIL: break # break on first fail else: yield test # then we crossover crossover_tests = self.get_crossover_tests() yield from crossover_tests # TODO: I don't understand this code here. Check with Ezequiel. # if self.crossover and 0 < self.crossover.frequency <= self.executed_count: # logger.info('Entering recombination phase.') # self.parents_recombination() # self.executed_count = 0 self.objective.recalculate_dynamic_threshold(self.history)
[docs] def get_mutated_tests(self) -> List[TestIndividual]: """ Searches for the best mutation parent, then performs a mutation depending on the parent's simulation outcome. :attr:`self.exploiter` will be applied if :attr:`.Outcome.FAIL`, :attr:`.Outcome.PASS` will trigger the :attr:`self.mutator` Returns: (List[TestIndividual]): The mutated test individuals. """ parent = self._get_best_mutation_parent() # returns a row if parent is None: logger.warning("Couldn't find a good parent. Skipping.") return [] # TODO: why isn't this in the filter? # Shouldn't we get the best parent of min_length? # if len(parent.test.item()) < self.min_length_to_mutate: # logger.debug("Best parent's test is too short.") # return [] logger.debug(f"Best unvisited parent for mutation is {parent.name}") self.history.at[parent.name, "visited"] = 1 parent_info = self.get_parent_info(parent.name) mutants = self.mutator(self.representation, parent) for m in mutants: m.update(parent_info) return mutants
[docs] def _get_best_mutation_parent(self) -> pd.Series: if self.history is None or len(self.history) <= 0: logger.warning("Empty history. Cannot get best parent.") return None assert self.objective.feature in self.history.columns, "Target feature is recorded in history records." # we take the best parent that hasn't been visited yet, whose feature is below/above the threshold selection = self._select_by_maxvisits_and_threshold(max_visits=0) # only use those parents where all mutation operators are applicable operator_filter = selection.apply(lambda x: True, axis=1) # note, this weird loop form is required, because pandas removes columns when selecting on an empty selection... for op in self.mutator.get_all(): operator_filter = operator_filter & selection.test.apply(op.is_applicable) selection = selection[operator_filter] return self.objective.get_best(selection)
[docs] def _select_by_maxvisits_and_threshold(self, max_visits=0): pass_fail_filter = self.history.outcome.isin( [Outcome.PASS, Outcome.FAIL] ) # TODO: this is domain-specific and needs to be dropped max_visit_filter = self.history.visited <= max_visits return self.objective.filter_by_threshold(self.history[pass_fail_filter & max_visit_filter])
[docs] def get_parent_info(self, p_index) -> Dict: parent = self.history.iloc[p_index] return { "parent_1_index": p_index, "parent_1_outcome": parent["outcome"], "parent_1_" + self.objective.feature: parent[self.objective.feature], "generation": parent["generation"] + 1, }
[docs] def get_crossover_tests(self) -> List[TestIndividual]: """ Creates new tests by pairwise "mating" of simulation outcome. :attr:`self.exploiter` will be applied if :attr:`.Outcome.FAIL`, :attr:`.Outcome.PASS` will trigger the :attr:`self.mutator` Returns: (List[TestIndividual]): The mutated test individuals. """ if self.crossover is None: logger.info("No crossover defined. Skipping.") return [] # parents_recombination candidates = self._select_crossover_candidates() if len(candidates) <= 0: logger.warning("No candidates for crossover.") return [] child_tests = [] for child, method, info in self.crossover(self.representation, candidates): self.history.at[info["parent_1_index"], "visited"] = self.history.iloc[info["parent_1_index"]]["visited"] + 1 self.history.at[info["parent_2_index"], "visited"] = self.history.iloc[info["parent_2_index"]]["visited"] + 1 child_tests.append(dict(test=child, method=method, **info)) return child_tests
[docs] def _select_crossover_candidates(self) -> List[TestIndividual]: if len(self.history) <= 0: logger.warning("Empty history. Cannot get best parent.") return [] assert self.objective.feature in self.history.columns, "Target feature is recorded in history records." selection = self._select_by_maxvisits_and_threshold(self.crossover_max_visits) if not self.crossover.is_applicable(selection): logger.warning( "Couldn't select enough tests to generate crossover candidates. " + f"Select: {len(selection)}, Crossover min size: {self.crossover.min_number_candidates_for_crossover}" ) return [] candidates = [] for index, candidate in selection.iterrows(): test = candidate["test"] if type(test) == str: logger.warning("Test was stored as a string value in the data frame.") test = ast.literal_eval(test) candidates.append((test, self.get_parent_info(index))) return candidates