logo

Peterson の相互排除アルゴリズム |セット 2 (CPU サイクルとメモリ フェンス)

問題: 2 つのプロセス i と j があるとすると、追加のハードウェア サポートなしで 2 つのプロセス間の相互排他を保証できるプログラムを作成する必要があります。

CPUクロックサイクルの無駄遣い

平たく言えば、スレッドが順番を待っているときに、1 秒あたり何百万回も条件をテストする長い while ループが終了し、不必要な計算が実行されます。もっと良い待機方法があります。 '収率'



これが何を行うのかを理解するには、Linux でプロセス スケジューラがどのように動作するかを深く掘り下げる必要があります。ここで述べたアイデアはスケジューラの簡略化されたバージョンですが、実際の実装には多くの複雑な点があります。

次の例を考えてみましょう 
P1、P2、P3 の 3 つのプロセスがあります。プロセス P3 には、あまり有用な計算を実行しないコード内のループと同様の while ループがあり、P2 が実行を終了したときにのみループから存在します。スケジューラはそれらすべてをラウンドロビン キューに入れます。ここで、プロセッサのクロック速度が 1000000/秒で、各反復で各プロセスに 100 クロックが割り当てられるとします。次に、最初の P1 が 100 クロック (0.0001 秒) 実行され、次に P2 (0.0001 秒)、続いて P3 (0.0001 秒) が実行されます。これは、これ以上プロセスがないため、このサイクルは P2 が終了するまで繰り返され、続いて P3 が実行され、最終的に終了します。

これは 100 CPU クロック サイクルの完全な無駄です。これを回避するために、CPU タイム スライス、つまりイールドを相互に放棄し、実質的にこのタイム スライスを終了し、スケジューラは実行する次のプロセスを選択します。ここで、条件を一度テストしてから、CPU を放棄します。テストに 25 クロック サイクルかかることを考慮すると、タイム スライスでの計算の 75% が節約されます。これを図示すると
 



Peterson の相互排除アルゴリズム |セット 2 (CPU サイクルとメモリ フェンス)

プロセッサのクロック速度が 1MHz であることを考えると、これは大幅な節約になります。 
ディストリビューションが異なれば、この機能を実現するための機能も異なります。 Linux が提供する sched_yield()

C
void lock(int self) {  flag[self] = 1;  turn = 1-self;  while (flag[1-self] == 1 &&  turn == 1-self)    // Only change is the addition of  // sched_yield() call  sched_yield(); } 

思い出の柵。

前のチュートリアルのコードはほとんどのシステムで動作する可能性がありますが、100% 正しいわけではありません。ロジックは完璧でしたが、最新の CPU のほとんどはパフォーマンスの最適化を採用しており、その結果、実行順序が狂ってしまう可能性があります。このメモリ操作 (ロードとストア) の順序変更は、通常、単一スレッドの実行内では気付かれませんが、並行プログラムでは予期しない動作を引き起こす可能性があります。
この例を考えてみましょう 



C
 while (f == 0);    // Memory fence required here  print x; 

上記の例では、コンパイラーは 2 つのステートメントが互いに独立していると見なすため、それらのステートメントを再順序付けしてコード効率を向上させようとしますが、これが並行プログラムの問題を引き起こす可能性があります。これを回避するために、メモリ フェンスを配置して、バリアを越えたステートメント間の考えられる関係についてコンパイラにヒントを与えます。

したがって、ステートメントの順序は  

フラグ[自分] = 1; 
ターン = 1-自分; 
while (ターン状態チェック) 
収率(); 
 

ロックが機能するには、まったく同じである必要があります。そうでない場合は、デッドロック状態になります。

このコンパイラが、この障壁を越えてステートメントの順序付けを防止する命令を提供できるようにするためです。 gccの場合は、 __sync_synchronize()
したがって、変更されたコードは次のようになります 
C での完全な実装:

C++
// Filename: peterson_yieldlock_memoryfence.cpp // Use below command to compile: // g++ -pthread peterson_yieldlock_memoryfence.cpp -o peterson_yieldlock_memoryfence #include   #include #include   std::atomic<int> flag[2]; std::atomic<int> turn; const int MAX = 1e9; int ans = 0; void lock_init() {  // Initialize lock by resetting the desire of  // both the threads to acquire the locks.  // And giving turn to one of them.  flag[0] = flag[1] = 0;  turn = 0; } // Executed before entering critical section void lock(int self) {  // Set flag[self] = 1 saying you want  // to acquire lock  flag[self]=1;  // But first give the other thread the  // chance to acquire lock  turn = 1-self;  // Memory fence to prevent the reordering  // of instructions beyond this barrier.  std::atomic_thread_fence(std::memory_order_seq_cst);  // Wait until the other thread loses the  // desire to acquire lock or it is your  // turn to get the lock.  while (flag[1-self]==1 && turn==1-self)  // Yield to avoid wastage of resources.  std::this_thread::yield(); } // Executed after leaving critical section void unlock(int self) {  // You do not desire to acquire lock in future.  // This will allow the other thread to acquire  // the lock.  flag[self]=0; } // A Sample function run by two threads created // in main() void func(int s) {  int i = 0;  int self = s;  std::cout << 'Thread Entered: ' << self << std::endl;  lock(self);  // Critical section (Only one thread  // can enter here at a time)  for (i=0; i<MAX; i++)  ans++;  unlock(self); } // Driver code int main() {   // Initialize the lock   lock_init();  // Create two threads (both run func)  std::thread t1(func 0);  std::thread t2(func 1);  // Wait for the threads to end.  t1.join();  t2.join();  std::cout << 'Actual Count: ' << ans << ' | Expected Count: ' << MAX*2 << std::endl;  return 0; } 
C
// Filename: peterson_yieldlock_memoryfence.c // Use below command to compile: // gcc -pthread peterson_yieldlock_memoryfence.c -o peterson_yieldlock_memoryfence #include #include #include 'mythreads.h' int flag[2]; int turn; const int MAX = 1e9; int ans = 0; void lock_init() {  // Initialize lock by resetting the desire of  // both the threads to acquire the locks.  // And giving turn to one of them.  flag[0] = flag[1] = 0;  turn = 0; } // Executed before entering critical section void lock(int self) {  // Set flag[self] = 1 saying you want  // to acquire lock  flag[self]=1;  // But first give the other thread the  // chance to acquire lock  turn = 1-self;  // Memory fence to prevent the reordering  // of instructions beyond this barrier.  __sync_synchronize();  // Wait until the other thread loses the  // desire to acquire lock or it is your  // turn to get the lock.  while (flag[1-self]==1 && turn==1-self)  // Yield to avoid wastage of resources.  sched_yield(); } // Executed after leaving critical section void unlock(int self) {  // You do not desire to acquire lock in future.  // This will allow the other thread to acquire  // the lock.  flag[self]=0; } // A Sample function run by two threads created // in main() void* func(void *s) {  int i = 0;  int self = (int *)s;  printf('Thread Entered: %dn'self);  lock(self);  // Critical section (Only one thread  // can enter here at a time)  for (i=0; i<MAX; i++)  ans++;  unlock(self); } // Driver code int main() {   pthread_t p1 p2;  // Initialize the lock   lock_init();  // Create two threads (both run func)  Pthread_create(&p1 NULL func (void*)0);  Pthread_create(&p2 NULL func (void*)1);  // Wait for the threads to end.  Pthread_join(p1 NULL);  Pthread_join(p2 NULL);  printf('Actual Count: %d | Expected Count:'  ' %dn'ansMAX*2);  return 0; } 
Java
import java.util.concurrent.atomic.AtomicInteger; public class PetersonYieldLockMemoryFence {  static AtomicInteger[] flag = new AtomicInteger[2];  static AtomicInteger turn = new AtomicInteger();  static final int MAX = 1000000000;  static int ans = 0;  static void lockInit() {  flag[0] = new AtomicInteger();  flag[1] = new AtomicInteger();  flag[0].set(0);  flag[1].set(0);  turn.set(0);  }  static void lock(int self) {  flag[self].set(1);  turn.set(1 - self);  // Memory fence to prevent the reordering of instructions beyond this barrier.  // In Java volatile variables provide this guarantee implicitly.  // No direct equivalent to atomic_thread_fence is needed.  while (flag[1 - self].get() == 1 && turn.get() == 1 - self)  Thread.yield();  }  static void unlock(int self) {  flag[self].set(0);  }  static void func(int s) {  int i = 0;  int self = s;  System.out.println('Thread Entered: ' + self);  lock(self);  // Critical section (Only one thread can enter here at a time)  for (i = 0; i < MAX; i++)  ans++;  unlock(self);  }  public static void main(String[] args) {  // Initialize the lock  lockInit();  // Create two threads (both run func)  Thread t1 = new Thread(() -> func(0));  Thread t2 = new Thread(() -> func(1));  // Start the threads  t1.start();  t2.start();  try {  // Wait for the threads to end.  t1.join();  t2.join();  } catch (InterruptedException e) {  e.printStackTrace();  }  System.out.println('Actual Count: ' + ans + ' | Expected Count: ' + MAX * 2);  } } 
Python
import threading flag = [0 0] turn = 0 MAX = 10**9 ans = 0 def lock_init(): # This function initializes the lock by resetting the flags and turn. global flag turn flag = [0 0] turn = 0 def lock(self): # This function is executed before entering the critical section. It sets the flag for the current thread and gives the turn to the other thread. global flag turn flag[self] = 1 turn = 1 - self while flag[1-self] == 1 and turn == 1-self: pass def unlock(self): # This function is executed after leaving the critical section. It resets the flag for the current thread. global flag flag[self] = 0 def func(s): # This function is executed by each thread. It locks the critical section increments the shared variable and then unlocks the critical section. global ans self = s print(f'Thread Entered: {self}') lock(self) for _ in range(MAX): ans += 1 unlock(self) def main(): # This is the main function where the threads are created and started. lock_init() t1 = threading.Thread(target=func args=(0)) t2 = threading.Thread(target=func args=(1)) t1.start() t2.start() t1.join() t2.join() print(f'Actual Count: {ans} | Expected Count: {MAX*2}') if __name__ == '__main__': main() 
JavaScript
class PetersonYieldLockMemoryFence {  static flag = [0 0];  static turn = 0;  static MAX = 1000000000;  static ans = 0;  // Function to acquire the lock  static async lock(self) {  PetersonYieldLockMemoryFence.flag[self] = 1;  PetersonYieldLockMemoryFence.turn = 1 - self;  // Asynchronous loop with a small delay to yield  while (PetersonYieldLockMemoryFence.flag[1 - self] == 1 &&  PetersonYieldLockMemoryFence.turn == 1 - self) {  await new Promise(resolve => setTimeout(resolve 0));  }  }  // Function to release the lock  static unlock(self) {  PetersonYieldLockMemoryFence.flag[self] = 0;  }  // Function representing the critical section  static func(s) {  let i = 0;  let self = s;  console.log('Thread Entered: ' + self);    // Lock the critical section  PetersonYieldLockMemoryFence.lock(self).then(() => {  // Critical section (Only one thread can enter here at a time)  for (i = 0; i < PetersonYieldLockMemoryFence.MAX; i++) {  PetersonYieldLockMemoryFence.ans++;  }    // Release the lock  PetersonYieldLockMemoryFence.unlock(self);  });  }  // Main function  static main() {  // Create two threads (both run func)  const t1 = new Thread(() => PetersonYieldLockMemoryFence.func(0));  const t2 = new Thread(() => PetersonYieldLockMemoryFence.func(1));  // Start the threads  t1.start();  t2.start();  // Wait for the threads to end.  setTimeout(() => {  console.log('Actual Count: ' + PetersonYieldLockMemoryFence.ans + ' | Expected Count: ' + PetersonYieldLockMemoryFence.MAX * 2);  } 1000); // Delay for a while to ensure threads finish  } } // Define a simple Thread class for simulation class Thread {  constructor(func) {  this.func = func;  }  start() {  this.func();  } } // Run the main function PetersonYieldLockMemoryFence.main(); 
C++
// mythread.h (A wrapper header file with assert statements) #ifndef __MYTHREADS_h__ #define __MYTHREADS_h__ #include  #include  #include  // Function to lock a pthread mutex void Pthread_mutex_lock(pthread_mutex_t *m) {  int rc = pthread_mutex_lock(m);  assert(rc == 0); // Assert that the mutex was locked successfully }   // Function to unlock a pthread mutex void Pthread_mutex_unlock(pthread_mutex_t *m) {  int rc = pthread_mutex_unlock(m);  assert(rc == 0); // Assert that the mutex was unlocked successfully }   // Function to create a pthread void Pthread_create(pthread_t *thread const pthread_attr_t *attr   void *(*start_routine)(void*) void *arg) {  int rc = pthread_create(thread attr start_routine arg);  assert(rc == 0); // Assert that the thread was created successfully } // Function to join a pthread void Pthread_join(pthread_t thread void **value_ptr) {  int rc = pthread_join(thread value_ptr);  assert(rc == 0); // Assert that the thread was joined successfully } #endif // __MYTHREADS_h__ 
C
// mythread.h (A wrapper header file with assert // statements) #ifndef __MYTHREADS_h__ #define __MYTHREADS_h__ #include  #include    #include  void Pthread_mutex_lock(pthread_mutex_t *m) {  int rc = pthread_mutex_lock(m);  assert(rc == 0); }   void Pthread_mutex_unlock(pthread_mutex_t *m) {  int rc = pthread_mutex_unlock(m);  assert(rc == 0); }   void Pthread_create(pthread_t *thread const pthread_attr_t *attr   void *(*start_routine)(void*) void *arg) {  int rc = pthread_create(thread attr start_routine arg);  assert(rc == 0); } void Pthread_join(pthread_t thread void **value_ptr) {  int rc = pthread_join(thread value_ptr);  assert(rc == 0); } #endif // __MYTHREADS_h__ 
Python
import threading import ctypes # Function to lock a thread lock def Thread_lock(lock): lock.acquire() # Acquire the lock # No need for assert in Python acquire will raise an exception if it fails # Function to unlock a thread lock def Thread_unlock(lock): lock.release() # Release the lock # No need for assert in Python release will raise an exception if it fails # Function to create a thread def Thread_create(target args=()): thread = threading.Thread(target=target args=args) thread.start() # Start the thread # No need for assert in Python thread.start() will raise an exception if it fails # Function to join a thread def Thread_join(thread): thread.join() # Wait for the thread to finish # No need for assert in Python thread.join() will raise an exception if it fails 

出力: 

Thread Entered: 1  
Thread Entered: 0
Actual Count: 2000000000 | Expected Count: 2000000000