Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add zeropp #256

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions configs/7B_llama2_zeropp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
JOB_NAME = "7b_llama2_train"
model_type = "LLAMA2"
DO_ALERT = False

VOCAB_SIZE = 32000
SEQ_LEN = 2048
HIDDEN_SIZE = 4096
NUM_ATTENTION_HEAD = 32
NUM_KV_ATTENTION_HEAD = 8
MLP_RATIO = 3.5
NUM_LAYER = 32


MODEL_ONLY_FOLDER = "local:llm_ckpts/xxxx"
# Ckpt folder format:
# fs: 'local:/mnt/nfs/XXX'
SAVE_CKPT_FOLDER = "local:llm_ckpts"
LOAD_CKPT_FOLDER = "local:llm_ckpts/49"

# boto3 Ckpt folder format:
# import os
# BOTO3_IP = os.environ["BOTO3_IP"] # boto3 bucket endpoint
# SAVE_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm"
# LOAD_CKPT_FOLDER = f"boto3:s3://model_weights.{BOTO3_IP}/internlm/snapshot/1/"
CHECKPOINT_EVERY = 50
ckpt = dict(
enable_save_ckpt=False, # enable ckpt save.
save_ckpt_folder=SAVE_CKPT_FOLDER, # Path to save training ckpt.
# 'auto_resume' is designed to automatically load the latest checkpoint from 'save_ckpt_folder' when encountering
# training interruptions/hangs caused by hardware failures, using a scheduling system (such as k8s/slurm)
# with an automatic restart mechanism upon training reboot.
# Please be aware that if `auto_resume` is not set (its default value is True), it will not load the checkpoint
# path specified in `load_ckpt_info` by default.
# If you want to initialize your model weights from another model, you must set `auto_resume` to False.
# If you want to train from scratch, please set `auto_resume` to False and 'load_ckpt_info' to None.
auto_resume=False,
checkpoint_every=CHECKPOINT_EVERY,
async_upload=True, # async ckpt upload. (only work for boto3 ckpt)
async_upload_tmp_folder="/dev/shm/internlm_tmp_ckpt/", # path for temporarily files during asynchronous upload.
oss_snapshot_freq=int(CHECKPOINT_EVERY / 2), # snapshot ckpt save frequency.
)

TRAIN_FOLDER = None
VALID_FOLDER = None # "/path/to/dataset"
data = dict(
seq_len=SEQ_LEN,
# micro_num means the number of micro_batch contained in one gradient update
micro_num=4,
# unit_schedule_size is used with ZeroPP and means the number of micro_batch contained in one scheduling unit.
# If unit_schedule_size is not set, ZeroPP will use Interleaved 1F1B schedule. Otherwise it will use interleaved GPipe.
unit_schedule_size=4,
# packed_length = micro_bsz * SEQ_LEN
micro_bsz=1,
# defaults to the value of micro_num
valid_micro_num=4,
# defaults to 0, means disable evaluate
valid_every=0,
pack_sample_into_one=False,
total_steps=20,
skip_batches="",
# rampup_batch_size (str): A string with three space-separated integers representing the
# starting batch size, the increment, and the number of steps between
# each increment. For example, "192 24 8" means that the batch size (micro_num)
# starts at 192 and increases by 24 every 8 steps. Defaults to None.
# (IMPORTANT): The interval step size is 'micro_bsz'.
rampup_batch_size="",
# Datasets with less than 50 rows will be discarded
min_length=50,
train_folder=TRAIN_FOLDER,
valid_folder=VALID_FOLDER,
empty_cache_and_diag_interval=200,
diag_outlier_ratio=1.1,
)

grad_scaler = dict(
fp16=dict(
# the initial loss scale, defaults to 2**16
initial_scale=2**16,
# the minimum loss scale, defaults to None
min_scale=1,
# the number of steps to increase loss scale when no overflow occurs
growth_interval=1000,
),
# the multiplication factor for increasing loss scale, defaults to 2
growth_factor=2,
# the multiplication factor for decreasing loss scale, defaults to 0.5
backoff_factor=0.5,
# the maximum loss scale, defaults to None
max_scale=2**24,
# the number of overflows before decreasing loss scale, defaults to 2
hysteresis=2,
)

hybrid_zero_optimizer = dict(
# Enable low_level_optimzer overlap_communication
overlap_sync_grad=False,
overlap_sync_param=False,
# bucket size for nccl communication params
reduce_bucket_size=512 * 1024 * 1024,
# grad clipping
clip_grad_norm=1.0,
)

loss = dict(
label_smoothing=0,
)

adam = dict(
lr=1e-4,
adam_beta1=0.9,
adam_beta2=0.95,
adam_beta2_c=0,
adam_eps=1e-8,
weight_decay=0.01,
)

lr_scheduler = dict(
total_steps=data["total_steps"],
init_steps=0, # optimizer_warmup_step
warmup_ratio=0.01,
eta_min=1e-5,
last_epoch=-1,
)

beta2_scheduler = dict(
init_beta2=adam["adam_beta2"],
c=adam["adam_beta2_c"],
cur_iter=-1,
)

use_fp32_norm = False
model = dict(
checkpoint=False,
num_chunks=4,
num_attention_heads=NUM_ATTENTION_HEAD,
embed_split_hidden=True,
vocab_size=VOCAB_SIZE,
embed_grad_scale=1,
parallel_output=True,
hidden_size=HIDDEN_SIZE,
num_layers=NUM_LAYER,
no_bias=True,
mlp_ratio=MLP_RATIO,
apply_post_layer_norm=False,
dtype="torch.bfloat16",
norm_type="rmsnorm",
layer_norm_epsilon=1e-5,
num_kv_attention_heads=NUM_KV_ATTENTION_HEAD,
use_flash_attn=True,
# Whether the odd and even columns of the query and key in the model are normally interleaved.
# If it's True, the model's odd and even columns are normally ordered; if it's False,
# it means that the model has prematurely concatenated all odd columns and even columns in front
# and back, in order to improve the RoPE's computational efficiency.
# Example:
# qk_interleaved = True: q[-1] = [q1,q2,q3,q4,q5,q6,...], k[-1] = [k1,k2,k3,k4,k5,k6,...]
# qk_interleaved = False: q[-1] = [q1,q3,q5,...,q2,q4,q6,...], k[-1] = [k1,k3,k5,...,k2,k4,k6,...]
qk_interleaved=False,
)

"""
zero1 parallel (dict):
1. size: int
* if size <= 0, the size of the zero process group is equal to the size of the dp process group,
so parameters will be divided within the range of dp.
* if size == 1, zero is not used, and all dp groups retain the full amount of model parameters.
* if size > 1 and size <= dp world size, the world size of zero is a subset of dp world size.
For smaller models, it is usually a better choice to split the parameters within nodes with a setting <= 8.
2. fsdp: bool, enable/disable torch's fully sharded data parallel, defaults to False.
tensor parallel (dict):
1. size: int, the size of tensor parallel.
2. mode: str, the tensor parallel mode, should be in ['mtp', 'msp', 'fsp', 'isp'],
defaults to 'mtp', means the pure megatron tensor parallel without sequence parallel.
msp: megatron tensor parallel with sequence parallel, sequence parallel size = tensor parallel size.
fsp: tensor parallel by flash-attn with sequence parallel, sequence parallel size = tensor parallel size.
isp: customed intern sequence parallel without tensor parallel, can be used with weight parallel.
pipeline parallel (dict):
1. size: int, the size of pipeline parallel.
2. interleaved_overlap: bool, enable/disable communication overlap when using interleaved pipeline scheduler,
defaults to False.
weight parallel (dict):
1. size: int, the size of weight parallel.
2. overlap: bool, enable/disable all_gather/reduce_scatter communication overlap, defaults to False.
3. memory_pool: bool, enable/disable memory pool, defaults to False.
"""
parallel = dict(
zero1=dict(size=-1),
tensor=dict(size=1, mode="isp"),
#pipeline=dict(size=4, interleaved_overlap=True, use_zeropp=True, decouple_grad=True),
pipeline=dict(size=4, interleaved_overlap=True, use_zeropp=True, decouple_grad=True),
weight=dict(size=8, overlap=True, memory_pool=False),
)

cudnn_deterministic = False
cudnn_benchmark = False

monitor = dict(
# feishu alert configs
alert=dict(
enable_feishu_alert=DO_ALERT,
feishu_alert_address=None, # feishu webhook to send alert message
light_monitor_address=None, # light_monitor address to send heartbeat
alert_file_path=f"llm_alter/{JOB_NAME}_alert.log",
),
tensorboard=dict(
queue_max_length=10,
),
)

# metric_dtype can be "fp32" or other string
# only when set to "fp32" will use fp32 to calc in metrics
# metric_dtype = "fp32"
6 changes: 5 additions & 1 deletion internlm/core/context/parallel_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from internlm.utils.common import SingletonMeta
from internlm.utils.logger import get_logger
from internlm.utils.timeout import LLM_NCCL_TIMEOUT
from internlm.utils.zeropp_manager import ZeroppManager

from . import process_group_initializer as pgroup_initializer
from .process_group_initializer import ParallelMode
Expand Down Expand Up @@ -666,7 +667,10 @@ def set_seed(self, seed: int, dpseed_with_tpoffset: bool = False):
def set_virtual_pipeline_parallel_size(self, size):
self.virtual_pipeline_parallel_size = size

def set_virtual_pipeline_parallel_rank(self, rank):
def set_virtual_pipeline_parallel_rank(self, rank, clear_zeropp_cache=True):
if rank != self.virtual_pipeline_parallel_rank:
if clear_zeropp_cache:
ZeroppManager.clear_cached_wp_parameters()
self.virtual_pipeline_parallel_rank = rank


Expand Down
9 changes: 7 additions & 2 deletions internlm/core/parallel/shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,19 @@ def pipeline_parallel_sharding_wrapper(

models = []

for start, end in parts:
use_checkpoint = kwargs["checkpoint"]
for part_idx, (start, end) in enumerate(parts):
kwargs["num_layers"] = end - start
kwargs["first"] = start == 0
# If there is no content in the final layer, assign the last layer.
kwargs["last"] = end == num_layers and len(all_parts[-1]) != 0
kwargs["device"] = device
kwargs["start_layer_idx"] = start

if hasattr(gpc.config.parallel.pipeline, "use_zeropp") and gpc.config.parallel.pipeline.use_zeropp:
if part_idx == num_chunks - 1 and gpc.is_pipeline_last_stage(ignore_virtual=True):
kwargs["checkpoint"] = False
else:
kwargs["checkpoint"] = use_checkpoint
chunk = model_builder(**kwargs).to(device)
setattr(chunk, "first_layer", start)
setattr(chunk, "last_layer", end)
Expand Down
7 changes: 6 additions & 1 deletion internlm/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from .base_scheduler import BaseScheduler
from .no_pipeline_scheduler import NonPipelineScheduler
from .pipeline_scheduler import InterleavedPipelineScheduler, PipelineScheduler
from .pipeline_scheduler import (
InterleavedPipelineScheduler,
PipelineScheduler,
ZeroPPScheduler,
)

__all__ = [
"BaseScheduler",
"NonPipelineScheduler",
"InterleavedPipelineScheduler",
"PipelineScheduler",
"ZeroPPScheduler",
]
Loading
Loading