Introduction to Concurrency and Parallelism#

Last time#

  • Python package: Pandas

Today#

Concurrency vs. parallelism#

Concurrency is about dealing with lots of things at once.
Parallelism is about doing lots of things at once.

                                      - Rob Pike

Practical example#


Multi-threading#

Python build-in module threading#

  • Thread-based parallelism

  • Suitable for heavy I/O task (e.g. reading & writing files)

import threading
import time

def job1(msg, t=1):
    print("Current thread:", threading.current_thread())
    # print("Active threads:", threading.active_count())
    print("Print message: {} is working.".format(msg))
    time.sleep(t)
    print("{} is done.".format(msg))

def job2(msg, t=5):
    print("Current thread:", threading.current_thread())
    # print("Active threads:", threading.active_count())
    print("Print message: {} is working.".format(msg))
    time.sleep(t)
    print("{} is done.".format(msg))

Example 1: create 2 threads by Thread#

thread1 = threading.Thread(target=job1, name="T1", args=("Thread 1",))
thread2 = threading.Thread(target=job2, name="T2", args=("Thread 2",))

print("Current thread:", threading.current_thread())

print("="*50)
thread1.start()
thread2.start()
print("="*50)

print("Current thread:", threading.current_thread())
print("Active threads:", threading.active_count())
Current thread: <_MainThread(MainThread, started 9388)>
==================================================
Current thread: <Thread(T1, started 10720)>
Print message: Thread 1 is working.
Current thread: <Thread(T2, started 10480)>
Print message: Thread 2 is working.
==================================================
Current thread: <_MainThread(MainThread, started 9388)>
Active threads: 7

Example 2: join#

  • Temporately stop the MainThread until all thread has finished.

thread1 = threading.Thread(target=job1, name="T1", args=("Thread 1",))
thread2 = threading.Thread(target=job2, name="T2", args=("Thread 2",))

print("Current thread:", threading.current_thread())

print("="*50)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("="*50)

print("Current thread:", threading.current_thread())
print("Active threads:", threading.active_count())
Current thread: <_MainThread(MainThread, started 9388)>
==================================================
Current thread: <Thread(T1, started 11576)>
Print message: Thread 1 is working.
Current thread: <Thread(T2, started 18960)>
Print message: Thread 2 is working.
Thread 1 is done.
Thread 1 is done.
Thread 2 is done.
Thread 2 is done.
==================================================
Current thread: <_MainThread(MainThread, started 9388)>
Active threads: 5

Example 3: daemon thread#

  • Automatically stop the thread when MainThread terminates.

  • There are bugs in ipython kernel, try the code in example3.py.


Example 4: combine with class#

import threading
import time

class multithread1:
    def __init__(self, input_list):
        self.data = input_list.copy()

    def get_data(self):
        return self.data

    def job1(self, data, idx):
        """
        Calculate the square number of all elements
        """
        print("Thread {} is starting.".format(idx))
        for j in range(len(data)):
            self.data[idx][j] = data[j]**2
            time.sleep(1)

    def run(self):
        all_thread = []

        # Create multi-thread
        for i in range(len(self.data)):
            thread = threading.Thread(target=self.job1, name="T{}".format(i), args=(self.data[i],i))
            
            thread.start()     # What is the difference?
            all_thread.append(thread)    
        
        # Temporately stop the main thread
        for thread in all_thread:
            # thread.start()     # What is the difference?
            thread.join()
arr1 = [[1,2,3],[4,5,6],[7,8,9],[10,11,12]]

test1 = multithread1(arr1)
test1.run()
print(test1.get_data())
Thread 0 is starting.
Thread 1 is starting.
Thread 2 is starting.
Thread 3 is starting.
[[1, 4, 9], [16, 25, 36], [49, 64, 81], [100, 121, 144]]

Brief summary of multi-threading#

  • Quick and easy to use

  • Achieve parallelism via context-switch of CPU (only use 1 CPU, low efficiency)

Multi-processing#

  • Use multiple CPUs to accomplish jobs

  • Require data transfer in different CPUs while execution

  • Suitable for complex tasks (\(\text{execution time} \ggg \text{data transfer time}\))

Example 5: mp.Process#

  • There are bugs in ipython kernel, try the code in example5.py.

  • There is NO gaurantee that all process will be finished sequentially.


import multiprocessing as mp

print("CPU count:", mp.cpu_count())
CPU count: 6

Example 6: mp.Pool#

  • There are bugs in ipython kernel, try the code in example6.py.

  • Use mp.Pool to create a pool of processes.

  • Use mp.Pool.map to arrange all process and return the result.


Example 7: mp.Pool.apply & mp.Pool.apply_async#

  • There are bugs in ipython kernel, try the code in example7.py.

  • mp.Pool.apply will block the main process untill the result is ready.

  • mp.Pool.apply_async returns a AsyncResult object which requires method attribute get to return the value.


Example 8: mp.Pool.map & mp.Pool.map_async#

  • There are bugs in ipython kernel, try the code in example7.py.

  • Same as the apply_async, map_async returns a AsyncResult object which requires method attribute get to return the value.

Shared memory and Lock#

Basic shared memory mp.Value and mp.Array#

  • Use mp.Value to create a value allocated in shared memory.

  • Use mp.Array to create a 1-dimensional list allocated in shared memory.

  • List of common data type:

    Type code

    C type

    Python type

    Minimum size in bytes

    "b"

    signed char

    int

    1

    "B"

    unsigned char

    int

    1

    "i"

    signed int

    int

    2 or 4

    "I"

    unsigned int

    int

    2 or 4

    "f"

    float

    float

    4

    "d"

    double

    float

    8

# Example of shared memory

import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)

array = mp.Array('i', [1, 2, 3, 4])

Example 9: Race condition#

  • When multiple threads or processes access the same memory (ex. a value in your workspace), unexpectable situation will arise. This is called the race condition.


Example 10: Create a Lock#

  • Use mp.Lock to create a Lock

  • Use mp.Lock.acquire() to block the memory from other processes or threads.

  • Use mp.Lock.release() to release the memory.


Example 11: Create a Semaphore#

  • Use mp.Semaphore to create a Semaphore

  • Unlike the mp.Lock, mp.Semaphore can assign access to multiple processes.


RLock#

  • A recursive lock

  • Use mp.RLock to create a RLock

  • Unlike the Lock, RLock can be acquired by multiple times or multiple processes. However, if a RLock object is acquired \(N\) times by acquire(), it is locked unless it is released by \(N\) times of release().

  • Difference:

    Lock

    RLock

    A Lock object can not be acquired again by any thread or process
    unless it is released by the thread or process that owns it.

    A RLock object can be acquired multiple times
    by any thread or process.

    A Lock object can be released by any thread or process.

    A RLock object can only be released by the thread/process that owns it.

    Faster

    RLock is relatively slower than Lock.

Summary#

Multi-threading

Multi-processing

Single processor

Multiple processors

Simple and intuitive

Require data transfer

Suitable for data I/O task

Suitable for heavy computation task