123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- # Copyright (c) Microsoft Corporation.
- # SPDX-License-Identifier: Apache-2.0
- # DeepSpeed Team
- import pytest
- import torch
- import deepspeed
- from deepspeed.ops.op_builder import QuantizerBuilder
- from deepspeed.accelerator import get_accelerator
- if not deepspeed.ops.__compatible_ops__[QuantizerBuilder.NAME]:
- pytest.skip("Inference ops are not available on this system", allow_module_level=True)
- inference_module = None
- def run_quantize_ds(activations, num_groups, q_bits, is_symmetric_quant):
- global inference_module
- if inference_module is None:
- inference_module = QuantizerBuilder().load()
- return inference_module.quantize(activations, num_groups, q_bits,
- inference_module.Symmetric if is_symmetric_quant else inference_module.Asymmetric)
- def run_dequantize_ds(activations, params, num_groups, q_bits, is_symmetric_quant):
- global inference_module
- if inference_module is None:
- inference_module = QuantizerBuilder().load()
- return inference_module.dequantize(
- activations,
- params,
- num_groups,
- q_bits,
- inference_module.Symmetric if is_symmetric_quant else inference_module.Asymmetric,
- )
- def get_q_props(q_bits):
- q_range = 2**q_bits
- q_min = -(2**(q_bits - 1))
- q_max = (2**(q_bits - 1) - 1)
- q_min = torch.IntTensor([q_min]).to(device=get_accelerator().device_name())
- q_max = torch.IntTensor([q_max]).to(device=get_accelerator().device_name())
- return q_range, q_max, q_min
- def get_scale_zero_point(q_bits, is_symmetric_quant, max, min, absmax, scales=None, zero_points=None):
- q_range, q_max, q_min = get_q_props(q_bits)
- if is_symmetric_quant:
- scale = torch.empty_like(absmax)
- for i, x in enumerate(absmax):
- scale[i] = torch.ones_like(x) if x == 0 else q_range / (2 * x)
- zero_point = torch.zeros(scale.shape, dtype=torch.float32, device=get_accelerator().device_name())
- else:
- scale = torch.empty_like(max)
- for i, x in enumerate(max):
- scale[i] = torch.ones_like(x) if max[i] == min[i] else q_range / (max[i] - min[i])
- zero_point = q_min - (min * scale)
- return scale, zero_point
- def int4x2to2xint4(int4X2tensor):
- high = int4X2tensor >> 4
- low = (int4X2tensor << 4) >> 4
- return torch.stack((high, low), dim=-1).flatten()
- def run_float_quantize(q_bits, is_symmetric_quant, activations_ref, num_groups):
- # Reference implementation
- # https://pytorch.org/docs/stable/quantization-support.html
- activations_ref = activations_ref.reshape(num_groups, -1).to(dtype=torch.float32)
- max_abs_activations_ref = torch.amax(torch.abs(activations_ref), dim=-1).view(num_groups, -1)
- max_activations_ref = torch.amax(activations_ref, dim=-1).view(num_groups, -1)
- min_activations_ref = torch.amin(activations_ref, dim=-1).view(num_groups, -1)
- _, q_max, q_min = get_q_props(q_bits)
- scale, zero_point = get_scale_zero_point(q_bits, is_symmetric_quant, max_activations_ref, min_activations_ref,
- max_abs_activations_ref)
- data_f = activations_ref * scale
- if not is_symmetric_quant:
- data_f = data_f + zero_point
- data_i32 = torch.round(data_f).to(dtype=torch.int32)
- data_i32 = torch.minimum(torch.maximum(data_i32, q_min.expand_as(data_i32)), q_max.expand_as(data_i32))
- data_i8 = data_i32.to(dtype=torch.int8)
- scales = (1.0 / scale).reshape(-1, 1)
- offsets = zero_point.reshape(-1, 1)
- params = torch.cat((scales, offsets), dim=-1)
- return data_i8, params
- def run_float_dequantize(q_bits, is_symmetric_quant, data_i8, params, num_groups):
- data_f = data_i8.reshape(num_groups, -1).to(dtype=torch.float32)
- scales = params[:, 0].reshape(-1, 1)
- offsets = params[:, 1].reshape(-1, 1)
- if not is_symmetric_quant:
- data_f = data_f - offsets
- else:
- assert offsets.allclose(torch.zeros_like(offsets))
- data_f = data_f * scales
- return data_f
- @pytest.mark.inference_ops
- @pytest.mark.parametrize("num_groups", [1, 13, 512])
- @pytest.mark.parametrize("num_elems", [8, 16, 32, 64, 128, 256, 4096, 8192, 12288, 16384])
- @pytest.mark.parametrize("is_symmetric_quant", [True, False])
- @pytest.mark.parametrize("q_bits", [4, 8])
- @pytest.mark.parametrize("directed_case", ["all_zeros", None])
- def test_float_quantize(num_elems, num_groups, is_symmetric_quant, q_bits, directed_case):
- # fix seed
- torch.manual_seed(num_elems)
- if directed_case == "all_zeros":
- activations_ds = torch.zeros((num_groups, num_elems),
- dtype=torch.float16,
- device=get_accelerator().device_name())
- else:
- activations_ds = torch.randn((num_groups, num_elems),
- dtype=torch.float16,
- device=get_accelerator().device_name())
- activations_ref = activations_ds.clone().detach()
- ref_out_tensor, ref_params = run_float_quantize(q_bits, is_symmetric_quant, activations_ref, num_groups)
- ref_dequantized_tensor = run_float_dequantize(q_bits, is_symmetric_quant, ref_out_tensor, ref_params, num_groups)
- # we need to convert the tensor to float64 to avoid overflow
- ref_quantization_error = torch.sum(torch.abs((activations_ref - ref_dequantized_tensor).to(torch.float64)))
- ds_out_tensor, ds_out_params = run_quantize_ds(activations_ds, num_groups, q_bits, is_symmetric_quant)
- ds_dequantized_tensor = run_dequantize_ds(ds_out_tensor, ds_out_params, num_groups, q_bits, is_symmetric_quant)
- assert torch.all(torch.isfinite(ds_dequantized_tensor))
- ds_quantization_error = torch.sum(torch.abs((activations_ds - ds_dequantized_tensor).to(torch.float64)))
- assert (ds_quantization_error <= ref_quantization_error * 1.05)
|