Direct Preference Optimization (DPO)¶
DPO (Direct Preference Optimization) is an elegant simplification of RLHF (Reinforcement Learning from Human Feedback) that makes preference learning more computationally efficient, especially for large language models.
The two key innovations are:
Eliminating the reward model: Instead of training a separate reward model to score outputs (which requires additional compute and memory), DPO directly optimizes the policy using preference data. It reparameterizes the reward function implicitly through the policy itself, deriving a closed-form solution for the optimal policy.
Preference-based optimization: DPO treats the preference learning problem as a classification task over pairs of responses. It maximizes the likelihood that preferred responses are ranked higher than rejected ones under the current policy, relative to a reference policy. This approach eliminates the need for sampling and reward model queries during training.
These changes are particularly valuable for LLM training because they reduce computational overhead by removing the need for a separate reward model and RL training loop, provide more stable training dynamics by avoiding the complexities of reinforcement learning, and they simplify implementation while achieving comparable or better performance than traditional RLHF.
Example¶
from agilerl.algorithms import DPO from agilerl.utils.llm_utils import PreferenceGym from accelerate import Accelerator from datasets import load_dataset from peft import get_peft_model from transformers import AutoModelForCausalLM, AutoTokenizer import torch # Instantiate the model and the associated tokenizer model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen2.5-3B", torch_dtype=torch.bfloat16, ) tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-3B") # Instantiate an accelerator object for distributed training accelerator = Accelerator() # Load the dataset into a PreferenceGym environment raw_dataset = load_dataset("HumanLLMs/Human-Like-DPO-Dataset", split="train").shuffle(seed=42) train_test_split = raw_dataset.train_test_split(test_size=0.1) train_dataset = train_test_split["train"] test_dataset = train_test_split["test"] env = PreferenceGym( train_dataset=train_dataset, test_dataset=test_dataset, tokenizer=tokenizer, data_batch_size_per_gpu=16, accelerator=accelerator, ) # Instantiate the agent agent = DPO( env.observation_space, env.action_space, actor_network=model, pad_token_id=tokenizer.eos_token_id, pad_token=tokenizer.eos_token, device="cuda" if torch.cuda.is_available() else "cpu", batch_size=32, lr=0.000005, beta=0.001, update_epochs=1, seed=42, reduce_memory_peak=True, accelerator=accelerator, ) Training a DPO agent¶
To train a DPO agent on a single preference gym environment, use the finetune_llm_preference function:
from agilerl.training.train_llm import finetune_llm_preference finetune_llm_preference( [agent], env, num_epochs=1, checkpoint_steps=250, accelerator=accelerator, ) Saving and Loading Agents¶
To save an agent, use the save_llm_checkpoint function:
from agilerl.utils.utils import save_llm_checkpoint save_llm_checkpoint(agent, "path/to/checkpoint") To load a trained model, you must use the HuggingFace .from_pretrained method, AgileRL is compatible with HuggingFace and Peft models:
from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel import torch base_model = AutoModelForCausalLM.from_pretrained( "Qwen/Qwen2.5-3B", torch_dtype=torch.bfloat16, device_map="auto" ) tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-3B") model = PeftModel.from_pretrained(base_model, "path/to/model/directory") Parameters¶
- class agilerl.algorithms.dpo.DPO(*args, **kwargs)¶
The DPO algorithm class. DPO paper: https://arxiv.org/pdf/2305.18290
- Parameters:
pad_token_id (int) – Pad token id
pad_token (str) – Pad token
model_name (str, optional) – Model name
actor_network (PreTrainedModel) – HuggingFace LLM
model_config – Model configuration, to be used when creating the model from a name or path
hp_config (HyperparameterConfig, optional) – RL hyperparameter mutation configuration, defaults to None, whereby algorithm mutations are disabled.
index (int, optional) – Index to keep track of object instance during tournament selection and mutation, defaults to 0
batch_size (int, optional) – Batch size for training, defaults to 16
lr (float, optional) – Learning rate, defaults to 0.000005
beta (float, optional) – Beta parameter for DPO, defaults to 0.001
max_grad_norm (float, optional) – Maximum gradient norm, defaults to 0.1
update_epochs (int, optional) – Number of update epochs, defaults to 1
calc_position_embeddings (bool, optional) – Flag to indicate if position embeddings should be calculated, defaults to True
micro_batch_size_per_gpu (int, optional) – Micro batch size per GPU, defaults to None
reduce_memory_peak (bool, optional) – Flag to indicate if memory peak should be reduced, defaults to False
device (str, optional) – Device for accelerated computing, ‘cpu’ or ‘cuda’, defaults to ‘cpu’
lora_config (LoraConfig, optional) – Config for LoRA, defaults to None
accelerator (accelerate.Accelerator(), optional) – Accelerator for distributed computing, defaults to None
wrap (bool, optional) – Wrap models for distributed training upon creation, defaults to True
clone (bool, optional) – Flag to indicate if the instantiation is a cloning, defaults to False
use_separate_reference_adapter (bool, optional) – Flag to indicate if the reference policy should have a separate adapter, defaults to False
seed (int, optional) – Seed for the random number generator, defaults to 42
gradient_checkpointing (bool, optional) – Flag to indicate if gradient checkpointing should be used, defaults to True
- clone(index: int | None = None, wrap: bool = True)¶
Creates a clone of the algorithm.
- Parameters:
- Returns:
A clone of the algorithm
- Return type:
- static copy_attributes(agent: SelfEvolvableAlgorithm, clone: SelfEvolvableAlgorithm) SelfEvolvableAlgorithm¶
Copies the non-evolvable attributes of the algorithm to a clone.
- Parameters:
clone (SelfEvolvableAlgorithm) – The clone of the algorithm.
- Returns:
The clone of the algorithm.
- Return type:
SelfEvolvableAlgorithm
- static create_prompt_masks(prompt_lengths: list[int], max_length: int) Tensor¶
Creates a mask for the prompts based on the prompt lengths (vectorized).
- evolvable_attributes(networks_only: bool = False) dict[str, EvolvableModule | ModuleDict | Optimizer | dict[str, Optimizer] | OptimizerWrapper]¶
Returns the attributes related to the evolvable networks in the algorithm. Includes attributes that are either EvolvableModule or ModuleDict objects, as well as the optimizers associated with the networks.
- get_action(obs: list[ReasoningPrompts], training: bool = True) tuple[list[Tensor], list[Tensor]]¶
Returns the action of the agent.
- static get_action_dim(action_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary]) tuple[int, ...]¶
Returns the dimension of the action space as it pertains to the underlying networks (i.e. the output size of the networks).
- Parameters:
action_space (spaces.Space or list[spaces.Space].) – The action space of the environment.
- Returns:
The dimension of the action space.
- Return type:
int.
- get_policy() EvolvableModule¶
Returns the policy network of the algorithm.
- static get_state_dim(observation_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary]) tuple[int, ...]¶
Returns the dimension of the state space as it pertains to the underlying networks (i.e. the input size of the networks).
- static inspect_attributes(agent: SelfEvolvableAlgorithm, input_args_only: bool = False) dict[str, Any]¶
Inspect and retrieve the attributes of the current object, excluding attributes related to the underlying evolvable networks (i.e. EvolvableModule, torch.optim.Optimizer) and with an option to include only the attributes that are input arguments to the constructor.
- learn(experiences: dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts]] | tuple[ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts], ...], training: bool = True) tuple[float, float, float]¶
Updates agent network parameters to learn from preference data.
- Parameters:
- Returns:
mean loss, mean chosen reward, mean rejected reward
- Return type:
- classmethod load(path: str, device: str | device = 'cpu', accelerator: Accelerator | None = None) None¶
Loads an algorithm from a checkpoint.
- Parameters:
path (string) – Location to load checkpoint from.
device (str, optional) – Device to load the algorithm on, defaults to ‘cpu’
accelerator (Optional[Accelerator], optional) – Accelerator object for distributed computing, defaults to None
- Returns:
An instance of the algorithm
- Return type:
- load_checkpoint(path: str) None¶
Override the load_checkpoint method to provide guidance on the correct method to use.
- Parameters:
path (string) – Location to load checkpoint from
- classmethod population(size: int, observation_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary], action_space: Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | list[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary], wrapper_cls: type[SelfAgentWrapper] | None = None, wrapper_kwargs: dict[str, Any] = {}, **kwargs) list[SelfEvolvableAlgorithm | SelfAgentWrapper]¶
Creates a population of algorithms.
- Parameters:
size (int.) – The size of the population.
- Returns:
A list of algorithms.
- Return type:
list[SelfEvolvableAlgorithm].
- preprocess_observation(observation: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts]) Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor]¶
Dummy preprocesses observations for forward pass through neural network.
- register_mutation_hook(hook: Callable) None¶
Registers a hook to be executed after a mutation is performed on the algorithm.
- Parameters:
hook (Callable) – The hook to be executed after mutation.
- register_network_group(group: NetworkGroup) None¶
Sets the evaluation network for the algorithm.
- Parameters:
name (str) – The name of the evaluation network.
- reinit_optimizers(optimizer: OptimizerConfig | None = None) None¶
Reinitialize the optimizers of an algorithm. If no optimizer is passed, all optimizers are reinitialized.
- Parameters:
optimizer (Optional[OptimizerConfig], optional) – The optimizer to reinitialize, defaults to None, in which case all optimizers are reinitialized.
- save_checkpoint(path: str, weights_only: bool = True) None¶
Override the save_checkpoint method to provide guidance on the correct method to use. :param path: Location to save checkpoint at :type path: string :param weights_only: If True, only save the weights of the model, defaults to False :type weights_only: bool, optional
- set_reference_policy(reference_update_tracker: int) None¶
Update the reference policy when the reference policy update tracker is greater than the current reference policy update tracker.
- Parameters:
reference_update_tracker (int) – The reference policy update tracker
- set_training_mode(training: bool) None¶
Sets the training mode of the algorithm.
- Parameters:
training (bool) – If True, set the algorithm to training mode.
- test(env: PreferenceGym, loop: int = 1) Tensor¶
Returns the fitness (test) score tensor of the agent.
- Parameters:
env (PreferenceGym environment) – The environment to be tested in
loop (int, optional) – Number of testing loops/episodes to complete. The returned score is the mean. Defaults to 3
- Returns:
Test score tensor of the agent
- Return type:
torch.Tensor
- to_device(*experiences: Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor]) tuple[Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor], ...]¶
Moves experiences to the device.
- static update_lr(optimizer: Optimizer, lr: float, accelerator: Accelerator | None = None, scheduler_config: CosineLRScheduleConfig | None = None) tuple[Accelerator | None, SequentialLR | None]¶
Update the learning rate of the optimizer
- Parameters:
optimizer (Optimizer) – Optimizer
lr (float) – Learning rate
accelerator (Optional[Accelerator]) – Accelerator
scheduler_config (Optional[CosineLRScheduleConfig]) – Scheduler configuration
- Returns:
Tuple of accelerator and scheduler
- Returns:
Accelerator