Asynchroniczne REST Api – jak i po co?

Asynchroniczne REST Api – jak i po co?

W tym artykule wyjaśnię Ci, czym jest asynchroniczne REST API, czemu i kiedy takie tworzyć. Poza tym powiem Ci jak sobie poradzić z synchronicznym REST API, żeby bez dostępu do kodu uzyskać wersję asynchroniczną. Takie czary 🙂

W międzyczasie ubrudzimy sobie trochę ręce. Przykładowe kody umieściłem na GitHubie. No to jedziemy.

Jak działa REST API?

REST Api możemy podzielić na dwie wersje – synchroniczne i asynchroniczne. Jeśli chodzi o synchroniczne, to sprawa jest dość prosta. Wysyłasz żądanie i czekasz na odpowiedź:

public async Task GenerateReport(string jsonData)
{
    var result = await _httpClient.PostAsJsonAsync("orders/report", jsonData);
}

Kod jest prosty i całe flow też. Zobaczmy jak to wygląda w żądaniach HTTP:

  • klient wysyła żądanie do serwera o wygenerowanie jakiegoś raportu, np.:
POST https://example.com/orders/report
{
  "startDate": "2023-01-01",
  "endDate": "2023-01-31"
}
  • serwer odpowiada kodem 400, jeśli przekazane dane są błędne lub 200 OK:
HTTP/1.1 200 OK
Content-Type: application/json
{
 "data": <data>
}

Tutaj oczywiście dane raportu mogą pojawić się w BODY lub też dostaniesz adres do pobrania pliku z raportem – w zależności od API.

Gorzej, gdy operacja na serwerze trwa długo. Kilkadziesiąt sekund lub kilka minut. Wtedy taka robota może zakończyć się kilkoma problemami:

  • możesz otrzymać time-out (brak odpowiedzi z serwera)
  • tak, czy inaczej używasz wątku, który czeka (na odpowiedź). Może to powodować problemy w wydajności aplikacji, szybsze jej skalowanie w górę i wzrost kosztów związany z chwilowym zwiększonym zapotrzebowaniem na zasoby (zwłaszcza jeśli ruch jest duży).
  • podatność na atak DDoS

Asynchroniczne REST Api

Jeśli operacja może trwać nieco dłużej niż kilka sekund, lepiej rozważyć jest zrobienie asynchronicznego API. Możesz je zaprojektować na kilka sposobów. Klient może odpytywać serwer co jakiś czas o status operacji lub klient może przekazać webhooka, na który serwer da znać, gdy operacja się zakończy. Prześledźmy obie możliwości:

Odpytywanie serwera

  • klient wysyła żądanie do serwera, np.:
POST https://example.com/orders/report
{
  "startDate": "2023-01-01",
  "endDate": "2023-01-31"
}
  • serwer odpowiada kodem 202 Accepted, dodając nagłówek Location, który wskazuje na endpoint, którym możesz odpytywać o status operacji
HTTP/1.1 202 Accepted
Location: orders/report/status/<id>
Retry-After: 60
  • serwer rozpoczyna operację (lub częściej – przekazuje ją dalej do wykonania)
  • co jakiś czas (Retry-After) pytasz o stan operacji, wysyłając żądanie na końcówkę otrzymaną w kroku 2
GET https://example.com/orders/report/status/<id>
  • możesz dostać odpowiedź 200 OK, wraz z opisem statusu lub 303 See Other ze wskazaniem miejsca, z którego pobierasz rezultat. Przy czym kod 303 oznacza, że operacja się zakończyła.

Przykładowa odpowiedź na operację, która jest w toku:

HTTP/1.1 200 OK
Content-Type: application/json
Retry-After: 60
{
 "status" : "InProgress"
}

Przykładowa odpowiedź na zakończoną operację:

HTTP/1.1 303 See Other
Location: orders/report/<id>
  • wysyłasz żądanie po rezultat na końcówkę z nagłówka Location
GET https://example.com/orders/report/<id>

W tym momencie żaden wątek klienta nie był zblokowany i nie czekał aż operacja się wykona. Co więcej, jeśli serwer przekazał operację do wykonania dalej, żaden wątek serwera też nie został zblokowany. Po prostu klient zlecił jakieś zadanie i co jakiś czas odpytywał, czy jest już zrobione (jak to bywa w życiu ;)).

Oczywiście serwer może odpowiedzieć na różne sposoby. W pewnym momencie może się coś wywalić i wtedy pytając o status klient powinien otrzymać informację o błędzie.

Jeśli klient przesyła niepoprawne dane w pierwszym żądaniu, serwer powinien odpowiedzieć kodem 400 Bad Request zamiast 202 Accepted – jak w przypadku synchronicznej wersji.

A niech to serwer… odpowie

Czasem nie chcesz, żeby klient pytał co jakiś czas o stan zadania i wychodzisz z założenia: „Panie, będzie to będzie”. Tak też można. Tutaj sprawa jest nieco prostsza.

  • klient wysyła żądanie wraz z adresem, na który serwer ma dać odpowiedź
POST https://example.com/orders/report
{
  "startDate": "2023-01-01",
  "endDate": "2023-01-31",
  "callbackUrl": "https://application.com/callback"
}
  • serwer odpowiada 202 Accepted (lub 400, jeśli dane w żądaniu są nieprawidłowe). Zauważ, że nie podaje tutaj już końcówki do sprawdzania stanu – nagłówka Location. Po prostu – „będzie zrobione, jak się zrobi”
HTTP/1.1 202 Accepted
  • no i jak już się zrobiło, to tym razem SERWER wysyła żądanie do klienta na wcześniej przekazany callback
POST https://application.com/callback
{
  "status" : "Completed",
  "links" : [{
    "rel" : "reports",
    "href" : "orders/reports/<id>"
  }]
}

Na koniec klient powinien zapytać się o konkretny raport, strzelając na podany endpoint. Oczywiście w zależności od API, serwer też może już w callbacku wysłać wynikowe dane.

Jak to wygląda w praktyce

Zazwyczaj, żeby móc korzystać z czyjegoś API, musisz zarejestrować swojego klienta (swoją aplikację, która będzie to API wykorzystywać). Często podczas rejestracji można podać od razu adres callback, na który serwer ma dawać znać o zakończonym zadaniu lub po prostu wysyłać do Ciebie różne komunikaty – to już zależy od konkretnego API.

Jednak często jest też możliwe wysłanie adresu callbacka w żądaniu, jak to było zrobione w tym przykładzie.

API, po skończonym zadaniu, może od razu wysłać Ci rezultat zamiast odpowiedzi o zakończonym statusie (tak jak w powyższym przykładzie).

Piszemy asynchroniczny serwer

Teraz napiszemy sobie przykładowy asynchroniczny serwer. Zauważ kilka rzeczy:

  • to jest przykład – dość prosty, acz użyteczny
  • nie ma tutaj mechanizmu autoryzacji, który powinien być w prawdziwym rozwiązaniu
  • nie ma tutaj wykonywania prawdziwej operacji, w rzeczywistym przypadku to może być robione na różne sposoby
  • nie ma tutaj żadnej abstrakcji, piszemy najprościej jak się da, jednak staram się stosować zasady czystego kodu

UWAGA! Ta wersja kodu jest wersją prostą. Bez użycia Azure (albo innej chmury). Weź pod uwagę, że to nie jest do końca asynchroniczne rozwiązanie, jednak jeśli nie znasz Azure, to ta wersja dużo bardziej ułatwi Ci zrozumienie o co w tym chodzi. Wersja wykorzystująca Azure jest opisana niżej.

Na początek stwórzmy sobie standardowe WebApi z kontrolerem do pogody, który za chwilę zmienimy. Do tego stwórzmy serwis WeatherForecastService, który na początek będzie pusty:

public class WeatherForecastService
{
    public Task GetForecast(Guid requestId, DateOnly date)
    {
        return Task.CompletedTask;
    }
}

Serwis zarejestrujemy jako Scoped:

builder.Services.AddScoped<WeatherForecastService>();

I wstrzykniemy go do naszego lekko zmienionego kontrolera:

[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
    private WeatherForecastService _service;

    public WeatherForecastController(WeatherForecastService service)
    {
        _service = service;
    }

    [HttpGet(Name = "GetWeatherForecast/{date}")]
    public async Task<IActionResult> Get(DateOnly date)
    {
        Guid requestId = Guid.NewGuid();
        _ = _service.GetForecast(requestId, date);

        return await Task.FromResult(Accepted($"WeatherForecast/Status/{requestId}"));
    }
}

Do tej pory wszystko powinno być jasne. Użytkownik pyta o pogodę na dany dzień. Kontroler wywołuje metodę w serwisie. Dodatkowo kontroler tworzy ID dla tego żądania, które zwraca użytkownikowi. Tutaj ważne jest, że nie czekamy na to aż wykona się Task GetForecast. Nie ma tutaj wywołania z await. To znaczy, że kontroler może zakończyć swoją pracę od razu, a Task będzie działał w tle.

Na koniec kontroler zwraca odpowiednią odpowiedź – 202 Accepted. Zwróć uwagę, że kontroler nie czeka na wykonanie operacji przez serwis. Co to oznacza? W tym momencie nigdy nie dowiemy się, czy operacja się wykonała i jaki jest jej wynik. Dlatego też ten wynik trzeba gdzieś zapisać…

Baza danych

Tak, takie operacje zapisuje się w bazie danych. Stwórzmy więc prosty model bazodanowy i banalną bazę danych – opakowany słownik (weź pod uwagę, że w rzeczywistości powinna być to prawilna baza, ten kod poniżej jest tylko ze względu na prostotę):

public enum OperationStatus
{
    NotStarted,
    InProgress,
    Finished
}
public class WeatherDatabaseItem
{
    public Guid RequestId { get; set; }
    public OperationStatus Status { get; set; }
    public WeatherForecast Data { get; set; }
}

A teraz nasza baza danych:

public class Database
{
    private Dictionary<Guid, WeatherDatabaseItem> _weatherForecasts = new();

    public void UpsertForecast(Guid id, OperationStatus status, WeatherForecast forecast)
    {
        WeatherDatabaseItem item = GetOrCreate(id);
        item.Status = status;
        item.Data = forecast;

        _weatherForecasts[id] = item;
    }

    private WeatherDatabaseItem GetOrCreate(Guid id)
    {
        WeatherDatabaseItem result = null;

        if (!_weatherForecasts.TryGetValue(id, out result))
            return new WeatherDatabaseItem { RequestId = id };

        return result;
    }
}

Żeby taka „baza” miał sens, musimy klasę Database zarejestrować jako singleton:

builder.Services.AddSingleton<Database>();

Serwis

OK, mając bazę danych i kontroler, możemy teraz zrobić jakąś prawdziwą robotę – sprawdzić/wyliczyć prognozę pogody w serwisie. Wstrzyknijmy mu bazę danych i dajmy nieco ciałka:

public class WeatherForecastService
{
    private readonly Database _database;

    public WeatherForecastService(Database database)
    {
        _database = database;
    }

    public async Task GetForecast(Guid requestId, DateOnly date)
    {
        _database.UpsertForecast(requestId, OperationStatus.InProgress, null);

        await Task.Delay(30000); //symulacja długiej operacji

        var result = new WeatherForecast
        {
            Date = date,
            Summary = "Sunny",
            TemperatureC = 25
        };

        _database.UpsertForecast(requestId, OperationStatus.Finished, result);
    }
}

Zwróć uwagę na dwie rzeczy. Po pierwsze na początku (linia 12) dodajemy pusty rekord do bazy z uzyskanym ID i statusem ustawionym na InProgress. To znaczy, że obliczenia są w trakcie.

Następnie mamy jakiś Delay, który ma tylko symulować długotrwałą operację.

Na koniec dodajemy do bazy gotową prognozę pogody. W międzyczasie klienci mogą pytać się o status operacji i dostaną odpowiedź InProgress.

Zatem musimy jeszcze zrobić dwa endpointy w kontrolerze:

  • sprawdzanie statusu operacji
  • pobieranie wyników

Sprawdzanie statusu operacji

Najpierw w bazie danych dodajmy metodę do pobierania odpowiedniego rekordu:

public class Database
{
    private Dictionary<Guid, DatabaseItem> _weatherForecasts = new();

    public void UpsertForecast(Guid id, OperationStatus status, WeatherForecast forecast)
    {
        DatabaseItem item = GetOrCreate(id);
        item.Status = status;
        item.Data = forecast;

        _weatherForecasts[id] = item;
    }

    public DatabaseItem GetById(Guid id)
    {
        DatabaseItem result = null;
        _weatherForecasts.TryGetValue(id, out result);
        return result;
    }

    private DatabaseItem GetOrCreate(Guid id)
    {
        DatabaseItem result = null;

        if (!_weatherForecasts.TryGetValue(id, out result))
            return new DatabaseItem { RequestId = id };

        return result;
    }
}

Teraz w serwisie powinna znaleźć się metoda do pobierania statusu:

public class WeatherForecastService
{
    private readonly Database _database;

    public WeatherForecastService(Database database)
    {
        _database = database;
    }

    public async Task GetForecast(Guid requestId, DateOnly date)
    {
        _database.UpsertForecast(requestId, OperationStatus.InProgress, null);

        await Task.Delay(30000);

        var result = new WeatherForecast
        {
            Date = date,
            Summary = "Sunny",
            TemperatureC = 25
        };

        _database.UpsertForecast(requestId, OperationStatus.Finished, result);
        return Task.CompletedTask;
    }

    public async Task<OperationStatus> GetRequestStatus(Guid requestId)
    {
        var status = await Task.Run(() =>
        {
            var item = _database.GetById(requestId);
            if (item == null)
                return OperationStatus.NotStarted;
            else
                return item.Status;
        });

        return status;
    }
}

Możesz się czepić o to, że metoda GetRequestStatus jest oznaczona jako asynchroniczna, bo w tym przypadku ta asynchroniczność niczego nie daje. Zrobiłem tak tylko po to, żeby utrzymać konwencję pobierania danych z bazy danych jako operację asynchroniczną.

No i na koniec endpoint w kontrolerze:

[HttpGet("Status/{requestId}")]
public async Task<IActionResult> GetStatus(Guid requestId)
{
    var status = await _service.GetRequestStatus(requestId);
    if(status == Models.OperationStatus.Finished)
    {
        Response.Headers.Add("Location", $"/WeatherForecast/{requestId}");
        return StatusCode(StatusCodes.Status303SeeOther);
    } else
    {
        var data = new
        {
            status = status
        };
        return Ok(data);
    }
}

Najpierw pobieramy status z serwisu i sprawdzamy go. Jeśli operacja jest zakończona, to odsyłamy klientowi odpowiedź 303 See Other wraz z linkiem do pobrania danych.

Jeśli jednak operacja jest w trakcie, to odsyłamy klientowi rezultat Ok z odpowiednimi danymi.

UWAGA!

Standardowi klienci (przeglądarki internetowe, Postman, a nawet klasa HttpClient) mają wbudowany mechanizm „follow redirects„, co oznacza że po odebraniu odpowiedzi 301, 302 lub 303 automatycznie przeniosą Cię na adres, który będzie w nagłówku Location.

Z jednej strony to fajnie, bo masz załatwioną część pracy. Z drugiej strony jeśli chcesz to przetestować krok po kroku np. Postmanem, to musisz mu wyłączyć opcję „Follow redirect” albo globalnie, albo na poziomie konkretnego requestu:

Niestety nie da się tego wyłączyć w Swaggerze.

Minusy takiego serwera

Oczywiście takie rozwiązanie (opisany serwer) ma swoje minusy, które w zależności od sytuacji można albo zaakceptować, albo nie.

Przede wszystkim tutaj serwer, mimo wszystko, odpowiada za przeprowadzenie długotrwałej operacji. Fakt, żadne wątki nie są blokowane, komunikacja z klientami jest szybka, natomiast jeśli uruchomimy kilka takich zadań, bo dostaniemy żądania od kilku klientów, wtedy nie skorzystamy za bardzo z asynchroniczności. Może się okazać, że aplikacja szybko będzie potrzebowała nowych zasobów i albo je dostanie (wzrost opłat za hosting), albo przestanie odpowiadać i się zblokuje. Daje to też możliwą podatność na atak DDoS.

Oczywiście, jeśli z apki korzysta kilkadziesiąt klientów raz na jakiś czas, to raczej nie ma to znaczenia. Ale już przy kilkuset czy kilku tysiącach, to jest nie do pomyślenia.

Jak zatem sobie z tym poradzić w rzeczywistości?

Odpowiedzią jest chmura.

Serwer asynchroniczny z użyciem chmurki

Pokażę Ci to na przykładzie Microsoft Azure. Jeśli nie wiesz, czym jest Azure, to ten akapit i następne nie dadzą Ci za wiele i na tym możesz skończyć czytanie. Jeśli jednak coś tam wiesz albo jesteś po prostu ciekawy, to czytaj dalej 🙂 Najpierw, z poczucia obowiązku, opiszę Ci bardzo ogólnie trzy usługi, z których będziemy korzystać.

Zakładam że posiadasz subskrypcję Azure’ową i wiesz jak rejestrować podstawowe usługi.

Poniższy przykład nie ma nic wspólnego z bezpieczeństwem. Ze względu na prostotę, wszystkie klucze i hasła będą przekazywane aplikacji w jawny sposób. Nie używamy tutaj KeyVaulta, żeby nie zaciemniać obrazu.

Usługi

Zauważ, że metod na rozwiązanie tego problemu jest zapewne kilka. Ja przedstawię Ci tylko jedną z nich. Oto usługi, z jakich będziemy korzystać:

  • Storage Queue
  • Azure Functions
  • CosmosDb

Oględny opis usług

Storage Queue

StorageQueue to usługa, która daje Ci kolejkę. Możesz kłaść do niej wiadomości, odczytywać je, a także zdejmować je z kolejki.

Azure Functions

Są to funkcje, które mogą być „serverless„, tzn. nie potrzebujesz do ich utrzymywania żadnego serwera. Ma to swoje plusy i minusy. Plusem zdecydowanie są (bardzo) małe koszty.

Potraktuj AzureFunction jak zwykłą funkcję lub metodę. Możesz ją napisać na kilka sposobów i w różnych językach. My się skupimy tutaj na kompilowanej wersji C#.

Dodatkowo funkcje Azurowe mają tzw. triggery – czyli coś, co je uruchamia. Jest wiele wbudowanych triggerów i one właściwie wystarczają. Jednym z nich jest np. wywołanie HTTP, innym – którego będziemy używać – dodanie nowej wiadomości do kolejki Storage Queue.

CosmosDB

CosmosDb to baza danych typu NoSQL. Na Azure znajduje się jej wersja „serverless”, dzięki czemu w prostych zastosowaniach koszty takiej bazy są naprawdę mikroskopijne.

W tej bazie będziemy trzymać dane dotyczące naszych prognoz.

Jeśli nie chcesz tworzyć usług ręcznie, w przykładowym kodzie są pliki BICEP, które utworzą infrastrukturę (o tym jak to zrobić piszę niżej).

Jak utworzyć usługi automatem?

W przykładowych kodach znajdują się pliki BICEP z opisaną strukturą usług. Teraz musisz się upewnić, że masz zainstalowane narzędzie az bicep:

az bicep version

lub je zainstalować:

az bicep install

Następnie za pomocą az musisz zalogować się do swojej subskrypcji na Azure i stworzyć grupę zasobów (dajmy na to: rg-rest-api).

Teraz, mając odpowiednią grupę, możesz uruchomić tworzenie usług. Przejdź w konsoli do katalogu deployment w przykładowych kodach, a następnie:

az deployment group create --resource-group "rg-rest-api" --template-file .\main.bicep

Po chwili wszystkie wymagane usługi będą utworzone w twojej resource grupie.

Tworzymy bazę danych

Na początek utwórzmy bazę danych CosmosDb (For NoSQL).

Tak naprawdę potrzebujemy tylko jednego kontenera. Nazwijmy go operations, a partitionKey ustawmy na requestId. W tym kontenerze będziemy trzymać wszystkie informacje.

Tworzymy kolejkę

Teraz stwórzmy kolejkę (Blob Storage Queue). Nowe wiadomości wpadające do tej kolejki będą odpalać Azurową funkcję, która będzie robiła całą robotę. W tym celu musimy utworzyć StorageAccount.

Jak widzisz, ja utworzyłem storage account o nazwie masterbranchweatherst, a w środku kolejkę o nazwie weather-requests-queue.

Super, została teraz już tylko logika do zrobienia. A to wymaga utworzenia funkcji Azurowej.

Tworzymy funkcję Azurową

Funkcja będzie uruchamiana wcześniej utworzoną kolejką. To znaczy, że jeśli w kolejce znajdzie się jakaś wiadomość, to to zdarzenie uruchomi funkcję i przekaże do niej konkretną wiadomość.

Oczywiście, jeśli w kolejce będzie 100 wiadomości, to jest szansa, że uruchomi się 100 funkcji równolegle. To jednak zależy od kilku czynników, którymi nie będziemy się zajmować w tym artykule. To co jest ważne, to to, że jeśli funkcje Azurowe nie będą w stanie obrobić wszystkich wiadomości od razu, te wiadomości będą po prostu czekać na swoją… kolej. Jak to w kolejce 🙂 Dzięki temu system wciąż będzie wydolny i nie zobaczymy żadnego przeciążenia. Po prostu niektóre wyniki będą nieco później dostępne.

Taka funkcja w najprostszej postaci może wyglądać jak w przykładzie:

public class AnalyzeWeather
{
    private readonly ILogger<AnalyzeWeather> _logger;
    private readonly WeatherRepository _weatherRepository;
    private readonly Randomizer _randomizer;

    public AnalyzeWeather(ILogger<AnalyzeWeather> logger, 
        WeatherRepository weatherRepository, 
        Randomizer randomizer)
    {
        _logger = logger;
        _weatherRepository = weatherRepository;
        _randomizer = randomizer;
    }

    [Function(nameof(AnalyzeWeather))]
    public async Task Run([QueueTrigger("weather-requests-queue", Connection = "QueueConnectionString")] QueueMessage message)
    {
        _logger.LogInformation("Weather analyzing started");

        WeatherQueueItem msgItem = message.Body.ToObjectFromJson<WeatherQueueItem>();

        WeatherDatabaseItem dbItem = new WeatherDatabaseItem();
        dbItem.Status = OperationStatus.InProgress;
        dbItem.RequestId = msgItem.Data.RequestId;
        dbItem.Data = msgItem.Data.Data;

        await _weatherRepository.UpsertWeatherOperation(dbItem);

        //symulacja
        await Task.Delay(30000);

        dbItem.Data.Summary = "Warm";
        dbItem.Data.TemperatureC = _randomizer.GetInt(20, 29);
        dbItem.Status = OperationStatus.Finished;

        await _weatherRepository.UpsertWeatherOperation(dbItem);            
    }
}

Nie omawiam tutaj, jak działają funkcje Azurowe i czym są, zakładam że wiesz to. Ale jeśli chciałbyś przeczytać o tym artykuł daj znać w komentarzu.

Tutaj sprawa jest prosta. Do funkcji trafia wiadomość z kolejki (parametr message). Wiadomość ma właściwość Body, w której znajdzie się json, którego wcześniej wysyłamy (o tym za chwilę).

To, co robimy w linii 22, to deserializujemy tego JSONa do konkretnego obiektu. Następnie (w linii 29) zapisujemy stan naszej operacji w bazie danych – zauważ, że ze statusem InProgress.

Potem symulujemy jakąś długą analizę, na koniec aktualizujemy nasz rekord w bazie danych częściowo losowymi danymi (to tak dla picu, żeby się działo :))

Wszystkie kody zobaczysz w przykładzie, nie jest celem tego artykułu opisywanie ich, bo byłby straaasznie długi. A to zwykła obsługa funkcji azurowej i CosmosDb.

OK, skoro już mamy funkcję azurową uruchamianą przez kolejkę, to teraz trzeba coś do tej kolejki dodać. I tu jest właśnie przeniesienie pracy i rozdzielenie naszej aplikacji na mniejsze części.

Dodajemy wiadomość do kolejki.

Tutaj sprawa jest prosta. To ma działać tak:

  • klient wysyła żądanie z pytaniem o prognozę pogody
  • jego żądanie jest wrzucane do kolejki
  • zwracamy mu odpowiedź 202 Accepted wraz z linkiem do pobierania informacji o statusie jego żądania

Najpierw pokażę Ci klasę, którą napisałem do wrzucenia żądania do kolejki:

public class StorageQueueService
{
    private readonly QueueOptions _queueOptions;

    public StorageQueueService(IOptions<QueueOptions> queueOptions)
    {
        _queueOptions = queueOptions.Value;
    }
    public async Task SendWeatherRequest(WeatherDatabaseItem item)
    {
        QueueClientOptions clientOptions = new QueueClientOptions
        {
            MessageEncoding = QueueMessageEncoding.Base64
        };

        var client = new QueueClient(_queueOptions.ConnectionString, 
            _queueOptions.WeatherQueueName, clientOptions);
        
        WeatherQueueItem queueItem = new WeatherQueueItem
        {
            Data = item
        };

        var serializedData = JsonSerializer.Serialize(queueItem);
        await client.SendMessageAsync(serializedData);
    }
}

Klasa jest dość prymitywna. Tworzymy ją z opcjami QueueOptions – to jest zwykła klasa trzymająca opcje, które pozwalają na połączenie się z kolejką:

public class QueueOptions
{
    public string ConnectionString {  get; set; }
    public string WeatherQueueName { get; set; }
}

Czyli mamy tutaj connection string do kolejki (w dokładniej do AzureBlobStorage), a także nazwę kolejki, do której chcemy wrzucać te żądania. Więcej o opcjach w .NET pisałem w tym artykule.

Następnie mamy tylko jedną metodę: SendWeatherRequest, która jest odpowiedzialna właśnie za wrzucenie konkretnego żądania na kolejkę. Wrzucamy to w postaci JSON, później otrzymamy to jako wiadomość w naszej funkcji Azure.

WeatherQueueItem to zwykła klasa, którą wykorzystuję dla danych wysyłanych do kolejki. Równie dobrze mógłbym posłużyć się WeatherDatabaseItem, jednak uznałem że tak będzie bardziej prawilnie. Ostatecznie ta wiadomość w kolejce może mieć jakieś dodatkowe dane. Natomiast, jak widzisz, WeatherDatabaseItem jest składnikiem WeatherQueueItem:

public class WeatherQueueItem
{
    public WeatherDatabaseItem Data {  get; set; }
}

StorageQueueService rejestrujemy jako Scoped (przy okazji konfigurujące opcje):

builder.Services.AddScoped<StorageQueueService>();
builder.Services.Configure<QueueOptions>(builder.Configuration.GetSection("StorageQueueOptions"));

Aktualizujemy WeatherForecastService

Teraz tylko zaktualizujemy sobie WeatherForecastService, bo kontroler będzie korzystał z niego:

public class WeatherForecastService
{
    private readonly Database _database;
    private readonly StorageQueueService _queueService;

    public WeatherForecastService(Database database, 
        StorageQueueService queueService)
    {
        _database = database;
        _queueService = queueService;
    }

    public async Task SetForecastRequestToQueue(Guid requestId, DateOnly date)
    {
        WeatherForecast forecast = new WeatherForecast
        {
            Date = date
        };

        WeatherDatabaseItem dbItem = new WeatherDatabaseItem
        {
            RequestId = requestId,
            Data = forecast,
            Status = OperationStatus.NotStarted,
        };

        await _queueService.SendWeatherRequest(dbItem);
    }
    //reszta bez zmian
}

No i na koniec dodajmy nowy endpoint do kontrolera:

[HttpGet(Name = "GetWeatherForecast/{date}")]
public async Task<IActionResult> Get(DateOnly date)
{
    Guid requestId = Guid.NewGuid();
    _ = _service.GetForecast(requestId, date);

    return await Task.FromResult(Accepted($"/WeatherForecast/Status/{requestId}"));
}

[HttpPost("GetAsyncWeatherForecast/{date}")]
public async Task<IActionResult> GetByQueue(DateOnly date)
{
    Guid requestId = Guid.NewGuid();
    await _service.SetForecastRequestToQueue(requestId, date);

    return Accepted($"/AsyncWeatherForecast/Status/{requestId}");
}

Podsumowanie

Ok, mamy prawie działający serwer asynchroniczny. Prawie, ponieważ nie ma tutaj końcówki do pobierania stanu ani wyniku. To będzie po prostu pobranie danych z CosmosDb, a działanie analogiczne do wersji serwera bez chmury.

Teraz, gdy wywołasz końcówkę GetByQueue, do kolejki zostanie dodana odpowiednia wiadomość. Dodanie tej wiadomości uruchomi funkcję Azurową. Funkcja odczyta tę wiadomość i zrobi odpowiednie wpisy w CosmosDb. To tyle.

Tworzenie asynchroniczności przy API synchronicznym

Może się zdarzyć taka sytuacja, że używasz API, które jest synchroniczne, jednak poszczególne żądania działają zbyt długo jak na Twoje wymagania.

W takiej sytuacji również możesz posłużyć się mechanizmem jak wyżej. I nie ma żadnego znaczenia, że nie masz dostępu do kodów API.

Musisz po prostu stworzyć jakąś kolejkę, funkcję Azure’ową i jakąś bazę danych. Teraz zamiast wysyłać żądanie do docelowego API, po prostu umieścisz odpowiedni komunikat w kolejce. Kolejka uruchomi funkcję Azurową, która strzeli do docelowego API. Po tym jak praca się skończy, funkcja Azure’owa może albo wysłać Ci powiadomienie (callback), albo po prostu zmienić dane w bazie, żebyś wiedział, że zadanie zostało zakończone.

Diagram takiej pracy może wyglądać w taki sposób (worker to docelowe API):


To tyle jeśli chodzi o asynchroniczne API. Nie przeczę, że temat może być zawiły zwłaszcza dla osób nie znających chmury lub juniorów. Jeśli czegoś nie zrozumiałeś lub znalazłeś w artykule błąd, koniecznie daj znać w komentarzu.

Podziel się artykułem na: