производительность — конвертировать Matlab в C ++, bsxfun

Я пытаюсь преобразовать мой код MATLAB в C ++ и обнаружил, что есть проблема в следующей ситуации:

MATLAB

A = rand(1000,40000);
b = rand(1000,1);
tic;
ans = bsxfun(@ne,b,A);
toc

C ++

std::vector<std::vector<int> > A;
std::vector<int> b;
std::vector<int> ans(10000);

// initial A and b
const clock_t begin_time = clock();
for(int i = 0; i < 40000; ++i){
for(int j = 0; j < 1000; ++j){
if(A[i][j] != b[j])
ans[i]++;
}
}
double run_time = static_cast<double>((clock() - begin_time)) / CLOCKS_PER_SEC;

Я считаю, что C ++ дело в три раза медленнее, чем MATLAB. Я хотел бы спросить, если кто-нибудь знает, как изменить код C ++, чтобы я мог иметь такую ​​же или ту же производительность, что и bsxfun делает?

После поиска в Интернете я нахожу два возможных пути:

  1. включить библиотеки из броненосца
  2. включить библиотеки из октавы

Но дело в том, что я не уверен, как это сделать, я имею в виду, я не знаю деталей реализации.

Резюме:

  1. Я хотел бы спросить, если кто-нибудь знает, как изменить код C ++, чтобы я мог иметь такую ​​же или ту же производительность, что и bsxfun делает?
  2. Может ли кто-нибудь дать некоторые подсказки или шаги или пример, чтобы я мог узнать, как включить Armadillo или Octave для выполнения этой задачи.

РЕДАКТИРОВАТЬ:

Благодаря @Peter, я компилирую с опцией -O3 и тогда проблема «решена», я имею в виду скорость такая же, как у MATLAB.

1

Решение

1 — Вы запускаете свои петли в неправильном порядке. В C и C ++ двумерные массивы хранятся в мажорных строках, что означает A[j][i] а также A[j][i+1] рядом друг с другом в памяти. (Думайте об этом так: A[j] является первой индексной операцией, возвращающей ссылку на другой вектор, который вы затем снова добавляете [i]).

Хранение данных в кеше для максимально возможного количества операций является одним из ключей к производительности на современном процессоре, что означает, что вы хотите получить доступ к соседним элементам, когда можете. Итак, измените порядок циклов:

for(int j = 0; j < 1000; ++j){
for(int i = 0; i < 40000; ++i){

2- Опции компилятора имеют большое значение. Убедитесь, что вы работаете в режиме «Release» или с включенной оптимизацией.

3- Обычно двумерные массивы хранятся в C ++ как одномерные массивы, выполняя индексирование строк / столбцов с помощью умножений. То есть, A будет вектор размером 1000 * 40000, и A[j][i] будет вместо A[j*row_length + i], Это дает преимущество более непрерывной памяти (см. Пункт 1), меньшего количества динамического выделения памяти и лучшего использования кэша.

6

Другие решения

Как я уже упоминал в комментариях, в вашем коде MATLAB отсутствует вызов sum функция (в противном случае два кода вычисляют разные вещи!). Так и должно быть:

MATLAB

A = rand(1000,40000);
B = rand(1000,1);
tic
count = sum(bsxfun(@ne, A, B));
toc

На моей машине я получаю:

Elapsed time is 0.036931 seconds.

Помните, что приведенное выше утверждение Векторизованных (думаю, SIMD распараллеливание). MATLAB также может автоматически запускать этот многопоточный если размер достаточно велик.


Вот моя версия кода в C ++. Я использую простые классы для создания интерфейса вектор / матрица. Обратите внимание, что базовые данные в основном хранятся как одномерный массив с основной порядок столбца похож на MATLAB.

C ++

#include <iostream>
#include <cstdlib>        // rand
#include <ctime>          // time
#include <sys/time.h>     // gettimeofday

class Timer
{
private:
timeval t1, t2;
public:
Timer() {}
~Timer() {}
void start() { gettimeofday(&t1, NULL); }
void stop() { gettimeofday(&t2, NULL); }
double elapsedTime() { return (t2.tv_sec - t1.tv_sec)*1000.0 + (t2.tv_usec - t1.tv_usec)/1000; }
};

template<typename T>
class Vector
{
private:
T *data;
const size_t num;
public:
Vector(const size_t num) : num(num) { data = new T[num]; }
~Vector() { delete[] data; }
inline T& operator() (const size_t i) { return data[i]; }
inline const T& operator() (const size_t i) const { return data[i]; }
size_t size() const { return num; }
};

template<typename T>
class Matrix
{
private:
T *data;
const size_t nrows, ncols;
public:
Matrix(const size_t nr, const size_t nc) : nrows(nr), ncols(nc) { data = new T[nrows * ncols]; }
~Matrix() { delete[] data; }
inline T& operator() (const size_t r, const size_t c) { return data[c*nrows + r]; }
inline const T& operator() (const size_t r, const size_t c) const { return data[c*nrows + r]; }
size_t size1() const { return nrows; }
size_t size2() const { return ncols; }
};

inline double rand_double(double min=0.0, double max=1.0)
{
return (max - min) * (static_cast<double>(rand()) / RAND_MAX) + min;
}

int main() {
// seed random number generator
srand( static_cast<unsigned int>(time(NULL)) );

// intialize data
const int m = 1000, n = 40000;
Matrix<double> A(m,n);
Vector<double> B(m);
for(size_t i=0; i<A.size1(); i++) {
B(i) = rand_double();
for(size_t j=0; j<A.size2(); j++) {
A(i,j) = rand_double();
}
}

// measure timing
Timer timer;
timer.start();

// in MATLAB: count = sum(bsxfun(@ne, A, B))
Vector<double> count(n);
#pragma omp parallel for
for(int j=0; j<n; ++j) {
count(j) = 0.0;
for(int i=0; i<m; i++) {
count(j) += (A(i,j) != B(i));
}
}

timer.stop();

// elapsed time in milliseconds
std::cout << "Elapsed time is " << timer.elapsedTime() << " milliseconds." << std::endl;

return 0;
}

Результат:

$ g++ -Wall -O3 test.cpp -o test
$ ./test
Elapsed time is 63 milliseconds.

Если я компилирую и запускаю его с включенной поддержкой OpenMP, я получаю:

$ g++ -Wall -O3 -fopenmp test.cpp -o test_omp
$ ./test_omp
Elapsed time is 16 milliseconds.

Неплохое улучшение (почти в 4 раза быстрее), просто добавив одну строку в код ( pargma omp макро).

Этот последний побеждает 37 мс, которые я получаю в MATLAB (R2013b). Код был скомпилирован с использованием GCC 4.8.1 (MinGW-w64 работает на Windows 8, ноутбук Core i7).


Если вы действительно хотите расширить здесь ограничения для кода C ++, вам придется добавить векторизацию (встроенные функции SSE / AVX) в дополнение к многопоточности, достигнутой с OpenMP.

Вы также можете рассмотреть возможность использования Программирование GPGPU (CUDA, OpenCL). В MATLAB это очень легко сделать:

AA = gpuArray(A);
BB = gpuArray(B);
CC = sum(bsxfun(@ne, AA, BB));
C = gather(CC);

gpuArray(.) перенесет матрицу в графический процессор, после чего все операции, выполненные на нем, будут выполнены на устройстве с графическим процессором, а не с процессором. gather(.) перенесет массив обратно в рабочую область MATLAB. Однако проблема здесь в значительной степени связана с памятью, поэтому вряд ли увидит какое-либо улучшение (возможно, даже медленнее из-за накладных расходов при передаче данных).

1