세마포어

세마포어는 N개 프로세스가 동시에 락을 획득할 수 있도록 허용하는 뮤텍스 기반의 동기 처리 기법이다. 물리적인 계산 리소스 이용에 제한을 적용하고 싶은 경우 사용할 수 있다.

스핀락 기반 세마포어 구현

#define NUM 4

void semaphore_acquire(volatile int *cnt) { // 락을 얻은 프로세스 수를 의미하는 공유 변수 포인터를 받는다.
    while (1) {
        while (*cnt >= NUM); // 락을 얻은 프로세스가 NUM 이상이면 스핀하며 대기한다.
        __sync_fetch_and_add(cnt, 1); // NUM 미만이면 값을 아토믹하게 증가한다.
        if (*cnt <= NUM) { // 증가한 공유 변수 값이 NUM 이하라면 루프를 벗어나 락을 얻는다.
            break;
        }
        __sync_fetch_and_sub(cnt, 1); // 증가한 공유 변수 값이 NUM을 초과하면 다시 시도한다.
    }
}

void semaphore_release(int *cnt) {
    __sync_fetch_and_sub(cnt, 1);
}
int cnt = 0;

void do_something() {
    while (1) {
        semaphore_acquire(&cnt);
        // critical section
        semaphore_release(&cnt);
    }
}

러스트 세마포어 구현

use std::sync::{Condvar, Mutex};

pub struct Semaphore {
    mutex: Mutex<isize>,
    cond: Condvar,
    max: isize, // 동시에 락을 획득할 수 있는 프로세스 최대 수
}

impl Semaphore {
    pub fn new(max: iszie) -> Self {
        Semaphore {
            mutex: Mutex::new(0),
            cond: Condvar::new(),
            max,
        }
    }

    pub fn wait(&self) {
        let mut cnt = self.mutex.lock().unwrap();
        while *cnt >= self.max { // 락을 획득한 프로세스가 이미 최대치라면 대기한다.
            cnt = self.cond.wait(cnt).unwrap();
        }
        *cnt += 1; // 락을 획득할 수 있으면 카운터를 증가하고 임계 구역으로 진입한다.
    }

    pub fn post(&self) {
        let mut cnt = self.mutex.lock().unwrap();
        *cnt -= 1;
        if *cnt <= self.max {
            self.cond.notify_one();
        }
    }
}
use semaphore::Semaphore;
use std::sync::atomic::{AtomicUsize, Ordering}
use std::sync::Arc;

const NUM_LOOP: usize = 100000;
const NUM_THREADS: usize = 8;
const SEM_NUM: isize = 4;

static mut CNT: AtomicUsize = AtomicUsize::new(0);

fn main() {
    let mut v = Vec::new();
    let sem = Arc::new(Semaphore::new(SEM_NUM)); // SEM_NUM만큼 동시 실행 가능한 세마포어

    for i in 0..NUM_THREADS {
        let s = sem.clone();

        let t  = std::thread::spawn(move || {
            for _ in 0..NUM_LOOP {
                s.wait();

                unsafe { CNT.fetch_add(1, Ordering::SeqCst) };
                let n = unsafe { CNT.load(Ordering::SeqCst) };
                assert!((n as isize) <= SEM_NUM);
                unsafe { CNT.fetch_sub(1, Ordering::SeqCst) };

                s.post();
            }
        });

        v.push(t);
    }

    for t in v {
        t.join().unwrap();
    }
}
  • std::sync::Arc: Atomically Reference Counted
    • 스레드 세이프한 레퍼런스 카운팅 포인터. Rc 타입과 다르게 원자적으로 명령을 수행한다.
    • Arc<T> 타입은 타입 T의 값에 대한 공유된 오너십을 제공한다. (힙 메모리에 할당된다.)
    • clone을 호출하면 같은 힙 메모리 주소를 가리키는 새로운 Arc 인스턴스를 만들어낸다:
      let foo = Arc::new(vec![1.0, 2.0, 3.0]);
      
      let a = foo.clone();
      let b = Arc::clone(&foo); // `foo.clone()`과 동일하다.
      
      // `a`, `b`, `foo`는 모두 같은 메모리 주소를 가리키는 `Arc` 인스턴스다.
      
    • 스레드 사이에 데이터를 공유하고 싶을 때 Arc를 사용한다:
      let apple = Arc::new("the same apple");
      
      for _ in 0..10 {
          let apple = Arc::clone(&apple);
      
          thread::spawn(move || {
              println!("{:?}", apple);
          });
      }
      
  • Rc 타입에 대한 자세한 내용은 『TRPL Ch 15. Smart Pointers』를 참고.

이 문서를 인용한 문서