跳转至

EvoEngineer(进化工程师)

evotoolkit.evo_method.evoengineer.EvoEngineer

Bases: Method

Source code in src/evotoolkit/evo_method/evoengineer/evoengineer.py
@register_algorithm("evoengineer", config=EvoEngineerConfig)
class EvoEngineer(Method):
    def __init__(self, config: EvoEngineerConfig):
        super().__init__(config)
        self.config = config

    def run(self):
        """Main EvoEngineer algorithm execution"""
        self.verbose_title("EvoEngineer ALGORITHM STARTED")

        if "sample" not in self.run_state_dict.usage_history:
            self.run_state_dict.usage_history["sample"] = []

        # Initialize with seed solution if sol_history is empty
        if len(self.run_state_dict.sol_history) == 0:
            init_sol = self._get_init_sol()
            if init_sol is None:
                exit()
            # Don't use _register_solution as this doesn't consume samples
            self.run_state_dict.sol_history.append(init_sol)
            self.run_state_dict.population.append(init_sol)
            self._save_run_state_dict()
            self.verbose_info(
                f"Initialized with baseline solution (score: {init_sol.evaluation_res.score if init_sol.evaluation_res else 'None'})"
            )

        # Initialize population if starting from scratch
        if self.run_state_dict.generation == 0:
            self._initialize_population()

        # Check if we have enough individuals for selection
        valid_population = self._get_valid_population(self.run_state_dict.population)
        if len(valid_population) < self.config.interface.valid_require:
            self.verbose_info(
                f"The search is terminated since EvoEngineer unable to obtain {self.config.interface.valid_require} feasible algorithms during initialization."
            )
            return

        # Main evolution loop - moved loop control logic here
        while (self.run_state_dict.generation < self.config.max_generations) and (
            self.run_state_dict.tot_sample_nums < self.config.max_sample_nums
        ):
            try:
                self.verbose_info(
                    f"Generation {self.run_state_dict.generation} - Sample {self.run_state_dict.tot_sample_nums + 1} - {self.run_state_dict.tot_sample_nums + self.config.num_samplers} / {self.config.max_sample_nums or 'unlimited'}"
                )

                # Apply offspring operators in parallel for this generation
                self._apply_operators_parallel(
                    self.config.get_offspring_operators(),
                    f"Gen {self.run_state_dict.generation}",
                )

                # Manage population size - keep only the best pop_size individuals
                self._manage_population_size()

                self.run_state_dict.generation += 1
                self._save_run_state_dict()

            except KeyboardInterrupt:
                self.verbose_info("Evolution interrupted by user")
                break
            except Exception as e:
                self.verbose_info(f"Evolution error: {str(e)}")
                continue

        # Mark as done and save final state
        self.run_state_dict.is_done = True
        self._save_run_state_dict()

    def _initialize_population(self):
        """Initialize population using init operators - keep generating until we have enough valid solutions"""
        self.verbose_info("Initializing population...")

        initial_sample_limit = self.config.max_sample_nums  # Reasonable limit

        # Keep generating until we have pop_size valid solutions or hit sample limit
        while self.run_state_dict.tot_sample_nums < initial_sample_limit:
            # Apply init operators in parallel
            self._apply_operators_parallel(self.config.get_init_operators(), "Init")

            valid_count = len(
                self._get_valid_population(self.run_state_dict.population)
            )
            self.verbose_info(f"Valid solutions: {valid_count}/{self.config.pop_size}")

            self._save_run_state_dict()

            if valid_count >= self.config.interface.valid_require:
                break

        valid_population = self._get_valid_population(self.run_state_dict.population)
        if len(valid_population) >= self.config.interface.valid_require:
            self.run_state_dict.generation = 1
            self._save_run_state_dict()
            self.verbose_info(
                f"Initialization completed with {len(valid_population)} valid solutions"
            )
        else:
            self.verbose_info(
                f"Warning: Only {len(valid_population)} valid solutions obtained, need at least {self.config.interface.valid_require}"
            )

    def _get_valid_population(self, population: List[Solution]) -> List[Solution]:
        """Get valid solutions from population"""
        return [
            sol for sol in population if sol.evaluation_res and sol.evaluation_res.valid
        ]

    def _get_best_valid_sol(self, sol_history: List[Solution]) -> Solution | None:
        """Get the best valid solution from sol_history"""
        valid_sols = [
            sol
            for sol in sol_history
            if sol.evaluation_res
            and sol.evaluation_res.valid
            and sol.evaluation_res.score is not None
        ]
        if valid_sols:
            return max(valid_sols, key=lambda x: x.evaluation_res.score)
        return None

    def _register_solution(self, solution: Solution):
        """Register a new solution to both sol_history and population"""
        self.run_state_dict.sol_history.append(solution)
        self.run_state_dict.population.append(solution)
        self.run_state_dict.current_gen_solutions.append(solution)  # 添加到当前代历史
        self.run_state_dict.tot_sample_nums += 1

    def _apply_operators_parallel(self, operators: List, generation_label: str = ""):
        """Apply operators in parallel and register solutions"""
        if not operators:
            return

        # Single executor for both generation and evaluation
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.config.num_samplers + self.config.num_evaluators
        ) as executor:
            generate_futures = []
            eval_futures = []

            # Calculate target samples: multiple of num_operators, not exceeding num_samplers
            num_operators = len(operators)

            max_multiplier = self.config.num_samplers // num_operators
            target_samples = (
                max_multiplier * num_operators
            )  # Largest multiple of num_operators <= num_samplers
            samples_per_operator = (
                target_samples // num_operators
            )  # This equals max_multiplier

            # Generate samples: each operator gets exactly samples_per_operator samples
            sample_id = 0
            for operator in operators:
                for _ in range(samples_per_operator):
                    selected_individuals = self._select_individuals_for_operator(
                        operator
                    )
                    future = executor.submit(
                        self._generate_single_solution,
                        operator,
                        selected_individuals,
                        sample_id,
                    )
                    generate_futures.append((operator.name, future))
                    sample_id += 1

            # Process generations as they complete and immediately submit for evaluation
            future_to_operator = {
                future: operator_name for operator_name, future in generate_futures
            }
            for future in concurrent.futures.as_completed(
                [f for _, f in generate_futures]
            ):
                operator_name = future_to_operator[future]
                try:
                    solution, usage = future.result()

                    # Add usage history
                    self.run_state_dict.usage_history["sample"].append(usage)
                    self.run_state_dict.current_gen_usage.append(
                        usage
                    )  # 添加到当前代usage历史

                    # Immediately submit for evaluation without waiting
                    if solution.sol_string.strip():
                        eval_future = executor.submit(
                            self.config.task.evaluate_code, solution.sol_string
                        )
                        eval_futures.append((eval_future, solution, operator_name))
                    else:
                        self._register_solution(solution)
                        # Log result for empty solution
                        self.verbose_info(
                            f"{operator_name} {generation_label} - Score: None (Invalid)"
                        )

                except Exception as e:
                    self.verbose_info(f"Error generating {operator_name}: {str(e)}")
                    continue

            # Collect evaluation results
            eval_future_to_info = {
                eval_future: (solution, operator_name)
                for eval_future, solution, operator_name in eval_futures
            }
            for eval_future in concurrent.futures.as_completed(
                [ef for ef, _, _ in eval_futures]
            ):
                solution, operator_name = eval_future_to_info[eval_future]
                try:
                    evaluation_res = eval_future.result()
                    solution.evaluation_res = evaluation_res
                    self._register_solution(solution)

                    # Log result
                    score_str = (
                        "None"
                        if not solution.evaluation_res
                        or solution.evaluation_res.score is None
                        else f"{solution.evaluation_res.score}"
                    )
                    valid_str = (
                        "Valid"
                        if solution.evaluation_res and solution.evaluation_res.valid
                        else "Invalid"
                    )
                    self.verbose_info(
                        f"{operator_name} {generation_label} - Score: {score_str} ({valid_str})"
                    )

                except Exception as e:
                    self.verbose_info(f"Error evaluating {operator_name}: {str(e)}")
                    self._register_solution(solution)  # Add with no evaluation result
                    continue

    def _manage_population_size(self):
        """Manage population size - keep only the best pop_size individuals"""
        if len(self.run_state_dict.population) <= self.config.pop_size:
            return

        # Separate valid and invalid solutions
        valid_solutions = self._get_valid_population(self.run_state_dict.population)
        invalid_solutions = [
            sol for sol in self.run_state_dict.population if sol not in valid_solutions
        ]

        # Sort valid solutions by score (descending - higher is better)
        valid_solutions.sort(
            key=lambda x: x.evaluation_res.score
            if x.evaluation_res and x.evaluation_res.score is not None
            else float("-inf"),
            reverse=True,
        )

        # Keep the best valid solutions + some invalid ones if needed
        new_population = []

        # First, add the best valid solutions
        valid_to_keep = min(len(valid_solutions), self.config.pop_size)
        new_population.extend(valid_solutions[:valid_to_keep])

        # If we need more individuals, add some invalid ones (most recent)
        remaining_slots = self.config.pop_size - len(new_population)
        if remaining_slots > 0 and invalid_solutions:
            new_population.extend(
                invalid_solutions[-remaining_slots:]
            )  # Keep most recent invalid ones

        self.run_state_dict.population = new_population

        valid_count = len(self._get_valid_population(new_population))
        self.verbose_info(
            f"Population managed: {len(new_population)} total ({valid_count} valid, {len(new_population) - valid_count} invalid)"
        )

    def _select_individuals_for_operator(self, operator) -> List[Solution]:
        """Select individuals for an operator using rank-based probability selection"""
        import math

        import numpy as np

        if operator.selection_size <= 0:
            return []  # Init operators or invalid selection size

        # Filter valid solutions with finite scores (including NaN check)
        funcs = [
            sol
            for sol in self.run_state_dict.population
            if sol.evaluation_res
            and sol.evaluation_res.valid
            and sol.evaluation_res.score is not None
            and not math.isinf(sol.evaluation_res.score)
            and not math.isnan(sol.evaluation_res.score)
        ]

        if len(funcs) == 0:
            # Fallback to any available solutions
            return (
                self.run_state_dict.population[: operator.selection_size]
                if self.run_state_dict.population
                else []
            )

        # Sort by score (assuming higher is better)
        func = sorted(funcs, key=lambda f: f.evaluation_res.score, reverse=True)

        # Create rank-based probability distribution
        p = [1 / (r + len(func)) for r in range(len(func))]
        p = np.array(p)
        p = p / np.sum(p)

        # Select individuals based on probability
        selected = []
        for _ in range(
            min(operator.selection_size, len(func))
        ):  # Ensure we don't select more than available
            chosen = np.random.choice(func, p=p)
            selected.append(chosen)

        return selected

    def _generate_single_solution(
        self, operator, selected_individuals: List[Solution], sampler_id: int
    ) -> tuple[Solution, dict]:
        """Generate a single solution using an operator"""
        try:
            current_best_sol = self._get_best_sol(self.run_state_dict.population)
            random_3_thought = self._get_n_random_thought(3)
            prompt_content = self.config.interface.get_operator_prompt(
                operator.name, selected_individuals, current_best_sol, random_3_thought
            )
            response, usage = self.config.running_llm.get_response(prompt_content)
            new_sol = self.config.interface.parse_response(response)
            self.verbose_info(
                f"Sampler {sampler_id}: Generated {operator.name} solution"
            )
            return new_sol, usage
        except Exception as e:
            self.verbose_info(
                f"Sampler {sampler_id}: Failed to generate {operator.name} solution - {str(e)}"
            )
            return Solution(""), {}

    def _get_n_random_thought(self, n: int) -> List[str]:
        """Get n random thoughts from solutions in the current population"""
        import random

        # Get all thoughts from current population
        thoughts = []
        for sol in self.run_state_dict.population:
            if sol.other_info and "thought" in sol.other_info:
                thought = sol.other_info["thought"]
                if thought:  # Only add non-empty thoughts
                    thoughts.append(thought)

        # If we don't have enough thoughts, return all available ones
        if len(thoughts) <= n:
            return thoughts

        # Randomly sample n thoughts without replacement
        return random.sample(thoughts, n)

    def _get_run_state_class(self) -> Type[BaseRunStateDict]:
        return EvoEngineerRunStateDict

run

run()

Main EvoEngineer algorithm execution

Source code in src/evotoolkit/evo_method/evoengineer/evoengineer.py
def run(self):
    """Main EvoEngineer algorithm execution"""
    self.verbose_title("EvoEngineer ALGORITHM STARTED")

    if "sample" not in self.run_state_dict.usage_history:
        self.run_state_dict.usage_history["sample"] = []

    # Initialize with seed solution if sol_history is empty
    if len(self.run_state_dict.sol_history) == 0:
        init_sol = self._get_init_sol()
        if init_sol is None:
            exit()
        # Don't use _register_solution as this doesn't consume samples
        self.run_state_dict.sol_history.append(init_sol)
        self.run_state_dict.population.append(init_sol)
        self._save_run_state_dict()
        self.verbose_info(
            f"Initialized with baseline solution (score: {init_sol.evaluation_res.score if init_sol.evaluation_res else 'None'})"
        )

    # Initialize population if starting from scratch
    if self.run_state_dict.generation == 0:
        self._initialize_population()

    # Check if we have enough individuals for selection
    valid_population = self._get_valid_population(self.run_state_dict.population)
    if len(valid_population) < self.config.interface.valid_require:
        self.verbose_info(
            f"The search is terminated since EvoEngineer unable to obtain {self.config.interface.valid_require} feasible algorithms during initialization."
        )
        return

    # Main evolution loop - moved loop control logic here
    while (self.run_state_dict.generation < self.config.max_generations) and (
        self.run_state_dict.tot_sample_nums < self.config.max_sample_nums
    ):
        try:
            self.verbose_info(
                f"Generation {self.run_state_dict.generation} - Sample {self.run_state_dict.tot_sample_nums + 1} - {self.run_state_dict.tot_sample_nums + self.config.num_samplers} / {self.config.max_sample_nums or 'unlimited'}"
            )

            # Apply offspring operators in parallel for this generation
            self._apply_operators_parallel(
                self.config.get_offspring_operators(),
                f"Gen {self.run_state_dict.generation}",
            )

            # Manage population size - keep only the best pop_size individuals
            self._manage_population_size()

            self.run_state_dict.generation += 1
            self._save_run_state_dict()

        except KeyboardInterrupt:
            self.verbose_info("Evolution interrupted by user")
            break
        except Exception as e:
            self.verbose_info(f"Evolution error: {str(e)}")
            continue

    # Mark as done and save final state
    self.run_state_dict.is_done = True
    self._save_run_state_dict()

配置

evotoolkit.evo_method.evoengineer.EvoEngineerConfig

Bases: BaseConfig

Source code in src/evotoolkit/evo_method/evoengineer/run_config.py
class EvoEngineerConfig(BaseConfig):
    def __init__(
        self,
        interface: EvoEngineerInterface,
        output_path: str,
        running_llm: HttpsApi,
        verbose: bool = True,
        max_generations: int = 10,
        max_sample_nums: int = 45,
        pop_size: int = 5,
        num_samplers: int = 4,
        num_evaluators: int = 4,
        **kwargs,
    ):
        super().__init__(interface, output_path, verbose)
        self.running_llm = running_llm

        self.max_generations = max_generations
        self.max_sample_nums = max_sample_nums
        self.pop_size = pop_size
        self.num_samplers = num_samplers
        self.num_evaluators = num_evaluators

        # Get operators from adapter
        self.init_operators = interface.get_init_operators()
        self.offspring_operators = interface.get_offspring_operators()

        # Validate required operators
        if not self.init_operators:
            raise ValueError("Adapter must provide at least one init operator")
        if not self.offspring_operators:
            raise ValueError("Adapter must provide at least one offspring operator")

        # Validate init operators have selection_size=0
        for op in self.init_operators:
            if op.selection_size != 0:
                raise ValueError(
                    f"Init operator '{op.name}' must have selection_size=0, got {op.selection_size}"
                )

    def get_init_operators(self) -> List[Operator]:
        """Get initialization operators"""
        return self.init_operators

    def get_offspring_operators(self) -> List[Operator]:
        """Get offspring operators"""
        return self.offspring_operators

    def get_all_operators(self) -> List[Operator]:
        """Get all operators"""
        return self.init_operators + self.offspring_operators

get_init_operators

get_init_operators() -> List[Operator]

Get initialization operators

Source code in src/evotoolkit/evo_method/evoengineer/run_config.py
def get_init_operators(self) -> List[Operator]:
    """Get initialization operators"""
    return self.init_operators

get_offspring_operators

get_offspring_operators() -> List[Operator]

Get offspring operators

Source code in src/evotoolkit/evo_method/evoengineer/run_config.py
def get_offspring_operators(self) -> List[Operator]:
    """Get offspring operators"""
    return self.offspring_operators

get_all_operators

get_all_operators() -> List[Operator]

Get all operators

Source code in src/evotoolkit/evo_method/evoengineer/run_config.py
def get_all_operators(self) -> List[Operator]:
    """Get all operators"""
    return self.init_operators + self.offspring_operators