wake-up-neo.com

Ausführen von TensorFlow auf mehreren Kernen und Threads

Ich sollte sagen, dass mir jegliche Art von Parallelität/Multithreading/Multiprocessing-Programmierung völlig neu ist.

Jetzt habe ich die Möglichkeit, meinen TensorFlow CNN auf 32 Kernen (mit jeweils 2 Hyperthreads) auszuführen. Ich habe viel Zeit damit verbracht, zu verstehen, wie ich meinen Code ändern soll (wenn ich muss), um all diese Rechenleistung auszunutzen. Leider bin ich zu nichts gekommen. Ich habe gehofft, dass TF dies automatisch tun kann, aber wenn ich mein Modell starte und mit top die CPU-Auslastung überprüfe, sehe ich die meiste Zeit eine 100% ige CPU-Auslastung und einige 200% -Spitzen. Wenn alle Kerne verwendet würden, würde ich eine Auslastung von 100 * 64 = 6400% erwarten (richtig?). Wie kann ich das erreichen? Sollte ich etwas Ähnliches tun, wie es erklärt wird hier ? Wenn dies der Fall ist, verstehe ich richtig, dass das gesamte Multithreading nur auf Berechnungen angewendet wird, die die Warteschlange betreffen? Ist dies wirklich alles, was getan werden kann, um die gesamte verfügbare Rechenleistung zu nutzen (da die Warteschlange meines Erachtens nur zum Lesen und Zusammenstellen von Trainingsmustern verwendet wird)?

So sieht mein Code bei Bedarf aus: (main.py)

# pylint: disable=missing-docstring
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf
from pylab import *

import argparse
import cnn
import freader_2

training_feats_file = ["file_name"]
training_lbls_file = ["file_name"]
test_feats_file = 'file_name'
test_lbls_file = 'file_name'
learning_rate = 0.1
testset_size = 1000
batch_size = 1000
testset_size = 793
tot_samples = 810901
max_steps = 3300

def placeholder_inputs(batch_size):

    images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))
    labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))
    return images_placeholder, labels_placeholder

def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):

    images = loadtxt(images_file)
    labels_feed = loadtxt(lbls_file)
    images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])

    feed_dict = {
        images_pl: images_feed,
        labels_pl: labels_feed,
    }

    return feed_dict

tot_training_loss = []
tot_test_loss = []
tot_grad = []

print('Starting TensorFlow session...')
with tf.Graph().as_default():

    DS = freader_2.XICSDataSet()
    images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)
    keep_prob = tf.placeholder(tf.float32) 
    logits = cnn.inference(images, batch_size, keep_prob)
    loss = cnn.loss(logits, labels)
    global_step = tf.Variable(0, trainable=False)
    train_op, grad_norm = cnn.training(loss, learning_rate, global_step)
    summary_op = tf.merge_all_summaries()   

    test_images_pl, test_labels_pl = placeholder_inputs(testset_size)
    test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)
    test_loss = cnn.loss(test_pred, test_labels_pl)

    saver = tf.train.Saver()
    sess = tf.Session()
    summary_writer = tf.train.SummaryWriter("CNN", sess.graph)

    init = tf.initialize_all_variables()
    sess.run(init)
    tf.train.start_queue_runners(sess=sess)
    test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)
    test_feed[keep_prob] = 1.    

    # Start the training loop.
    print('Starting training loop...')
    start_time = time.time()
    for step in xrange(max_steps):

        _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})  
        tot_training_loss.append(loss_value)
        tot_grad.append(grad)

        _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)
        tot_test_loss.append(test_loss_val)

        if step % 1 == 0:        
            duration = time.time() - start_time
            print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))
            print(' gradient = %f'%grad)
#            summary_str = sess.run(summary_op)#, feed_dict=feed_dict)
#            summary_writer.add_summary(summary_str, step)
#            summary_writer.flush()

        if (step+1) % 100 == 0:
            print('Saving checkpoint...')
            saver.save(sess, "chkpts/medias-res", global_step = global_step)

        if test_loss_val < 0.01:# or grad < 0.01:
            print("Stopping condition reached.")
            break

    print('Saving final network...')
    saver.save(sess, "chkpts/final.chkpt")
    print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')

cnn.py:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math

import tensorflow as tf

NUM_OUTPUT = 15

IMAGE_WIDTH = 195
IMAGE_HEIGHT = 20
IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT

def inference(images, num_samples, keep_prob, reuse=None):

    with tf.variable_scope('conv1', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))        
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')
        # output dim: 18x34
        biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv1 = tf.nn.relu(bias, name='conv1')

    pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')    
    #output dim: 9x17

    with tf.variable_scope('conv2', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')
        #output dim: 8x16
        biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv2 = tf.nn.relu(bias, name='conv2')


    pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')
    #output dim: 4x8

    h_fc1_drop = tf.nn.dropout(pool2, keep_prob)

    with tf.variable_scope('fully_connected', reuse=reuse):
        reshape = tf.reshape(h_fc1_drop, [num_samples, -1])
        dim = reshape.get_shape()[1].value
        weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([20], name='biases'))
        fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')

    with tf.variable_scope('identity', reuse=reuse):
        weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))
        output = tf.matmul(fully_connected, weights) + biases

    return output


def loss(outputs, labels):

    rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")
    loss_list = tf.get_collection('losses')
    loss_list.append(rmse)
    rmse_tot = tf.add_n(loss_list, name='total_loss')  
    return rmse_tot


def training(loss, starter_learning_rate, global_step):

      tf.scalar_summary(loss.op.name, loss)
#      optimizer = tf.train.AdamOptimizer()
      learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)
      optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)
      grads_and_vars = optimizer.compute_gradients(loss)
      grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]      
      grad_norm = tf.add_n(grad_norms)
      train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
#      train_op = optimizer.minimize(loss, global_step=global_step)
      return train_op, grad_norm

freader_2.py:

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os 
import collections
import numpy as np

from six.moves import xrange  
import tensorflow as tf

class XICSDataSet:    
    def __init__(self, height=20, width=195, batch_size=1000, noutput=15):
        self.depth = 1
        self.height = height
        self.width = width
        self.batch_size = batch_size
        self.noutput = noutput

    def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):

        im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)
        lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)

        imreader = tf.TextLineReader()
        lbreader = tf.TextLineReader()
        imkey, imvalue = imreader.read(im_filename_queue)
        lbkey, lbvalue = lbreader.read(lb_filename_queue)
        im_record_defaults = [[.0]]*self.height*self.width
        lb_record_defaults = [[.0]]*self.noutput
        im_data_Tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')
        lb_data_Tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')
        features = tf.pack(im_data_Tuple)
        label = tf.pack(lb_data_Tuple)

        depth_major = tf.reshape(features, [self.height, self.width, self.depth])

        min_after_dequeue = 10
        capacity = min_after_dequeue + 3 * self.batch_size
        example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,
                                                            min_after_dequeue=min_after_dequeue)

        return example_batch, label_batch
16
bored_to_death

Nach Tensorflow :

Die beiden unten aufgeführten Konfigurationen werden verwendet, um die CPU-Leistung durch Anpassen der Thread-Pools zu optimieren.

  • intra_op_parallelism_threads: Knoten, die mehrere Threads verwenden können, um ihre Ausführung zu parallelisieren, planen die einzelnen Teile in diesem Pool.
  • inter_op_parallelism_threads: Alle bereiten Knoten werden in diesem Pool eingeplant.

Diese Konfigurationen werden über das tf.ConfigProto und übergeben an tf.Session im config-Attribut, wie im folgenden Snippet gezeigt. Wenn beide Konfigurationsoptionen deaktiviert oder auf 0 gesetzt sind, wird standardmäßig die Anzahl der logischen CPU-Kerne verwendet. Tests haben gezeigt, dass die Standardeinstellung für Systeme von einer CPU mit 4 Kernen bis zu mehreren CPUs mit über 70 kombinierten logischen Kernen wirksam ist. Eine übliche alternative Optimierung besteht darin, die Anzahl der Threads in beiden Pools auf die Anzahl der physischen Kerne und nicht auf die Anzahl der logischen Kerne festzulegen

config = tf.ConfigProto()
config.intra_op_parallelism_threads = 44
config.inter_op_parallelism_threads = 44
tf.session(config=config)

In TensorFlow-Versionen vor 1.2 wird empfohlen, aus Gründen der Leistung auf Warteschlangen basierende Eingabe-Pipelines mit mehreren Threads zu verwenden. Ab TensorFlow 1.4 wird jedoch empfohlen, stattdessen das Modul tf.data zu verwenden.


Ja, unter Linux können Sie die CPU-Auslastung mit top überprüfen und auf drücken 1 um die Auslastung pro CPU anzuzeigen. Hinweis: Der Prozentsatz hängt vom Irix/Solaris-Modus ab.

6
Marco D.G.

Dies ist ein Kommentar, aber ich poste ihn als Antwort, weil ich noch nicht genug Repräsentanten habe, um Kommentare zu posten. Die Antwort von Marco D.G. ist richtig, ich wollte nur die lustige Tatsache hinzufügen, dass with tf.device('/cpu:0') automatisch versucht, alle verfügbaren Kerne zu verwenden. Viel Spaß beim Fließen!

3
TheLoneDeranger

Bei mir hat es so geklappt:

from multiprocessing.dummy import Pool as ThreadPool 
....
pool = ThreadPool()
outputs = pool.starmap(run_on_sess,[(tf_vars,data1),(tf_vars,data2),])
pool.close()
pool.join()

Sie sollten die Sitzung initialisieren und sitzungsbezogene Variablen als Teil von tf_vars Global verfügbar machen. Erstellen Sie eine Funktion run_on_sess, Die den Schritt sess.run Und andere hintere Berechnungen für einzelne Stapel mit den Namen data1 und data2 in einer Pythonic-Multithread-Umgebung ausführt.

1
burglarhobbit