Skip to content

seaman248/stream-handbook-russian

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 

Repository files navigation

Данный текст является переводом stream-handbook

Руководство по Stream

Это руководство раскрывает основы написания node.js программ с использованием потоков.

Введение

Концепция потоков пришла к нам почти с момента зарождения unix и зарекомендовала себя как надежный способ построения больших систем из небольших компонентов, чья функция -- делать хорошо всего одну вещь. В unix поток можно реализовать при помощи shell путем добавления | pipe. В node.js присутствует встроенный модуль 'stream', используемый в ядре node, но его можно использовать и в пользовательских модулях. Как и в unix, модуль 'stream' прежде всего включает в состав оператор .pipe(), благодаря которому вы имеете возможность написать некий регулятор.

Потоки помогают разделить ваши задачи, поскольку ограничивают зону поверхности выполнения в последовательных интерфейсах, что способствует переиспользованию кода. В дальнейшем можно перенаправить выход одного потока на вход другого и использовать библиотеки, которые позволяют оперировать потоками на более высоком уровне абстракции.

Потоки - это очень важный компонент дизайна маленьких программ и философии unix, но есть и другие важные абстракции, которые стоит рассмотреть. Просто запомните, что технический долг технический долг - это враг и всегда необходимо искать лучшие абстракции для решаемой задачи.

Почему необходимо использовать потоки?

Операции ввода/вывода в node.js являются асинхронными, поэтому взаимодействие с диском и сетью включают в себя передачу коллбек-функций. У вас может возникнуть соблазн написать код, подобный этому:

	var http = require('http');
	var fs = require('fs');

	var server = http.createServer(function (req, res) {
		fs.readFile(__dirname + '/data.txt', function (err, data) {
			res.end(data);
		});
	});
	server.listen(8000);

Этот код будет работать, но он довольно громоздкий и сохраняет целый файл data.txt в памяти для каждого запроса, перед тем как отдать результат клиенту. Если data.txt очень большой, ваша программа будет съедать очень много памяти, так как она используется многими пользователями одновременно, особенно заметно это будет пользователям с медленным соединением.

Из-за этого обстоятельства обедняется UX, поскольку пользователь должен ждать пока весь файл на сервере попадет в буфер и только после этого получит его себе.

К счастью, оба аргумента req и res являются потоками, и это можно использовать для написания нашего файлового сервера. Более продуктивно в данном случае использовать метод fs.createReadStream(), а не fs.readFile():

	var http = require('http');
	var fs = require('fs');

	var server = http.createServer(function (req, res) {
		var stream = fs.createReadStream(__dirname + '/data.txt');
		stream.pipe(res);
	});
	server.listen(8000);

Метод .pipe() в этом примере заботится о прослушивании для событий 'data' и 'end' из метода fs.createReadStream(). Этот код еще не является идеальным, но не смотря на это, сейчас файл data.txt будет поступать клиенту порциями по мере их поступления с диска.

Используя .pipe(), можно получить и другие приемущества, такие как automatically handling backpressure - node.js не будет помещать куски данных в память без нужды, когда удаленный клиент использует очень медленное соединение, например.

Хотите сжать файл? Существуют потоковые модули для того, чтобы это сделать!

	var http = require('http');
	var fs = require('fs');
	var oppressor = require('oppressor');

	var server = http.createServer(function (req, res) {
		var stream = fs.createReadStream(__dirname + '/data.txt');
		stream.pipe(oppressor(req)).pipe(res);
	});
	server.listen(8000);

Сейчас файл data.txt сжимается для браузеров, которые поддерживают gzip или Deflate. Мы можем просто воспользоваться модулем oppressor для обработки всего контента.

Однажды изучив stream api, вы сможете просто объединять вместе потоковые модули, как кубики лего или САДОВЫЙ ШЛАНГ, вместо того, чтобы помнить как отправить данные через шаткие непотоковые самодельные API.

Основы

Существуют 5 типов потоков: - readable - writable - transform - duplex - "classic"

pipe

Все типы потоков используют .pipe() для объединения ввода и вывода.

.pipe() - это просто функция, которая принимает readable поток src и перенаправляет вывод в writable поток dst:

	src.pipe(dst)

.pipe(dst) возвращает dst, поэтому можно объединять вместе несколько .pipe():

	a.pipe(b).pipe(c).pipe(d)

, тоже самое можно записать так:

	a.pipe(b);
	b.pipe(c);
	c.pipe(d);

Это почти тоже самое, что следующая запись в командной строке unix:

	a | b | c | d

разница лишь в том, что мы используем node.js вместо shell.

Потоки для чтения (readable streams)

Потоки для чтения выводят данные, которые могут быть перенаправлены в поток для записи (writable), transform - потоки или duplex с помощью .pipe():

	readableStream.pipe(dst)

Создание потока для чтения (readable stream)

Давайте создадим поток для чтения!

	var Readable = require('stream').Readable;

	var rs = new Readable;
	rs.push('beep ');
	rs.push('boop\n');
	rs.push('null');

	rs.pipe(process.stdout);

	$ node read0.js
	beep boop

rs.push(null) говорит о том, что данные закончили отдаваться.

Следует помнить, что мы передаем контент в поток для чтения rs (с помощью метода .push()) до того, как перенаправить его (с помощью .pipe()) в поток для вывода (process.stdout), и сообщение о передаче null в поток для чтения должно быть записано до перенаправления в process.stdout.

Это необходимо, поскольку, .push() передает куски данных в поток и они сохраняются в буфере, пока следующее звено потока не будет готово принять их.

Тем не менее, было бы еще лучше, если бы мы могли избежать буферизации данных перед тем, как отдать их и генерировать их только тогда, когда потребитель просит об этом.

Мы можем отдать порцию данных по требованию, если определим функцию ._read:

	var Readable = require('stream').Readable;
	var rs = Readable();

	var c = 97;
	rs._read = function(){
		rs.push(String.fromCharCode(c++));
		if(c>'z'.charCodeAt(0)) rs.push(null);
	};

	rs.pipe(process.stdout);
	$ node read1.js
	abcdefghijklmnopqrstuvwxyz

В данном случае, мы передаем буквы с 'a' по 'z', но только после того, как потребитель готов прочитать их.

Функция _read принимает предварительный параметр size в качестве первого аргумента, который определяет размер порций в байтах, который готов принимать потребитель, но наша функция проигнорирует size, если потребитель его запросит.

Следует помнить, что вы можете также использовать метод util.inherits() встроенного модуля 'util' для того, чтобы наследовать Readable. Но такой подход не очень хорошо подходит для примеров, поскольку создает дополнительную сложность в понимании.

Для того, чтобы показать, что функция _read в настоящее время была вызвана при запросе потребителя, мы можем немного изменить наш код readable потока, добавив задержку:

	var Readable = require('stream').Readable;
	var rs = Readable();

	var c = 97 - 1;

	rs._read = function () {
		if (c >= 'z'.charCodeAt(0)) return rs.push(null);

		setTimeout(function () {
			rs.push(String.fromCharCode(++c));
		}, 100);
	};

	rs.pipe(process.stdout);

	process.on('exit', function () {
		console.error('\n_read() called ' + (c - 97) + ' times');
	});
	process.stdout.on('error', process.exit);

В результате работы этой программы, в консоли выйдет сообщение, о том, что функция _read() вызвана 5 раз, когда мы запросили только 5 байт вывода:

	$ node read2.js | head -c5
	abcde
	_read() called 5 times

Задержка, созданная с помошью setTimeout() необходима, поскольку операционной системе требуется некотороe время, чтобы послать соответствующий сигнал о закрытии pipe.

Обработчик process.stdout.on('error', fn) также необходим, поскольку операционная система посылает SIGPIPE к нашему process, когда head больше не интересует вывод нашей программы, которая принимает EPIPE ошибку в process.stdout.

Эти дополнительные осложнения нужны, когда происходят взаимодействия с внешней операционной системой, но когда потоки взаимодействуют внутри node, этого не требуется.

Если вы хотите создать поток для чтения, который принимает произвольные данные, а не только данные типа string и buffer, убедитесь, что создали ваш readable-поток с дополнительным параметром: Readable({ objectMode: true })

Использование потоков для чтения (Readable Stream)

Как правило, гораздо легче перенаправлять потоки для чтения в другие потоки или создавать их с помощью таких модулей как through или concat-stream, но иногда бывает необходимо потреблять поток напрямую.

	process.stdin.on('readable', function () {
		var buf = process.stdin.read();
		console.dir(buf);
	});
	$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
	<Buffer 61 62 63 0a>
	<Buffer 64 65 66 0a>
	<Buffer 67 68 69 0a>
	null

Когда данные доступны, событие 'readable' активируется и вы можете вызвать метод .read(), чтобы получить данные из буфера.

Когда поток закончился, .read() возвращает null, потому что послать больше нечего.

Вы можете таже вызвать .read(n), который вернет n байт данных. Чтение количества байт является консультативным и не работает для потоков объектов, однако все основные потоки поддерживают его.

Здесь приводится пример использования .read(n) для вывода 3 порций buffer:

	process.stdin.on('readable', function () {
		var buf = process.stdin.read(3);
		console.dir(buf);
	});

Запуск этого примера выдаст неполные данные:

	$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
	<Buffer 61 62 63>
	<Buffer 0a 64 65>
	<Buffer 66 0a 67>

Так получается, потому что есть дополнительные данные во внутреннем буфере, поэтому нам необходимо указать node.js, что нам не интересно читать данные после 3 байтов, которые мы уже читали. Сделать это можно простым указанием .read(0):

	process.stdin.on('readable', function () {
		var buf = process.stdin.read(3);
		console.dir(buf);
		process.stdin.read(0);
	});
	$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
	<Buffer 61 62 63>
	<Buffer 0a 64 65>
	<Buffer 66 0a 67>
	<Buffer 68 69 0a>

Вы можете также использовать .unshift() метод, чтобы положить обратно лишние данные и тогда будет срабатывать та же логика, что и в случае с .read(0):

Использование .unshift() предотвращает появление ненужных буферных копий.

	var offset = 0;

	process.stdin.on('readable', function () {
		var buf = process.stdin.read();
		if (!buf) return;
		for (; offset < buf.length; offset++) {
			if (buf[offset] === 0x0a) {
				console.dir(buf.slice(0, offset).toString());
				buf = buf.slice(offset + 1);
				offset = 0;
				process.stdin.unshift(buf);
				return;
			}
		}
		process.stdin.unshift(buf);
	});
	$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
	'hearties'
	'heartiest'
	'heartily'
	'heartiness'
	'heartiness\'s'
	'heartland'
	'heartland\'s'
	'heartlands'
	'heartless'
	'heartlessly'

Тем не менее, в npm существует модули, такие как split которые вы можете использовать вместо повторения кода со своей логикой парсинга на строки.

Потоки для записи (Writable streams)

Writable-потоки - это потоки, в которые можно перенаправлять поток но не из них:

	src.pipe(writableStream)

Создание writable-потоков

Просто определите функцию ._write(chunk, enc, next) и можно перенаправлять readable-поток во вновь созданный с помощью .pipe()

	var Writable = require('stream').Writable;
	var ws = Writable();
	ws._write = function (chunk, enc, next) {
		console.dir(chunk);
		next();
	};

	process.stdin.pipe(ws);		
	$ (echo beep; sleep 1; echo boop) | node write0.js 
	<Buffer 62 65 65 70 0a>
	<Buffer 62 6f 6f 70 0a>

Первый аргумент chunk - это данные, которые пришли из потока для чтения.

Второй аргумент enc - это кодировка, но вы можете писать строки, только если opts.decodeString = false.

Третий аргумент next(err) - это функция обратного вызова, которая сообщает потоку-потребителю, что он может записать больше данных. Опционально можно послать объект ошибки err в эту функцию, который запустит событие 'error' в экземпляре потока.

Если перенаправлямый поток для чтения содержал строки для записи, то поток будет конвертирован в объект Buffer, если только вы не указали Writable({ decodeStrings: false })

Если перенаправляемый поток для чтения будет включать объекты для записи, то при создании потока для записи укажите Writable({ objectMode: true }).

Запись writable-потоков

Для того, чтобы произвести запись в поток для записи, воспользуйтесь методом .write(data), где data - это те данные, которые необходимо передать.

	process.stdout.write('beep boop\n');

Для того, чтобы сказать потоку для записи, что вы закончили запись, вызовите метод .end(). Можно передать данные, которые будут записаны перед окончанием записи с помощью .end(data):

	var fs = require('fs');
	var ws = fs.createWriteStream('message.txt');

	ws.write('beep ');

	setTimeout(function () {
		ws.end('boop\n');
	}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop

Метод .write() возвращает false, когда пришло больше данных, чем указано в opts.highWaterMark опции, которая передается Writable() из входящего буфера.

Если необходимо подождать, пока буфер будет снова пуст, то поможет событие 'drain'.

Потоки transform

Вы могли слышать про потоки transform, отсылающие нас к through потокам.

Through потоки - это простые readable/writable фильтры, которые изменяют входящие данные и отправляют результат на вывод.

Дуплексные потоки (Duplex streams)

Дуплексные потоки - это readable/writable двунаправленные потоки. Они как телефон направляют сообщения туда и обратно. RPC exchange - это хороший пример дуплексных потоков. Всякий раз, когда вы видите что-то вроде

	a.pipe(b).pipe(a)

вы имеете дело с дуплексными потоками.

Классические потоки

Классические потоки - это устаревший интерфейс, первоначально реализованный в node 0.4. Вы вероятно сталкивались с этим типом потоков довольно часто, поэтому не лишним будет разобраться в том как они работют.

Всякий раз, когда событие 'data' активно в каком-то потоке, этот поток переключается в "классический" режим и ведет себя в соответствии со старым API.

Классические потоки для чтения

Классические потоки для чтения - это всего лишь потомки eventEmmitter, который, когда принимает данные, активирует событие 'data', а когда поток данных завершается активируется событие 'end'.

Метод .pipe() убеждается что классический поток является потоком для чтения, в зависимости отзначения, которое возвращает stream.readable.

Здесь приводится очень простой пример потока для чтения, который выводит буквы с A по J:

	var Stream = require('stream');
	var stream = new Stream;
	stream.readable = true;

	var c = 64;
	var iv = setInterval(function () {
	    if (++c >= 75) {
	        clearInterval(iv);
	        stream.emit('end');
	    }
	    else stream.emit('data', String.fromCharCode(c));
	}, 100);

	stream.pipe(process.stdout);
	$ node classic0.js
	ABCDEFGHIJ

Чтобы читать из классического потока для чтения, необходимо определить callback-функции, которые будут вызываться при событиях 'data' и 'end'. Здесь приводится пример чтения из потока process.stdin в стиле старом стиле потоков для чтения.

	process.stdin.on('data', function (buf) {
	    console.log(buf);
	});
	process.stdin.on('end', function () {
	    console.log('__END__');
	});
	$ (echo beep; sleep 1; echo boop) | node classic1.js 
	<Buffer 62 65 65 70 0a>
	<Buffer 62 6f 6f 70 0a>
	__END__

Помните, что когда бы вы не определили слушатель события 'data', вы получаете поток в режиме совместимости и поэтому теряете приемущества нового api потоков.

Использование обработчиков событий 'data' и 'end' вообще лучше избегать. Если вам нужно взаимодействовать с существующими потоками, используйте библиотеки, которые позволят вам перенаправлять потоки напрямую с помощью метода .pipe().

К примеру, вы можете использовать модуль through, для того, чтобы избежать использования 'data' и 'end' обработчиков:

	var through = require('through');
	process.stdin.pipe(through(write, end));

	function write (buf) {
	    console.log(buf);
	}
	function end () {
	    console.log('__END__');
	}
	$ (echo beep; sleep 1; echo boop) | node through.js 
	<Buffer 62 65 65 70 0a>
	<Buffer 62 6f 6f 70 0a>
	__END__

или жe модуль concat-stream, чтобы сохранить в буфер все содержимое потока:

	var concat = require('concat-stream');
	process.stdin.pipe(concat(function (body) {
	    console.log(JSON.parse(body));
	}));
	$ echo '{"beep":"boop"}' | node concat.js 
	{ beep: 'boop' }

Классические потоки для чтения имеют методы .pause() .resume() для того, чтобы временно приостановить поток, но это всего лишь консультативные инструкции не обязательные к выполнению. Если вы собираетесь использовать эти методы с классическими потоками для чтения, вы должны использовать модуль through, чтобы обрабатывать буферизацию, а не писать ее самим.

About

Translation of stream-handbook

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published