如何使用modelarts训练海量数据

网友投稿 664 2022-05-29

ModelArts上使用notebook上使用evs空间默认大小是5G,能满足大部分文本和图片训练模型的需求。如果训练数据稍微超过这个限额,可以适当的扩增下空间。但如果训练对象是视频,或是实际生成过程中的海量数据,这个空间就显得小了,这时候扩增evs空间就显得很不经济了。

最近老山便碰到这样的案例,客户的训练数据大约在1T的量级,在obs上存储的数据结构大概如下图所示。

your-obs-name

└── ...

└── video

├── folder1

│   ├── text.txt

│   └── video.mp4

├── folder2

│   ├── text.txt

│   └── video.mp4

├── folder3

│   ├── text.txt

│   └── video.mp4

├── folder4

│   ├── text.txt

│   └── video.mp4

├── folder5

│   ├── text.txt

│   └── video.mp4

├── ...

虽然使用华为云自带的moxing模块可以直接读取obs的数据,但由于实质是通过http实时读取数据,这个速度比从evs的ssd硬盘上读取数据要慢得多。而解决方案也比较直接,在evs上开辟一个固定大小的空间作为缓存区,一方面不断把obs数据读入缓存区,如果缓存区满了,就等待其腾出空间,另一方面训练任务消费evs数据,当消费完后便删除数据。

程序上也自然选用生产者-消费者模型。程序定义了管道类Pipeline,有生产者线程producer用于将obs数据保存到evs;同时输出evs数据用于外部模型的消费。由于每个视频文件都单独放在一个文件夹下,所以程序的输出对象也是这个文件夹在evs上保存的地址,如folder1,folder2等。至于读取文件夹内部文件信息等消费工作,由用户自行定义。

不多说,直接上代码。

import moxing as mox

mox.file.shift('os', 'mox')

import os, shutil

from queue import Queue

from time import sleep

import threading

import logging

logging.basicConfig(level=logging.INFO,

format="%(asctime)s %(name)s %(levelname)s %(message)s",)

class ObsClient:

def __init__(self, root):

'''获取obs路径上需要读取的文件夹的相关信息'''

self.root = root

self.directory = self.list_directory()

self.maxSize = self.getMaxSize()

def getMaxSize(self):

'''最大的文件夹的大小'''

return max([size for *_, size in self.directory])

def list_directory(self):

'''输出用于训练的文件夹的路径,输出directory:

[(文件夹相对路径,文件夹绝对路径,文件夹大小), ...]

'''

directory = []

folders = mox.file.list_directory(self.root)

for folder in folders:

folderPath = os.path.join(self.root, folder)

if mox.file.is_directory(folderPath):

size = self.get_size(folderPath)

directory.append((folder, folderPath, size))

return directory

def get_size(self, path):

'''获取文件(夹)的大小'''

if mox.file.is_directory(path):

return self.get_size_folder(path)

return self.get_size_file(path)

def get_size_file(self, path):

如何使用modelarts训练海量数据

'''获取文件的大小'''

return mox.file.get_size(path)

def get_size_folder(self, path):

'''获取文件夹的大小'''

size = 0

for filename in mox.file.list_directory(path, recursive=True):

filepath = os.path.join(path, filename)

if not mox.file.is_directory(filepath):

size+= self.get_size_file(filepath)

return size

class EvsClient:

def __init__(self, root, memory, queue, directory, interval = 0.1):

self.root = root # evs缓存区根目录

self.directory = directory # obs文件夹信息

self.size = 0 # evs缓存区已使用的空间

self.memory = memory # evs上用于缓存的空间大小

self.queue = queue # 队列,存储了evs缓存区文件夹的信息

self.interval = interval # 如果缓存区满后,查询缓存大小的间隔时间

def remove(self, folder, size):

'''删除evs文件夹,在文件夹被消费后调用'''

logging.info(f"consumer: start removing folder {folder} with size {size}|{self.size}")

shutil.rmtree(folder, True)

self.size -= size

logging.info(f"consumer: end removing folder {folder} with size -{size}|{self.size}")

def work(self):

'''生成者主程序,用于从obs中copy文件夹到evs'''

for relObsFolder, absObsFolder, size in self.directory:

while True:

# 缓存区没满,就copy文件

if not self.waitOrDo(size):

self.copy(relObsFolder, absObsFolder, size)

break

# 如果缓存区满了,就等待

sleep(self.interval)

# 当所有文件都拷贝后,置入结束符(None, None)

self.queue.put((None, None))

def waitOrDo(self, size):

'''返回True时等待,返回False时工作'''

return self.size + size > self.memory

def copy(self, relObsFolder, absObsFolder, size):

'''从obs中copy文件夹到evs'''

evsFolder = os.path.join(self.root, relObsFolder)

logging.info(f"producer: start copying folder {relObsFolder} with size {size}|{self.size}")

mox.file.copy_parallel(absObsFolder, evsFolder)

self.queue.put((evsFolder, size))

self.size += size

logging.info(f"producer: end copying folder {relObsFolder} with size +{size}|{self.size}")

class Pipeline:

def __init__(self, evsRoot, obsRoot, memory = '1g', timeout = 300, interval = 0.1):

self.memory = self.rescript(memory) # evs上用于缓存的空间大小

self.timeout = timeout # 消费者获取evs缓存区文件夹的最长等待时间

self.queue = Queue() # 队列,存储了evs缓存区文件夹的信息

self.obsClient = ObsClient(obsRoot) # 存储obs上的文件夹信息

# evs上的操作

self.evsClient = EvsClient(evsRoot, self.memory, self.queue, self.obsClient.directory, interval)

self.checkMemory() # 验证evs上用于缓存的空间大小是否足够大

def checkMemory(self):

'''evs上用于缓存的空间大小不能小于obs上最大文件夹大小'''

if self.memory

raise Exception("memory should bigger than maxFolderSize!")

def rescript(self, memory):

'''将文本或数值类型的memory转写成数值'''

try:

if isinstance(memory, str):

if memory[-1].lower()=='g':

return int(float(memory[:-1])*1024*1024*1024)

elif memory[-1].lower()=='m':

return int(float(memory[:-1])*1024*1024)

elif memory[-1].lower()=='k':

return int(float(memory[:-1])*1024)

else:

return int(float(memory))

else:

return int(float(memory))

except:

raise Exception("Error when rescripting memory!")

def __iter__(self):

'''生成器,yield输出evs文件夹路径和大小'''

# 生产者线程

producer = threading.Thread(target = self.evsClient.work)

producer.start()

# 主程序提供生成器用于消费,输出evs文件夹路径和大小

while True:

logging.info(f"consumer: start to get the queue")

path, size = self.queue.get(timeout=self.timeout)

logging.info(f"consumer: get the queue {path}, {size} ")

if path is None and size is None:

break

yield path, size

self.evsClient.remove(path, size)

# 主程序等待

producer.join()

if __name__ == '__main__':

# 使用示例

for path, size in Pipeline('./video', 's3://your-obs-name/.../video'):

do_job(path, size)

如果你觉得老山的文章不错,不妨点击下关注。

大数据开发 ModelArts

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:30 个实例详解 TOP 命令
下一篇:9个超级实用的 ES6 特性,超级实用哦!
相关文章