This page is a static HTML representation of the slides at
https://peteasa.github.io/parapg/parapg-MapReduce.html




using the Parallella
by

Peter Saunderson

2016




Contents

PeterSaunderson 1st December 2016 at 11:14am

Why Word Counting

PeterSaunderson 3rd December 2016 at 6:19pm

  • MapReduce (Dean2008) has many applications
  • Word counting is one of the simplest
  • Uses the Epiphany cores to count words in a book
    • word counting is a convenient vehicle to develop a framework
    • the framework will provide tools to help with solving other problems
    • builds on the platform provided by parapg

Big Picture

Big Picture

PeterSaunderson 2nd December 2016 at 8:57am

  • Target is matrix maths on Epiphany cores
    • Counting is a simple start
    • Proves data movement and algorithm control
    • Implementing helps scale the problem
  • Provides Controller, Parallella clustering and Epiphany algorithm

Map Reduce Design

Map Reduce Design

PeterSaunderson 2nd December 2016 at 4:12pm

  • The framework provides message channels for commands
  • The user provided map function splits a sentence into words
  • The user provided reduce function processes the words counting the occurrence of the words
  • One eCore software image provides more than one function
    • Simple priority / interrupt based non-preemptive scheduler

Next: System Block Diagram

System Block Diagram

PeterSaunderson 3rd December 2016 at 5:47pm

Launcher Daemon Controller eCore Parallella Parallella Parallella Daemon Application Ethernet Input Word Per Core Per Core O/P O/P Circular Buffer I/P Circular Buffer Workspace uConsumers Shmem
The user remotely launches the Application on the Parallella from the Controller. The Application manages the workload on the Epiphany eCore.

Next: Controller

Application

PeterSaunderson 3rd December 2016 at 8:51am

Back to:

Next: Message Channel

Message Channel

PeterSaunderson 5th December 2016 at 11:10am

  • The same command structure is used on the data path in all channels
  • Contention resolution is different for each type of channel
Next: Shared Memory Channel

Distribution

PeterSaunderson 3rd December 2016 at 5:49pm

  • The results of the Map operation are sorted and distributed
    • relatively small amount of per eCore memory in Epiphany architecture implies small queues
    • reverse order processing (Fold-right) is not implemented
  • A user provided allocate function selects the eCore for each emitted item
  • A linked list queue per consumer eCore (uConsumers) is used to queue the items
  • When the Map operation for the current Workspace is complete the items are distributed to the selected eCore

Next: Message Packing - eCore

Simple Scheduler

PeterSaunderson 3rd December 2016 at 9:03am

  • Simple non-preemptive run loop
    1. complete processing of workspace
    2. consume all eCore messages in the required order
    3. if appropriate consume host messages
    4. sleep until interrupted
  • Interrupted by host or by eCore completing a requested command
  • Each message is parsed so that a cancel message could be handled

Next: Daemon Design

Controller

PeterSaunderson 2nd December 2016 at 4:20pm

The Controller is a Linux workstation running a Daemon

  • the Daemon learns the addresses of connected Parallella devices
    • the daemon provides a listening socket so that all the Parallella boards can register.
  • the Launcher launches the Application on a remote Parallella board
    • the user application can provide a listening socket to allow the Application to stream data to the Parallella board
    • in the word counting example each line of the book is read from the Controller on demand

Back to:

Next: Application

Parallella Software

PeterSaunderson 2nd December 2016 at 1:32pm

  • the Daemon running on the Parallella runs the Application in the cl user environment
  • the Application is run as if it were run by the cl user
    • it can use the Daemon to reserve other Parallella boards
    • it can send messages to reserved Parallella boards to share workload
    • it can use the msg_channel_producer to send messages to one of the Epiphany cores (eCore)

Back to:

eCore Software

PeterSaunderson 3rd December 2016 at 4:04pm

  • The Application sends commands to the eCore
  • User provided functions to perform specific activities
  • Local core memory and eCore to eCore transfers are fast
    • data is loaded and distributed locally for processing
    • one output buffer per eCore
    • busy / free flag set on the remote eCore
  • Application access is slower but has more memory
    • multi-functional eCore program is loaded once
    • Reduce operation run quickly to save memory
  • (future) Upload less frequently used items to slower memory
    • offload work to other Parallella boards
Back to:

Shared Memory Channel

PeterSaunderson 3rd December 2016 at 6:05pm

Next: Local Memory Channel 1

Local Memory Channel 1

PeterSaunderson 1st December 2016 at 3:36pm

Next: Local Memory Channel 2

Local Memory Channel 2

PeterSaunderson 3rd December 2016 at 6:04pm

  • The consumer must choose which of the available messages (from InQueue[16]) to process
    • each command is assigned an order by the producer
    • a reduce command would be assigned order = 3
    • a map command would be assigned a higher order (say 4)
    • the command with the lowest order number is run first
  • To assist with contention resolution a non-zero random number may also be assigned
    • the command with the lowest order wins
    • for two commands with the same order the lowest (non-zero) random number wins
    • for two commands with the same order and random number the lowest rank wins

Next: Distribution

Message Packing - eCore

PeterSaunderson 3rd December 2016 at 9:01am

  • The user provided mr_map and mr_reduce functions control the format of the data
    • during distribution the mr_mappack function is used
    • each item emitted from the map operation may be packed into per core output queue (inqueuestore)
  • On reception the user provided mr_unpackwk is called
    • for each item unpacked the mr_reduce function is called
    • the distribution may also be to the current eCore
    • the mr_reduce function is called for local distribution

Simple Scheduler

Daemon Design

PeterSaunderson 2nd December 2016 at 5:18pm

A simple job control system is required

  • In part inspired by grun a Daemon is used
    • running on the Controller and the Parallella
    • enables connected devices to reserve processors
    • launch applications on remote systems
  • Configured to run as master on Controller
  • Configured to run as slave on Parallella

Daemon Requirements

Daemon Requirements

PeterSaunderson 2nd December 2016 at 4:56pm

  • Remote launch of command
    • prevent multiple commands on one system
    • report results (stdin, stdout) of command
  • List network addresses of participating boards
  • Simple configuration

Launching Commands

Launching Commands

PeterSaunderson 2nd December 2016 at 6:08pm

  • Use subprocess.Popen to start command
  • User cl (gid 2000, uid 2000) is used to run the command
  • Command found in that users path run as if run by that user

ØMQ Networking Library

ØMQ Networking Library

PeterSaunderson 2nd December 2016 at 5:16pm

ØMQ

  • An excellent abstraction for networked applications
    • easy to start simple and then grow the system
    • support for multicast
    • support for C, C++ Python
  • Use ØMQ to pass messages connecting Controller and Parallella

Next: Message Types

Message Types

PeterSaunderson 2nd December 2016 at 5:29pm

Next: Packing Message Data

Packing Message Data

PeterSaunderson 3rd December 2016 at 9:13am

  • Need a simple framing solution that works with python and c
  • msgpack, Pickle are complex and have no support for Epiphany
  • Pack and Unpack methods are implemented

Future Work

Future Work

PeterSaunderson 5th December 2016 at 8:37am

At the time of writing each major component of the system has been prototyped

  • Daemon cluster management tool discovery needs completion
  • The Application needs to be updated to use ØMQ
  • The eCore software needs to be optimised
    • better use of memory
    • DMA transfers need to be added

Contributors or sponsors for this work are always welcome!

@paracpg #parapg on Twitter Peter on GitHub
The End

The End

PeterSaunderson 3rd December 2016 at 9:27am

Additional Material

PeterSaunderson 2nd December 2016 at 2:02pm

Detailed Design

PeterSaunderson 2nd December 2016 at 7:14pm

eCore Details

PeterSaunderson 3rd December 2016 at 4:44pm

Application eCore Interface

PeterSaunderson 3rd December 2016 at 4:30pm

libecomms

PeterSaunderson 2nd December 2016 at 7:12pm

Essential interface functions for ARM and Epiphany software. Built separately with ARM (e-hal.h) and the Epiphany (e_lib.h) libraries so that application code can use the same interface in ARM code and Epiphany code.

msg_channel_t

Launcher Application Interface

PeterSaunderson 2nd December 2016 at 6:17pm

User Provided Functions

PeterSaunderson 2nd December 2016 at 8:30pm

  1. mr_map performs the Map operation
  2. mr_allocate allocates eCore to receive Map output
  3. mr_mappack used to pack Map results into inqueuestore
  4. mr_unpackwk unpacks Map results to user provided store
  5. mr_mvuc moves Map results to user provided store
  6. mr_reduce performs the Reduce operation
  7. mr_reportpack used to pack Reduce result

Framework Provided Functions

PeterSaunderson 2nd December 2016 at 9:17pm

  1. mr_mapemit map results from mr_map to the framework
  2. mr_emitus results from mr_unpackwk to the framework

workspace

PeterSaunderson 3rd December 2016 at 5:23pm

Used during the MapReduce operation as a workspace for received messages

char workspace[tbd];

Back to:

uConsumers

PeterSaunderson 3rd December 2016 at 5:22pm

Used to hold an interim linked list of data for eCore consumers

  • Holds the Map operation output
  • Contains a list of pointers to significant data
  • Distributed to the eCore after Map is complete

Back to:

InQueue

PeterSaunderson 3rd December 2016 at 4:59pm

Used to manage communications between eCore. Uses "Busy" (non-zero) "Free" arbitration.

unsigned InQueue[MAXRANK];
#define SETENTRY(o,r)                          \
    ((((unsigned)(o) << 4 * sizeof(unsigned)) & ORDMASK) + ((unsigned)(r) & RNDMASK))
typedef enum {
    WL_IDLE = 0,
    WL_CANCEL,
    WL_CURR,
    WL_SEQ, // numbers to 0xFFFE are available to sequence commands
    WL_SYNC = 0xFFFF, // wait for all work to complete
} work_order_t;

Back to:

inqueuestore

PeterSaunderson 3rd December 2016 at 5:07pm

Used to pass commands between eCore. Uses InQueue for arbitration.

  • Managed using msg_channel_store_prd and msg_channel_store_cons
    • msg_channel_store_prd manages local memory
    • read commands from remote memory with msg_channel_store_cons

char inqueuestore[MAXRANK][tbd];
msg_channel_t *msg_channel_store_cons;
msg_channel_t *msg_channel_store_prd[MAXRANK];

Back to:

msg_channel_inqueue_cons

PeterSaunderson 3rd December 2016 at 6:05pm

  • Provides management of local InQueue memory
  • Single word per eCore
  • Busy (non-zero) / Free flag
  • Addressed by rank of the eCore
    • this eCore "owns" all Busy entries
    • this eCore "owns" all remote Free entries with its rank
  • Paired with msg_channel_inqueue_prd used to manage the remote InQueue memory

msg_channel_t *msg_channel_inqueue_cons;

Back to:

msg_channel_inqueue_prd

PeterSaunderson 3rd December 2016 at 6:05pm

  • Provides management of remote InQueue memory
  • Single word per eCore
  • Busy (non-zero) / Free flag
  • Addressed by rank of the eCore
    • this eCore "owns" all Free entries with its rank
    • this eCore "owns" all Busy entries in local InQueue memory
  • Paired with msg_channel_inqueue_cons used to manage the local InQueue memory

msg_channel_t *msg_channel_inqueue_prd;

Back to:

msg_channel_producer

PeterSaunderson 3rd December 2016 at 4:28pm

  • Configured by the Application
  • Written by the Application
  • Read by the eCore
  • Simplified by ensuring only one eCore can send to the Application

msg_channel_t *msg_channel_producer;

Back to:

msg_channel_consumer

PeterSaunderson 3rd December 2016 at 4:28pm

  • Configured by the Application
  • Written by the eCore
  • Read by the Application
  • Simplified by ensuring only one eCore can send to the Application

msg_channel_t *msg_channel_consumer;

Back to:

msg_channel_core

PeterSaunderson 3rd December 2016 at 4:29pm

  • Configured by the Application
  • Written by any of the eCore
  • Read by the Application
  • Simplified by segmenting the memory so that each eCore uses a different segment

msg_channel_t *msg_channel_core;
msg_goto_next_sgmnt(msg_channel_core);

Back to:

mr_map

PeterSaunderson 2nd December 2016 at 3:55pm

  • Performs the Map operation

int (*mr_map)(char **wkp, char *wklimit, el_t **mqnxtfree, char *mqlimit);

Back to:

mr_allocate

PeterSaunderson 2nd December 2016 at 3:51pm

  • Allocates eCore to receive Map output
  • Part of the Sort operation
    • shares the output among several eCore

unsigned (*mr_allocate)(el_t *item, unsigned maxid);

Back to:

mr_mappack

PeterSaunderson 2nd December 2016 at 8:29pm

int (*mr_mappack)(el_t *ucitem, char **msp, char* msplimit, size_t *size);

Back to:

mr_reduce

PeterSaunderson 2nd December 2016 at 9:33pm

  • Performs Reduce operation
    • called following reception of CMD_RDC command
    • may also be called during the distribute operation
    • processes data from user provided store

void (*mr_reduce)(el_t *usitem);

Back to:

mr_reportpack

PeterSaunderson 2nd December 2016 at 3:55pm

  • Used to pack Reduce result message into inqueuestore
    • delivered to the eCore requesting the results
int (*mr_reportpack)(char **msp, char* msplimit, size_t *size);

Back to:

mr_mvuc

PeterSaunderson 2nd December 2016 at 8:38pm

  • Used to move Map results to user provided store
    • Map results generated locally
    • then passes items to mr_reduce for processing
    • empties uConsumers linked list (ucitem)
    • saves data to user provided store

int (*mr_mvuc)(el_t *ucitem)

mr_unpackwk

PeterSaunderson 2nd December 2016 at 9:00pm

  • Used to unpack Map results from the workspace
    • receiving eCore unpacks
    • then passes items to mr_reduce for processing
    • processes data in the workspace
    • saves data to user provided store (usitem)

int (*mr_unpackwk)(char **wkp, char* wklimit);

Back to:

mr_emitus

PeterSaunderson 2nd December 2016 at 9:17pm

  • Enables the framework to perform Reduce operation
    • used in user provided mr_unpackwk function
    • called following reception of the CMD_RDC command
    • yields each Map result received with the message
    • framework passes items to mr_reduce for processing

int mr_emitus(el_t *usitem);

Back to:

mr_mapemit

PeterSaunderson 2nd December 2016 at 9:07pm

  • Enables the framework to queue the result of a Map operation

int mr_mapemit(el_t *ucitem);

Back to:

msg_channel_t

PeterSaunderson 3rd December 2016 at 4:18pm

typedef struct {
    unsigned row; // row of remote eCore (or 0 for shm)
    unsigned col; // col of remote eCore (or 0 for shm)
    size_t initialoffset;
    size_t offset;
    size_t sgmntsize;
    size_t limit;
    size_t size;
    void *mem; // Either &e_group_config or &emem
#ifdef __epiphany__
    e_memseg_t emem;
#else
    e_mem_t emem;
#endif
} msg_channel_t;

Back to:

Command Message

PeterSaunderson 2nd December 2016 at 6:04pm

Designed to carry the command and arguments to the remote system and the results back to the caller

Back to:

Request Message

PeterSaunderson 2nd December 2016 at 6:17pm

Designed to carry one of a number of different requests to the provider

Back to:

Glossary

PeterSaunderson 23rd August 2016 at 3:56pm

BSP
Bulk Synchronous Parallel: library to assist in running tasks in a parallel processing system
Fold-left
apply operator on first map item first
Fold-right
apply operator on last map item first
grun
a lightweight job queueing system
MapReduce
an architecture framework designed for creating and processing large volumes of data using clusters of computers
rank
a unique number allocated to each core in the system

BSP

PeterSaunderson 3rd December 2016 at 5:13pm

Bulk Synchronous Parallel: library to assist in running tasks in a parallel processing system

Back to:

Fold-left

PeterSaunderson 3rd December 2016 at 5:54pm

Map operates on a list of values in order to produce a new list of values, by applying the same computation to each value.

The order in which the elements are accumulated can be important. Fold-left combines items in the sequence they were created. Fold-right goes in the other direction.

Back to:

Fold-right

PeterSaunderson 3rd December 2016 at 5:57pm

Map operates on a list of values in order to produce a new list of values, by applying the same computation to each value.

The order in which the elements are accumulated can be important. Fold-left combines items in the sequence they were created. Fold-right goes in the other direction.

Back to:

grun

PeterSaunderson 3rd December 2016 at 5:13pm

https://github.com/earonesty/grun

A lightweight replacement for job queueing systems like LSF, Torque, condor, SGE, for private clusters.

Back to:

MapReduce

PeterSaunderson 3rd December 2016 at 5:57pm

Map Reduce is an architecture framework designed for creating and processing large volumes of data using clusters of computers. Google's paper Dean2008 MapReduce: Simplified Data Processing on Large Clusters is now often cited as the introduction of Map Reduce to the problem of data processing.

Map operates on a list of values in order to produce a new list of values, by applying the same computation to each value. Reduce operates on the output of Map to produce the final result. The order in which the elements are accumulated can be important. Fold-left combines items in the sequence they were created. Fold-right goes in the other direction.

Back to:

rank

PeterSaunderson 3rd December 2016 at 6:12pm

In the 16 core Epiphany chip the eCores are arranged in 4 rows and 4 columns. The rank of a given core is row×colrow \times col. This provides a convenient way to allocate different tasks to each eCore with a single number.

Back to:

Dean2008

PeterSaunderson 30th November 2016 at 10:54am

Dean2008
Dean, Jeffrey and Ghemawat, Sanjay MapReduce: Simplified Data Processing on Large Clusters Commun. ACM 2008

Back to:

@article{Dean:2008:MSD:1327452.1327492, author = {Dean, Jeffrey and Ghemawat, Sanjay}, title = {MapReduce: Simplified Data Processing on Large Clusters}, journal = {Commun. ACM}, issue_date = {January 2008}, volume = {51}, number = {1}, month = jan, year = {2008}, issn = {0001-0782}, pages = {107–113}, numpages = {7}, url = {http://doi.acm.org/10.1145/1327452.1327492}, doi = {10.1145/1327452.1327492}, acmid = {1327492}, publisher = {ACM}, address = {New York, NY, USA}, }

MR.References

PeterSaunderson 1st November 2016 at 3:56pm

Dean2008
Dean, Jeffrey and Ghemawat, Sanjay MapReduce: Simplified Data Processing on Large Clusters Commun. ACM 2008