Skip to content

taskiq-python/taskiq-sqs

Repository files navigation

taskiq-sqs

PyPI - Python Version PyPI Checks

This library provides SQS broker and S3 result backend for TaskIQ.

Installation

pip install taskiq-sqs

Basic usage

Here is an example of how to use the SQS broker with the S3 backend:

import asyncio
from taskiq_sqs import S3Bucket, S3ResultBackend, SQSBroker

QUEUE_NAME = "my-queue"
broker = SQSBroker(
    "http://localhost:4566/000000000000/my-queue",  # specify existing queue
    sqs_region_override="us-east-1"
).with_result_backend(
    S3ResultBackend(
        bucket=S3Bucket(name="response-bucket")  # by default backend will create bucket for you if it does not exist
    )
)

@broker.task()
async def i_love_aws() -> None:
    await asyncio.sleep(1)
    print("Hello there!")

async def main() -> None:
    await broker.startup()
    task = await i_love_aws.kiq()
    print(await task.wait_result())
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

How to run:

  • run worker first with taskiq worker examples.example_broker:broker
  • after that run broker to create a task and wait for result: python examples/example_broker.py

Message expiration

If you set the sqs_expiry label to a unix timestamp, the message will be discarded if the worker receives it after that time.

import asyncio
from taskiq_sqs import SQSBroker

broker = SQSBroker("http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue")

@broker.task
async def add_one(value: int) -> int:
    return value + 1


async def main() -> None:
    # Never forget to call startup in the beginning.
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result. (result backend must be configured)
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

About

AWS SQS Broker for TaskIQ

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors