Source code for l2l.optimizees.community_detection.optimizee_hybrid

import os
import numpy as np

import time
from collections import namedtuple
from l2l.optimizees.optimizee import Optimizee
from .helpers import create_config, result_csv_hybrid
from dwave.cloud.client import Client
from dimod import BinaryQuadraticModel
from itertools import combinations
from dwave.system import LeapHybridSampler
import networkx as nx
from collections import defaultdict

HybridCommunityOptimizeeParameters = namedtuple(
    'HybridCommunityOptimizeeParameters', ['APIToken', 'config_path', 'num_partitions', 'Graph','result_path', 
                                           'one_hot_strength', 'weight'])


[docs] class HybridCommunityOptimizee(Optimizee): def __init__(self, traj, parameters): super().__init__(traj) self.num_partitions = parameters.num_partitions self.one_hot_strength = parameters.one_hot_strength self.G = parameters.Graph self.weight = parameters.weight self.ind_idx = traj.individual.ind_idx self.generation = traj.individual.generation self.config_path = os.path.join(parameters.config_path, "dwave.conf") os.makedirs(parameters.result_path, exist_ok=True) self.result_path = os.path.join(parameters.result_path, "result.csv") if not os.path.exists(self.config_path): create_config(parameters.APIToken, parameters.path)
[docs] def create_individual(self): """ Creates and returns the individual """ # create individual individual = {'num_partitions': self.num_partitions, 'one_hot_strength': self.one_hot_strength} return individual
[docs] def bounding_func(self, individual): """ Bounds the individual within the required bounds via coordinate clipping """ return {'num_partitons': np.clip(individual['num_partitions'], a_min=1, a_max=len(self.G.nodes)), 'one_hot_strength': np.clip(individual['one_hot_strength'], a_min=0.01, a_max=50)}
[docs] def simulate(self, traj): """ Performs community detection on the graph using the D-Wave Quantum Computer. Args: - traj: The trajectory object containing the individual's parameters. Returns: - A tuple containing the fitness value (1/modularity) of the clustering. """ # Extract metadata from trajectory object self.ind_idx = traj.individual.ind_idx self.generation = traj.individual.generation # Define partitions (clusters) and initialize variables partitions = range(int(traj.individual.num_partitions)) # Compute the modularity matrix of the graph B = nx.modularity_matrix(self.G, weight=self.weight) # Initialize a binary quadratic model (BQM) bqm = BinaryQuadraticModel('BINARY') # One-hot encoding constraint strength one_hot_strength = traj.individual.one_hot_strength # Add variables for each node in each partition (one-hot encoding) for i in self.G.nodes(): for c in partitions: var = f"{i}__{c}" bqm.add_variable(var, 0.0) # Add modularity-based interactions (encodes community structure objective) for i in self.G.nodes(): for j in self.G.nodes(): if i == j: continue for c in partitions: vi = f"{i}__{c}" vj = f"{j}__{c}" # Negative interaction to maximize modularity bqm.add_interaction(vi, vj, -B[i, j]) # Apply one-hot constraints: each node must belong to exactly one partition for i in self.G.nodes(): vars_i = [f"{i}__{c}" for c in partitions] # Add a small positive bias to all variables for v in vars_i: bqm.add_variable(v, bqm.get_linear(v) + one_hot_strength) # Penalize assigning a node to multiple clusters for u, v in combinations(vars_i, 2): bqm.add_interaction(u, v, 2 * one_hot_strength) # Subtract linear term to center the penalty for v in vars_i: bqm.add_variable(v, bqm.get_linear(v) - 2 * one_hot_strength) # Add constant offset for the one-hot penalty bqm.offset += one_hot_strength try: # Load D-Wave quantum sampler client client = Client.from_config(config_file=self.config_path) sampler = LeapHybridSampler() start = time.time() sampleset = sampler.sample(bqm, label='Hybrid-BQM-Community') run_time = (time.time() - start) best_sample = sampleset.first.sample client.close() # Iterate over samples in the sampleset valid_sample = None for sample in sampleset.samples(): is_valid = True for node in self.G.nodes(): # Count how many partitions this node is assigned to (i.e., how many variables for this node are 1) assigned = sum(sample[f"{node}__{c}"] for c in partitions) if assigned != 1: is_valid = False break # No need to continue if one node already violates the constraint if is_valid: valid_sample = sample break sample = valid_sample if valid_sample is not None else best_sample #Robust Decoding raw_assignments = defaultdict(list) # Collect all 1-valued variables per node for var, value in sample.items(): if value == 1: node_str, part_str = var.split("__") node = int(node_str) part = int(part_str) raw_assignments[node].append(part) # Clean and fix assignments assignment = {} for node in self.G.nodes(): assigned_parts = raw_assignments.get(node, []) if len(assigned_parts) == 1: assignment[node] = assigned_parts[0] else: # Invalid case: node unassigned or overassignment max_community = None max_edges = -1 for c in partitions: edges = sum(1 for neighbor in self.G.neighbors(node) if assignment.get(neighbor) == c) if edges > max_edges: max_edges = edges max_community = c if max_community is None: max_community = np.random.choice(partitions) # fallback assignment[node] = max_community #Build community sets communities = [] for c in partitions: comm = {node for node, cluster in assignment.items() if cluster == c} if comm: communities.append(comm) modularity = nx.community.modularity(self.G, communities) # Write the results to the result file result_csv_hybrid(self.result_path, run_time, self.generation, self.ind_idx, best_sample, communities, modularity) except Exception as e: with open(self.result_path, "a", encoding="utf-8") as f: import traceback f.write("An error occurred\n") traceback.print_exc(file=f) return (modularity,)