Trang chủ Kiến Thức Công Nghệ Concurrency pattern trong Go – Phần 2
Công Nghệ

Concurrency pattern trong Go – Phần 2

Chia sẻ
Concurrency pattern trong Go – Phần 2
Chia sẻ

Trong bài Concurrency pattern trong Go – Phần 1, chúng ta đã tìm hiểu 3 pattern quan trọng là confinement, vòng lặp for select và ngăn goroutine leak. Ở phần này, chúng ta sẽ tìm hiểu tiếp 3 pattern quan trọng khác, bao gồm: or channel, xử lý lỗi và pipeline.

1. Or channel

Ở phần 1, tôi đề cập tới pattern done channel dùng để dừng một goroutine. Đôi khi, một goroutine có nhiều điều kiện dừng. Pattern or channel sinh ra để giải quyết bài toán này.

Or Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng. Pattern này tạo ra một channel tổng hợp, giúp đơn giản hóa việc quản lý và lắng nghe trên nhiều goroutine chạy đồng thời, làm cho code gọn gàng và dễ đọc hơn.

Go

package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	<-Or(
		Sig(2*time.Hour),
		Sig(5*time.Minute),
		Sig(1*time.Second),
		Sig(1*time.Hour),
		Sig(1*time.Minute),
	)
	fmt.Printf("done after %v", time.Since(start))
}

// Hàm Sig tạo ra một channel sẽ đóng sau một khoảng thời gian xác định after.
func Sig(after time.Duration) <-chan any {
	c := make(chan interface{})
	go func() {
    	// Đóng channel sau một khoảng thời gian
		defer close(c)
		time.Sleep(after)
	}()
	return c
}

// Hàm `Or` nhận vào một slice các channel và trả ra duy nhất 1 channel tổng hợp.
func Or(channels ...<-chan any) <-chan any {
	switch len(channels) {
	case 0:
    	// Trường hợp không có channel nào, trả về một channel đã đóng để không xảy ra deadlock.
		closedCh := make(chan any)
		close(closedCh)
        return closedCh
	case 1:
    	// Chỉ có một channel, trả về chính channel đó.
		return channels[0]
	default:
    	// Nếu có từ 2 channel trở lên thì kết hợp nhiều channels thành 1 channel `orDone`
		orDone := make(chan any)
		go func() {
			defer close(orDone)

			select {
			case <-channels[0]:
			case <-channels[1]:
            // Gọi đệ quy cho tới khi channels chỉ còn một phần tử
			case <-Or(append(channels[2:], orDone)...):
			}
		}()
        // Trả ra 1 channel tổng hợp
		return orDone
	}
}
Ví dụ khởi tạo và sử dụng Or channel

Đoạn code trên trả ra kết quả như sau:

Bash

done after 1.001204583s

2. Xử lý lỗi

Trong Go, nhiều goroutine chạy đồng thời và độc lập với nhau và với cả process sinh ra chúng. Khi lỗi ở một goroutine, chúng ta sẽ gặp khó khăn xử lý lỗi nếu không có cơ chế giao tiếp phù hợp giữa các goroutine với nhau.

Điển hình là ví dụ dưới:

Go

package main

import (
	"fmt"
	"net/http"
)

func main() {
	done := make(chan interface{})
	defer close(done)

    urls := []string{"https://www.google.com", "https://badhost", "https://badhost2"}
    for response := range checkStatus(done, urls...) {
        fmt.Printf("Response: %vn", response.Status)
    }
}

func checkStatus(done <-chan interface{}, urls ...string) <-chan *http.Response {
    responses := make(chan *http.Response)
    go func() {
        defer close(responses)
        for _, url := range urls {
            resp, err := http.Get(url)
            if err != nil {
            	// Khi lỗi xảy ra, goroutine chỉ biết in ra console mà không có cách nào để thông báo lỗi tới nơi xử lý chúng.
                // Và không thể dừng checkStatus khi có lỗi xảy ra.
                fmt.Println(err)
                continue
            }
            select {
            case <-done:
                return
            case responses <- resp:
            }
        }
    }()
    return responses
}

Nếu có lỗi xảy ra, chương trình sẽ chỉ in ra màn hình kết quả:

Bash

Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host
Get "https://badhost2": dial tcp: lookup badhost: no such host

Solution:

Để xử lý vấn đề này, tôi sẽ trả ra một struct chứa cả data và error thay vì chỉ data chứa http.Response.

Go

// Tạo thêm một struct chứa cả dữ liệu và error
type Result struct {
    Error error
    Response *http.Response
}

func checkStatus(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            // Cho thêm error vào kết quả trả ra ở channel
            result = Result{Error: err, Response: resp}
            select {
            case <-done:
                return
            case results <- result:
            }
        }
    }()
    return results
}

Go

func main() {
	done := make(chan interface{})
    defer close(done)

    urls := []string{"https://www.google.com", "https://badhost"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
        	// Nếu có lỗi khi chạy goroutine thì in ra lỗi, sau đó, dừng vòng lặp
            fmt.Printf("error: %v", result.Error)
            return
        }
        // Nếu không có lỗi thì in kết quả ra màn hình.
        fmt.Printf("Response: %vn", result.Response.Status)
    }
}

Khi chạy tới url số 2, checkStatus lỗi nên chương trình dừng.

Go

Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host

Ngoài ra, tôi có thể check nếu error vượt quá giới hạn thì mới dừng chương trình.

Go

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %vn", result.Error)
        errCount++
        // Dừng chương trình nếu 3 lỗi xảy ra
        if errCount >= 3 {
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %vn", result.Response.Status)
}

Chương trình sẽ chỉ dừng nếu gặp 3 lỗi

Go

error: Get "a": unsupported protocol scheme ""
Response: 200 OK
error: Get "b": unsupported protocol scheme ""
error: Get "c": unsupported protocol scheme ""
Too many errors, breaking!

3. Pipeline

Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.

Pipeline

Các stage hoạt động độc lập với nhau và việc chỉnh sửa một stage sẽ không ảnh hưởng tới các stage còn lại, do đó, chúng hoàn toàn có thể chạy đồng thời hoặc chạy theo thứ tự nhất định.

Sau đây là ví dụ đơn giản về stage:

Go

multiply := func(value, multiplier int) int {
    return value * multiplier
}

add := func(value, additive int) int {
    return value + additive
}
Ví dụ về stage

Hàm multiplyadd được coi là 2 stage khác nhau. Hàmmultiply trả về giá trị là tích của hai số cho sẵn. Hàm add trả về tổng của hai số cho sẵn.

Tôi có thể kết hợp hai stage này để trả về pipeline add(multiply(v, 2), 1)).

Go

ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}
Ví dụ về pipeline

Đoạn code trên sẽ in ra kết quả như sau:

Bash

6
10
14
18

Trong thực tế, các bài toán thường phức tạp hơn thế này. Trong mỗi vòng for, có thể là một hoặc nhiều pipeline bao gồm các stage đang chạy. Nếu một stage bị lỗi, làm sao để tôi dừng được cả pipeline cũng như dừng toàn bộ các pipeline khác? Hay khi một pipeline hoàn thành, làm sao để tôi dừng các pipeline khác?

Để giải quyết vấn đề trên, tôi sẽ thêm vào hàm add hai channel donein. Channel done dùng để dừng stage khi cần thiết. Channel in chứa dữ liệu cần xử lý.

Go

func add(done, in <-chan int, additive int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
        	select {
            case <-done:
                return
            case out <- i:
            }
            out <- n + additive
        }
        close(out)
    }()
    return out
}

Tôi cũng thêm tương tự với hàm multiply.

Go

func multiply(done, in <-chan int, multiplier int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
        	select {
            case <-done:
                return
            case out <- i:
            }
            out <- n * multiplier
        }
        close(out)
    }()
    return out
}

Pipeline mới của tôi sẽ có dạng như sau:

Go

pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

4. Tổng kết

  • Or Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng.
  • Trong Go, các tác vụ chạy đồng thời sử dụng goroutine. Các goroutine chạy độc lập với nhau và với cả process sinh ra chúng, nên khi gặp lỗi, cần có cơ chế giao tiếp giữa các goroutine với nhau xử lý lỗi phát sinh.
  • Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.

5. Tài liệu tham khảo

  1. Concurrency in Go – Katherine Cox-Buday
  2. https://go.dev/talks/2012/concurrency.slide
  3. https://go.dev/blog/pipelines
Bài viết cùng chuyên mục
Tối ưu ứng dụng với cấu trúc dữ liệu cơ bản và bitwise
Công Nghệ

Tối ưu ứng dụng với cấu trúc dữ liệu cơ bản và bitwise

Trong bài viết này, 200Lab sẽ chia sẻ những trường hợp dễ...

Công Nghệ

So sánh Flutter vs React Native: Framework nào đáng học năm 2021

Điểm chung của Flutter, React Native đều là Cross-platform Mobile, build native...

HTTP/2 là gì? So sánh HTTP/2 và HTTP/1
Công Nghệ

HTTP/2 là gì? So sánh HTTP/2 và HTTP/1

Từ khi Internet ra đời, sự phát triển về các giao thức...

Upload File từ Frontend đến Backend mà rất nhiều bạn vẫn đang làm sai!!
Công Nghệ

Upload File từ Frontend đến Backend mà rất nhiều bạn vẫn đang làm sai!!

1. Client encode file (base64) rồi gởi về backend 200Lab đã từng...

Công Nghệ

React Native – Hướng dẫn làm việc với Polyline và Animated-Polyline trên Map

Vẽ đường đi trên bản đồ là một nghiệp vụ vô cùng...

Công Nghệ

Hybrid App và Native App: Những khác biệt to lớn

Bất cứ khi nào một công ty quyết định làm ứng dụng...

Web/System Architecture 101 – Kiến trúc web/hệ thống cơ bản cho người mới
Công Nghệ

Web/System Architecture 101 – Kiến trúc web/hệ thống cơ bản cho người mới

Đây là một kiến trúc cơ bản mà bất kì một người...

Công Nghệ

Tư duy kiến trúc thông qua các trò chơi mà rất nhiều bạn không biết

Tư duy kiến trúc là gì? Tư duy kiến trúc có thể...