облачные вычисления
параллельность
отказоустойчивость
масштабирование
map: (k1, v1) [(k2, v2)]
reduce: (k2, [v2]) [(k3, v3)]
метаинформация
table A: chunks = 1, 2, 3, ...
table B: chunks = 314, 159, ...
...
расположение чанков
slave 0: chunks = 271, 828, 182, ...
slave 1: chunks = 161, 803, 39, ...
...
вся метаинформация в одной точке
упрощённый дизайн мастера
единственная точка отказа
узкое место при масштабировании
таблица: массив записей
запись: (key, value)
key, value: массивы байт
сравнение лексикографическое
распределённое хранилище произвольное множество таблиц
(k1, v1)
распределённое хранилище
несортированная таблица
(k2, v2)
храним в мастере сэмплы - один ключ на чанк
распределённое хранилище сортированная таблица
(k2, [v2])
распределённое хранилище
несортированная таблица
(k3, v3)
con: дополнительная материализация
pros:
один sort, много reduce
"сортированность" интуитивно понятна
в сортированной таблице можно искать по ключу
необходимо:
группировать по ключу
сортировать по ключу и чему-то ещё ("источник")
вариант:
включаем источник в значение
опциональная пользовательская сортировка по значению
вариант:
включаем источник в ключ
используем пользовательский partitioner
для группировки без источника
несколько петабайт
сотни операций в день
десятки тысяч транзакций
расчёт некоторых факторов поиска
статистика по логам
...
Что дальше?
масштабирование: распределённые мастера
эффективность: поколоночное хранение
Yet Another MapReduce
Александр Дмитриев
YaC, 1 октября 2010 года
Yet Another MapReduce
материализация в локальные файлы
SELECT Person.Name, City.Name
FROM Person JOIN City
ON Person.CityId = City.Id
INNER
OUTER LEFT
OUTER RIGHT
OUTER FULL
CROSS
пользовательская функция
из (k1, v1) в [(k2, v2)]
1, 1
...
1, R
input 1 map partition
пользовательская функция
из (k2, [v2]) в [(k3, v3)]
по умолчанию hash(k2) mod R
возможность переопределения пользователем
обычно распределённая файловая система
возможен другой источник с переходником в пары (k1, v1)
sort reduce output 1
пишем в распределённую файловую систему
или в какое-либо другое хранилище
в случае, если объём данных слишком велик,
необходима ещё одна локальная материализация
City
(Id, Name)
(1, Moscow)
(2, Vladimir)
(3, Kostroma)
простота
...
sort reduce output
пересылка почти всей промежуточной таблицы по сети
input map partition
Reduce
Person
(Name, CityId)
(Alexander, 1)
(Ivan, 2)
(Dmitry, 1)
(Andrey, 4)
Map 1
(Person, City)
(Alexander, Moscow)
(Dmitry, Moscow)
(Ivan, Vladimir)
(k, v)
(1, Alexander)
(2, Ivan)
(1, Dmitry)
(4, Andrey)
sort reduce output R
(k, v)
(1, Alexander)
(1, Moscow)
(1, Dmitry)
(2, Vladimir)
(2, Ivan)
(3, Kostroma)
(4, Andrey)
Person
(Name, CityId)
(Alexander, 1)
(Ivan, 2)
(Dmitry, 1)
(Andrey, 4)
input M map partition
M, 1
...
M, R
Result
(Person, City)
(Alexander, Moscow)
(Ivan, Vladimir)
(Dmitry, Moscow)
Map 2
(k, v)
(1, Moscow)
(2, Vladimir)
(3, Kostroma)
City
(Id, Name)
(1, Moscow)
(2, Vladimir)
(3, Kostroma)
Как используем?
reduce
map
подсчёт встречаемости слов
(k1, v1): (docId, text)
(k2, v2): (word, 1)
(k3, v3): (word, count)
Map
...
(3, "the black cat")
...
(25, "the white cat")
...
Reduce
...
("black", 1)
...
("cat", 1)
("cat", 1)
...
("the", 1)
("the", 1)
...
("white", 1)
...
...
("the", 1)
("black", 1)
("cat", 1)
...
("the", 1)
("white", 1)
("cat", 1)
...
...
("black", 1)
...
("cat", 2)
...
("the", 2)
...
("white", 1)
...
subkey
(key, value)
reduce всегда по key
сортировка по (key, subkey)
subkey опционален
join
собственная реализация, C++
легкая (около 50 тыс. строк)
больше мапредюсов, хороших и разных!
1, 1
...
1, S
partition 1
hash(k2) mod S не подходит
для полной сортировки необходимо знать множество исходных ключей
sort 1
Вопросы?
reduce
monster@yandex-team.ru
map
диспетчер заданий
op X: 1000 tasks
op Y: 500 tasks
op Z: 600 tasks
master
...
sort
partition
reduce
merge
reduce
map
sort S
task: python
task (Z, 17)
task: perl
task (Y, 30)
task: C++
task (X, 2)
streaming
partition P
P, 1
...
P, R
reduce
merge
slave
chunk 42
chunk 314
chunk 1
chunk 17
репликация
балансировка
header
k, v
k, v
k, v
crc
reduce
merge
reduce
sort
map
sort выделен в отдельный примитив
sort не использует пользовательского кода
map и reduce могут быть выполнены по отдельности
reduce требует на входе сортированную таблицу
merge_reduce
merge
chunk 1