数据集准备
- Step 1. 定义一个数据类,继承 object
- Step 2. 实现__init__函数,定义数据初始化等操作,在实例化数据集对象时被调用。
- Step 3. 实现__getitem__函数,定义该函数后可使其支持随机访问,能够根据给定的索引值
index
,获取数据集中的数据并返回。数据返回值类型是由 NumPy 数组组成的 Tuple。 - Step 4. 实现__len__函数,返回数据集的样本数量。
- Step 5. 实例化数据类,并将其传入 mindspore.dataset.GeneratorDataset 函数的 source 参数中。该函数返回一个 DataSet 对象
GeneratorDataset
MindSpore 的数据集由 GeneratorDataset 类产生,该函数位于 mindspore.dateset 中。
class_mindspore.dataset.GeneratorDataset(
source:Union[Callable, Iterable, Random Accessible],
column_names:Union[str, list[str]]=None,
column_types:list[mindspore.dtype]=None,
schema:Union[Schema, str]=None,
num_samples:int=None,
num_parallel_workers:int=1,
shuffle:bool=None,
sampler:Union[Sampler, Iterable]=None,
num_shards:int=None,
shard_id:int=None,
python_multiprocessing:bool=True,
max_rowsize:int=6)
该函数的参数定义如下所示:
-
source (Union[Callable, Iterable, Random Accessible]) - 一个 Python 的可调用对象,可以是可迭代的 Python 对象,或支持随机访问的 Python 对象。
-
如果 source 是可调用对象,要求 source 对象可以通过 source().next() 的方式返回一个由NumPy数组构成的元组。
-
如果 source 是可迭代对象,要求 source 对象通过 iter(source).next() 的方式返回一个由NumPy数组构成的元组。
-
如果 source 是支持随机访问的对象,要求 source 对象通过 source[idx] 的方式返回一个由NumPy数组构成的元组。
-
-
column_names (Union[str, list[str]],可选) - 指定数据集生成的列名,默认值:None,不指定。用户可以通过此参数或 schema 参数指定列名。
-
column_types (list[mindspore.dtype],可选) - 指定生成数据集各个数据列的数据类型,默认值:None,不指定。 如果未指定该参数,则自动推断类型;如果指定了该参数,将在数据输出时做类型匹配检查。
-
schema (Union[Schema, str],可选) - 读取模式策略,用于指定读取数据列的数据类型、数据维度等信息。 支持传入JSON文件路径或 mindspore.dataset.Schema 构造的对象。默认值:None,不指定。 用户可以通过提供 column_names 或 schema 指定数据集的列名,但如果同时指定两者,则将优先从 schema 中获取列名信息。
-
num_samples (int,可选) - 指定从数据集中读取的样本数,默认值:None,读取全部样本。
-
num_parallel_workers (int,可选) - 指定读取数据的工作进程数/线程数(由参数 python_multiprocessing 决定当前为多进程模式或多线程模式),默认值:1。
-
shuffle (bool,可选) - 是否混洗数据集。只有输入的 source 参数带有可随机访问属性(getitem)时,才可以指定该参数。默认值:None,下表中会展示不同配置的预期行为。
-
sampler (Union[Sampler, Iterable],可选) - 指定从数据集中选取样本的采样器。只有输入的 source 参数带有可随机访问属性(getitem)时,才可以指定该参数。默认值:None,下表中会展示不同配置的预期行为。
-
num_shards (int, 可选) - 指定分布式训练时将数据集进行划分的分片数,默认值:None。指定此参数后, num_samples 表示每个分片的最大样本数。
-
shard_id (int, 可选) - 指定分布式训练时使用的分片ID号,默认值:None。只有当指定了 num_shards 时才能指定此参数。
-
python_multiprocessing (bool,可选) - 启用Python多进程模式加速运算,默认值:True。当传入 source 的Python对象的计算量很大时,开启此选项可能会有较好效果。
-
max_rowsize (int,可选) - 指定在多进程之间复制数据时,共享内存分配的最大空间,默认值:6,单位为 MB。仅当参数 python_multiprocessing 设为 True 时,此参数才会生效。
该函数抛出的异常如下:
-
RuntimeError - Python 对象 source 在执行期间引发异常。
-
RuntimeError - column_names 参数指定的列名数量与 source 参数输出的数据数量不匹配。
-
ValueError - num_parallel_workers 参数超过最大线程数。
-
ValueError - 同时指定了 sampler 和 shuffle 参数。
-
ValueError - 同时指定了 sampler 和 num_shards 参数或同时指定了 sampler 和 shard_id 参数。
-
ValueError - 指定了 num_shards 参数,但是未指定 shard_id 参数。
-
ValueError - 指定了 shard_id 参数,但是未指定 num_shards 参数。
-
ValueError - shard_id 参数值错误(小于0或者大于等于 num_shards )。
数据集准备示例
import numpy as np
# 1) Multidimensional generator function as callable input.
def generator_multidimensional():
for i in range(64):
yield (np.array([[i, i + 1], [i + 2, i + 3]]),)
dataset = ds.GeneratorDataset(source=generator_multidimensional, column_names=["multi_dimensional_data"])
# 2) Multi-column generator function as callable input.
def generator_multi_column():
for i in range(64):
yield np.array([i]), np.array([[i, i + 1], [i + 2, i + 3]])
dataset = ds.GeneratorDataset(source=generator_multi_column, column_names=["col1", "col2"])
# 3) Iterable dataset as iterable input.
class MyIterable:
def __init__(self):
self._index = 0
self._data = np.random.sample((5, 2))
self._label = np.random.sample((5, 1))
def __next__(self):
if self._index >= len(self._data):
raise StopIteration
else:
item = (self._data[self._index], self._label[self._index])
self._index += 1
return item
def __iter__(self):
self._index = 0
return self
def __len__(self):
return len(self._data)
dataset = ds.GeneratorDataset(source=MyIterable(), column_names=["data", "label"])
# 4) Random accessible dataset as random accessible input.
class MyAccessible:
def __init__(self):
self._data = np.random.sample((5, 2))
self._label = np.random.sample((5, 1))
def __getitem__(self, index):
return self._data[index], self._label[index]
def __len__(self):
return len(self._data)
dataset = ds.GeneratorDataset(source=MyAccessible(), column_names=["data", "label"])
# list, dict, tuple of Python is also random accessible
dataset = ds.GeneratorDataset(source=[(np.array(0),), (np.array(1),), (np.array(2),)], column_names=["col"])
#########################################################################
# 单标签数据集生成
import numpy as np
from mindspore import dataset as ds
def get_singlelabel_data(num):
"""定义生成单标签数据集函数"""
for _ in range(num):
data = np.random.uniform(-10.0, 10.0) # 自定义数据
label = np.random.uniform(-1.0, 1.0) # 自定义标签
yield np.array([data]).astype(np.float32), np.array([label]).astype(np.float32)
# 定义数据集
dataset = ds.GeneratorDataset(list(get_singlelabel_data(5)), column_names=['data', 'label'])
# 打印数据集
for data in dataset.create_dict_iterator():
print("data:[{:9.5f}] ".format(data['data'].asnumpy()[0]),
"label:[{:9.5f}] ".format(data['label'].asnumpy()[0]))
# 打印数据条数
print("data size:", dataset.get_dataset_size())
#######################################################################
# 多标签数据集生成
import numpy as np
from mindspore import dataset as ds
def get_multilabel_data(num, w=2.0, b=3.0):
"""定义生成多标签数据集函数"""
for _ in range(num):
data = np.random.uniform(-10.0, 10.0) # 自定义数据
label1 = np.random.uniform(-1.0, 1.0) # 自定义标签1
label2 = np.random.uniform(1.0, 2.0) # 自定义标签2
yield np.array([data]).astype(np.float32), np.array([label1]).astype(np.float32), np.array([label2]).astype(np.float32)
# 定义数据集
dataset = ds.GeneratorDataset(list(get_multilabel_data(5)), column_names=['data', 'label1', 'label2'])
# 打印数据集
for data in dataset.create_dict_iterator():
print("data:[{:9.5f}] ".format(data['data'].asnumpy()[0]),
"label1:[{:9.5f}] ".format(data['label1'].asnumpy()[0]),
"label2:[{:9.5f}]".format(data['label2'].asnumpy()[0]))
# 打印数据条数
print("data size:", dataset.get_dataset_size())
网络搭建
from mindspore import nn
class Net(nn.Cell):
def __init__(self):
super(Net, self).__init__()
self.conv = nn.Conv2d(10, 20, 3, has_bias=True, weight_init='normal')
def construct(self, x):
out = self.conv(x)
return out
自定义损失函数
内置损失函数的使用
import numpy as np
import mindspore.nn as nn
import mindspore as ms
# 输出loss均值
loss = nn.L1Loss()
# 输出loss和
loss_sum = nn.L1Loss(reduction='sum')
# 输出loss原值
loss_none = nn.L1Loss(reduction='none')
input_data = ms.Tensor(np.array([[1, 2, 3], [2, 3, 4]]).astype(np.float32))
target_data = ms.Tensor(np.array([[0, 2, 5], [3, 1, 1]]).astype(np.float32))
print("loss:", loss(input_data, target_data))
print("loss_sum:", loss_sum(input_data, target_data))
print("loss_none:\n", loss_none(input_data, target_data))
自定义损失函数
- 基于
nn.Cell
构造损失函数
import mindspore.ops as ops
class MAELoss(nn.Cell):
"""自定义损失函数MAELoss"""
def __init__(self):
"""初始化"""
super(MAELoss, self).__init__()
self.abs = ops.Abs()
self.reduce_mean = ops.ReduceMean()
def construct(self, base, target):
"""调用算子"""
x = self.abs(base - target)
return self.reduce_mean(x)
loss = MAELoss()
input_data = ms.Tensor(np.array([0.1, 0.2, 0.3]).astype(np.float32)) # 生成预测值
target_data = ms.Tensor(np.array([0.1, 0.2, 0.2]).astype(np.float32)) # 生成真实值
output = loss(input_data, target_data)
print(output)
- 基于
nn.LossBase
构造损失函数
class MAELoss(nn.LossBase):
"""自定义损失函数MAELoss"""
def __init__(self, reduction="mean"):
"""初始化并求loss均值"""
super(MAELoss, self).__init__(reduction)
self.abs = ops.Abs() # 求绝对值算子
def construct(self, base, target):
x = self.abs(base - target)
return self.get_loss(x) # 返回loss均值
loss = MAELoss()
input_data = ms.Tensor(np.array([0.1, 0.2, 0.3]).astype(np.float32)) # 生成预测值
target_data = ms.Tensor(np.array([0.1, 0.2, 0.2]).astype(np.float32)) # 生成真实值
output = loss(input_data, target_data)
print(output)
模型训练
import mindspore as ms
from mindspore import dataset as ds
from mindspore.common.initializer import Normal
from mindvision.engine.callback import LossMonitor
def get_data(num, w=2.0, b=3.0):
"""生成数据及对应标签"""
for _ in range(num):
x = np.random.uniform(-10.0, 10.0)
noise = np.random.normal(0, 1)
y = x * w + b + noise
yield np.array([x]).astype(np.float32), np.array([y]).astype(np.float32)
def create_dataset(num_data, batch_size=16):
"""加载数据集"""
dataset = ds.GeneratorDataset(list(get_data(num_data)), column_names=['data', 'label'])
dataset = dataset.batch(batch_size)
return dataset
class LinearNet(nn.Cell):
"""定义线性回归网络"""
def __init__(self):
super(LinearNet, self).__init__()
self.fc = nn.Dense(1, 1, Normal(0.02), Normal(0.02))
def construct(self, x):
return self.fc(x)
ds_train = create_dataset(num_data=160)
net = LinearNet()
loss = MAELoss()
opt = nn.Momentum(net.trainable_params(), learning_rate=0.005, momentum=0.9)
# 使用model接口将网络、损失函数和优化器关联起来
model = ms.Model(net, loss, opt)
model.train(epoch=1, train_dataset=ds_train, callbacks=[LossMonitor(0.005)])