Direct Preference Optimization (DPO)

DPO (Direct Preference Optimization) is an elegant simplification of RLHF (Reinforcement Learning from Human Feedback) that makes preference learning more computationally efficient, especially for large language models.

The two key innovations are:

  • Eliminating the reward model: Instead of training a separate reward model to score outputs (which requires additional compute and memory), DPO directly optimizes the policy using preference data. It reparameterizes the reward function implicitly through the policy itself, deriving a closed-form solution for the optimal policy.

  • Preference-based optimization: DPO treats the preference learning problem as a classification task over pairs of responses. It maximizes the likelihood that preferred responses are ranked higher than rejected ones under the current policy, relative to a reference policy. This approach eliminates the need for sampling and reward model queries during training.

These changes are particularly valuable for LLM training because they reduce computational overhead by removing the need for a separate reward model and RL training loop, provide more stable training dynamics by avoiding the complexities of reinforcement learning, and they simplify implementation while achieving comparable or better performance than traditional RLHF.

Example

from agilerl.algorithms import DPO from agilerl.utils.llm_utils import PreferenceGym from accelerate import Accelerator from datasets import load_dataset from peft import get_peft_model from transformers import AutoModelForCausalLM, AutoTokenizer import torch # Instantiate the model and the associated tokenizer model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen2.5-3B", torch_dtype=torch.bfloat16, ) tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-3B") # Instantiate an accelerator object for distributed training accelerator = Accelerator() # Load the dataset into a PreferenceGym environment raw_dataset = load_dataset("HumanLLMs/Human-Like-DPO-Dataset", split="train").shuffle(seed=42) train_test_split = raw_dataset.train_test_split(test_size=0.1) train_dataset = train_test_split["train"] test_dataset = train_test_split["test"] env = PreferenceGym( train_dataset=train_dataset, test_dataset=test_dataset, tokenizer=tokenizer, data_batch_size_per_gpu=16, accelerator=accelerator, ) # Instantiate the agent agent = DPO( env.observation_space, env.action_space, actor_network=model, pad_token_id=tokenizer.eos_token_id, pad_token=tokenizer.eos_token, device="cuda" if torch.cuda.is_available() else "cpu", batch_size=32, lr=0.000005, beta=0.001, update_epochs=1, seed=42, reduce_memory_peak=True, accelerator=accelerator, ) 

Training a DPO agent

To train a DPO agent on a single preference gym environment, use the finetune_llm_preference function:

from agilerl.training.train_llm import finetune_llm_preference finetune_llm_preference( [agent], env, num_epochs=1, checkpoint_steps=250, accelerator=accelerator, ) 

Saving and Loading Agents

To save an agent, use the save_llm_checkpoint function:

from agilerl.utils.utils import save_llm_checkpoint save_llm_checkpoint(agent, "path/to/checkpoint") 

To load a trained model, you must use the HuggingFace .from_pretrained method, AgileRL is compatible with HuggingFace and Peft models:

from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel import torch base_model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen2.5-3B", torch_dtype=torch.bfloat16, device_map="auto" ) tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-3B") model = PeftModel.from_pretrained(base_model, "path/to/model/directory") 

Parameters

class agilerl.algorithms.dpo.DPO(*args, **kwargs)

The DPO algorithm class. DPO paper: https://arxiv.org/pdf/2305.18290

Parameters:
  • pad_token_id (int) – Pad token id

  • pad_token (str) – Pad token

  • model_name (str, optional) – Model name

  • actor_network (PreTrainedModel) – HuggingFace LLM

  • model_config – Model configuration, to be used when creating the model from a name or path

  • hp_config (HyperparameterConfig, optional) – RL hyperparameter mutation configuration, defaults to None, whereby algorithm mutations are disabled.

  • index (int, optional) – Index to keep track of object instance during tournament selection and mutation, defaults to 0

  • batch_size (int, optional) – Batch size for training, defaults to 16

  • lr (float, optional) – Learning rate, defaults to 0.000005

  • beta (float, optional) – Beta parameter for DPO, defaults to 0.001

  • max_grad_norm (float, optional) – Maximum gradient norm, defaults to 0.1

  • update_epochs (int, optional) – Number of update epochs, defaults to 1

  • calc_position_embeddings (bool, optional) – Flag to indicate if position embeddings should be calculated, defaults to True

  • micro_batch_size_per_gpu (int, optional) – Micro batch size per GPU, defaults to None

  • reduce_memory_peak (bool, optional) – Flag to indicate if memory peak should be reduced, defaults to False

  • device (str, optional) – Device for accelerated computing, ‘cpu’ or ‘cuda’, defaults to ‘cpu’

  • lora_config (LoraConfig, optional) – Config for LoRA, defaults to None

  • accelerator (accelerate.Accelerator(), optional) – Accelerator for distributed computing, defaults to None

  • wrap (bool, optional) – Wrap models for distributed training upon creation, defaults to True

  • clone (bool, optional) – Flag to indicate if the instantiation is a cloning, defaults to False

  • use_separate_reference_adapter (bool, optional) – Flag to indicate if the reference policy should have a separate adapter, defaults to False

  • seed (int, optional) – Seed for the random number generator, defaults to 42

  • gradient_checkpointing (bool, optional) – Flag to indicate if gradient checkpointing should be used, defaults to True

clean_up() None

Clean up the algorithm.

clone(index: int | None = None, wrap: bool = True)

Creates a clone of the algorithm.

Parameters:
  • index (Optional[int], optional) – The index of the clone, defaults to None

  • wrap (bool, optional) – If True, wrap the models in the clone with the accelerator, defaults to False

Returns:

A clone of the algorithm

Return type:

EvolvableAlgorithm

static copy_attributes(agent: SelfEvolvableAlgorithm, clone: SelfEvolvableAlgorithm) SelfEvolvableAlgorithm

Copies the non-evolvable attributes of the algorithm to a clone.

Parameters:

clone (SelfEvolvableAlgorithm) – The clone of the algorithm.

Returns:

The clone of the algorithm.

Return type:

SelfEvolvableAlgorithm

static create_prompt_masks(prompt_lengths: list[int], max_length: int) Tensor

Creates a mask for the prompts based on the prompt lengths (vectorized).

Parameters:
  • prompt_lengths (list[int]) – List of prompt lengths

  • max_length (int) – Maximum length of the prompts

Returns:

Mask tensor [batch_size, max_length]

Return type:

torch.Tensor

evolvable_attributes(networks_only: bool = False) dict[str, EvolvableModule | ModuleDict | Optimizer | dict[str, Optimizer] | OptimizerWrapper]

Returns the attributes related to the evolvable networks in the algorithm. Includes attributes that are either EvolvableModule or ModuleDict objects, as well as the optimizers associated with the networks.

Parameters:

networks_only (bool, optional) – If True, only include evolvable networks, defaults to False

Returns:

A dictionary of network attributes.

Return type:

dict[str, Any]

get_action(obs: list[ReasoningPrompts], training: bool = True) tuple[list[Tensor], list[Tensor]]

Returns the action of the agent.

Parameters:
  • obs (LLMObsType) – The observation of the agent

  • training (bool) – Whether the agent is training or not

Returns:

The action of the agent

Return type:

tuple[list[torch.Tensor], list[torch.Tensor]]

static get_action_dim(action_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary]) tuple[int, ...]

Returns the dimension of the action space as it pertains to the underlying networks (i.e. the output size of the networks).

Parameters:

action_space (spaces.Space or list[spaces.Space].) – The action space of the environment.

Returns:

The dimension of the action space.

Return type:

int.

get_lr_names() list[str]

Returns the learning rates of the algorithm.

get_policy() EvolvableModule

Returns the policy network of the algorithm.

static get_state_dim(observation_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary]) tuple[int, ...]

Returns the dimension of the state space as it pertains to the underlying networks (i.e. the input size of the networks).

Parameters:

observation_space (spaces.Space or list[spaces.Space].) – The observation space of the environment.

Returns:

The dimension of the state space.

Return type:

tuple[int, …].

property index: int

Returns the index of the algorithm.

static inspect_attributes(agent: SelfEvolvableAlgorithm, input_args_only: bool = False) dict[str, Any]

Inspect and retrieve the attributes of the current object, excluding attributes related to the underlying evolvable networks (i.e. EvolvableModule, torch.optim.Optimizer) and with an option to include only the attributes that are input arguments to the constructor.

Parameters:

input_args_only (bool) – If True, only include attributes that are input arguments to the constructor. Defaults to False.

Returns:

A dictionary of attribute names and their values.

Return type:

dict[str, Any]

learn(experiences: dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts]] | tuple[ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts], ...], training: bool = True) tuple[float, float, float]

Updates agent network parameters to learn from preference data.

Parameters:
  • experiences (tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) – Batched chosen_input_ids, rejected_input_ids, chosen_attention_mask, rejected_attention_mask and rewards

  • training (bool) – Whether the agent is training or not

Returns:

mean loss, mean chosen reward, mean rejected reward

Return type:

tuple[float, float, float]

classmethod load(path: str, device: str | device = 'cpu', accelerator: Accelerator | None = None) None

Loads an algorithm from a checkpoint.

Parameters:
  • path (string) – Location to load checkpoint from.

  • device (str, optional) – Device to load the algorithm on, defaults to ‘cpu’

  • accelerator (Optional[Accelerator], optional) – Accelerator object for distributed computing, defaults to None

Returns:

An instance of the algorithm

Return type:

RLAlgorithm

load_checkpoint(path: str) None

Override the load_checkpoint method to provide guidance on the correct method to use.

Parameters:

path (string) – Location to load checkpoint from

property mut: Any

Returns the mutation object of the algorithm.

mutation_hook() None

Executes the hooks registered with the algorithm.

classmethod population(size: int, observation_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary], action_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary], wrapper_cls: type[SelfAgentWrapper] | None = None, wrapper_kwargs: dict[str, Any] = {}, **kwargs) list[SelfEvolvableAlgorithm | SelfAgentWrapper]

Creates a population of algorithms.

Parameters:

size (int.) – The size of the population.

Returns:

A list of algorithms.

Return type:

list[SelfEvolvableAlgorithm].

preprocess_observation(observation: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts]) Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor]

Dummy preprocesses observations for forward pass through neural network.

Parameters:

observations (numpy.ndarray[float] or dict[str, numpy.ndarray[float]]) – Observations of environment

Returns:

Preprocessed observations

Return type:

torch.Tensor[float] or dict[str, torch.Tensor[float]] or tuple[torch.Tensor[float], …]

recompile() None

Recompiles the algorithm.

register_mutation_hook(hook: Callable) None

Registers a hook to be executed after a mutation is performed on the algorithm.

Parameters:

hook (Callable) – The hook to be executed after mutation.

register_network_group(group: NetworkGroup) None

Sets the evaluation network for the algorithm.

Parameters:

name (str) – The name of the evaluation network.

reinit_optimizers(optimizer: OptimizerConfig | None = None) None

Reinitialize the optimizers of an algorithm. If no optimizer is passed, all optimizers are reinitialized.

Parameters:

optimizer (Optional[OptimizerConfig], optional) – The optimizer to reinitialize, defaults to None, in which case all optimizers are reinitialized.

save_checkpoint(path: str, weights_only: bool = True) None

Override the save_checkpoint method to provide guidance on the correct method to use. :param path: Location to save checkpoint at :type path: string :param weights_only: If True, only save the weights of the model, defaults to False :type weights_only: bool, optional

select_policy(use_reference: bool = False) None

Select the policy.

set_reference_policy(reference_update_tracker: int) None

Update the reference policy when the reference policy update tracker is greater than the current reference policy update tracker.

Parameters:

reference_update_tracker (int) – The reference policy update tracker

set_training_mode(training: bool) None

Sets the training mode of the algorithm.

Parameters:

training (bool) – If True, set the algorithm to training mode.

test(env: PreferenceGym, loop: int = 1) Tensor

Returns the fitness (test) score tensor of the agent.

Parameters:
  • env (PreferenceGym environment) – The environment to be tested in

  • loop (int, optional) – Number of testing loops/episodes to complete. The returned score is the mean. Defaults to 3

Returns:

Test score tensor of the agent

Return type:

torch.Tensor

to_device(*experiences: Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor]) tuple[Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor], ...]

Moves experiences to the device.

Parameters:

experiences (tuple[torch.Tensor[float], ...]) – Experiences to move to device

Returns:

Experiences on the device

Return type:

tuple[torch.Tensor[float], …]

unwrap_models() None

Unwraps the models in the algorithm from the accelerator.

static update_lr(optimizer: Optimizer, lr: float, accelerator: Accelerator | None = None, scheduler_config: CosineLRScheduleConfig | None = None) tuple[Accelerator | None, SequentialLR | None]

Update the learning rate of the optimizer

Parameters:
  • optimizer (Optimizer) – Optimizer

  • lr (float) – Learning rate

  • accelerator (Optional[Accelerator]) – Accelerator

  • scheduler_config (Optional[CosineLRScheduleConfig]) – Scheduler configuration

Returns:

Tuple of accelerator and scheduler

Returns:

Accelerator

wrap_models() None

Wrap the models in the accelerator, DeepSpeed objects must be wrapped at the same time, not individually.